limen_core/scheduling.rs
1//! Scheduling traits: readiness and dequeue policy (EDF hooks).
2//!
3//! How a runtime composes `NodeSummary` (contract):
4//!
5//! - `readiness`
6//! * Compute from **input occupancies** and **output pressure**.
7//! * If all inputs empty → `NotReady`.
8//! * If any input non-empty AND any output watermark ≥ soft → `ReadyUnderPressure`.
9//! * If any input non-empty AND all outputs below soft → `Ready`.
10//!
11//! - `backpressure`
12//! * Take the **max** watermark over all outputs (BelowSoft < BetweenSoftAndHard < AtOrAboveHard).
13//!
14//! - `earliest_deadline`
15//! * If any input has an absolute deadline → use the **minimum** of present deadlines.
16//! * Else if `NodePolicy.deadline.default_deadline_ns` is `Some(d)` → synthesize as `now + d`.
17//! * Else → `None`.
18//!
19//! Notes:
20//! * Watermarks come from edge occupancy snapshots (`EdgeOccupancy`), which the runtime fills
21//! via `GraphApi::write_all_edge_occupancies` and refreshes via `refresh_occupancies_for_node`.
22//! * Dequeue strategies (FIFO, EDF, QoS-weighted) live in runtimes; `DequeuePolicy` stays unchanged.
23
24use crate::node::StepResult;
25use crate::policy::WatermarkState;
26use crate::types::{DeadlineNs, NodeIndex, Ticks};
27
28/// Readiness level derived from inputs and backpressure state.
29///
30/// Contract followed by runtimes:
31/// - `NotReady`: all inputs empty (no work).
32/// - `ReadyUnderPressure`: some input available AND at least one output watermark is
33/// `BetweenSoftAndHard` or `AtOrAboveHard`.
34/// - `Ready`: some input available AND all outputs are `BelowSoft`.
35#[non_exhaustive]
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Readiness {
38 /// No input available or blocked by backpressure.
39 NotReady,
40 /// Input(s) available but under pressure.
41 ReadyUnderPressure,
42 /// Input(s) available and not under pressure.
43 Ready,
44}
45
46/// A minimal summary of a node for scheduling decisions.
47///
48/// - `index`: the node identifier in the graph.
49/// - `earliest_deadline`: minimum absolute deadline among ready inputs, or synthesized `now + default_deadline_ns`
50/// if policy provides a default and inputs lack deadlines; otherwise `None`.
51/// - `readiness`: derived from input availability and output pressure (see `Readiness` contract above).
52/// - `backpressure`: the maximum watermark state across all outputs.
53#[non_exhaustive]
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub struct NodeSummary {
56 /// Index of the node in the graph.
57 index: NodeIndex,
58 /// Earliest absolute deadline (if any) among its ready inputs.
59 earliest_deadline: Option<DeadlineNs>,
60 /// Readiness level.
61 readiness: Readiness,
62 /// Current backpressure state from outputs.
63 backpressure: WatermarkState,
64}
65
66impl NodeSummary {
67 /// Constructs a new `NodeSummary` from the provided fields.
68 pub const fn new(
69 index: NodeIndex,
70 earliest_deadline: Option<DeadlineNs>,
71 readiness: Readiness,
72 backpressure: WatermarkState,
73 ) -> Self {
74 Self {
75 index,
76 earliest_deadline,
77 readiness,
78 backpressure,
79 }
80 }
81
82 /// Index of the node in the graph.
83 #[inline]
84 pub const fn index(&self) -> &NodeIndex {
85 &self.index
86 }
87
88 /// Earliest absolute deadline (if any) among its ready inputs.
89 #[inline]
90 pub const fn earliest_deadline(&self) -> &Option<DeadlineNs> {
91 &self.earliest_deadline
92 }
93
94 /// Readiness level.
95 #[inline]
96 pub const fn readiness(&self) -> &Readiness {
97 &self.readiness
98 }
99
100 /// Current backpressure state from outputs.
101 #[inline]
102 pub const fn backpressure(&self) -> &WatermarkState {
103 &self.backpressure
104 }
105}
106
107/// A dequeue policy selects the next node to run from a set of summaries.
108pub trait DequeuePolicy {
109 /// Select the next node index to step, or `None` if none should run.
110 fn select_next(&mut self, candidates: &[NodeSummary]) -> Option<NodeIndex>;
111}
112
113// ---------------------------------------------------------------------------
114// Concurrent worker scheduling
115// ---------------------------------------------------------------------------
116
117/// Scheduling decision for a concurrent worker.
118///
119/// Returned by [`WorkerScheduler::decide`] to control per-worker stepping in
120/// scoped concurrent execution.
121///
122/// `#[non_exhaustive]` — future variants will be added as planned work lands:
123/// - `StepBatch(usize)` (C1 batch semantics — step N messages in one call)
124/// - `StepWithBudget { max_micros: u64 }` (R11 execution measurement)
125#[non_exhaustive]
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum WorkerDecision {
128 /// Step the node once.
129 Step,
130 /// Wait before calling `decide()` again. Duration in microseconds.
131 WaitMicros(u64),
132 /// This worker should exit.
133 Stop,
134}
135
136/// Per-worker state snapshot for scheduling decisions.
137///
138/// Populated by the codegen-generated worker loop before each
139/// [`WorkerScheduler::decide`] call. Edge occupancy is queried from concrete
140/// edge types (no dyn dispatch) and summarised here.
141///
142/// **Extensibility model:** `#[non_exhaustive]` — fields will be added as
143/// planned work lands. The codegen loop queries edges using concrete types
144/// (no dyn) and populates this snapshot. The scheduler never touches edges
145/// directly — it only sees the snapshot. New edge capabilities
146/// (peek_header, mailbox, urgency) are exposed by adding fields here, with
147/// the codegen loop doing the actual querying.
148///
149/// **Planned extensions (one field per planned item):**
150/// - `earliest_deadline: Option<DeadlineNs>` (R1 — from peeked input headers)
151/// - `max_input_urgency: Option<Urgency>` (R1 — highest urgency among inputs)
152/// - `inputs_fresh: bool` (R2 — all inputs within max_age threshold)
153/// - `input_liveness_ok: bool` (R5 — all inputs within liveness threshold)
154/// - `micros_since_last_step: u64` (R6 — for node liveness violation detection)
155/// - `criticality: CriticalityClass` (R8 — node's criticality tier)
156/// - `input_count: usize` / `inputs_ready_count: usize` (C2 — N→M per-port)
157#[non_exhaustive]
158#[derive(Debug, Clone, Copy)]
159pub struct WorkerState {
160 /// Index of this node in the graph.
161 pub node_index: usize,
162 /// Total number of nodes in the graph.
163 pub node_count: usize,
164 /// Current clock tick (monotonic). Enables freshness/liveness calculations
165 /// without the scheduler needing a clock reference.
166 pub current_tick: Ticks,
167 /// Readiness derived from input availability and output pressure.
168 pub readiness: Readiness,
169 /// Max output backpressure (watermark across all outputs).
170 pub backpressure: WatermarkState,
171 /// Result of the last step, if any.
172 pub last_step: Option<StepResult>,
173 /// Whether the last step errored (NodeError details go to telemetry).
174 pub last_error: bool,
175}
176
177impl WorkerState {
178 /// Construct a new initial worker state for a given node.
179 #[inline]
180 pub const fn new(node_index: usize, node_count: usize, current_tick: Ticks) -> Self {
181 Self {
182 node_index,
183 node_count,
184 current_tick,
185 readiness: Readiness::NotReady,
186 backpressure: WatermarkState::BelowSoft,
187 last_step: None,
188 last_error: false,
189 }
190 }
191}
192
193/// Per-worker scheduling for concurrent execution.
194///
195/// Called from worker threads via **static dispatch** — the concrete scheduler
196/// type is a generic parameter on `ScopedGraphApi::run_scoped`, so no `dyn`.
197///
198/// Sequential runtimes use [`DequeuePolicy`] (centralized, one node per tick).
199/// Concurrent runtimes use `WorkerScheduler` (per-worker, one decision per step).
200pub trait WorkerScheduler: Send + Sync {
201 /// Decide what this worker should do next.
202 fn decide(&self, state: &WorkerState) -> WorkerDecision;
203}