rta_for_fps_lib/
system.rs

1//! Module for the System type
2
3use crate::curve::AggregateExt;
4use crate::iterators::curve::{AggregationIterator, CapacityCheckIterator, InverseCurveIterator};
5
6use crate::server::{
7    ActualServerExecution, ConstrainedServerDemand, HigherPriorityServerDemand, Server,
8    UnconstrainedServerExecution,
9};
10
11use crate::curve::curve_types::CurveType;
12use crate::iterators::server::actual_execution::ActualServerExecutionIterator;
13use crate::iterators::{ClonableCurveIterator, CurveIterator, ReclassifyIterator};
14use crate::time::TimeUnit;
15use alloc::boxed::Box;
16use alloc::vec::Vec;
17
18/// Type representing a System of Servers
19#[derive(Debug)]
20pub struct System<'a> {
21    /// The Servers of the System
22    servers: &'a [Server<'a>],
23}
24
25impl<'a> System<'a> {
26    /// Create a new System from a slice of Servers,
27    /// indexed by their priority,
28    /// lowest index being the highest priority
29    #[must_use]
30    pub const fn new(servers: &'a [Server<'a>]) -> System<'a> {
31        System { servers }
32    }
33
34    /// Get a slice reference to the systems servers
35    #[must_use]
36    pub const fn as_servers(&self) -> &'a [Server<'a>] {
37        self.servers
38    }
39
40    /// Calculate the aggregated higher priority demand curve
41    /// by aggregating the aggregated demand curves of all Servers with higher priority
42    /// (lower value) than `index`.
43    ///
44    /// Based on the papers Definition 12.
45    #[must_use]
46    pub fn aggregated_higher_priority_demand_curve_iter<'b, CSDCI>(
47        constrained_demand_curves: CSDCI,
48    ) -> impl CurveIterator<CurveKind = HigherPriorityServerDemand> + Clone + 'b
49    where
50        CSDCI::Item: CurveIterator<CurveKind = ConstrainedServerDemand> + Clone + 'b,
51        CSDCI: IntoIterator,
52    {
53        constrained_demand_curves
54            .into_iter()
55            .aggregate::<ReclassifyIterator<_, _>>()
56    }
57
58    pub fn aggregated_higher_priority_actual_execution_curve_iter<'b>(
59        &'b self,
60        server_index: usize,
61    ) -> AggregationIterator<
62        Box<dyn ClonableCurveIterator<'b, CurveKind = ActualServerExecution> + 'b>,
63        <ActualServerExecution as CurveType>::WindowKind,
64    > {
65        let mut curves: Vec<Box<dyn ClonableCurveIterator<CurveKind = ActualServerExecution>>> =
66            alloc::vec::Vec::with_capacity(2);
67
68        if server_index > 0 {
69            curves.push(Box::new(
70                self.fixed_actual_execution_curve_iter(server_index - 1),
71            ));
72            if server_index > 1 {
73                let curve = self
74                    .aggregated_higher_priority_actual_execution_curve_iter(server_index - 1)
75                    .reclassify();
76                curves.push(Box::new(curve));
77            }
78        }
79
80        AggregationIterator::new(curves)
81    }
82
83    /// Calculate the system wide hyper period
84    /// accounting for all servers and tasks
85    /// up to and including the server with priority `server_index`
86    ///
87    /// Section 7.1 ยง2 Sentence 3, allows to exclude lower priority servers from the swh period calculation,
88    /// when analysing tasks of a server
89    #[must_use]
90    pub fn system_wide_hyper_period(&self, server_index: usize) -> TimeUnit {
91        self.servers[..=server_index]
92            .iter()
93            .map(|server| server.properties.interval)
94            .chain(
95                self.servers
96                    .iter()
97                    .flat_map(|server| server.as_tasks().iter().map(|task| task.interval)),
98            )
99            .fold(TimeUnit::ONE, TimeUnit::lcm)
100    }
101
102    pub fn analysis_end(&self, server_index: usize) -> TimeUnit {
103        let res = self.servers[..=server_index]
104            .iter()
105            .map(|server| (server.properties.interval, TimeUnit::ZERO))
106            .chain(self.servers.iter().flat_map(|server| {
107                server
108                    .as_tasks()
109                    .iter()
110                    .map(|task| (task.interval, task.offset))
111            }))
112            .fold((TimeUnit::ONE, TimeUnit::ZERO), |acc, next| {
113                (TimeUnit::lcm(acc.0, next.0), acc.1.max(next.1))
114            });
115        res.0 + res.1
116    }
117
118    /// Calculate the unconstrained execution curve
119    /// for the server with priority `index`.
120    ///
121    /// See Definition 13. of the paper for reference
122    #[must_use]
123    pub fn original_unconstrained_server_execution_curve_iter(
124        &self,
125        server_index: usize,
126    ) -> impl CurveIterator<CurveKind = UnconstrainedServerExecution> + Clone + '_ {
127        #![allow(clippy::redundant_closure_for_method_calls)]
128
129        let csdi = self.servers[..server_index]
130            .iter()
131            .map(|server| server.constraint_demand_curve_iter());
132
133        let ahpc = System::aggregated_higher_priority_demand_curve_iter(csdi);
134
135        InverseCurveIterator::new(ahpc)
136    }
137
138    #[must_use]
139    pub fn fixed_unconstrained_server_execution_curve_iter(
140        &self,
141        server_index: usize,
142    ) -> impl CurveIterator<CurveKind = UnconstrainedServerExecution> + Clone + '_ {
143        let ahpc = self.aggregated_higher_priority_actual_execution_curve_iter(server_index);
144
145        InverseCurveIterator::new(ahpc)
146    }
147
148    /// Calculate the Constrained Execution Curve using Algorithm 4. from the paper
149    /// TODO more detail, what do the parameters mean
150    /// # Panics
151    ///
152    /// When a server is not guaranteed its capacity every interval
153    ///
154    #[must_use]
155    pub fn original_actual_execution_curve_iter(
156        &self,
157        server_index: usize,
158    ) -> impl CurveIterator<CurveKind = ActualServerExecution> + Clone + '_ {
159        let unchecked_unconstrained_execution =
160            self.original_unconstrained_server_execution_curve_iter(server_index);
161
162        let props = self.servers[server_index].properties;
163
164        // split unconstrained execution curve into groups every server.interval
165        // and check that each group has at least server.capacity of capacity
166        let checked_unconstrained_execution = CapacityCheckIterator::new(
167            unchecked_unconstrained_execution,
168            props.capacity,
169            props.interval,
170        );
171
172        let constrained_demand = self.servers[server_index].constraint_demand_curve_iter();
173
174        ActualServerExecutionIterator::new(
175            self.servers[server_index].properties,
176            checked_unconstrained_execution,
177            constrained_demand,
178        )
179    }
180
181    #[must_use]
182    pub fn fixed_actual_execution_curve_iter(
183        &self,
184        server_index: usize,
185    ) -> impl CurveIterator<CurveKind = ActualServerExecution> + Clone + '_ {
186        let unchecked_unconstrained_execution =
187            self.fixed_unconstrained_server_execution_curve_iter(server_index);
188
189        let props = self.servers[server_index].properties;
190
191        // split unconstrained execution curve into groups every server.interval
192        // and check that each group has at least server.capacity of capacity
193        let checked_unconstrained_execution = CapacityCheckIterator::new(
194            unchecked_unconstrained_execution,
195            props.capacity,
196            props.interval,
197        );
198
199        let constrained_demand = self.servers[server_index].constraint_demand_curve_iter();
200
201        ActualServerExecutionIterator::new(
202            self.servers[server_index].properties,
203            checked_unconstrained_execution,
204            constrained_demand,
205        )
206    }
207}