use core::cmp::Ordering;
use core::iter::FusedIterator;
use alloc::boxed::Box;
use alloc::vec::Vec;
use crate::curve::curve_types::CurveType;
use crate::curve::{Curve, PartitionResult};
use crate::iterators::curve::{AggregationIterator, CurveSplitIterator};
use crate::iterators::peek::Peeker;
use crate::iterators::CurveIterator;
use crate::server::{AggregatedServerDemand, ConstrainedServerDemand, ServerProperties};
use crate::time::TimeUnit;
use crate::window::WindowEnd;
use crate::window::{Demand, Window};
type AggregateDemandWindow = <AggregatedServerDemand as CurveType>::WindowKind;
#[derive(Debug, Clone)]
pub struct ConstrainedServerDemandIterator<I> {
server_properties: ServerProperties,
demand:
Peeker<Box<CurveSplitIterator<AggregateDemandWindow, I>>, Window<AggregateDemandWindow>>,
spill: Option<Window<<AggregatedServerDemand as CurveType>::WindowKind>>,
remainder: Vec<Window<<ConstrainedServerDemand as CurveType>::WindowKind>>,
}
impl<'a, I> ConstrainedServerDemandIterator<I>
where
I: CurveIterator<CurveKind = AggregatedServerDemand>,
{
pub fn new(server_properties: ServerProperties, aggregated_demand: I) -> Self {
let split = CurveSplitIterator::new(aggregated_demand, server_properties.interval);
ConstrainedServerDemandIterator {
server_properties,
demand: Peeker::new(Box::new(split)),
spill: None,
remainder: Vec::new(),
}
}
}
impl<I: CurveIterator> FusedIterator for ConstrainedServerDemandIterator<I>
where
Self: Iterator,
CurveSplitIterator<<AggregatedServerDemand as CurveType>::WindowKind, I>: FusedIterator,
{
}
impl<I> CurveIterator for ConstrainedServerDemandIterator<I>
where
I: CurveIterator<CurveKind = AggregatedServerDemand>,
{
type CurveKind = ConstrainedServerDemand;
fn next_window(&mut self) -> Option<Window<<Self::CurveKind as CurveType>::WindowKind>> {
#![allow(clippy::option_if_let_else)]
if let Some(window) = self.remainder.pop() {
Some(window)
} else {
let next_group = self.demand.peek_ref();
let spill = self.spill.take();
match (next_group, spill) {
(None, None) => None,
(Some(group_head), Some(spill)) => {
let k_group_head = group_head.start / self.server_properties.interval;
let k_spill = spill.start / self.server_properties.interval;
match k_group_head.cmp(&k_spill) {
Ordering::Less => {
unreachable!("Groups are processed in order and spill can only go into the future")
}
Ordering::Equal => {
let mut windows = alloc::vec![group_head.take()];
for window in &mut self.demand {
if window.budget_group(self.server_properties.interval)
== k_group_head
{
windows.push(window);
} else {
self.demand.restore_peek(window);
break;
}
}
let next_group: Curve<AggregatedServerDemand> =
unsafe { Curve::from_windows_unchecked(windows) };
let curve: Curve<_> = AggregationIterator::new(alloc::vec![
next_group.into_iter(),
Curve::new(spill).into_iter(),
])
.collect_curve();
self.process_group(k_spill, curve)
}
Ordering::Greater => {
let curve = Curve::new(spill);
self.process_group(k_spill, curve)
}
}
}
(Some(group_head), None) => {
let k_group_head = group_head.start / self.server_properties.interval;
let mut windows = alloc::vec![group_head.take()];
for window in &mut self.demand {
if window.budget_group(self.server_properties.interval) == k_group_head {
windows.push(window);
} else {
self.demand.restore_peek(window);
break;
}
}
let next_group: Curve<AggregatedServerDemand> =
unsafe { Curve::from_windows_unchecked(windows) };
let curve = next_group;
self.process_group(k_group_head, curve)
}
(None, Some(spill)) => {
let k = spill.start / self.server_properties.interval;
let curve = Curve::new(spill);
self.process_group(k, curve)
}
}
}
}
}
impl<I> ConstrainedServerDemandIterator<I>
where
I: CurveIterator<CurveKind = AggregatedServerDemand>,
{
fn process_group(
&mut self,
k_group_head: usize,
curve: Curve<AggregatedServerDemand>,
) -> Option<Window<Demand>> {
let PartitionResult { index, head, tail } =
curve.partition(k_group_head, self.server_properties);
let mut windows = curve.into_windows();
self.remainder.reserve(windows.len().min(index) + 1);
self.remainder.extend(
windows
.drain(..index)
.chain(core::iter::once(head).filter(|window| !window.is_empty()))
.rev(),
);
let delta_k: WindowEnd = tail.length()
+ windows
.into_iter()
.skip(1)
.map(|window| window.length())
.sum::<WindowEnd>();
if delta_k > TimeUnit::ZERO {
let spill_start = (k_group_head + 1) * self.server_properties.interval;
self.spill = Some(Window::new(spill_start, spill_start + delta_k));
}
let result = self.remainder.pop();
assert!(result.is_some());
result
}
}