Skip to main content

taktora_executor/
grid.rs

1//! Absolute-grid cyclic dispatch: the scheduling time source (`CyclicClock`),
2//! the dispatch-mode toggle (`DispatchMode`), and the pure `GridTimer` state
3//! machine for `REQ_0268` / `ADR_0100`.
4//!
5//! This module is deliberately free of iceoryx2 and of the telemetry
6//! `MonotonicClock`: scheduling time is a *distinct* role from telemetry
7//! measurement, so a test telemetry clock can never alter dispatch timing.
8
9use std::time::{Duration, Instant};
10
11/// Monotonic nanosecond time source used for **scheduling** cyclic dispatch.
12///
13/// Distinct from [`crate::MonotonicClock`] (telemetry) by design: the type
14/// distinction guarantees a telemetry mock can never be wired as the scheduler.
15/// A future fieldbus distributed-clock source is just another implementation.
16pub trait CyclicClock: Send + Sync + 'static {
17    /// Nanoseconds since this clock's epoch. Monotonic non-decreasing.
18    fn now_nanos(&self) -> u64;
19}
20
21/// Production scheduling clock over `CLOCK_MONOTONIC` (via `Instant`).
22#[derive(Debug)]
23pub struct MonotonicCyclicClock {
24    epoch: Instant,
25}
26
27impl MonotonicCyclicClock {
28    /// Construct a clock whose epoch is the current instant.
29    #[must_use]
30    pub fn new() -> Self {
31        Self {
32            epoch: Instant::now(),
33        }
34    }
35}
36
37impl Default for MonotonicCyclicClock {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl CyclicClock for MonotonicCyclicClock {
44    fn now_nanos(&self) -> u64 {
45        u64::try_from(self.epoch.elapsed().as_nanos()).unwrap_or(u64::MAX)
46    }
47}
48
49/// Cyclic dispatch timing strategy.
50///
51/// `Grid` is the absolute-grid timer of `REQ_0268`; `Legacy` is the pre-fix
52/// `attach_interval` path. The [`Default`] is **platform-conditional**: `Grid`
53/// on Linux (the production absolute-grid `timerfd` path), `Legacy` on non-Linux
54/// dev hosts. On non-Linux `Grid` is only a self-computed-`epoll`-timeout
55/// fallback — not the real-time target — and its millisecond-rounding jitter
56/// makes tight timing tests flaky on loaded CI, so the stable `attach_interval`
57/// path is the better default there. The Linux production behaviour is unchanged.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub enum DispatchMode {
60    /// Self-computed absolute grid; the production default on Linux.
61    Grid,
62    /// iceoryx2 `attach_interval` relative timer; the default on non-Linux dev
63    /// hosts (and the opt-in legacy path on Linux).
64    Legacy,
65}
66
67impl Default for DispatchMode {
68    fn default() -> Self {
69        if cfg!(target_os = "linux") {
70            Self::Grid
71        } else {
72            Self::Legacy
73        }
74    }
75}
76
77/// Pure absolute-grid timer. Holds one nominal target per cyclic task; advances
78/// each target by exactly one period per dispatch so lateness never compounds.
79///
80/// No clock and no I/O: callers pass `now` (read from a [`CyclicClock`]) in, so
81/// the whole state machine is deterministic and unit-testable.
82//
83// `redundant_pub_crate` is allowed: `GridTimer` lives in this private module
84// and is driven from `dispatch_loop`, so a `pub(crate)` type here reads as
85// redundant under clippy. Every field is read — `epoch` by `take_due`
86// (skip-realign), `period_ns`/`next` by the dispatch loop.
87#[allow(clippy::redundant_pub_crate)]
88#[derive(Debug)]
89pub(crate) struct GridTimer {
90    /// Scheduling epoch (ns), sampled once at dispatch-loop entry.
91    epoch: u64,
92    /// Per cyclic task period (ns); index-aligned with `next`. All cadences
93    /// share `epoch`, so every period phase-aligns at the epoch (harmonic grid).
94    period_ns: Vec<u64>,
95    /// Per cyclic task next absolute grid target (ns); `epoch + slot·period`.
96    next: Vec<u64>,
97}
98
99impl GridTimer {
100    /// `epoch` = scheduling `now_nanos()` at dispatch entry; one `period` per
101    /// cyclic task. First target for task *k* is `epoch + period_k`.
102    pub(crate) fn new(epoch: u64, periods: Vec<u64>) -> Self {
103        let next = periods.iter().map(|p| epoch.saturating_add(*p)).collect();
104        Self {
105            epoch,
106            period_ns: periods,
107            next,
108        }
109    }
110
111    /// Time to sleep until the earliest pending grid target (zero if already
112    /// due — a zero `epoll` timeout polls and catches up).
113    //
114    // Used only on the non-Linux self-computed-timeout path: on Linux the master
115    // timerfd owns the wake (the wait blocks with `Duration::MAX`), so this is
116    // dead there (REQ_0268 / ADR_0100).
117    #[cfg_attr(target_os = "linux", allow(dead_code))]
118    pub(crate) fn next_timeout(&self, now: u64) -> Duration {
119        // No cyclic targets → no grid-driven wakeup. Return `Duration::MAX`
120        // exactly (not `u64::MAX` nanos): the WaitSet treats `Duration::MAX`
121        // as "block indefinitely on fds" and dispatches a near-MAX `timed_wait`
122        // that overflows to `WaitSetRunError::InternalError`. This keeps an
123        // event-only executor blocking on its fds identically to Legacy.
124        let Some(earliest) = self.next.iter().copied().min() else {
125            return Duration::MAX;
126        };
127        Duration::from_nanos(earliest.saturating_sub(now))
128    }
129
130    /// Current nominal target for task `i` (test/inspection helper).
131    #[cfg(test)]
132    pub(crate) fn next_target(&self, i: usize) -> u64 {
133        self.next[i]
134    }
135
136    /// Collect cyclic tasks due at `now` into `due` (cleared first). A due task
137    /// is dispatched exactly once; its target then advances by one period in the
138    /// normal case, or — if the wake was late by ≥1 whole slot — snaps closed-form
139    /// to the next *future* grid point (skip-realign, `ADR_0100`). Never replays a
140    /// burst of stale cycles, which is wrong for cyclic control.
141    pub(crate) fn take_due(&mut self, now: u64, due: &mut Vec<usize>) {
142        due.clear();
143        for (i, next) in self.next.iter_mut().enumerate() {
144            if now >= *next {
145                due.push(i);
146                let period = self.period_ns[i];
147                if period == 0 {
148                    continue;
149                }
150                let stepped = next.saturating_add(period);
151                *next = if stepped > now {
152                    // Normal case: one period ahead is already in the future.
153                    stepped
154                } else {
155                    // Missed >= 1 whole slot: closed-form snap to the next
156                    // future grid point. Dispatch once (above); never burst.
157                    let slots_passed = now.saturating_sub(self.epoch) / period;
158                    self.epoch
159                        .saturating_add(slots_passed.saturating_add(1).saturating_mul(period))
160                };
161            }
162        }
163    }
164}
165
166/// The base tick for a PLC-style master timer: the GCD of all declared cyclic
167/// periods (ns), so every task's period is an integer number of base ticks and
168/// the single timer hits every task's grid point. Returns 0 when there are no
169/// cyclic tasks (caller arms no timer). Zero-valued periods are ignored (they
170/// are rejected at registration, `REQ_0268`).
171// `redundant_pub_crate`: this module is private, so `pub(crate)` looks redundant,
172// but the symbol is consumed by `executor::dispatch_loop` (the master timer).
173// `dead_code`: only the Linux master-timer path calls `base_period`; on non-Linux
174// the `GridTimer` drives dispatch via `next_timeout`, so it is genuinely unused.
175#[allow(clippy::redundant_pub_crate)]
176#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
177pub(crate) fn base_period(periods: &[u64]) -> u64 {
178    periods.iter().copied().filter(|p| *p != 0).fold(0, gcd)
179}
180
181// Called only from `base_period`, so it shares the same non-Linux dead-code fate.
182#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
183const fn gcd(a: u64, b: u64) -> u64 {
184    if b == 0 { a } else { gcd(b, a % b) }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn monotonic_cyclic_clock_is_non_decreasing() {
193        let c = MonotonicCyclicClock::new();
194        let a = c.now_nanos();
195        let b = c.now_nanos();
196        assert!(
197            b >= a,
198            "CLOCK_MONOTONIC must not go backwards: {a} then {b}"
199        );
200    }
201
202    #[test]
203    fn dispatch_mode_default_is_grid_on_linux_legacy_elsewhere() {
204        // Production default: the absolute-grid timerfd path on Linux; the
205        // stable attach_interval fallback on non-Linux dev hosts (REQ_0268).
206        #[cfg(target_os = "linux")]
207        assert_eq!(DispatchMode::default(), DispatchMode::Grid);
208        #[cfg(not(target_os = "linux"))]
209        assert_eq!(DispatchMode::default(), DispatchMode::Legacy);
210    }
211
212    #[test]
213    fn single_period_advances_on_absolute_grid_with_zero_drift() {
214        // period 1000ns, epoch 0. Wake late by a varying jitter each cycle and
215        // confirm the *nominal target* never absorbs that jitter (no drift).
216        let mut t = GridTimer::new(0, vec![1000]);
217        let mut due = Vec::new();
218
219        // Cycle 1: woke at 1005 (5ns late). Due once; next target -> 2000, not 2005.
220        t.take_due(1005, &mut due);
221        assert_eq!(due, vec![0]);
222        assert_eq!(t.next_target(0), 2000);
223
224        // Cycle 2: woke at 2012 (12ns late). Due once; next target -> 3000.
225        t.take_due(2012, &mut due);
226        assert_eq!(due, vec![0]);
227        assert_eq!(t.next_target(0), 3000);
228
229        // Not yet due at 2999.
230        t.take_due(2999, &mut due);
231        assert_eq!(due, Vec::<usize>::new());
232        assert_eq!(t.next_target(0), 3000);
233    }
234
235    #[test]
236    fn stall_skips_whole_slots_and_dispatches_once() {
237        // period 1000, epoch 0. We were starved until 3500 (slots 1,2,3 missed).
238        let mut t = GridTimer::new(0, vec![1000]);
239        let mut due = Vec::new();
240
241        t.take_due(3500, &mut due);
242        // Dispatched exactly once — no burst replay of the 3 missed cycles.
243        assert_eq!(due, vec![0]);
244        // Re-aligned to the next *future* slot: floor(3500/1000)+1 = 4 -> 4000.
245        assert_eq!(t.next_target(0), 4000);
246        assert!(
247            t.next_target(0) > 3500,
248            "target must be strictly in the future"
249        );
250    }
251
252    #[test]
253    fn stall_realign_is_exact_on_a_slot_boundary() {
254        let mut t = GridTimer::new(0, vec![1000]);
255        let mut due = Vec::new();
256        // Exactly on slot 3's boundary.
257        t.take_due(3000, &mut due);
258        assert_eq!(due, vec![0]);
259        assert_eq!(t.next_target(0), 4000);
260    }
261
262    #[test]
263    // ns form is deliberate: the grid is a nanosecond domain, so timeouts read
264    // clearest in the same unit as the period under test.
265    #[allow(clippy::duration_suboptimal_units)]
266    fn next_timeout_is_distance_to_earliest_target() {
267        let t = GridTimer::new(0, vec![1000]);
268        assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
269        assert_eq!(t.next_timeout(250), Duration::from_nanos(750));
270        // Already past the target -> zero (catch up immediately).
271        assert_eq!(t.next_timeout(1500), Duration::from_nanos(0));
272    }
273
274    #[test]
275    fn empty_grid_next_timeout_is_duration_max() {
276        // No cyclic tasks: the timer must yield `Duration::MAX` exactly so the
277        // WaitSet blocks on fds (event-only executor) instead of issuing a
278        // near-MAX timed wait that overflows to an InternalError.
279        let t = GridTimer::new(0, vec![]);
280        assert_eq!(t.next_timeout(0), Duration::MAX);
281        assert_eq!(t.next_timeout(12_345), Duration::MAX);
282    }
283
284    #[test]
285    fn base_period_is_gcd_of_declared_periods() {
286        assert_eq!(base_period(&[1_000_000]), 1_000_000); // single task → its period
287        assert_eq!(base_period(&[2_000_000, 4_000_000]), 2_000_000); // harmonic → smaller
288        assert_eq!(base_period(&[2_000_000, 3_000_000]), 1_000_000); // coprime → gcd
289        assert_eq!(base_period(&[1_000_000, 1_000_000]), 1_000_000); // duplicates
290        assert_eq!(base_period(&[0, 2_000_000]), 2_000_000); // zero entry ignored
291        assert_eq!(base_period(&[]), 0); // no cyclic tasks
292    }
293
294    #[test]
295    // ns form is deliberate: targets and periods read clearest in the same unit.
296    #[allow(clippy::duration_suboptimal_units)]
297    fn multi_period_picks_earliest_and_coalesces_coincident_slots() {
298        // Two cadences sharing one epoch: 1000ns and 2000ns (harmonic grid).
299        let mut t = GridTimer::new(0, vec![1000, 2000]);
300        let mut due = Vec::new();
301
302        // Earliest target is task0 at 1000.
303        assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
304
305        // At 1000: only the 1ms task is due.
306        t.take_due(1000, &mut due);
307        assert_eq!(due, vec![0]);
308
309        // Next earliest: both targets now at 2000.
310        assert_eq!(t.next_timeout(1000), Duration::from_nanos(1000));
311
312        // At 2000: both cadences coincide -> both due in one wake.
313        t.take_due(2000, &mut due);
314        assert_eq!(due, vec![0, 1]);
315        assert_eq!(t.next_target(0), 3000);
316        assert_eq!(t.next_target(1), 4000);
317    }
318}