Skip to main content

taktora_executor/
grid.rs

1//! Absolute-grid cyclic dispatch: the scheduling time source (`CyclicClock`),
2//! the dispatch-mode toggle (`DispatchMode`), and the pure `GridTimer` state
3//! machine for `REQ_0268` / `ADR_0100`.
4//!
5//! This module is deliberately free of iceoryx2 and of the telemetry
6//! `MonotonicClock`: scheduling time is a *distinct* role from telemetry
7//! measurement, so a test telemetry clock can never alter dispatch timing.
8
9use std::time::{Duration, Instant};
10
11/// Monotonic nanosecond time source used for **scheduling** cyclic dispatch.
12///
13/// Distinct from [`crate::MonotonicClock`] (telemetry) by design: the type
14/// distinction guarantees a telemetry mock can never be wired as the scheduler.
15/// A future fieldbus distributed-clock source is just another implementation.
16pub trait CyclicClock: Send + Sync + 'static {
17    /// Nanoseconds since this clock's epoch. Monotonic non-decreasing.
18    fn now_nanos(&self) -> u64;
19}
20
21/// Production scheduling clock over `CLOCK_MONOTONIC` (via `Instant`).
22#[derive(Debug)]
23pub struct MonotonicCyclicClock {
24    epoch: Instant,
25}
26
27impl MonotonicCyclicClock {
28    /// Construct a clock whose epoch is the current instant.
29    #[must_use]
30    pub fn new() -> Self {
31        Self {
32            epoch: Instant::now(),
33        }
34    }
35}
36
37impl Default for MonotonicCyclicClock {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl CyclicClock for MonotonicCyclicClock {
44    fn now_nanos(&self) -> u64 {
45        u64::try_from(self.epoch.elapsed().as_nanos()).unwrap_or(u64::MAX)
46    }
47}
48
49/// Cyclic dispatch timing strategy.
50///
51/// `Grid` is the absolute-grid timer of `REQ_0268`; `Legacy` is the pre-fix
52/// `attach_interval` path. The [`Default`] is **platform-conditional**: `Grid`
53/// on Linux (the production absolute-grid `timerfd` path), `Legacy` on non-Linux
54/// dev hosts. On non-Linux `Grid` is only a self-computed-`epoll`-timeout
55/// fallback — not the real-time target — and its millisecond-rounding jitter
56/// makes tight timing tests flaky on loaded CI, so the stable `attach_interval`
57/// path is the better default there. The Linux production behaviour is unchanged.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub enum DispatchMode {
60    /// Self-computed absolute grid; the production default on Linux.
61    Grid,
62    /// iceoryx2 `attach_interval` relative timer; the default on non-Linux dev
63    /// hosts (and the opt-in legacy path on Linux).
64    Legacy,
65}
66
67impl Default for DispatchMode {
68    fn default() -> Self {
69        if cfg!(target_os = "linux") {
70            Self::Grid
71        } else {
72            Self::Legacy
73        }
74    }
75}
76
77/// Pure absolute-grid timer. Holds one nominal target per cyclic task; advances
78/// each target by exactly one period per dispatch so lateness never compounds.
79///
80/// No clock and no I/O: callers pass `now` (read from a [`CyclicClock`]) in, so
81/// the whole state machine is deterministic and unit-testable.
82//
83// `redundant_pub_crate` is allowed: `GridTimer` lives in this private module
84// and is driven from `dispatch_loop`, so a `pub(crate)` type here reads as
85// redundant under clippy. Every field is read — `epoch` by `take_due`
86// (skip-realign), `period_ns`/`next` by the dispatch loop.
87#[allow(clippy::redundant_pub_crate)]
88#[derive(Debug)]
89pub(crate) struct GridTimer {
90    /// Scheduling epoch (ns), sampled once at dispatch-loop entry.
91    epoch: u64,
92    /// Per cyclic task period (ns); index-aligned with `next`. All cadences
93    /// share `epoch`, so every period phase-aligns at the epoch (harmonic grid).
94    period_ns: Vec<u64>,
95    /// Per cyclic task next absolute grid target (ns); `epoch + slot·period`.
96    next: Vec<u64>,
97    /// Per-task skipped-slot carry (`REQ_0840`): slots the realign of the
98    /// *previous* dispatch passed over unserved, reported on the task's next
99    /// dispatch (backward-looking). `0` in steady state; consumed exactly once.
100    carry: Vec<u64>,
101    /// Per-task "dispatched at least once". A realign on the very first
102    /// dispatch sets no carry: the task's lateness grid anchors at that
103    /// dispatch, so earlier slots do not exist on its own grid (`REQ_0840`).
104    served: Vec<bool>,
105}
106
107impl GridTimer {
108    /// `epoch` = scheduling `now_nanos()` at dispatch entry; one `period` per
109    /// cyclic task. First target for task *k* is `epoch + period_k`.
110    pub(crate) fn new(epoch: u64, periods: Vec<u64>) -> Self {
111        let len = periods.len();
112        let next = periods.iter().map(|p| epoch.saturating_add(*p)).collect();
113        Self {
114            epoch,
115            period_ns: periods,
116            next,
117            carry: vec![0; len],
118            served: vec![false; len],
119        }
120    }
121
122    /// Time to sleep until the earliest pending grid target (zero if already
123    /// due — a zero `epoll` timeout polls and catches up).
124    //
125    // Used only on the non-Linux self-computed-timeout path: on Linux the master
126    // timerfd owns the wake (the wait blocks with `Duration::MAX`), so this is
127    // dead there (REQ_0268 / ADR_0100).
128    #[cfg_attr(target_os = "linux", allow(dead_code))]
129    pub(crate) fn next_timeout(&self, now: u64) -> Duration {
130        // No cyclic targets → no grid-driven wakeup. Return `Duration::MAX`
131        // exactly (not `u64::MAX` nanos): the WaitSet treats `Duration::MAX`
132        // as "block indefinitely on fds" and dispatches a near-MAX `timed_wait`
133        // that overflows to `WaitSetRunError::InternalError`. This keeps an
134        // event-only executor blocking on its fds identically to Legacy.
135        let Some(earliest) = self.next.iter().copied().min() else {
136            return Duration::MAX;
137        };
138        Duration::from_nanos(earliest.saturating_sub(now))
139    }
140
141    /// Current nominal target for task `i` (test/inspection helper).
142    #[cfg(test)]
143    pub(crate) fn next_target(&self, i: usize) -> u64 {
144        self.next[i]
145    }
146
147    /// Collect cyclic tasks due at `now` into `due` (cleared first), each
148    /// paired with its consumed skipped-slot carry (`REQ_0840`) and how late
149    /// this dispatch is past the nominal slot it serves (`late_by`,
150    /// `REQ_0106`: the record path back-dates the task's lateness-grid epoch
151    /// by the FIRST dispatch's `late_by`, anchoring the grid at the nominal
152    /// slot — both values are same-clock differences, so they cross the
153    /// scheduling/telemetry clock-domain boundary safely). A due task is
154    /// dispatched exactly once; its target then advances by one period in the
155    /// normal case, or — if the wake was late by ≥1 whole slot — snaps
156    /// closed-form to the next *future* grid point (skip-realign, `ADR_0100`),
157    /// recording the abandoned slot count as carry for the task's NEXT
158    /// dispatch. Never replays a burst of stale cycles, which is wrong for
159    /// cyclic control.
160    pub(crate) fn take_due(&mut self, now: u64, due: &mut Vec<(usize, u64, u64)>) {
161        due.clear();
162        for (i, next) in self.next.iter_mut().enumerate() {
163            if now >= *next {
164                let carry = std::mem::take(&mut self.carry[i]);
165                let period = self.period_ns[i];
166                if period == 0 {
167                    // Unreachable post-registration (rejected per REQ_0268);
168                    // continue skips served/carry bookkeeping intentionally.
169                    due.push((i, carry, 0));
170                    continue;
171                }
172                let stepped = next.saturating_add(period);
173                *next = if stepped > now {
174                    // Normal case: one period ahead is already in the future.
175                    stepped
176                } else {
177                    // Missed >= 1 whole slot: closed-form snap to the next
178                    // future grid point. Dispatch once (above); never burst.
179                    let slots_passed = now.saturating_sub(self.epoch) / period;
180                    let snapped = self
181                        .epoch
182                        .saturating_add(slots_passed.saturating_add(1).saturating_mul(period));
183                    if self.served[i] {
184                        // Abandoned slots strictly after the one served by this
185                        // dispatch, up to (exclusive) the realigned target (REQ_0840).
186                        self.carry[i] = snapped.saturating_sub(stepped) / period;
187                    }
188                    snapped
189                };
190                // In both branches the slot this dispatch SERVES is one
191                // period before the realigned target, so `late_by` is the
192                // same closed form for the normal and the snap path.
193                let late_by = now.saturating_sub(next.saturating_sub(period));
194                due.push((i, carry, late_by));
195                self.served[i] = true;
196            }
197        }
198    }
199}
200
201/// The base tick for a PLC-style master timer: the GCD of all declared cyclic
202/// periods (ns), so every task's period is an integer number of base ticks and
203/// the single timer hits every task's grid point. Returns 0 when there are no
204/// cyclic tasks (caller arms no timer). Zero-valued periods are ignored (they
205/// are rejected at registration, `REQ_0268`).
206// `redundant_pub_crate`: this module is private, so `pub(crate)` looks redundant,
207// but the symbol is consumed by `executor::dispatch_loop` (the master timer).
208// `dead_code`: only the Linux master-timer path calls `base_period`; on non-Linux
209// the `GridTimer` drives dispatch via `next_timeout`, so it is genuinely unused.
210#[allow(clippy::redundant_pub_crate)]
211#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
212pub(crate) fn base_period(periods: &[u64]) -> u64 {
213    periods.iter().copied().filter(|p| *p != 0).fold(0, gcd)
214}
215
216// Called only from `base_period`, so it shares the same non-Linux dead-code fate.
217#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
218const fn gcd(a: u64, b: u64) -> u64 {
219    if b == 0 { a } else { gcd(b, a % b) }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn monotonic_cyclic_clock_is_non_decreasing() {
228        let c = MonotonicCyclicClock::new();
229        let a = c.now_nanos();
230        let b = c.now_nanos();
231        assert!(
232            b >= a,
233            "CLOCK_MONOTONIC must not go backwards: {a} then {b}"
234        );
235    }
236
237    #[test]
238    fn dispatch_mode_default_is_grid_on_linux_legacy_elsewhere() {
239        // Production default: the absolute-grid timerfd path on Linux; the
240        // stable attach_interval fallback on non-Linux dev hosts (REQ_0268).
241        #[cfg(target_os = "linux")]
242        assert_eq!(DispatchMode::default(), DispatchMode::Grid);
243        #[cfg(not(target_os = "linux"))]
244        assert_eq!(DispatchMode::default(), DispatchMode::Legacy);
245    }
246
247    #[test]
248    fn single_period_advances_on_absolute_grid_with_zero_drift() {
249        // period 1000ns, epoch 0. Wake late by a varying jitter each cycle and
250        // confirm the *nominal target* never absorbs that jitter (no drift).
251        let mut t = GridTimer::new(0, vec![1000]);
252        let mut due = Vec::new();
253
254        // Cycle 1: woke at 1005 (5ns late). Due once; next target -> 2000, not 2005.
255        t.take_due(1005, &mut due);
256        assert_eq!(due, vec![(0, 0, 5)]);
257        assert_eq!(t.next_target(0), 2000);
258
259        // Cycle 2: woke at 2012 (12ns late). Due once; next target -> 3000.
260        t.take_due(2012, &mut due);
261        assert_eq!(due, vec![(0, 0, 12)]);
262        assert_eq!(t.next_target(0), 3000);
263
264        // Not yet due at 2999.
265        t.take_due(2999, &mut due);
266        assert_eq!(due, Vec::<(usize, u64, u64)>::new());
267        assert_eq!(t.next_target(0), 3000);
268    }
269
270    #[test]
271    fn stall_skips_whole_slots_and_dispatches_once() {
272        // period 1000, epoch 0. We were starved until 3500 (slots 1,2,3 missed).
273        let mut t = GridTimer::new(0, vec![1000]);
274        let mut due = Vec::new();
275
276        t.take_due(3500, &mut due);
277        // Dispatched exactly once — no burst replay of the 3 missed cycles.
278        assert_eq!(due, vec![(0, 0, 500)]);
279        // Re-aligned to the next *future* slot: floor(3500/1000)+1 = 4 -> 4000.
280        assert_eq!(t.next_target(0), 4000);
281        assert!(
282            t.next_target(0) > 3500,
283            "target must be strictly in the future"
284        );
285    }
286
287    #[test]
288    fn stall_realign_is_exact_on_a_slot_boundary() {
289        let mut t = GridTimer::new(0, vec![1000]);
290        let mut due = Vec::new();
291        // Exactly on slot 3's boundary.
292        t.take_due(3000, &mut due);
293        assert_eq!(due, vec![(0, 0, 0)]);
294        assert_eq!(t.next_target(0), 4000);
295    }
296
297    #[test]
298    // ns form is deliberate: the grid is a nanosecond domain, so timeouts read
299    // clearest in the same unit as the period under test.
300    #[allow(clippy::duration_suboptimal_units)]
301    fn next_timeout_is_distance_to_earliest_target() {
302        let t = GridTimer::new(0, vec![1000]);
303        assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
304        assert_eq!(t.next_timeout(250), Duration::from_nanos(750));
305        // Already past the target -> zero (catch up immediately).
306        assert_eq!(t.next_timeout(1500), Duration::from_nanos(0));
307    }
308
309    #[test]
310    fn empty_grid_next_timeout_is_duration_max() {
311        // No cyclic tasks: the timer must yield `Duration::MAX` exactly so the
312        // WaitSet blocks on fds (event-only executor) instead of issuing a
313        // near-MAX timed wait that overflows to an InternalError.
314        let t = GridTimer::new(0, vec![]);
315        assert_eq!(t.next_timeout(0), Duration::MAX);
316        assert_eq!(t.next_timeout(12_345), Duration::MAX);
317    }
318
319    #[test]
320    fn base_period_is_gcd_of_declared_periods() {
321        assert_eq!(base_period(&[1_000_000]), 1_000_000); // single task → its period
322        assert_eq!(base_period(&[2_000_000, 4_000_000]), 2_000_000); // harmonic → smaller
323        assert_eq!(base_period(&[2_000_000, 3_000_000]), 1_000_000); // coprime → gcd
324        assert_eq!(base_period(&[1_000_000, 1_000_000]), 1_000_000); // duplicates
325        assert_eq!(base_period(&[0, 2_000_000]), 2_000_000); // zero entry ignored
326        assert_eq!(base_period(&[]), 0); // no cyclic tasks
327    }
328
329    #[test]
330    // ns form is deliberate: targets and periods read clearest in the same unit.
331    #[allow(clippy::duration_suboptimal_units)]
332    fn multi_period_picks_earliest_and_coalesces_coincident_slots() {
333        // Two cadences sharing one epoch: 1000ns and 2000ns (harmonic grid).
334        let mut t = GridTimer::new(0, vec![1000, 2000]);
335        let mut due = Vec::new();
336
337        // Earliest target is task0 at 1000.
338        assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
339
340        // At 1000: only the 1ms task is due.
341        t.take_due(1000, &mut due);
342        assert_eq!(due, vec![(0, 0, 0)]);
343
344        // Next earliest: both targets now at 2000.
345        assert_eq!(t.next_timeout(1000), Duration::from_nanos(1000));
346
347        // At 2000: both cadences coincide -> both due in one wake.
348        t.take_due(2000, &mut due);
349        assert_eq!(due, vec![(0, 0, 0), (1, 0, 0)]);
350        assert_eq!(t.next_target(0), 3000);
351        assert_eq!(t.next_target(1), 4000);
352    }
353
354    #[test]
355    fn due_entries_carry_zero_skips_in_steady_state() {
356        let mut t = GridTimer::new(0, vec![1000]);
357        let mut due = Vec::new();
358        t.take_due(1002, &mut due);
359        assert_eq!(due, vec![(0, 0, 2)]);
360        t.take_due(2005, &mut due);
361        assert_eq!(due, vec![(0, 0, 5)]);
362    }
363
364    #[test]
365    fn realign_carries_abandoned_slots_to_the_next_dispatch_exactly_once() {
366        // period 1000, epoch 0. Dispatch on-grid once, then starve.
367        let mut t = GridTimer::new(0, vec![1000]);
368        let mut due = Vec::new();
369        t.take_due(1002, &mut due); // serves slot 1; next -> 2000
370        assert_eq!(due, vec![(0, 0, 2)]);
371
372        // Starved until 3500: serves the overdue slot-2 target, realigns to 4000.
373        // Slot 3 (t=3000) is abandoned: carry = (4000 - 3000) / 1000 = 1.
374        // The starved dispatch itself reports 0 — backward-looking semantics:
375        // nothing was passed over between slot 1 (previous) and slot 2 (this).
376        t.take_due(3500, &mut due);
377        assert_eq!(due, vec![(0, 0, 500)]);
378        assert_eq!(t.next_target(0), 4000);
379
380        // The NEXT dispatch consumes the carry (slot 2 -> slot 4 skipped slot 3).
381        t.take_due(4000, &mut due);
382        assert_eq!(due, vec![(0, 1, 0)]);
383
384        // Consumed exactly once: back to 0 afterwards.
385        t.take_due(5000, &mut due);
386        assert_eq!(due, vec![(0, 0, 0)]);
387    }
388
389    #[test]
390    fn realign_on_a_tasks_first_dispatch_sets_no_carry() {
391        // Starved before the task ever dispatched: slots before the first
392        // dispatch do not exist on the task's own lateness grid (REQ_0840).
393        let mut t = GridTimer::new(0, vec![1000]);
394        let mut due = Vec::new();
395        t.take_due(3500, &mut due); // first dispatch, realigns 2000 -> 4000
396        assert_eq!(due, vec![(0, 0, 500)]);
397        assert_eq!(t.next_target(0), 4000);
398        t.take_due(4000, &mut due); // no carry from the first-dispatch realign
399        assert_eq!(due, vec![(0, 0, 0)]);
400    }
401
402    #[test]
403    fn due_entries_report_late_by_vs_the_last_passed_grid_point() {
404        // `late_by` anchors the task's lateness-grid epoch (REQ_0106): it is
405        // the distance from the dispatch to the most recent grid point at or
406        // before `now`. For a whole-slot miss the abandoned slots are NOT
407        // part of it — they do not exist on the task's own grid (REQ_0840) —
408        // so the anchor lands on the lattice point this dispatch realigns
409        // from, keeping later slots' lateness offset-free.
410        let mut t = GridTimer::new(0, vec![1000]);
411        let mut due = Vec::new();
412        // Late wake within the slot: late vs its own target.
413        t.take_due(1700, &mut due);
414        assert_eq!(due, vec![(0, 0, 700)]);
415        // Whole-slot miss (next=2000 overdue, realign to 4000): late is
416        // measured vs the last passed lattice point (3000), not the overdue
417        // 2000 target.
418        t.take_due(3400, &mut due);
419        assert_eq!(due, vec![(0, 0, 400)]);
420    }
421
422    #[test]
423    fn multi_slot_starvation_carries_the_full_abandoned_count() {
424        let mut t = GridTimer::new(0, vec![1000]);
425        let mut due = Vec::new();
426        t.take_due(1000, &mut due); // first dispatch on-grid; next -> 2000
427        // Starved until 5500: serves slot-2 target, realign to 6000.
428        // Abandoned: slots at 3000, 4000, 5000 -> carry = (6000 - 3000)/1000 = 3.
429        t.take_due(5500, &mut due);
430        assert_eq!(due, vec![(0, 0, 500)]);
431        assert_eq!(t.next_target(0), 6000);
432        t.take_due(6000, &mut due);
433        assert_eq!(due, vec![(0, 3, 0)]);
434    }
435
436    #[test]
437    fn back_to_back_realigns_hand_over_carry_without_loss_or_doubling() {
438        // Pins the mem::take-then-reassign ordering across two consecutive
439        // starvations: the first realign's carry must be consumed by the second
440        // starvation's due entry (not stale/doubled), and the second realign's
441        // fresh carry must be delivered on the very next on-grid wake.
442        // epoch 0, period 1000.
443        let mut t = GridTimer::new(0, vec![1000]);
444        let mut due = Vec::new();
445
446        // Dispatch on-grid: serves slot 1 (target 1000); next -> 2000.
447        t.take_due(1000, &mut due);
448        assert_eq!(due, vec![(0, 0, 0)]);
449
450        // First starvation: now=4500 >= next=2000.
451        //   stepped = 2000 + 1000 = 3000; 3000 <= 4500 -> realign.
452        //   slots_passed = 4500/1000 = 4; snapped = (4+1)*1000 = 5000.
453        //   carry = (5000 - 3000) / 1000 = 2.  Abandoned: slots 3000, 4000.
454        //   Due entry reports 0 (backward-looking; nothing before slot-2 skipped).
455        t.take_due(4500, &mut due);
456        assert_eq!(due, vec![(0, 0, 500)]);
457        assert_eq!(t.next_target(0), 5000);
458
459        // Second starvation BEFORE on-grid wake: now=7500 >= next=5000.
460        //   mem::take delivers the first realign's carry (2) in this due entry.
461        //   stepped = 5000 + 1000 = 6000; 6000 <= 7500 -> realign.
462        //   slots_passed = 7500/1000 = 7; snapped = (7+1)*1000 = 8000.
463        //   carry = (8000 - 6000) / 1000 = 2.  (Second realign's fresh carry.)
464        t.take_due(7500, &mut due);
465        assert_eq!(due, vec![(0, 2, 500)]);
466        assert_eq!(t.next_target(0), 8000);
467
468        // On-grid wake: consumes the second realign's carry, then drains to 0.
469        t.take_due(8000, &mut due);
470        assert_eq!(due, vec![(0, 2, 0)]);
471        t.take_due(9000, &mut due);
472        assert_eq!(due, vec![(0, 0, 0)]);
473    }
474}