rta_for_fps_lib/iterators/server/
constrained_demand.rs1use core::cmp::Ordering;
5use core::iter::FusedIterator;
6
7use alloc::boxed::Box;
8use alloc::vec::Vec;
9
10use crate::curve::curve_types::CurveType;
11use crate::curve::{Curve, PartitionResult};
12use crate::iterators::curve::{AggregationIterator, CurveSplitIterator};
13use crate::iterators::peek::Peeker;
14use crate::iterators::CurveIterator;
15use crate::server::{AggregatedServerDemand, ConstrainedServerDemand, ServerProperties};
16use crate::time::TimeUnit;
17use crate::window::WindowEnd;
18use crate::window::{Demand, Window};
19
20type AggregateDemandWindow = <AggregatedServerDemand as CurveType>::WindowKind;
23
24#[derive(Debug, Clone)]
30pub struct ConstrainedServerDemandIterator<I> {
31 server_properties: ServerProperties,
33 demand:
35 Peeker<Box<CurveSplitIterator<AggregateDemandWindow, I>>, Window<AggregateDemandWindow>>,
36 spill: Option<Window<<AggregatedServerDemand as CurveType>::WindowKind>>,
38 remainder: Vec<Window<<ConstrainedServerDemand as CurveType>::WindowKind>>,
40}
41
42impl<'a, I> ConstrainedServerDemandIterator<I>
43where
44 I: CurveIterator<CurveKind = AggregatedServerDemand>,
45{
46 pub fn new(server_properties: ServerProperties, aggregated_demand: I) -> Self {
49 let split = CurveSplitIterator::new(aggregated_demand, server_properties.interval);
51 ConstrainedServerDemandIterator {
52 server_properties,
53 demand: Peeker::new(Box::new(split)),
54 spill: None,
55 remainder: Vec::new(),
56 }
57 }
58}
59
60impl<I: CurveIterator> FusedIterator for ConstrainedServerDemandIterator<I>
61where
62 Self: Iterator,
63 CurveSplitIterator<<AggregatedServerDemand as CurveType>::WindowKind, I>: FusedIterator,
64{
65}
66
67impl<I> CurveIterator for ConstrainedServerDemandIterator<I>
68where
69 I: CurveIterator<CurveKind = AggregatedServerDemand>,
70{
71 type CurveKind = ConstrainedServerDemand;
72
73 fn next_window(&mut self) -> Option<Window<<Self::CurveKind as CurveType>::WindowKind>> {
75 #![allow(clippy::option_if_let_else)] if let Some(window) = self.remainder.pop() {
78 Some(window)
79 } else {
80 let next_group = self.demand.peek_ref();
81 let spill = self.spill.take();
82
83 match (next_group, spill) {
84 (None, None) => None,
85 (Some(group_head), Some(spill)) => {
86 let k_group_head = group_head.start / self.server_properties.interval;
87 let k_spill = spill.start / self.server_properties.interval;
88
89 match k_group_head.cmp(&k_spill) {
90 Ordering::Less => {
91 unreachable!("Groups are processed in order and spill can only go into the future")
92 }
93 Ordering::Equal => {
94 let mut windows = alloc::vec![group_head.take()];
97
98 for window in &mut self.demand {
99 if window.budget_group(self.server_properties.interval)
100 == k_group_head
101 {
102 windows.push(window);
103 } else {
104 self.demand.restore_peek(window);
105 break;
106 }
107 }
108
109 let next_group: Curve<AggregatedServerDemand> =
111 unsafe { Curve::from_windows_unchecked(windows) };
112
113 let curve: Curve<_> = AggregationIterator::new(alloc::vec![
115 next_group.into_iter(),
116 Curve::new(spill).into_iter(),
117 ])
118 .collect_curve();
119
120 self.process_group(k_spill, curve)
121 }
122 Ordering::Greater => {
123 let curve = Curve::new(spill);
125 self.process_group(k_spill, curve)
126 }
127 }
128 }
129 (Some(group_head), None) => {
130 let k_group_head = group_head.start / self.server_properties.interval;
131 let mut windows = alloc::vec![group_head.take()];
134
135 for window in &mut self.demand {
136 if window.budget_group(self.server_properties.interval) == k_group_head {
137 windows.push(window);
138 } else {
139 self.demand.restore_peek(window);
140 break;
141 }
142 }
143
144 let next_group: Curve<AggregatedServerDemand> =
146 unsafe { Curve::from_windows_unchecked(windows) };
147
148 let curve = next_group;
149
150 self.process_group(k_group_head, curve)
151 }
152 (None, Some(spill)) => {
153 let k = spill.start / self.server_properties.interval;
156
157 let curve = Curve::new(spill);
158
159 self.process_group(k, curve)
160 }
161 }
162 }
163 }
164}
165
166impl<I> ConstrainedServerDemandIterator<I>
167where
168 I: CurveIterator<CurveKind = AggregatedServerDemand>,
169{
170 fn process_group(
172 &mut self,
173 k_group_head: usize,
174 curve: Curve<AggregatedServerDemand>,
175 ) -> Option<Window<Demand>> {
176 let PartitionResult { index, head, tail } =
177 curve.partition(k_group_head, self.server_properties);
178
179 let mut windows = curve.into_windows();
180
181 self.remainder.reserve(windows.len().min(index) + 1);
182
183 self.remainder.extend(
184 windows
185 .drain(..index)
186 .chain(core::iter::once(head).filter(|window| !window.is_empty()))
187 .rev(),
188 );
189
190 let delta_k: WindowEnd = tail.length()
191 + windows
192 .into_iter()
193 .skip(1)
194 .map(|window| window.length())
195 .sum::<WindowEnd>();
196
197 if delta_k > TimeUnit::ZERO {
198 let spill_start = (k_group_head + 1) * self.server_properties.interval;
199 self.spill = Some(Window::new(spill_start, spill_start + delta_k));
200 }
201
202 let result = self.remainder.pop();
203 assert!(result.is_some());
204 result
205 }
206}