rtlola_frontend/mir/
schedule.rs

1use std::ops::Not;
2use std::time::Duration;
3
4use num::rational::Rational64 as Rational;
5use num::{One, ToPrimitive};
6use uom::num_traits::Inv;
7use uom::si::rational64::Time as UOM_Time;
8use uom::si::time::{nanosecond, second};
9
10use super::PacingLocality;
11use crate::mir::{OutputReference, PacingType, RtLolaMir, Stream};
12
13/// This enum represents the different tasks that have to be executed periodically.
14#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
15pub enum Task {
16    /// Evaluate the stream referred to by the OutputReference
17    Evaluate(OutputReference),
18    /// Spawn the stream referred to by the OutputReference
19    Spawn(OutputReference),
20    /// Evaluate the close condition referred to by the OutputReference
21    Close(OutputReference),
22}
23
24/// This struct represents a single deadline inside a [Schedule].
25///
26/// All deadlines are meant to lie within a hyper-period, i.e., they represent a family of points in time rather than
27/// a single one.  The deadline contains information on what streams need to be evaluated when due.
28///
29/// # Example
30/// See example of [Schedule::deadlines].
31#[derive(Debug, Clone)]
32pub struct Deadline {
33    /// The time delay between the current deadline and the next.
34    pub pause: Duration,
35    /// The set of streams affected by this deadline.
36    pub due: Vec<Task>,
37}
38
39///
40/// A schedule for the periodic streams.
41///
42/// The schedule is a sequence of deadlines and describes a single hyper-period.  Hence, the sequences is meant to repeat afterwards.
43#[derive(Debug, Clone)]
44pub struct Schedule {
45    /// The `hyper_period` is the duration after which the schedule is meant to repeat.
46    ///
47    /// It is therefore the least common multiple of all periods. If there are no statically scheduled streams, the hyper-period is `None`.
48    /// # Example:  
49    /// If there are three streams, one running at 0.5Hz, one with 1Hz, and one with 2Hz.  The hyper-period then is 2000ms.
50    pub hyper_period: Option<Duration>,
51
52    /// A sequence of deadlines within a hyper-period.
53    ///
54    /// Deadlines represent points in time at which periodic stream needs to be updated.  Deadlines may not be empty.
55    /// The first deadline is due [Deadline::pause] time units after the start of the schedule.  Subsequent deadlines are due [Deadline::pause]
56    /// time units after their predecessor.
57    ///
58    /// # Example:  
59    /// Assume there are two periodic streams, `a` running at 1Hz and `b` running at 2Hz.  The deadlines are thus {`b`} at time 500ms and {`a`, `b`}
60    /// 500ms later.  Then, the schedule repeats.
61    ///
62    /// # See Also
63    /// * [Deadline]
64    pub deadlines: Vec<Deadline>,
65}
66
67impl Schedule {
68    /// Initiates the computation of a [Schedule] for the given Mir.
69    /// # Fail
70    /// Fails if the resulting schedule would require at least 10^7 deadlines.
71    pub(crate) fn from(ir: &RtLolaMir) -> Result<Schedule, String> {
72        let stream_periods = ir
73            .time_driven
74            .iter()
75            .filter(|tds| tds.locality == PacingLocality::Global)
76            .map(|tds| tds.period());
77        let spawn_periods = ir.outputs.iter().filter_map(|o| {
78            if let PacingType::GlobalPeriodic(freq) = &o.spawn.pacing {
79                Some(UOM_Time::new::<second>(
80                    freq.get::<uom::si::frequency::hertz>().inv(),
81                ))
82            } else {
83                None
84            }
85        });
86        let close_periods = ir.outputs.iter().filter_map(|o| {
87            if let PacingType::GlobalPeriodic(freq) = &o.close.pacing {
88                Some(UOM_Time::new::<second>(
89                    freq.get::<uom::si::frequency::hertz>().inv(),
90                ))
91            } else {
92                None
93            }
94        });
95        let periods: Vec<UOM_Time> = stream_periods
96            .chain(spawn_periods)
97            .chain(close_periods)
98            .collect();
99        if periods.is_empty() {
100            // Nothing to schedule here
101            return Ok(Schedule {
102                hyper_period: None,
103                deadlines: vec![],
104            });
105        }
106        let gcd = Self::find_extend_period(&periods);
107        let hyper_period = Self::find_hyper_period(&periods);
108
109        let extend_steps = Self::build_extend_steps(ir, gcd, hyper_period)?;
110        let extend_steps = Self::apply_periodicity(&extend_steps);
111        let mut deadlines = Self::condense_deadlines(gcd, extend_steps);
112        Self::sort_deadlines(ir, &mut deadlines);
113
114        let hyper_period = Duration::from_nanos(
115            hyper_period
116                .get::<nanosecond>()
117                .to_integer()
118                .to_u64()
119                .unwrap(),
120        );
121        Ok(Schedule {
122            hyper_period: Some(hyper_period),
123            deadlines,
124        })
125    }
126
127    /// Determines the maximal amount of time the process can wait between successive checks for
128    /// due deadlines without missing one.
129    fn find_extend_period(rates: &[UOM_Time]) -> UOM_Time {
130        assert!(!rates.is_empty());
131        let rates: Vec<Rational> = rates.iter().map(|r| r.get::<nanosecond>()).collect();
132        let gcd = math::rational_gcd_all(&rates);
133        UOM_Time::new::<nanosecond>(gcd)
134    }
135
136    /// Determines the hyper period of the given `rates`.
137    fn find_hyper_period(rates: &[UOM_Time]) -> UOM_Time {
138        assert!(!rates.is_empty());
139        let rates: Vec<Rational> = rates.iter().map(|r| r.get::<nanosecond>()).collect();
140        let lcm = math::rational_lcm_all(&rates);
141        let lcm = math::rational_lcm(lcm, Rational::one()); // needs to be multiple of 1 ns
142        UOM_Time::new::<nanosecond>(lcm)
143    }
144
145    /// Takes a vec of gcd-sized intervals. In each interval, there are streams that need
146    /// to be scheduled periodically at this point in time.
147    /// Example:
148    /// Hyper-period: 2 seconds, gcd: 500ms, streams: (c @ .5Hz), (b @ 1Hz), (a @ 2Hz)
149    /// Input:  `[[a] [b]   []  [c]]`
150    /// Output: `[[a] [a,b] [a] [a,b,c]]`
151    fn apply_periodicity(steps: &[Vec<Task>]) -> Vec<Vec<Task>> {
152        // Whenever there are streams in a cell at index `i`,
153        // add them to every cell with index k*i within bounds, where k > 1.
154        // k = 0 would always schedule them initially, so this must be skipped.
155        // TODO: Skip last half of the array.
156        let mut res = vec![Vec::new(); steps.len()];
157        for (ix, streams) in steps.iter().enumerate() {
158            if !streams.is_empty() {
159                let mut k = 1;
160                while let Some(target) = res.get_mut(k * (ix + 1) - 1) {
161                    target.extend(streams);
162                    k += 1;
163                }
164            }
165        }
166        res
167    }
168
169    /// Build extend steps for each gcd-sized time interval up to the hyper period.
170    /// Example:
171    /// Hyper-period: 2 seconds, gcd: 500ms, streams: (c @ .5Hz), (b @ 1Hz), (a @ 2Hz)
172    /// Result: `[[a] [b] [] [c]]`
173    /// Meaning: `a` starts being scheduled after one gcd, `b` after two gcds, `c` after 4 gcds.
174    fn build_extend_steps(
175        ir: &RtLolaMir,
176        gcd: UOM_Time,
177        hyper_period: UOM_Time,
178    ) -> Result<Vec<Vec<Task>>, String> {
179        let num_steps = hyper_period.get::<second>() / gcd.get::<second>();
180        assert!(num_steps.is_integer());
181        let num_steps = num_steps.to_integer() as usize;
182        if num_steps >= 10_000_000 {
183            return Err("stream frequencies are too incompatible to generate schedule".to_string());
184        }
185        let mut extend_steps = vec![Vec::new(); num_steps];
186        for s in ir
187            .time_driven
188            .iter()
189            .filter(|tds| tds.locality == PacingLocality::Global)
190        {
191            let ix = s.period().get::<second>() / gcd.get::<second>();
192            // Period must be integer multiple of gcd by def of gcd
193            assert!(ix.is_integer());
194            let ix = ix.to_integer() as usize;
195            let ix = ix - 1;
196            extend_steps[ix].push(Task::Evaluate(s.reference.out_ix()));
197        }
198        let periodic_spawns = ir.outputs.iter().filter_map(|o| match &o.spawn.pacing {
199            PacingType::GlobalPeriodic(freq) => Some((
200                o.reference.out_ix(),
201                UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()),
202            )),
203            _ => None,
204        });
205        for (out_ix, period) in periodic_spawns {
206            let ix = period.get::<second>() / gcd.get::<second>();
207            // Period must be integer multiple of gcd by def of gcd
208            assert!(ix.is_integer());
209            let ix = ix.to_integer() as usize;
210            let ix = ix - 1;
211            extend_steps[ix].push(Task::Spawn(out_ix));
212        }
213
214        let periodic_close = ir.outputs.iter().filter_map(|o| {
215            if let PacingType::GlobalPeriodic(freq) = &o.close.pacing {
216                o.close.has_self_reference.not().then(|| {
217                    (
218                        o.reference.out_ix(),
219                        UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()),
220                    )
221                })
222            } else {
223                None
224            }
225        });
226        for (out_ix, period) in periodic_close {
227            let ix = period.get::<second>() / gcd.get::<second>();
228            // Period must be integer multiple of gcd by def of gcd
229            assert!(ix.is_integer());
230            let ix = ix.to_integer() as usize;
231            let ix = ix - 1;
232            extend_steps[ix].push(Task::Close(out_ix));
233        }
234        Ok(extend_steps)
235    }
236
237    /// Transforms `extend_steps` into a sequence of [Deadline]s.  
238    ///
239    /// `gcd` represents the minimal time step possible between two consecutive deadlines. Each entry in `extend_steps`
240    /// represents a minimal time step in the schedule.  The resulting Deadlines summarize these entries without containing
241    /// gaps.  So for every deadline, [Deadline::due] will contain at least one entry.
242    ///
243    /// # Panics
244    /// Panics if the last entry/-ies of `extend_steps` are empty.
245    fn condense_deadlines(gcd: UOM_Time, extend_steps: Vec<Vec<Task>>) -> Vec<Deadline> {
246        let mut empty_counter = 0;
247        let mut deadlines: Vec<Deadline> = vec![];
248        for step in extend_steps.iter() {
249            if step.is_empty() {
250                empty_counter += 1;
251                continue;
252            }
253            let pause = gcd.get::<nanosecond>() * (empty_counter + 1);
254            let pause = Duration::from_nanos(pause.to_integer() as u64);
255            empty_counter = 0;
256            let deadline = Deadline {
257                pause,
258                due: step.clone(),
259            };
260            deadlines.push(deadline);
261        }
262        // There cannot be some gcd periods left at the end of the hyper-period.
263        assert!(empty_counter == 0);
264        deadlines
265    }
266
267    fn sort_deadlines(ir: &RtLolaMir, deadlines: &mut Vec<Deadline>) {
268        for deadline in deadlines {
269            deadline.due.sort_by_key(|s| match s {
270                Task::Evaluate(sref) => ir.outputs[*sref].eval_layer().inner(),
271                Task::Spawn(sref) => ir.outputs[*sref].spawn_layer().inner(),
272                Task::Close(_) => usize::MAX,
273            });
274        }
275    }
276}
277mod math {
278    use num::integer::{gcd as num_gcd, lcm as num_lcm};
279    use num::rational::Rational64 as Rational;
280
281    pub(crate) fn rational_gcd(a: Rational, b: Rational) -> Rational {
282        let numer = num_gcd(*a.numer(), *b.numer());
283        let denom = num_lcm(*a.denom(), *b.denom());
284        Rational::new(numer, denom)
285    }
286
287    pub(crate) fn rational_lcm(a: Rational, b: Rational) -> Rational {
288        let numer = num_lcm(*a.numer(), *b.numer());
289        let denom = num_gcd(*a.denom(), *b.denom());
290        Rational::new(numer, denom)
291    }
292
293    pub(crate) fn rational_gcd_all(v: &[Rational]) -> Rational {
294        assert!(!v.is_empty());
295        v.iter().fold(v[0], |a, b| rational_gcd(a, *b))
296    }
297
298    pub(crate) fn rational_lcm_all(v: &[Rational]) -> Rational {
299        assert!(!v.is_empty());
300        v.iter().fold(v[0], |a, b| rational_lcm(a, *b))
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use num::{FromPrimitive, ToPrimitive};
307
308    use super::math::*;
309    use super::*;
310    use crate::mir::schedule::Task::{Close, Evaluate, Spawn};
311    use crate::mir::RtLolaMir;
312    use crate::ParserConfig;
313
314    macro_rules! rat {
315        ($i:expr) => {
316            Rational::from_i64($i).unwrap()
317        };
318        ($n:expr, $d:expr) => {
319            Rational::new($n, $d)
320        };
321    }
322
323    macro_rules! assert_eq_with_sort {
324        ($left:expr, $right:expr) => {
325            $left.sort();
326            $right.sort();
327            assert_eq!($left, $right)
328        };
329    }
330    #[test]
331    fn test_gcd() {
332        assert_eq!(rational_gcd(rat!(3), rat!(18)), rat!(3));
333        assert_eq!(rational_gcd(rat!(18), rat!(3)), rat!(3));
334        assert_eq!(rational_gcd(rat!(1), rat!(25)), rat!(1));
335        assert_eq!(rational_gcd(rat!(5), rat!(13)), rat!(1));
336        assert_eq!(rational_gcd(rat!(25), rat!(40)), rat!(5));
337        assert_eq!(rational_gcd(rat!(7), rat!(7)), rat!(7));
338        assert_eq!(rational_gcd(rat!(7), rat!(7)), rat!(7));
339
340        assert_eq!(rational_gcd(rat!(1, 4), rat!(1, 2)), rat!(1, 4));
341        assert_eq!(rational_lcm(rat!(1, 4), rat!(1, 2)), rat!(1, 2));
342        assert_eq!(rational_gcd(rat!(2, 3), rat!(1, 8)), rat!(1, 24));
343        assert_eq!(rational_lcm(rat!(2, 3), rat!(1, 8)), rat!(2));
344    }
345
346    fn to_ir(spec: &str) -> RtLolaMir {
347        let cfg = ParserConfig::for_string(String::from(spec));
348        crate::parse(&cfg).expect("spec was invalid")
349    }
350
351    /// Divides two durations. If `rhs` is not a divider of `lhs`, a warning is emitted and the
352    /// rounding strategy `round_up` is applied.
353    fn divide_durations(lhs: Duration, rhs: Duration, round_up: bool) -> usize {
354        // The division of durations is currently unstable (feature duration_float) because
355        // it falls back to using floats which cannot necessarily represent the durations
356        // accurately. We, however, fall back to nanoseconds as u128. Regardless, some inaccuracies
357        // might occur, rendering this code TODO *not stable for real-time devices!*
358        let lhs = lhs.as_nanos();
359        let rhs = rhs.as_nanos();
360        let representable = lhs % rhs == 0;
361        let mut div = lhs / rhs;
362        if !representable {
363            println!("Warning: Spec unstable: Cannot accurately represent extend periods.");
364            // TODO: Introduce better mechanism for emitting such warnings.
365            if round_up {
366                div += 1;
367            }
368        }
369        div as usize
370    }
371
372    #[test]
373    #[ignore] //Depends on Typechecker, NYI
374    fn test_extension_rate_extraction() {
375        let input = "input a: UInt64\n";
376        let hz50 = "output b: UInt64 @50Hz := 1\n";
377        let hz40 = "output c: UInt64 @40Hz := 2\n";
378        let ms20 = "output d: UInt64 @20ms := 3\n"; // 50Hz
379        let ms1 = "output e: UInt64 @1ms := 4\n"; // 100Hz
380
381        let case1 = (format!("{}{}", input, hz50), 20_000_000);
382        let case2 = (format!("{}{}", input, hz40), 25_000_000);
383        let case3 = (format!("{}{}{}", input, hz50, hz40), 5_000_000);
384        let case4 = (format!("{}{}{}", input, hz50, ms1), 1_000_000);
385        let case5 = (format!("{}{}{}{}", input, hz50, ms20, ms1), 1_000_000);
386
387        let cases = [case1, case2, case3, case4, case5];
388        for (spec, expected) in cases.iter() {
389            let periods: Vec<_> = to_ir(spec).time_driven.iter().map(|s| s.period()).collect();
390            let was = Schedule::find_extend_period(&periods);
391            let was = was.get::<nanosecond>().to_integer().to_u64().expect("");
392            assert_eq!(*expected, was);
393        }
394    }
395
396    #[test]
397    fn test_divide_durations_round_down() {
398        type TestDurations = ((u64, u32), (u64, u32), usize);
399        let case1: TestDurations = ((1, 0), (1, 0), 1);
400        let case2: TestDurations = ((1, 0), (0, 100_000_000), 10);
401        let case3: TestDurations = ((1, 0), (0, 100_000), 10_000);
402        let case4: TestDurations = ((1, 0), (0, 20_000), 50_000);
403        let case5: TestDurations = ((0, 40_000), (0, 30_000), 1);
404        let case6: TestDurations = ((3, 1_000), (3, 5_000), 0);
405
406        let cases = [case1, case2, case3, case4, case5, case6];
407        for (a, b, expected) in &cases {
408            let to_dur = |(s, n)| Duration::new(s, n);
409            let was = divide_durations(to_dur(*a), to_dur(*b), false);
410            assert_eq!(was, *expected, "Expected {}, but was {}.", expected, was);
411        }
412    }
413
414    #[test]
415    fn test_divide_durations_round_up() {
416        type TestDurations = ((u64, u32), (u64, u32), usize);
417        let case1: TestDurations = ((1, 0), (1, 0), 1);
418        let case2: TestDurations = ((1, 0), (0, 100_000_000), 10);
419        let case3: TestDurations = ((1, 0), (0, 100_000), 10_000);
420        let case4: TestDurations = ((1, 0), (0, 20_000), 50_000);
421        let case5: TestDurations = ((0, 40_000), (0, 30_000), 2);
422        let case6: TestDurations = ((3, 1_000), (3, 5_000), 1);
423
424        let cases = [case1, case2, case3, case4, case5, case6];
425        for (a, b, expected) in &cases {
426            let to_dur = |(s, n)| Duration::new(s, n);
427            let was = divide_durations(to_dur(*a), to_dur(*b), true);
428            assert_eq!(was, *expected, "Expected {}, but was {}.", expected, was);
429        }
430    }
431
432    #[test]
433    fn test_spawn_close_scheduled() {
434        let ir = to_ir(
435            "input a:UInt64\n\
436                          output x @1Hz := a.hold(or: 42)\n\
437                          output y close when x = 42 eval with a\n\
438                          output z spawn @0.5Hz when a.hold(or: 42) = 1337 eval with a - 15
439       ",
440        );
441        let mut schedule = ir.compute_schedule().expect("failed to compute schedule");
442        assert_eq_with_sort!(schedule.deadlines[0].due, vec![Evaluate(0), Close(1)]);
443        assert_eq_with_sort!(
444            schedule.deadlines[1].due,
445            vec![Evaluate(0), Spawn(2), Close(1)]
446        );
447    }
448}