1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
//! Module that defines the finite Curve type
//!
//! and all associated functions

use alloc::vec::Vec;
use core::fmt::Debug;

use curve_types::CurveType;

use crate::iterators::curve::{CurveDeltaIterator, Delta};
use crate::iterators::CurveIterator;
use crate::server::{ServerKind, ServerProperties};
use crate::time::{TimeUnit, UnitNumber};
use crate::window::window_types::WindowType;
use crate::window::WindowEnd;
use crate::window::{Demand, Overlap, Window};

pub mod curve_types;

/// A Curve is an ordered Set of non-overlapping Windows
///
/// The windows are ordered by their start
#[derive(Debug, Eq)]
pub struct Curve<C: CurveType> {
    /// windows contains an ordered Set of non-overlapping non-empty windows
    windows: Vec<Window<C::WindowKind>>,
}

impl<C: CurveType> PartialEq for Curve<C> {
    fn eq(&self, other: &Self) -> bool {
        self.windows.eq(&other.windows)
    }
}

impl<C: CurveType> Clone for Curve<C> {
    fn clone(&self) -> Self {
        Curve {
            windows: self.windows.clone(),
        }
    }
}

impl<T: CurveType> Curve<T> {
    /// Create a new Curve from the provided window
    ///
    /// May return a Curve with no Windows when the provided Window is empty
    #[must_use]
    pub fn new(window: Window<T::WindowKind>) -> Self {
        let windows = if window.is_empty() {
            // Empty windows can be ignored
            alloc::vec![]
        } else {
            // A Curve with only a single has
            // the windows always ordered and non-overlapping
            alloc::vec![window]
        };

        Self { windows }
    }

    /// Returns a slice reference to the contained windows
    #[must_use]
    pub fn as_windows(&self) -> &[Window<T::WindowKind>] {
        self.windows.as_slice()
    }

    /// Consumes self and returns the contained Windows
    #[must_use]
    pub fn into_windows(self) -> Vec<Window<T::WindowKind>> {
        self.windows
    }

    /// Create a new empty Curve
    #[must_use]
    pub fn empty() -> Self {
        Self {
            windows: alloc::vec![],
        }
    }

    /// Create a new Curve from the given Vector of Windows
    /// without checking or guaranteeing that the Curve invariants are met
    /// by the list of windows.
    ///
    /// # Safety
    /// Windows need to be non-overlapping and
    /// ordered based on start, to fulfill invariants of curve
    #[must_use]
    pub unsafe fn from_windows_unchecked(windows: Vec<Window<T::WindowKind>>) -> Self {
        Self { windows }
    }

    /// Return the Curves Capacity as defined by Definition 3. in the paper
    #[must_use]
    pub fn capacity(&self) -> WindowEnd {
        self.windows.iter().map(Window::length).sum()
    }

    /// Return true if the Capacity of the Curve is 0
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.windows
            .iter()
            .map(Window::length)
            .all(|c| c == TimeUnit::ZERO)
    }

    /// Change the `CurveType` of the Curve,
    /// requires that the `WindowType` of both [`CurveTypes`](trait@CurveType) is the same
    #[must_use]
    pub fn reclassify<C: CurveType<WindowKind = T::WindowKind>>(self) -> Curve<C> {
        Curve {
            windows: self.windows,
        }
    }

    /// compare the curve to a curve iterator
    /// consuming the iterator in the process
    pub fn eq_curve_iterator<CI: CurveIterator<CurveKind = T>>(&self, mut other: CI) -> bool {
        let mut windows = self.as_windows().iter();
        loop {
            match (windows.next(), other.next_window()) {
                (None, None) => break true,
                (Some(_), None) | (None, Some(_)) => break false,
                (Some(left), Some(right)) => {
                    if left != &right {
                        break false;
                    }
                }
            }
        }
    }
}

impl<T: CurveType<WindowKind = Demand>> Curve<T> {
    /// Partition the Curve as Defined by Algorithms 2. and 3. of the paper
    ///
    /// The implementation here deviates from the paper by returning an exclusive index while the paper uses an inclusive index
    #[must_use]
    pub fn partition(
        &self,
        group_index: UnitNumber,
        server_properties: ServerProperties,
    ) -> PartitionResult {
        match server_properties.server_type {
            ServerKind::Deferrable => {
                // Algorithm 2.

                // Note for Step (1):
                // The paper indexes the Windows 0-based but iterates starting at 1
                // this appears to be a mix-up between 0-based and 1-based indexing
                // which is mixed throughout the paper

                // Note index is i+1 rather than i
                // as 0 is used in case the first window is larges than the server capacity
                // meaning index is exclusive here rather than inclusive as in the paper

                // (1)
                let (index, sum) = self
                    .windows
                    .iter()
                    .enumerate()
                    .scan(TimeUnit::ZERO, |acc, (index, window)| {
                        match window.length() {
                            WindowEnd::Finite(length) => {
                                *acc += length;
                                (*acc <= server_properties.capacity).then(|| (index + 1, *acc))
                            }
                            WindowEnd::Infinite => None,
                        }
                    })
                    .last()
                    .unwrap_or((0, TimeUnit::ZERO));

                // (2)
                let remaining_capacity = server_properties.capacity - sum;

                let (head, tail) = self.windows.get(index).map_or_else(
                    || (Window::empty(), Window::empty()),
                    |window| {
                        if remaining_capacity > TimeUnit::ZERO {
                            // we have remaining capacity and a window to fill the remaining budget
                            let head_start = window.start;
                            let tail_end = window.end;
                            let split = head_start + remaining_capacity;
                            let head = Window::new(head_start, Some(split));
                            let tail = Window::new(split, tail_end);
                            (head, tail)
                        } else {
                            // no capacity left set window as tail
                            (Window::empty(), window.clone())
                        }
                    },
                );

                PartitionResult { index, head, tail }
            }
            ServerKind::Periodic => {
                // Algorithm 3.
                // (1)

                let limit = group_index * server_properties.interval + server_properties.capacity;

                // Note index is i+1 rather than i,
                // as 0 is used to indicate that the first window is already past the limit
                // index need therefore be treated as exclusive rather than inclusive as in the paper

                let index = self
                    .windows
                    .iter()
                    .enumerate()
                    .filter_map(|(index, window)| (window.end < limit).then(|| index + 1))
                    .last()
                    .unwrap_or(0);

                // (2)
                let (head, tail) = self.windows.get(index).map_or_else(
                    || (Window::empty(), Window::empty()),
                    |window| {
                        if window.start < limit && limit < window.end {
                            // window crosses the limit, split it at the limit
                            let head = Window::new(window.start, limit);
                            let tail = Window::new(limit, window.end);
                            (head, tail)
                        } else {
                            // Window won't be split as it does not contain the limit
                            // just set the window as the tail
                            (Window::empty(), window.clone())
                        }
                    },
                );

                PartitionResult { index, head, tail }
            }
        }
    }
}

/// Return Type for [`CurveDeltaIterator::collect_delta`]
#[derive(Debug, PartialEq)]
pub struct CurveDeltaResult<
    P: CurveType,
    Q: CurveType,
    R: CurveType<WindowKind = Overlap<P::WindowKind, Q::WindowKind>>,
> {
    /// The remaining supply, can be 0-2 Windows
    pub remaining_supply: Curve<P>,
    /// The (used) Overlap between Supply and Demand
    pub overlap: Curve<R>,
    /// The remaining Demand that could not be fulfilled by the Supply
    pub remaining_demand: Curve<Q>,
}

impl<DW: WindowType, SW: WindowType, DI, SI> CurveDeltaIterator<DW, SW, DI, SI>
where
    DI: CurveIterator,
    DI::CurveKind: CurveType<WindowKind = DW>,
    SI: CurveIterator,
    SI::CurveKind: CurveType<WindowKind = SW>,
{
    /// collect the complete `CurveDeltaIterator`
    ///
    /// # Warning
    ///
    /// Won't terminate if `CurveDelaIterator` is infinite as it will try to consume the complete iterator
    ///
    #[must_use]
    pub fn collect_delta<R: CurveType<WindowKind = Overlap<SW, DW>>>(
        self,
    ) -> CurveDeltaResult<SI::CurveKind, DI::CurveKind, R>
    where
        Self: Iterator<Item = Delta<SW, DW, SI, DI>>,
    {
        let mut result = CurveDeltaResult {
            remaining_supply: Curve::empty(),
            overlap: Curve::empty(),
            remaining_demand: Curve::empty(),
        };

        for delta in self {
            match delta {
                Delta::RemainingSupply(supply) => result.remaining_supply.windows.push(supply),
                Delta::Overlap(overlap) => result.overlap.windows.push(overlap),
                Delta::RemainingDemand(demand) => result.remaining_demand.windows.push(demand),
                Delta::EndSupply(supply) => supply
                    .into_iterator()
                    .for_each(|window| result.remaining_supply.windows.push(window)),
                Delta::EndDemand(demand) => {
                    demand.for_each(|window| result.remaining_demand.windows.push(window))
                }
            }
        }

        result
    }
}

/// Return Type for [`Curve::partition`](Curve::partition)
#[derive(Debug)]
pub struct PartitionResult {
    /// The exclusive index up to which all demand fits into the current partition
    ///
    /// Note: the paper uses an inclusive index
    pub index: usize,

    /// If there is a window on the partitioning boundary
    /// this contains the split before the boundary, otherwise this contains an empty window
    pub head: Window<Demand>,

    /// If there is a window on the partitioning boundary
    /// this contains the split after the boundary,
    /// otherwise if there is no window on the boundary this contains the first window
    /// after the boundary or an empty window if there is no window after the boundary
    pub tail: Window<Demand>,
}

/// Extension trait to allow calling aggregate on an iterator
pub trait AggregateExt: Iterator + Sized {
    /// aggregate all iterator elements
    /// acts similar to [`core::iter::Iterator::sum`]
    #[must_use]
    fn aggregate<A: Aggregate<Self::Item>>(self) -> A {
        A::aggregate(self)
    }
}

impl<I: Iterator> AggregateExt for I {}

/// Trait used by the `AggregateExt` Extension trait
pub trait Aggregate<A = Self> {
    /// aggregate all elements of `iter` into a new Self
    /// pendant to [`core::iter::Sum`]
    #[must_use]
    fn aggregate<I>(iter: I) -> Self
    where
        I: Iterator<Item = A>;
}