1use std::ops::Not;
2use std::time::Duration;
3
4use num::rational::Rational64 as Rational;
5use num::{One, ToPrimitive};
6use uom::num_traits::Inv;
7use uom::si::rational64::Time as UOM_Time;
8use uom::si::time::{nanosecond, second};
9
10use super::PacingLocality;
11use crate::mir::{OutputReference, PacingType, RtLolaMir, Stream};
12
13#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
15pub enum Task {
16 Evaluate(OutputReference),
18 Spawn(OutputReference),
20 Close(OutputReference),
22}
23
24#[derive(Debug, Clone)]
32pub struct Deadline {
33 pub pause: Duration,
35 pub due: Vec<Task>,
37}
38
39#[derive(Debug, Clone)]
44pub struct Schedule {
45 pub hyper_period: Option<Duration>,
51
52 pub deadlines: Vec<Deadline>,
65}
66
67impl Schedule {
68 pub(crate) fn from(ir: &RtLolaMir) -> Result<Schedule, String> {
72 let stream_periods = ir
73 .time_driven
74 .iter()
75 .filter(|tds| tds.locality == PacingLocality::Global)
76 .map(|tds| tds.period());
77 let spawn_periods = ir.outputs.iter().filter_map(|o| {
78 if let PacingType::GlobalPeriodic(freq) = &o.spawn.pacing {
79 Some(UOM_Time::new::<second>(
80 freq.get::<uom::si::frequency::hertz>().inv(),
81 ))
82 } else {
83 None
84 }
85 });
86 let close_periods = ir.outputs.iter().filter_map(|o| {
87 if let PacingType::GlobalPeriodic(freq) = &o.close.pacing {
88 Some(UOM_Time::new::<second>(
89 freq.get::<uom::si::frequency::hertz>().inv(),
90 ))
91 } else {
92 None
93 }
94 });
95 let periods: Vec<UOM_Time> = stream_periods
96 .chain(spawn_periods)
97 .chain(close_periods)
98 .collect();
99 if periods.is_empty() {
100 return Ok(Schedule {
102 hyper_period: None,
103 deadlines: vec![],
104 });
105 }
106 let gcd = Self::find_extend_period(&periods);
107 let hyper_period = Self::find_hyper_period(&periods);
108
109 let extend_steps = Self::build_extend_steps(ir, gcd, hyper_period)?;
110 let extend_steps = Self::apply_periodicity(&extend_steps);
111 let mut deadlines = Self::condense_deadlines(gcd, extend_steps);
112 Self::sort_deadlines(ir, &mut deadlines);
113
114 let hyper_period = Duration::from_nanos(
115 hyper_period
116 .get::<nanosecond>()
117 .to_integer()
118 .to_u64()
119 .unwrap(),
120 );
121 Ok(Schedule {
122 hyper_period: Some(hyper_period),
123 deadlines,
124 })
125 }
126
127 fn find_extend_period(rates: &[UOM_Time]) -> UOM_Time {
130 assert!(!rates.is_empty());
131 let rates: Vec<Rational> = rates.iter().map(|r| r.get::<nanosecond>()).collect();
132 let gcd = math::rational_gcd_all(&rates);
133 UOM_Time::new::<nanosecond>(gcd)
134 }
135
136 fn find_hyper_period(rates: &[UOM_Time]) -> UOM_Time {
138 assert!(!rates.is_empty());
139 let rates: Vec<Rational> = rates.iter().map(|r| r.get::<nanosecond>()).collect();
140 let lcm = math::rational_lcm_all(&rates);
141 let lcm = math::rational_lcm(lcm, Rational::one()); UOM_Time::new::<nanosecond>(lcm)
143 }
144
145 fn apply_periodicity(steps: &[Vec<Task>]) -> Vec<Vec<Task>> {
152 let mut res = vec![Vec::new(); steps.len()];
157 for (ix, streams) in steps.iter().enumerate() {
158 if !streams.is_empty() {
159 let mut k = 1;
160 while let Some(target) = res.get_mut(k * (ix + 1) - 1) {
161 target.extend(streams);
162 k += 1;
163 }
164 }
165 }
166 res
167 }
168
169 fn build_extend_steps(
175 ir: &RtLolaMir,
176 gcd: UOM_Time,
177 hyper_period: UOM_Time,
178 ) -> Result<Vec<Vec<Task>>, String> {
179 let num_steps = hyper_period.get::<second>() / gcd.get::<second>();
180 assert!(num_steps.is_integer());
181 let num_steps = num_steps.to_integer() as usize;
182 if num_steps >= 10_000_000 {
183 return Err("stream frequencies are too incompatible to generate schedule".to_string());
184 }
185 let mut extend_steps = vec![Vec::new(); num_steps];
186 for s in ir
187 .time_driven
188 .iter()
189 .filter(|tds| tds.locality == PacingLocality::Global)
190 {
191 let ix = s.period().get::<second>() / gcd.get::<second>();
192 assert!(ix.is_integer());
194 let ix = ix.to_integer() as usize;
195 let ix = ix - 1;
196 extend_steps[ix].push(Task::Evaluate(s.reference.out_ix()));
197 }
198 let periodic_spawns = ir.outputs.iter().filter_map(|o| match &o.spawn.pacing {
199 PacingType::GlobalPeriodic(freq) => Some((
200 o.reference.out_ix(),
201 UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()),
202 )),
203 _ => None,
204 });
205 for (out_ix, period) in periodic_spawns {
206 let ix = period.get::<second>() / gcd.get::<second>();
207 assert!(ix.is_integer());
209 let ix = ix.to_integer() as usize;
210 let ix = ix - 1;
211 extend_steps[ix].push(Task::Spawn(out_ix));
212 }
213
214 let periodic_close = ir.outputs.iter().filter_map(|o| {
215 if let PacingType::GlobalPeriodic(freq) = &o.close.pacing {
216 o.close.has_self_reference.not().then(|| {
217 (
218 o.reference.out_ix(),
219 UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()),
220 )
221 })
222 } else {
223 None
224 }
225 });
226 for (out_ix, period) in periodic_close {
227 let ix = period.get::<second>() / gcd.get::<second>();
228 assert!(ix.is_integer());
230 let ix = ix.to_integer() as usize;
231 let ix = ix - 1;
232 extend_steps[ix].push(Task::Close(out_ix));
233 }
234 Ok(extend_steps)
235 }
236
237 fn condense_deadlines(gcd: UOM_Time, extend_steps: Vec<Vec<Task>>) -> Vec<Deadline> {
246 let mut empty_counter = 0;
247 let mut deadlines: Vec<Deadline> = vec![];
248 for step in extend_steps.iter() {
249 if step.is_empty() {
250 empty_counter += 1;
251 continue;
252 }
253 let pause = gcd.get::<nanosecond>() * (empty_counter + 1);
254 let pause = Duration::from_nanos(pause.to_integer() as u64);
255 empty_counter = 0;
256 let deadline = Deadline {
257 pause,
258 due: step.clone(),
259 };
260 deadlines.push(deadline);
261 }
262 assert!(empty_counter == 0);
264 deadlines
265 }
266
267 fn sort_deadlines(ir: &RtLolaMir, deadlines: &mut Vec<Deadline>) {
268 for deadline in deadlines {
269 deadline.due.sort_by_key(|s| match s {
270 Task::Evaluate(sref) => ir.outputs[*sref].eval_layer().inner(),
271 Task::Spawn(sref) => ir.outputs[*sref].spawn_layer().inner(),
272 Task::Close(_) => usize::MAX,
273 });
274 }
275 }
276}
277mod math {
278 use num::integer::{gcd as num_gcd, lcm as num_lcm};
279 use num::rational::Rational64 as Rational;
280
281 pub(crate) fn rational_gcd(a: Rational, b: Rational) -> Rational {
282 let numer = num_gcd(*a.numer(), *b.numer());
283 let denom = num_lcm(*a.denom(), *b.denom());
284 Rational::new(numer, denom)
285 }
286
287 pub(crate) fn rational_lcm(a: Rational, b: Rational) -> Rational {
288 let numer = num_lcm(*a.numer(), *b.numer());
289 let denom = num_gcd(*a.denom(), *b.denom());
290 Rational::new(numer, denom)
291 }
292
293 pub(crate) fn rational_gcd_all(v: &[Rational]) -> Rational {
294 assert!(!v.is_empty());
295 v.iter().fold(v[0], |a, b| rational_gcd(a, *b))
296 }
297
298 pub(crate) fn rational_lcm_all(v: &[Rational]) -> Rational {
299 assert!(!v.is_empty());
300 v.iter().fold(v[0], |a, b| rational_lcm(a, *b))
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use num::{FromPrimitive, ToPrimitive};
307
308 use super::math::*;
309 use super::*;
310 use crate::mir::schedule::Task::{Close, Evaluate, Spawn};
311 use crate::mir::RtLolaMir;
312 use crate::ParserConfig;
313
314 macro_rules! rat {
315 ($i:expr) => {
316 Rational::from_i64($i).unwrap()
317 };
318 ($n:expr, $d:expr) => {
319 Rational::new($n, $d)
320 };
321 }
322
323 macro_rules! assert_eq_with_sort {
324 ($left:expr, $right:expr) => {
325 $left.sort();
326 $right.sort();
327 assert_eq!($left, $right)
328 };
329 }
330 #[test]
331 fn test_gcd() {
332 assert_eq!(rational_gcd(rat!(3), rat!(18)), rat!(3));
333 assert_eq!(rational_gcd(rat!(18), rat!(3)), rat!(3));
334 assert_eq!(rational_gcd(rat!(1), rat!(25)), rat!(1));
335 assert_eq!(rational_gcd(rat!(5), rat!(13)), rat!(1));
336 assert_eq!(rational_gcd(rat!(25), rat!(40)), rat!(5));
337 assert_eq!(rational_gcd(rat!(7), rat!(7)), rat!(7));
338 assert_eq!(rational_gcd(rat!(7), rat!(7)), rat!(7));
339
340 assert_eq!(rational_gcd(rat!(1, 4), rat!(1, 2)), rat!(1, 4));
341 assert_eq!(rational_lcm(rat!(1, 4), rat!(1, 2)), rat!(1, 2));
342 assert_eq!(rational_gcd(rat!(2, 3), rat!(1, 8)), rat!(1, 24));
343 assert_eq!(rational_lcm(rat!(2, 3), rat!(1, 8)), rat!(2));
344 }
345
346 fn to_ir(spec: &str) -> RtLolaMir {
347 let cfg = ParserConfig::for_string(String::from(spec));
348 crate::parse(&cfg).expect("spec was invalid")
349 }
350
351 fn divide_durations(lhs: Duration, rhs: Duration, round_up: bool) -> usize {
354 let lhs = lhs.as_nanos();
359 let rhs = rhs.as_nanos();
360 let representable = lhs % rhs == 0;
361 let mut div = lhs / rhs;
362 if !representable {
363 println!("Warning: Spec unstable: Cannot accurately represent extend periods.");
364 if round_up {
366 div += 1;
367 }
368 }
369 div as usize
370 }
371
372 #[test]
373 #[ignore] fn test_extension_rate_extraction() {
375 let input = "input a: UInt64\n";
376 let hz50 = "output b: UInt64 @50Hz := 1\n";
377 let hz40 = "output c: UInt64 @40Hz := 2\n";
378 let ms20 = "output d: UInt64 @20ms := 3\n"; let ms1 = "output e: UInt64 @1ms := 4\n"; let case1 = (format!("{}{}", input, hz50), 20_000_000);
382 let case2 = (format!("{}{}", input, hz40), 25_000_000);
383 let case3 = (format!("{}{}{}", input, hz50, hz40), 5_000_000);
384 let case4 = (format!("{}{}{}", input, hz50, ms1), 1_000_000);
385 let case5 = (format!("{}{}{}{}", input, hz50, ms20, ms1), 1_000_000);
386
387 let cases = [case1, case2, case3, case4, case5];
388 for (spec, expected) in cases.iter() {
389 let periods: Vec<_> = to_ir(spec).time_driven.iter().map(|s| s.period()).collect();
390 let was = Schedule::find_extend_period(&periods);
391 let was = was.get::<nanosecond>().to_integer().to_u64().expect("");
392 assert_eq!(*expected, was);
393 }
394 }
395
396 #[test]
397 fn test_divide_durations_round_down() {
398 type TestDurations = ((u64, u32), (u64, u32), usize);
399 let case1: TestDurations = ((1, 0), (1, 0), 1);
400 let case2: TestDurations = ((1, 0), (0, 100_000_000), 10);
401 let case3: TestDurations = ((1, 0), (0, 100_000), 10_000);
402 let case4: TestDurations = ((1, 0), (0, 20_000), 50_000);
403 let case5: TestDurations = ((0, 40_000), (0, 30_000), 1);
404 let case6: TestDurations = ((3, 1_000), (3, 5_000), 0);
405
406 let cases = [case1, case2, case3, case4, case5, case6];
407 for (a, b, expected) in &cases {
408 let to_dur = |(s, n)| Duration::new(s, n);
409 let was = divide_durations(to_dur(*a), to_dur(*b), false);
410 assert_eq!(was, *expected, "Expected {}, but was {}.", expected, was);
411 }
412 }
413
414 #[test]
415 fn test_divide_durations_round_up() {
416 type TestDurations = ((u64, u32), (u64, u32), usize);
417 let case1: TestDurations = ((1, 0), (1, 0), 1);
418 let case2: TestDurations = ((1, 0), (0, 100_000_000), 10);
419 let case3: TestDurations = ((1, 0), (0, 100_000), 10_000);
420 let case4: TestDurations = ((1, 0), (0, 20_000), 50_000);
421 let case5: TestDurations = ((0, 40_000), (0, 30_000), 2);
422 let case6: TestDurations = ((3, 1_000), (3, 5_000), 1);
423
424 let cases = [case1, case2, case3, case4, case5, case6];
425 for (a, b, expected) in &cases {
426 let to_dur = |(s, n)| Duration::new(s, n);
427 let was = divide_durations(to_dur(*a), to_dur(*b), true);
428 assert_eq!(was, *expected, "Expected {}, but was {}.", expected, was);
429 }
430 }
431
432 #[test]
433 fn test_spawn_close_scheduled() {
434 let ir = to_ir(
435 "input a:UInt64\n\
436 output x @1Hz := a.hold(or: 42)\n\
437 output y close when x = 42 eval with a\n\
438 output z spawn @0.5Hz when a.hold(or: 42) = 1337 eval with a - 15
439 ",
440 );
441 let mut schedule = ir.compute_schedule().expect("failed to compute schedule");
442 assert_eq_with_sort!(schedule.deadlines[0].due, vec![Evaluate(0), Close(1)]);
443 assert_eq_with_sort!(
444 schedule.deadlines[1].due,
445 vec![Evaluate(0), Spawn(2), Close(1)]
446 );
447 }
448}