rta_for_fps_lib/iterators/server/
constrained_demand.rs

1//! Module for the implementation of the `CurveIterator`s used to calculate
2//! the constrained demand curve of a Server
3
4use core::cmp::Ordering;
5use core::iter::FusedIterator;
6
7use alloc::boxed::Box;
8use alloc::vec::Vec;
9
10use crate::curve::curve_types::CurveType;
11use crate::curve::{Curve, PartitionResult};
12use crate::iterators::curve::{AggregationIterator, CurveSplitIterator};
13use crate::iterators::peek::Peeker;
14use crate::iterators::CurveIterator;
15use crate::server::{AggregatedServerDemand, ConstrainedServerDemand, ServerProperties};
16use crate::time::TimeUnit;
17use crate::window::WindowEnd;
18use crate::window::{Demand, Window};
19
20/// Type alias for the `WindowKind` of the `AggregatedServerDemand` `CurveType`
21/// to reduce type complexity
22type AggregateDemandWindow = <AggregatedServerDemand as CurveType>::WindowKind;
23
24/// `CurveIterator` for `ConstrainedServerDemand`
25///
26/// used to calculate a Servers constrained demand curve,
27/// using the aggregated server demand curve
28/// based on the Algorithm 1. from the paper and described in Section 5.1 of the paper
29#[derive(Debug, Clone)]
30pub struct ConstrainedServerDemandIterator<I> {
31    /// The Server for which to calculate the constrained demand
32    server_properties: ServerProperties,
33    /// The remaining aggregated Demand of the Server
34    demand:
35        Peeker<Box<CurveSplitIterator<AggregateDemandWindow, I>>, Window<AggregateDemandWindow>>,
36    /// The spill from the previous group
37    spill: Option<Window<<AggregatedServerDemand as CurveType>::WindowKind>>,
38    /// Remaining windows till we need to process the next group
39    remainder: Vec<Window<<ConstrainedServerDemand as CurveType>::WindowKind>>,
40}
41
42impl<'a, I> ConstrainedServerDemandIterator<I>
43where
44    I: CurveIterator<CurveKind = AggregatedServerDemand>,
45{
46    /// Create a new `InternalConstrainedServerDemandIterator`
47    /// the main part for calculating the Constraint Server Demand Curve
48    pub fn new(server_properties: ServerProperties, aggregated_demand: I) -> Self {
49        // Algorithm 1. (1)
50        let split = CurveSplitIterator::new(aggregated_demand, server_properties.interval);
51        ConstrainedServerDemandIterator {
52            server_properties,
53            demand: Peeker::new(Box::new(split)),
54            spill: None,
55            remainder: Vec::new(),
56        }
57    }
58}
59
60impl<I: CurveIterator> FusedIterator for ConstrainedServerDemandIterator<I>
61where
62    Self: Iterator,
63    CurveSplitIterator<<AggregatedServerDemand as CurveType>::WindowKind, I>: FusedIterator,
64{
65}
66
67impl<I> CurveIterator for ConstrainedServerDemandIterator<I>
68where
69    I: CurveIterator<CurveKind = AggregatedServerDemand>,
70{
71    type CurveKind = ConstrainedServerDemand;
72
73    // Algorithm 1. (2)
74    fn next_window(&mut self) -> Option<Window<<Self::CurveKind as CurveType>::WindowKind>> {
75        #![allow(clippy::option_if_let_else)] // false positive, can't use map_or as the same value is moved in both branches
76
77        if let Some(window) = self.remainder.pop() {
78            Some(window)
79        } else {
80            let next_group = self.demand.peek_ref();
81            let spill = self.spill.take();
82
83            match (next_group, spill) {
84                (None, None) => None,
85                (Some(group_head), Some(spill)) => {
86                    let k_group_head = group_head.start / self.server_properties.interval;
87                    let k_spill = spill.start / self.server_properties.interval;
88
89                    match k_group_head.cmp(&k_spill) {
90                        Ordering::Less => {
91                            unreachable!("Groups are processed in order and spill can only go into the future")
92                        }
93                        Ordering::Equal => {
94                            // spill spilled into next_group
95
96                            let mut windows = alloc::vec![group_head.take()];
97
98                            for window in &mut self.demand {
99                                if window.budget_group(self.server_properties.interval)
100                                    == k_group_head
101                                {
102                                    windows.push(window);
103                                } else {
104                                    self.demand.restore_peek(window);
105                                    break;
106                                }
107                            }
108
109                            // collect next_group
110                            let next_group: Curve<AggregatedServerDemand> =
111                                unsafe { Curve::from_windows_unchecked(windows) };
112
113                            // Handle next_group and spill
114                            let curve: Curve<_> = AggregationIterator::new(alloc::vec![
115                                next_group.into_iter(),
116                                Curve::new(spill).into_iter(),
117                            ])
118                            .collect_curve();
119
120                            self.process_group(k_spill, curve)
121                        }
122                        Ordering::Greater => {
123                            // spill not spilled into group, next group consists only of spill
124                            let curve = Curve::new(spill);
125                            self.process_group(k_spill, curve)
126                        }
127                    }
128                }
129                (Some(group_head), None) => {
130                    let k_group_head = group_head.start / self.server_properties.interval;
131                    // no spill, only next group
132
133                    let mut windows = alloc::vec![group_head.take()];
134
135                    for window in &mut self.demand {
136                        if window.budget_group(self.server_properties.interval) == k_group_head {
137                            windows.push(window);
138                        } else {
139                            self.demand.restore_peek(window);
140                            break;
141                        }
142                    }
143
144                    // collect next_group
145                    let next_group: Curve<AggregatedServerDemand> =
146                        unsafe { Curve::from_windows_unchecked(windows) };
147
148                    let curve = next_group;
149
150                    self.process_group(k_group_head, curve)
151                }
152                (None, Some(spill)) => {
153                    // only spill remaining
154
155                    let k = spill.start / self.server_properties.interval;
156
157                    let curve = Curve::new(spill);
158
159                    self.process_group(k, curve)
160                }
161            }
162        }
163    }
164}
165
166impl<I> ConstrainedServerDemandIterator<I>
167where
168    I: CurveIterator<CurveKind = AggregatedServerDemand>,
169{
170    /// Process the group with index `k_group_head` and `demand `curve`
171    fn process_group(
172        &mut self,
173        k_group_head: usize,
174        curve: Curve<AggregatedServerDemand>,
175    ) -> Option<Window<Demand>> {
176        let PartitionResult { index, head, tail } =
177            curve.partition(k_group_head, self.server_properties);
178
179        let mut windows = curve.into_windows();
180
181        self.remainder.reserve(windows.len().min(index) + 1);
182
183        self.remainder.extend(
184            windows
185                .drain(..index)
186                .chain(core::iter::once(head).filter(|window| !window.is_empty()))
187                .rev(),
188        );
189
190        let delta_k: WindowEnd = tail.length()
191            + windows
192                .into_iter()
193                .skip(1)
194                .map(|window| window.length())
195                .sum::<WindowEnd>();
196
197        if delta_k > TimeUnit::ZERO {
198            let spill_start = (k_group_head + 1) * self.server_properties.interval;
199            self.spill = Some(Window::new(spill_start, spill_start + delta_k));
200        }
201
202        let result = self.remainder.pop();
203        assert!(result.is_some());
204        result
205    }
206}