Skip to main content

limen_core/
scheduling.rs

1//! Scheduling traits: readiness and dequeue policy (EDF hooks).
2//!
3//! How a runtime composes `NodeSummary` (contract):
4//!
5//! - `readiness`
6//!   * Compute from **input occupancies** and **output pressure**.
7//!   * If all inputs empty → `NotReady`.
8//!   * If any input non-empty AND any output watermark ≥ soft → `ReadyUnderPressure`.
9//!   * If any input non-empty AND all outputs below soft → `Ready`.
10//!
11//! - `backpressure`
12//!   * Take the **max** watermark over all outputs (BelowSoft < BetweenSoftAndHard < AtOrAboveHard).
13//!
14//! - `earliest_deadline`
15//!   * If any input has an absolute deadline → use the **minimum** of present deadlines.
16//!   * Else if `NodePolicy.deadline.default_deadline_ns` is `Some(d)` → synthesize as `now + d`.
17//!   * Else → `None`.
18//!
19//! Notes:
20//! * Watermarks come from edge occupancy snapshots (`EdgeOccupancy`), which the runtime fills
21//!   via `GraphApi::write_all_edge_occupancies` and refreshes via `refresh_occupancies_for_node`.
22//! * Dequeue strategies (FIFO, EDF, QoS-weighted) live in runtimes; `DequeuePolicy` stays unchanged.
23
24use crate::node::StepResult;
25use crate::policy::WatermarkState;
26use crate::types::{DeadlineNs, NodeIndex, Ticks};
27
28/// Readiness level derived from inputs and backpressure state.
29///
30/// Contract followed by runtimes:
31/// - `NotReady`: all inputs empty (no work).
32/// - `ReadyUnderPressure`: some input available AND at least one output watermark is
33///   `BetweenSoftAndHard` or `AtOrAboveHard`.
34/// - `Ready`: some input available AND all outputs are `BelowSoft`.
35#[non_exhaustive]
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Readiness {
38    /// No input available or blocked by backpressure.
39    NotReady,
40    /// Input(s) available but under pressure.
41    ReadyUnderPressure,
42    /// Input(s) available and not under pressure.
43    Ready,
44}
45
46/// A minimal summary of a node for scheduling decisions.
47///
48/// - `index`: the node identifier in the graph.
49/// - `earliest_deadline`: minimum absolute deadline among ready inputs, or synthesized `now + default_deadline_ns`
50///   if policy provides a default and inputs lack deadlines; otherwise `None`.
51/// - `readiness`: derived from input availability and output pressure (see `Readiness` contract above).
52/// - `backpressure`: the maximum watermark state across all outputs.
53#[non_exhaustive]
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub struct NodeSummary {
56    /// Index of the node in the graph.
57    index: NodeIndex,
58    /// Earliest absolute deadline (if any) among its ready inputs.
59    earliest_deadline: Option<DeadlineNs>,
60    /// Readiness level.
61    readiness: Readiness,
62    /// Current backpressure state from outputs.
63    backpressure: WatermarkState,
64}
65
66impl NodeSummary {
67    /// Constructs a new `NodeSummary` from the provided fields.
68    pub const fn new(
69        index: NodeIndex,
70        earliest_deadline: Option<DeadlineNs>,
71        readiness: Readiness,
72        backpressure: WatermarkState,
73    ) -> Self {
74        Self {
75            index,
76            earliest_deadline,
77            readiness,
78            backpressure,
79        }
80    }
81
82    /// Index of the node in the graph.
83    #[inline]
84    pub const fn index(&self) -> &NodeIndex {
85        &self.index
86    }
87
88    /// Earliest absolute deadline (if any) among its ready inputs.
89    #[inline]
90    pub const fn earliest_deadline(&self) -> &Option<DeadlineNs> {
91        &self.earliest_deadline
92    }
93
94    /// Readiness level.
95    #[inline]
96    pub const fn readiness(&self) -> &Readiness {
97        &self.readiness
98    }
99
100    /// Current backpressure state from outputs.
101    #[inline]
102    pub const fn backpressure(&self) -> &WatermarkState {
103        &self.backpressure
104    }
105}
106
107/// A dequeue policy selects the next node to run from a set of summaries.
108pub trait DequeuePolicy {
109    /// Select the next node index to step, or `None` if none should run.
110    fn select_next(&mut self, candidates: &[NodeSummary]) -> Option<NodeIndex>;
111}
112
113// ---------------------------------------------------------------------------
114// Concurrent worker scheduling
115// ---------------------------------------------------------------------------
116
117/// Scheduling decision for a concurrent worker.
118///
119/// Returned by [`WorkerScheduler::decide`] to control per-worker stepping in
120/// scoped concurrent execution.
121///
122/// `#[non_exhaustive]` — future variants will be added as planned work lands:
123/// - `StepBatch(usize)` (C1 batch semantics — step N messages in one call)
124/// - `StepWithBudget { max_micros: u64 }` (R11 execution measurement)
125#[non_exhaustive]
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum WorkerDecision {
128    /// Step the node once.
129    Step,
130    /// Wait before calling `decide()` again. Duration in microseconds.
131    WaitMicros(u64),
132    /// This worker should exit.
133    Stop,
134}
135
136/// Per-worker state snapshot for scheduling decisions.
137///
138/// Populated by the codegen-generated worker loop before each
139/// [`WorkerScheduler::decide`] call. Edge occupancy is queried from concrete
140/// edge types (no dyn dispatch) and summarised here.
141///
142/// **Extensibility model:** `#[non_exhaustive]` — fields will be added as
143/// planned work lands. The codegen loop queries edges using concrete types
144/// (no dyn) and populates this snapshot. The scheduler never touches edges
145/// directly — it only sees the snapshot. New edge capabilities
146/// (peek_header, mailbox, urgency) are exposed by adding fields here, with
147/// the codegen loop doing the actual querying.
148///
149/// **Planned extensions (one field per planned item):**
150/// - `earliest_deadline: Option<DeadlineNs>` (R1 — from peeked input headers)
151/// - `max_input_urgency: Option<Urgency>` (R1 — highest urgency among inputs)
152/// - `inputs_fresh: bool` (R2 — all inputs within max_age threshold)
153/// - `input_liveness_ok: bool` (R5 — all inputs within liveness threshold)
154/// - `micros_since_last_step: u64` (R6 — for node liveness violation detection)
155/// - `criticality: CriticalityClass` (R8 — node's criticality tier)
156/// - `input_count: usize` / `inputs_ready_count: usize` (C2 — N→M per-port)
157#[non_exhaustive]
158#[derive(Debug, Clone, Copy)]
159pub struct WorkerState {
160    /// Index of this node in the graph.
161    pub node_index: usize,
162    /// Total number of nodes in the graph.
163    pub node_count: usize,
164    /// Current clock tick (monotonic). Enables freshness/liveness calculations
165    /// without the scheduler needing a clock reference.
166    pub current_tick: Ticks,
167    /// Readiness derived from input availability and output pressure.
168    pub readiness: Readiness,
169    /// Max output backpressure (watermark across all outputs).
170    pub backpressure: WatermarkState,
171    /// Result of the last step, if any.
172    pub last_step: Option<StepResult>,
173    /// Whether the last step errored (NodeError details go to telemetry).
174    pub last_error: bool,
175}
176
177impl WorkerState {
178    /// Construct a new initial worker state for a given node.
179    #[inline]
180    pub const fn new(node_index: usize, node_count: usize, current_tick: Ticks) -> Self {
181        Self {
182            node_index,
183            node_count,
184            current_tick,
185            readiness: Readiness::NotReady,
186            backpressure: WatermarkState::BelowSoft,
187            last_step: None,
188            last_error: false,
189        }
190    }
191}
192
193/// Per-worker scheduling for concurrent execution.
194///
195/// Called from worker threads via **static dispatch** — the concrete scheduler
196/// type is a generic parameter on `ScopedGraphApi::run_scoped`, so no `dyn`.
197///
198/// Sequential runtimes use [`DequeuePolicy`] (centralized, one node per tick).
199/// Concurrent runtimes use `WorkerScheduler` (per-worker, one decision per step).
200pub trait WorkerScheduler: Send + Sync {
201    /// Decide what this worker should do next.
202    fn decide(&self, state: &WorkerState) -> WorkerDecision;
203}