rta_for_fps_lib/iterators/curve/
aggregate.rs

1//! Module for the implementation of the Curve aggregate operation using iterators
2
3use alloc::vec::Vec;
4use core::iter::Fuse;
5
6use crate::curve::curve_types::CurveType;
7use crate::curve::Aggregate;
8use crate::iterators::peek::Peeker;
9use crate::iterators::{CurveIterator, CurveIteratorIterator, ReclassifyIterator};
10use crate::server::{
11    ActualServerExecution, AggregatedServerDemand, ConstrainedServerDemand,
12    HigherPriorityServerDemand, HigherPriorityServerExecution,
13};
14use crate::task::curve_types::{HigherPriorityTaskDemand, TaskDemand};
15use crate::window::Window;
16use core::fmt::Debug;
17
18/// Iterator for Aggregating two Curve Iterators
19///
20/// Aggregate multiple (Demand) Curves as defined in Definition 5. of the paper
21///
22#[derive(Debug, Clone)]
23pub struct AggregationIterator<I, W> {
24    /// The CurveIterators to aggregate
25    curves: Vec<Peeker<Fuse<CurveIteratorIterator<I>>, Window<W>>>,
26}
27
28impl<I, W> AggregationIterator<I, W>
29where
30    I: CurveIterator,
31    I::CurveKind: CurveType<WindowKind = W>,
32{
33    /// Create a new `AggregationIterator`
34    #[must_use]
35    pub fn new(curves: Vec<I>) -> Self {
36        AggregationIterator {
37            curves: curves
38                .into_iter()
39                .map(|curve| Peeker::new(curve.fuse_curve()))
40                .collect(),
41        }
42    }
43}
44
45impl<I, W> CurveIterator for AggregationIterator<I, W>
46where
47    I: CurveIterator,
48    I::CurveKind: CurveType<WindowKind = W>,
49    W: Debug,
50{
51    type CurveKind = I::CurveKind;
52
53    fn next_window(&mut self) -> Option<Window<W>> {
54        // find curve with earliest peek
55        let result = self
56            .curves
57            .iter_mut()
58            .enumerate()
59            .filter_map(|(index, element)| {
60                element
61                    .peek_ref()
62                    .map(|some_ref| (index, some_ref.start, some_ref))
63            })
64            .min_by_key(|(_, start, _)| *start)
65            .map(|(index, _, some_ref)| (index, some_ref.take()));
66
67        // take peek
68        if let Some((original_index, first_peek)) = result {
69            let mut overlap: Window<_> = first_peek;
70
71            // the index that was last aggregated into overlap
72            // if we reach it again without aggregating more we are done
73            let mut aggregate_index = original_index;
74
75            'outer: loop {
76                let (tail, head) = self.curves.split_at_mut(original_index + 1);
77
78                // start after index and cycle through all elements
79                // until we reach and process an index again without aggregating since our last visit
80                let iter = head
81                    .iter_mut()
82                    .enumerate()
83                    .map(move |(i, element)| (i + original_index + 1, element))
84                    .chain(tail.iter_mut().enumerate());
85
86                for (index, element) in iter {
87                    if let Some(peek) = element.peek_ref() {
88                        if let Some(overlap_window) = overlap
89                            .aggregate(&*peek)
90                            .filter(|_| !overlap.adjacent(&*peek))
91                        {
92                            // update last aggregated index
93                            aggregate_index = index;
94                            // replace overlap with new overlap_window
95                            overlap = overlap_window;
96                            // clear the peek as we have used it
97                            peek.take();
98                            continue;
99                        }
100                    }
101
102                    if aggregate_index == index {
103                        // reached this again without aggregating
104                        break 'outer Some(overlap);
105                    }
106                }
107            }
108        } else {
109            None
110        }
111    }
112}
113
114/// Trait to mark which curve types may be aggregated into which other curve types
115pub trait AggregateInto<Result = Self>: CurveType {}
116
117impl<T: CurveType> AggregateInto for T {}
118
119impl AggregateInto<HigherPriorityServerExecution> for ActualServerExecution {}
120impl AggregateInto<HigherPriorityServerDemand> for ConstrainedServerDemand {}
121
122impl AggregateInto<AggregatedServerDemand> for TaskDemand {}
123impl AggregateInto<HigherPriorityTaskDemand> for TaskDemand {}
124
125impl<AI, O, W> Aggregate<AI> for ReclassifyIterator<AggregationIterator<AI, W>, O>
126where
127    <AI as CurveIterator>::CurveKind: AggregateInto<O>,
128    AI: CurveIterator,
129    AI::CurveKind: CurveType<WindowKind = W>,
130    W: Debug,
131{
132    fn aggregate<I>(iter: I) -> Self
133    where
134        I: Iterator<Item = AI>,
135    {
136        AggregationIterator::new(iter.collect()).reclassify()
137    }
138}