taktora_executor/grid.rs
1//! Absolute-grid cyclic dispatch: the scheduling time source (`CyclicClock`),
2//! the dispatch-mode toggle (`DispatchMode`), and the pure `GridTimer` state
3//! machine for `REQ_0268` / `ADR_0100`.
4//!
5//! This module is deliberately free of iceoryx2 and of the telemetry
6//! `MonotonicClock`: scheduling time is a *distinct* role from telemetry
7//! measurement, so a test telemetry clock can never alter dispatch timing.
8
9use std::time::{Duration, Instant};
10
11/// Monotonic nanosecond time source used for **scheduling** cyclic dispatch.
12///
13/// Distinct from [`crate::MonotonicClock`] (telemetry) by design: the type
14/// distinction guarantees a telemetry mock can never be wired as the scheduler.
15/// A future fieldbus distributed-clock source is just another implementation.
16pub trait CyclicClock: Send + Sync + 'static {
17 /// Nanoseconds since this clock's epoch. Monotonic non-decreasing.
18 fn now_nanos(&self) -> u64;
19}
20
21/// Production scheduling clock over `CLOCK_MONOTONIC` (via `Instant`).
22#[derive(Debug)]
23pub struct MonotonicCyclicClock {
24 epoch: Instant,
25}
26
27impl MonotonicCyclicClock {
28 /// Construct a clock whose epoch is the current instant.
29 #[must_use]
30 pub fn new() -> Self {
31 Self {
32 epoch: Instant::now(),
33 }
34 }
35}
36
37impl Default for MonotonicCyclicClock {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl CyclicClock for MonotonicCyclicClock {
44 fn now_nanos(&self) -> u64 {
45 u64::try_from(self.epoch.elapsed().as_nanos()).unwrap_or(u64::MAX)
46 }
47}
48
49/// Cyclic dispatch timing strategy.
50///
51/// `Grid` is the absolute-grid timer of `REQ_0268`; `Legacy` is the pre-fix
52/// `attach_interval` path. The [`Default`] is **platform-conditional**: `Grid`
53/// on Linux (the production absolute-grid `timerfd` path), `Legacy` on non-Linux
54/// dev hosts. On non-Linux `Grid` is only a self-computed-`epoll`-timeout
55/// fallback — not the real-time target — and its millisecond-rounding jitter
56/// makes tight timing tests flaky on loaded CI, so the stable `attach_interval`
57/// path is the better default there. The Linux production behaviour is unchanged.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub enum DispatchMode {
60 /// Self-computed absolute grid; the production default on Linux.
61 Grid,
62 /// iceoryx2 `attach_interval` relative timer; the default on non-Linux dev
63 /// hosts (and the opt-in legacy path on Linux).
64 Legacy,
65}
66
67impl Default for DispatchMode {
68 fn default() -> Self {
69 if cfg!(target_os = "linux") {
70 Self::Grid
71 } else {
72 Self::Legacy
73 }
74 }
75}
76
77/// Pure absolute-grid timer. Holds one nominal target per cyclic task; advances
78/// each target by exactly one period per dispatch so lateness never compounds.
79///
80/// No clock and no I/O: callers pass `now` (read from a [`CyclicClock`]) in, so
81/// the whole state machine is deterministic and unit-testable.
82//
83// `redundant_pub_crate` is allowed: `GridTimer` lives in this private module
84// and is driven from `dispatch_loop`, so a `pub(crate)` type here reads as
85// redundant under clippy. Every field is read — `epoch` by `take_due`
86// (skip-realign), `period_ns`/`next` by the dispatch loop.
87#[allow(clippy::redundant_pub_crate)]
88#[derive(Debug)]
89pub(crate) struct GridTimer {
90 /// Scheduling epoch (ns), sampled once at dispatch-loop entry.
91 epoch: u64,
92 /// Per cyclic task period (ns); index-aligned with `next`. All cadences
93 /// share `epoch`, so every period phase-aligns at the epoch (harmonic grid).
94 period_ns: Vec<u64>,
95 /// Per cyclic task next absolute grid target (ns); `epoch + slot·period`.
96 next: Vec<u64>,
97 /// Per-task skipped-slot carry (`REQ_0840`): slots the realign of the
98 /// *previous* dispatch passed over unserved, reported on the task's next
99 /// dispatch (backward-looking). `0` in steady state; consumed exactly once.
100 carry: Vec<u64>,
101 /// Per-task "dispatched at least once". A realign on the very first
102 /// dispatch sets no carry: the task's lateness grid anchors at that
103 /// dispatch, so earlier slots do not exist on its own grid (`REQ_0840`).
104 served: Vec<bool>,
105}
106
107impl GridTimer {
108 /// `epoch` = scheduling `now_nanos()` at dispatch entry; one `period` per
109 /// cyclic task. First target for task *k* is `epoch + period_k`.
110 pub(crate) fn new(epoch: u64, periods: Vec<u64>) -> Self {
111 let len = periods.len();
112 let next = periods.iter().map(|p| epoch.saturating_add(*p)).collect();
113 Self {
114 epoch,
115 period_ns: periods,
116 next,
117 carry: vec![0; len],
118 served: vec![false; len],
119 }
120 }
121
122 /// Time to sleep until the earliest pending grid target (zero if already
123 /// due — a zero `epoll` timeout polls and catches up).
124 //
125 // Used only on the non-Linux self-computed-timeout path: on Linux the master
126 // timerfd owns the wake (the wait blocks with `Duration::MAX`), so this is
127 // dead there (REQ_0268 / ADR_0100).
128 #[cfg_attr(target_os = "linux", allow(dead_code))]
129 pub(crate) fn next_timeout(&self, now: u64) -> Duration {
130 // No cyclic targets → no grid-driven wakeup. Return `Duration::MAX`
131 // exactly (not `u64::MAX` nanos): the WaitSet treats `Duration::MAX`
132 // as "block indefinitely on fds" and dispatches a near-MAX `timed_wait`
133 // that overflows to `WaitSetRunError::InternalError`. This keeps an
134 // event-only executor blocking on its fds identically to Legacy.
135 let Some(earliest) = self.next.iter().copied().min() else {
136 return Duration::MAX;
137 };
138 Duration::from_nanos(earliest.saturating_sub(now))
139 }
140
141 /// Current nominal target for task `i` (test/inspection helper).
142 #[cfg(test)]
143 pub(crate) fn next_target(&self, i: usize) -> u64 {
144 self.next[i]
145 }
146
147 /// Collect cyclic tasks due at `now` into `due` (cleared first), each
148 /// paired with its consumed skipped-slot carry (`REQ_0840`) and how late
149 /// this dispatch is past the nominal slot it serves (`late_by`,
150 /// `REQ_0106`: the record path back-dates the task's lateness-grid epoch
151 /// by the FIRST dispatch's `late_by`, anchoring the grid at the nominal
152 /// slot — both values are same-clock differences, so they cross the
153 /// scheduling/telemetry clock-domain boundary safely). A due task is
154 /// dispatched exactly once; its target then advances by one period in the
155 /// normal case, or — if the wake was late by ≥1 whole slot — snaps
156 /// closed-form to the next *future* grid point (skip-realign, `ADR_0100`),
157 /// recording the abandoned slot count as carry for the task's NEXT
158 /// dispatch. Never replays a burst of stale cycles, which is wrong for
159 /// cyclic control.
160 pub(crate) fn take_due(&mut self, now: u64, due: &mut Vec<(usize, u64, u64)>) {
161 due.clear();
162 for (i, next) in self.next.iter_mut().enumerate() {
163 if now >= *next {
164 let carry = std::mem::take(&mut self.carry[i]);
165 let period = self.period_ns[i];
166 if period == 0 {
167 // Unreachable post-registration (rejected per REQ_0268);
168 // continue skips served/carry bookkeeping intentionally.
169 due.push((i, carry, 0));
170 continue;
171 }
172 let stepped = next.saturating_add(period);
173 *next = if stepped > now {
174 // Normal case: one period ahead is already in the future.
175 stepped
176 } else {
177 // Missed >= 1 whole slot: closed-form snap to the next
178 // future grid point. Dispatch once (above); never burst.
179 let slots_passed = now.saturating_sub(self.epoch) / period;
180 let snapped = self
181 .epoch
182 .saturating_add(slots_passed.saturating_add(1).saturating_mul(period));
183 if self.served[i] {
184 // Abandoned slots strictly after the one served by this
185 // dispatch, up to (exclusive) the realigned target (REQ_0840).
186 self.carry[i] = snapped.saturating_sub(stepped) / period;
187 }
188 snapped
189 };
190 // In both branches the slot this dispatch SERVES is one
191 // period before the realigned target, so `late_by` is the
192 // same closed form for the normal and the snap path.
193 let late_by = now.saturating_sub(next.saturating_sub(period));
194 due.push((i, carry, late_by));
195 self.served[i] = true;
196 }
197 }
198 }
199}
200
201/// The base tick for a PLC-style master timer: the GCD of all declared cyclic
202/// periods (ns), so every task's period is an integer number of base ticks and
203/// the single timer hits every task's grid point. Returns 0 when there are no
204/// cyclic tasks (caller arms no timer). Zero-valued periods are ignored (they
205/// are rejected at registration, `REQ_0268`).
206// `redundant_pub_crate`: this module is private, so `pub(crate)` looks redundant,
207// but the symbol is consumed by `executor::dispatch_loop` (the master timer).
208// `dead_code`: only the Linux master-timer path calls `base_period`; on non-Linux
209// the `GridTimer` drives dispatch via `next_timeout`, so it is genuinely unused.
210#[allow(clippy::redundant_pub_crate)]
211#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
212pub(crate) fn base_period(periods: &[u64]) -> u64 {
213 periods.iter().copied().filter(|p| *p != 0).fold(0, gcd)
214}
215
216// Called only from `base_period`, so it shares the same non-Linux dead-code fate.
217#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
218const fn gcd(a: u64, b: u64) -> u64 {
219 if b == 0 { a } else { gcd(b, a % b) }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn monotonic_cyclic_clock_is_non_decreasing() {
228 let c = MonotonicCyclicClock::new();
229 let a = c.now_nanos();
230 let b = c.now_nanos();
231 assert!(
232 b >= a,
233 "CLOCK_MONOTONIC must not go backwards: {a} then {b}"
234 );
235 }
236
237 #[test]
238 fn dispatch_mode_default_is_grid_on_linux_legacy_elsewhere() {
239 // Production default: the absolute-grid timerfd path on Linux; the
240 // stable attach_interval fallback on non-Linux dev hosts (REQ_0268).
241 #[cfg(target_os = "linux")]
242 assert_eq!(DispatchMode::default(), DispatchMode::Grid);
243 #[cfg(not(target_os = "linux"))]
244 assert_eq!(DispatchMode::default(), DispatchMode::Legacy);
245 }
246
247 #[test]
248 fn single_period_advances_on_absolute_grid_with_zero_drift() {
249 // period 1000ns, epoch 0. Wake late by a varying jitter each cycle and
250 // confirm the *nominal target* never absorbs that jitter (no drift).
251 let mut t = GridTimer::new(0, vec![1000]);
252 let mut due = Vec::new();
253
254 // Cycle 1: woke at 1005 (5ns late). Due once; next target -> 2000, not 2005.
255 t.take_due(1005, &mut due);
256 assert_eq!(due, vec![(0, 0, 5)]);
257 assert_eq!(t.next_target(0), 2000);
258
259 // Cycle 2: woke at 2012 (12ns late). Due once; next target -> 3000.
260 t.take_due(2012, &mut due);
261 assert_eq!(due, vec![(0, 0, 12)]);
262 assert_eq!(t.next_target(0), 3000);
263
264 // Not yet due at 2999.
265 t.take_due(2999, &mut due);
266 assert_eq!(due, Vec::<(usize, u64, u64)>::new());
267 assert_eq!(t.next_target(0), 3000);
268 }
269
270 #[test]
271 fn stall_skips_whole_slots_and_dispatches_once() {
272 // period 1000, epoch 0. We were starved until 3500 (slots 1,2,3 missed).
273 let mut t = GridTimer::new(0, vec![1000]);
274 let mut due = Vec::new();
275
276 t.take_due(3500, &mut due);
277 // Dispatched exactly once — no burst replay of the 3 missed cycles.
278 assert_eq!(due, vec![(0, 0, 500)]);
279 // Re-aligned to the next *future* slot: floor(3500/1000)+1 = 4 -> 4000.
280 assert_eq!(t.next_target(0), 4000);
281 assert!(
282 t.next_target(0) > 3500,
283 "target must be strictly in the future"
284 );
285 }
286
287 #[test]
288 fn stall_realign_is_exact_on_a_slot_boundary() {
289 let mut t = GridTimer::new(0, vec![1000]);
290 let mut due = Vec::new();
291 // Exactly on slot 3's boundary.
292 t.take_due(3000, &mut due);
293 assert_eq!(due, vec![(0, 0, 0)]);
294 assert_eq!(t.next_target(0), 4000);
295 }
296
297 #[test]
298 // ns form is deliberate: the grid is a nanosecond domain, so timeouts read
299 // clearest in the same unit as the period under test.
300 #[allow(clippy::duration_suboptimal_units)]
301 fn next_timeout_is_distance_to_earliest_target() {
302 let t = GridTimer::new(0, vec![1000]);
303 assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
304 assert_eq!(t.next_timeout(250), Duration::from_nanos(750));
305 // Already past the target -> zero (catch up immediately).
306 assert_eq!(t.next_timeout(1500), Duration::from_nanos(0));
307 }
308
309 #[test]
310 fn empty_grid_next_timeout_is_duration_max() {
311 // No cyclic tasks: the timer must yield `Duration::MAX` exactly so the
312 // WaitSet blocks on fds (event-only executor) instead of issuing a
313 // near-MAX timed wait that overflows to an InternalError.
314 let t = GridTimer::new(0, vec![]);
315 assert_eq!(t.next_timeout(0), Duration::MAX);
316 assert_eq!(t.next_timeout(12_345), Duration::MAX);
317 }
318
319 #[test]
320 fn base_period_is_gcd_of_declared_periods() {
321 assert_eq!(base_period(&[1_000_000]), 1_000_000); // single task → its period
322 assert_eq!(base_period(&[2_000_000, 4_000_000]), 2_000_000); // harmonic → smaller
323 assert_eq!(base_period(&[2_000_000, 3_000_000]), 1_000_000); // coprime → gcd
324 assert_eq!(base_period(&[1_000_000, 1_000_000]), 1_000_000); // duplicates
325 assert_eq!(base_period(&[0, 2_000_000]), 2_000_000); // zero entry ignored
326 assert_eq!(base_period(&[]), 0); // no cyclic tasks
327 }
328
329 #[test]
330 // ns form is deliberate: targets and periods read clearest in the same unit.
331 #[allow(clippy::duration_suboptimal_units)]
332 fn multi_period_picks_earliest_and_coalesces_coincident_slots() {
333 // Two cadences sharing one epoch: 1000ns and 2000ns (harmonic grid).
334 let mut t = GridTimer::new(0, vec![1000, 2000]);
335 let mut due = Vec::new();
336
337 // Earliest target is task0 at 1000.
338 assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
339
340 // At 1000: only the 1ms task is due.
341 t.take_due(1000, &mut due);
342 assert_eq!(due, vec![(0, 0, 0)]);
343
344 // Next earliest: both targets now at 2000.
345 assert_eq!(t.next_timeout(1000), Duration::from_nanos(1000));
346
347 // At 2000: both cadences coincide -> both due in one wake.
348 t.take_due(2000, &mut due);
349 assert_eq!(due, vec![(0, 0, 0), (1, 0, 0)]);
350 assert_eq!(t.next_target(0), 3000);
351 assert_eq!(t.next_target(1), 4000);
352 }
353
354 #[test]
355 fn due_entries_carry_zero_skips_in_steady_state() {
356 let mut t = GridTimer::new(0, vec![1000]);
357 let mut due = Vec::new();
358 t.take_due(1002, &mut due);
359 assert_eq!(due, vec![(0, 0, 2)]);
360 t.take_due(2005, &mut due);
361 assert_eq!(due, vec![(0, 0, 5)]);
362 }
363
364 #[test]
365 fn realign_carries_abandoned_slots_to_the_next_dispatch_exactly_once() {
366 // period 1000, epoch 0. Dispatch on-grid once, then starve.
367 let mut t = GridTimer::new(0, vec![1000]);
368 let mut due = Vec::new();
369 t.take_due(1002, &mut due); // serves slot 1; next -> 2000
370 assert_eq!(due, vec![(0, 0, 2)]);
371
372 // Starved until 3500: serves the overdue slot-2 target, realigns to 4000.
373 // Slot 3 (t=3000) is abandoned: carry = (4000 - 3000) / 1000 = 1.
374 // The starved dispatch itself reports 0 — backward-looking semantics:
375 // nothing was passed over between slot 1 (previous) and slot 2 (this).
376 t.take_due(3500, &mut due);
377 assert_eq!(due, vec![(0, 0, 500)]);
378 assert_eq!(t.next_target(0), 4000);
379
380 // The NEXT dispatch consumes the carry (slot 2 -> slot 4 skipped slot 3).
381 t.take_due(4000, &mut due);
382 assert_eq!(due, vec![(0, 1, 0)]);
383
384 // Consumed exactly once: back to 0 afterwards.
385 t.take_due(5000, &mut due);
386 assert_eq!(due, vec![(0, 0, 0)]);
387 }
388
389 #[test]
390 fn realign_on_a_tasks_first_dispatch_sets_no_carry() {
391 // Starved before the task ever dispatched: slots before the first
392 // dispatch do not exist on the task's own lateness grid (REQ_0840).
393 let mut t = GridTimer::new(0, vec![1000]);
394 let mut due = Vec::new();
395 t.take_due(3500, &mut due); // first dispatch, realigns 2000 -> 4000
396 assert_eq!(due, vec![(0, 0, 500)]);
397 assert_eq!(t.next_target(0), 4000);
398 t.take_due(4000, &mut due); // no carry from the first-dispatch realign
399 assert_eq!(due, vec![(0, 0, 0)]);
400 }
401
402 #[test]
403 fn due_entries_report_late_by_vs_the_last_passed_grid_point() {
404 // `late_by` anchors the task's lateness-grid epoch (REQ_0106): it is
405 // the distance from the dispatch to the most recent grid point at or
406 // before `now`. For a whole-slot miss the abandoned slots are NOT
407 // part of it — they do not exist on the task's own grid (REQ_0840) —
408 // so the anchor lands on the lattice point this dispatch realigns
409 // from, keeping later slots' lateness offset-free.
410 let mut t = GridTimer::new(0, vec![1000]);
411 let mut due = Vec::new();
412 // Late wake within the slot: late vs its own target.
413 t.take_due(1700, &mut due);
414 assert_eq!(due, vec![(0, 0, 700)]);
415 // Whole-slot miss (next=2000 overdue, realign to 4000): late is
416 // measured vs the last passed lattice point (3000), not the overdue
417 // 2000 target.
418 t.take_due(3400, &mut due);
419 assert_eq!(due, vec![(0, 0, 400)]);
420 }
421
422 #[test]
423 fn multi_slot_starvation_carries_the_full_abandoned_count() {
424 let mut t = GridTimer::new(0, vec![1000]);
425 let mut due = Vec::new();
426 t.take_due(1000, &mut due); // first dispatch on-grid; next -> 2000
427 // Starved until 5500: serves slot-2 target, realign to 6000.
428 // Abandoned: slots at 3000, 4000, 5000 -> carry = (6000 - 3000)/1000 = 3.
429 t.take_due(5500, &mut due);
430 assert_eq!(due, vec![(0, 0, 500)]);
431 assert_eq!(t.next_target(0), 6000);
432 t.take_due(6000, &mut due);
433 assert_eq!(due, vec![(0, 3, 0)]);
434 }
435
436 #[test]
437 fn back_to_back_realigns_hand_over_carry_without_loss_or_doubling() {
438 // Pins the mem::take-then-reassign ordering across two consecutive
439 // starvations: the first realign's carry must be consumed by the second
440 // starvation's due entry (not stale/doubled), and the second realign's
441 // fresh carry must be delivered on the very next on-grid wake.
442 // epoch 0, period 1000.
443 let mut t = GridTimer::new(0, vec![1000]);
444 let mut due = Vec::new();
445
446 // Dispatch on-grid: serves slot 1 (target 1000); next -> 2000.
447 t.take_due(1000, &mut due);
448 assert_eq!(due, vec![(0, 0, 0)]);
449
450 // First starvation: now=4500 >= next=2000.
451 // stepped = 2000 + 1000 = 3000; 3000 <= 4500 -> realign.
452 // slots_passed = 4500/1000 = 4; snapped = (4+1)*1000 = 5000.
453 // carry = (5000 - 3000) / 1000 = 2. Abandoned: slots 3000, 4000.
454 // Due entry reports 0 (backward-looking; nothing before slot-2 skipped).
455 t.take_due(4500, &mut due);
456 assert_eq!(due, vec![(0, 0, 500)]);
457 assert_eq!(t.next_target(0), 5000);
458
459 // Second starvation BEFORE on-grid wake: now=7500 >= next=5000.
460 // mem::take delivers the first realign's carry (2) in this due entry.
461 // stepped = 5000 + 1000 = 6000; 6000 <= 7500 -> realign.
462 // slots_passed = 7500/1000 = 7; snapped = (7+1)*1000 = 8000.
463 // carry = (8000 - 6000) / 1000 = 2. (Second realign's fresh carry.)
464 t.take_due(7500, &mut due);
465 assert_eq!(due, vec![(0, 2, 500)]);
466 assert_eq!(t.next_target(0), 8000);
467
468 // On-grid wake: consumes the second realign's carry, then drains to 0.
469 t.take_due(8000, &mut due);
470 assert_eq!(due, vec![(0, 2, 0)]);
471 t.take_due(9000, &mut due);
472 assert_eq!(due, vec![(0, 0, 0)]);
473 }
474}