taktora_executor/grid.rs
1//! Absolute-grid cyclic dispatch: the scheduling time source (`CyclicClock`),
2//! the dispatch-mode toggle (`DispatchMode`), and the pure `GridTimer` state
3//! machine for `REQ_0268` / `ADR_0100`.
4//!
5//! This module is deliberately free of iceoryx2 and of the telemetry
6//! `MonotonicClock`: scheduling time is a *distinct* role from telemetry
7//! measurement, so a test telemetry clock can never alter dispatch timing.
8
9use std::time::{Duration, Instant};
10
11/// Monotonic nanosecond time source used for **scheduling** cyclic dispatch.
12///
13/// Distinct from [`crate::MonotonicClock`] (telemetry) by design: the type
14/// distinction guarantees a telemetry mock can never be wired as the scheduler.
15/// A future fieldbus distributed-clock source is just another implementation.
16pub trait CyclicClock: Send + Sync + 'static {
17 /// Nanoseconds since this clock's epoch. Monotonic non-decreasing.
18 fn now_nanos(&self) -> u64;
19}
20
21/// Production scheduling clock over `CLOCK_MONOTONIC` (via `Instant`).
22#[derive(Debug)]
23pub struct MonotonicCyclicClock {
24 epoch: Instant,
25}
26
27impl MonotonicCyclicClock {
28 /// Construct a clock whose epoch is the current instant.
29 #[must_use]
30 pub fn new() -> Self {
31 Self {
32 epoch: Instant::now(),
33 }
34 }
35}
36
37impl Default for MonotonicCyclicClock {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl CyclicClock for MonotonicCyclicClock {
44 fn now_nanos(&self) -> u64 {
45 u64::try_from(self.epoch.elapsed().as_nanos()).unwrap_or(u64::MAX)
46 }
47}
48
49/// Cyclic dispatch timing strategy.
50///
51/// `Grid` is the absolute-grid timer of `REQ_0268`; `Legacy` is the pre-fix
52/// `attach_interval` path. The [`Default`] is **platform-conditional**: `Grid`
53/// on Linux (the production absolute-grid `timerfd` path), `Legacy` on non-Linux
54/// dev hosts. On non-Linux `Grid` is only a self-computed-`epoll`-timeout
55/// fallback — not the real-time target — and its millisecond-rounding jitter
56/// makes tight timing tests flaky on loaded CI, so the stable `attach_interval`
57/// path is the better default there. The Linux production behaviour is unchanged.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub enum DispatchMode {
60 /// Self-computed absolute grid; the production default on Linux.
61 Grid,
62 /// iceoryx2 `attach_interval` relative timer; the default on non-Linux dev
63 /// hosts (and the opt-in legacy path on Linux).
64 Legacy,
65}
66
67impl Default for DispatchMode {
68 fn default() -> Self {
69 if cfg!(target_os = "linux") {
70 Self::Grid
71 } else {
72 Self::Legacy
73 }
74 }
75}
76
77/// Pure absolute-grid timer. Holds one nominal target per cyclic task; advances
78/// each target by exactly one period per dispatch so lateness never compounds.
79///
80/// No clock and no I/O: callers pass `now` (read from a [`CyclicClock`]) in, so
81/// the whole state machine is deterministic and unit-testable.
82//
83// `redundant_pub_crate` is allowed: `GridTimer` lives in this private module
84// and is driven from `dispatch_loop`, so a `pub(crate)` type here reads as
85// redundant under clippy. Every field is read — `epoch` by `take_due`
86// (skip-realign), `period_ns`/`next` by the dispatch loop.
87#[allow(clippy::redundant_pub_crate)]
88#[derive(Debug)]
89pub(crate) struct GridTimer {
90 /// Scheduling epoch (ns), sampled once at dispatch-loop entry.
91 epoch: u64,
92 /// Per cyclic task period (ns); index-aligned with `next`. All cadences
93 /// share `epoch`, so every period phase-aligns at the epoch (harmonic grid).
94 period_ns: Vec<u64>,
95 /// Per cyclic task next absolute grid target (ns); `epoch + slot·period`.
96 next: Vec<u64>,
97}
98
99impl GridTimer {
100 /// `epoch` = scheduling `now_nanos()` at dispatch entry; one `period` per
101 /// cyclic task. First target for task *k* is `epoch + period_k`.
102 pub(crate) fn new(epoch: u64, periods: Vec<u64>) -> Self {
103 let next = periods.iter().map(|p| epoch.saturating_add(*p)).collect();
104 Self {
105 epoch,
106 period_ns: periods,
107 next,
108 }
109 }
110
111 /// Time to sleep until the earliest pending grid target (zero if already
112 /// due — a zero `epoll` timeout polls and catches up).
113 //
114 // Used only on the non-Linux self-computed-timeout path: on Linux the master
115 // timerfd owns the wake (the wait blocks with `Duration::MAX`), so this is
116 // dead there (REQ_0268 / ADR_0100).
117 #[cfg_attr(target_os = "linux", allow(dead_code))]
118 pub(crate) fn next_timeout(&self, now: u64) -> Duration {
119 // No cyclic targets → no grid-driven wakeup. Return `Duration::MAX`
120 // exactly (not `u64::MAX` nanos): the WaitSet treats `Duration::MAX`
121 // as "block indefinitely on fds" and dispatches a near-MAX `timed_wait`
122 // that overflows to `WaitSetRunError::InternalError`. This keeps an
123 // event-only executor blocking on its fds identically to Legacy.
124 let Some(earliest) = self.next.iter().copied().min() else {
125 return Duration::MAX;
126 };
127 Duration::from_nanos(earliest.saturating_sub(now))
128 }
129
130 /// Current nominal target for task `i` (test/inspection helper).
131 #[cfg(test)]
132 pub(crate) fn next_target(&self, i: usize) -> u64 {
133 self.next[i]
134 }
135
136 /// Collect cyclic tasks due at `now` into `due` (cleared first). A due task
137 /// is dispatched exactly once; its target then advances by one period in the
138 /// normal case, or — if the wake was late by ≥1 whole slot — snaps closed-form
139 /// to the next *future* grid point (skip-realign, `ADR_0100`). Never replays a
140 /// burst of stale cycles, which is wrong for cyclic control.
141 pub(crate) fn take_due(&mut self, now: u64, due: &mut Vec<usize>) {
142 due.clear();
143 for (i, next) in self.next.iter_mut().enumerate() {
144 if now >= *next {
145 due.push(i);
146 let period = self.period_ns[i];
147 if period == 0 {
148 continue;
149 }
150 let stepped = next.saturating_add(period);
151 *next = if stepped > now {
152 // Normal case: one period ahead is already in the future.
153 stepped
154 } else {
155 // Missed >= 1 whole slot: closed-form snap to the next
156 // future grid point. Dispatch once (above); never burst.
157 let slots_passed = now.saturating_sub(self.epoch) / period;
158 self.epoch
159 .saturating_add(slots_passed.saturating_add(1).saturating_mul(period))
160 };
161 }
162 }
163 }
164}
165
166/// The base tick for a PLC-style master timer: the GCD of all declared cyclic
167/// periods (ns), so every task's period is an integer number of base ticks and
168/// the single timer hits every task's grid point. Returns 0 when there are no
169/// cyclic tasks (caller arms no timer). Zero-valued periods are ignored (they
170/// are rejected at registration, `REQ_0268`).
171// `redundant_pub_crate`: this module is private, so `pub(crate)` looks redundant,
172// but the symbol is consumed by `executor::dispatch_loop` (the master timer).
173// `dead_code`: only the Linux master-timer path calls `base_period`; on non-Linux
174// the `GridTimer` drives dispatch via `next_timeout`, so it is genuinely unused.
175#[allow(clippy::redundant_pub_crate)]
176#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
177pub(crate) fn base_period(periods: &[u64]) -> u64 {
178 periods.iter().copied().filter(|p| *p != 0).fold(0, gcd)
179}
180
181// Called only from `base_period`, so it shares the same non-Linux dead-code fate.
182#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
183const fn gcd(a: u64, b: u64) -> u64 {
184 if b == 0 { a } else { gcd(b, a % b) }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn monotonic_cyclic_clock_is_non_decreasing() {
193 let c = MonotonicCyclicClock::new();
194 let a = c.now_nanos();
195 let b = c.now_nanos();
196 assert!(
197 b >= a,
198 "CLOCK_MONOTONIC must not go backwards: {a} then {b}"
199 );
200 }
201
202 #[test]
203 fn dispatch_mode_default_is_grid_on_linux_legacy_elsewhere() {
204 // Production default: the absolute-grid timerfd path on Linux; the
205 // stable attach_interval fallback on non-Linux dev hosts (REQ_0268).
206 #[cfg(target_os = "linux")]
207 assert_eq!(DispatchMode::default(), DispatchMode::Grid);
208 #[cfg(not(target_os = "linux"))]
209 assert_eq!(DispatchMode::default(), DispatchMode::Legacy);
210 }
211
212 #[test]
213 fn single_period_advances_on_absolute_grid_with_zero_drift() {
214 // period 1000ns, epoch 0. Wake late by a varying jitter each cycle and
215 // confirm the *nominal target* never absorbs that jitter (no drift).
216 let mut t = GridTimer::new(0, vec![1000]);
217 let mut due = Vec::new();
218
219 // Cycle 1: woke at 1005 (5ns late). Due once; next target -> 2000, not 2005.
220 t.take_due(1005, &mut due);
221 assert_eq!(due, vec![0]);
222 assert_eq!(t.next_target(0), 2000);
223
224 // Cycle 2: woke at 2012 (12ns late). Due once; next target -> 3000.
225 t.take_due(2012, &mut due);
226 assert_eq!(due, vec![0]);
227 assert_eq!(t.next_target(0), 3000);
228
229 // Not yet due at 2999.
230 t.take_due(2999, &mut due);
231 assert_eq!(due, Vec::<usize>::new());
232 assert_eq!(t.next_target(0), 3000);
233 }
234
235 #[test]
236 fn stall_skips_whole_slots_and_dispatches_once() {
237 // period 1000, epoch 0. We were starved until 3500 (slots 1,2,3 missed).
238 let mut t = GridTimer::new(0, vec![1000]);
239 let mut due = Vec::new();
240
241 t.take_due(3500, &mut due);
242 // Dispatched exactly once — no burst replay of the 3 missed cycles.
243 assert_eq!(due, vec![0]);
244 // Re-aligned to the next *future* slot: floor(3500/1000)+1 = 4 -> 4000.
245 assert_eq!(t.next_target(0), 4000);
246 assert!(
247 t.next_target(0) > 3500,
248 "target must be strictly in the future"
249 );
250 }
251
252 #[test]
253 fn stall_realign_is_exact_on_a_slot_boundary() {
254 let mut t = GridTimer::new(0, vec![1000]);
255 let mut due = Vec::new();
256 // Exactly on slot 3's boundary.
257 t.take_due(3000, &mut due);
258 assert_eq!(due, vec![0]);
259 assert_eq!(t.next_target(0), 4000);
260 }
261
262 #[test]
263 // ns form is deliberate: the grid is a nanosecond domain, so timeouts read
264 // clearest in the same unit as the period under test.
265 #[allow(clippy::duration_suboptimal_units)]
266 fn next_timeout_is_distance_to_earliest_target() {
267 let t = GridTimer::new(0, vec![1000]);
268 assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
269 assert_eq!(t.next_timeout(250), Duration::from_nanos(750));
270 // Already past the target -> zero (catch up immediately).
271 assert_eq!(t.next_timeout(1500), Duration::from_nanos(0));
272 }
273
274 #[test]
275 fn empty_grid_next_timeout_is_duration_max() {
276 // No cyclic tasks: the timer must yield `Duration::MAX` exactly so the
277 // WaitSet blocks on fds (event-only executor) instead of issuing a
278 // near-MAX timed wait that overflows to an InternalError.
279 let t = GridTimer::new(0, vec![]);
280 assert_eq!(t.next_timeout(0), Duration::MAX);
281 assert_eq!(t.next_timeout(12_345), Duration::MAX);
282 }
283
284 #[test]
285 fn base_period_is_gcd_of_declared_periods() {
286 assert_eq!(base_period(&[1_000_000]), 1_000_000); // single task → its period
287 assert_eq!(base_period(&[2_000_000, 4_000_000]), 2_000_000); // harmonic → smaller
288 assert_eq!(base_period(&[2_000_000, 3_000_000]), 1_000_000); // coprime → gcd
289 assert_eq!(base_period(&[1_000_000, 1_000_000]), 1_000_000); // duplicates
290 assert_eq!(base_period(&[0, 2_000_000]), 2_000_000); // zero entry ignored
291 assert_eq!(base_period(&[]), 0); // no cyclic tasks
292 }
293
294 #[test]
295 // ns form is deliberate: targets and periods read clearest in the same unit.
296 #[allow(clippy::duration_suboptimal_units)]
297 fn multi_period_picks_earliest_and_coalesces_coincident_slots() {
298 // Two cadences sharing one epoch: 1000ns and 2000ns (harmonic grid).
299 let mut t = GridTimer::new(0, vec![1000, 2000]);
300 let mut due = Vec::new();
301
302 // Earliest target is task0 at 1000.
303 assert_eq!(t.next_timeout(0), Duration::from_nanos(1000));
304
305 // At 1000: only the 1ms task is due.
306 t.take_due(1000, &mut due);
307 assert_eq!(due, vec![0]);
308
309 // Next earliest: both targets now at 2000.
310 assert_eq!(t.next_timeout(1000), Duration::from_nanos(1000));
311
312 // At 2000: both cadences coincide -> both due in one wake.
313 t.take_due(2000, &mut due);
314 assert_eq!(due, vec![0, 1]);
315 assert_eq!(t.next_target(0), 3000);
316 assert_eq!(t.next_target(1), 4000);
317 }
318}