limen_core/graph.rs
1//! Graph trait hierarchy and typed graph contracts.
2//!
3//! Graphs are fully typed and monomorphized. Node/edge counts are `const` generics;
4//! there is no runtime-length collection in the hot path.
5//!
6//! Key traits:
7//! - [`GraphApi`] — top-level entry point; step + occupancy sampling.
8//! - `ScopedGraphApi` (`std`) — concurrent variant with scoped worker threads.
9//! - [`GraphNodeAccess<I>`] / [`GraphEdgeAccess<E>`] — compile-time indexed access.
10//! - [`GraphNodeTypes<I, IN, OUT>`] — per-node payload and queue type associations.
11//! - [`GraphNodeContextBuilder<I, IN, OUT>`] — factory for [`StepContext`].
12//!
13//! Submodules:
14//! - [`validate`] — [`GraphValidator`] and [`GraphDescBuf`] for descriptor validation.
15//! - `bench` — test graphs (`bench` / `test` feature).
16
17pub mod validate;
18
19#[cfg(any(test, feature = "bench"))]
20pub mod bench;
21
22use crate::node::Node;
23use crate::prelude::{MemoryManager, PlatformClock, Telemetry};
24use crate::{
25 edge::{link::EdgeDescriptor, Edge, EdgeOccupancy},
26 errors::{GraphError, NodeError},
27 graph::validate::{GraphDescBuf, GraphValidator},
28 message::payload::Payload,
29 node::{link::NodeDescriptor, StepContext, StepResult},
30 policy::{EdgePolicy, NodePolicy},
31};
32
33// pub mod builder;
34
35/// Provides indexed access to a graph node.
36///
37/// This trait is implemented by graph container types (e.g. a generated
38/// graph struct) to allow compile-time access to a specific node by index.
39///
40/// # Type Parameters
41/// - `I`: The compile-time index of the node within the graph.
42pub trait GraphNodeAccess<const I: usize> {
43 /// The concrete node type at index `I`.
44 type Node;
45
46 /// Immutable access to the node at index `I`.
47 fn node_ref(&self) -> &Self::Node;
48
49 /// Mutable access to the node at index `I`.
50 fn node_mut(&mut self) -> &mut Self::Node;
51}
52
53/// Provides indexed access to a graph edge.
54///
55/// This trait is implemented by graph container types to allow compile-time
56/// access to a specific edge by index.
57///
58/// # Type Parameters
59/// - `E`: The compile-time index of the edge within the graph.
60pub trait GraphEdgeAccess<const E: usize> {
61 /// The concrete edge type at index `E`.
62 type Edge;
63
64 /// Immutable access to the edge at index `E`.
65 fn edge_ref(&self) -> &Self::Edge;
66
67 /// Mutable access to the edge at index `E`.
68 fn edge_mut(&mut self) -> &mut Self::Edge;
69}
70
71/// Defines per-node compile-time types and arity.
72///
73/// Implemented for each node in the graph by the graph-building macro.
74/// Associates payload types, queue types, and port counts with the node.
75///
76/// # Type Parameters
77/// - `I`: Compile-time index of the node within the graph.
78/// - `IN`: Number of input ports for the node.
79/// - `OUT`: Number of output ports for the node.
80pub trait GraphNodeTypes<const I: usize, const IN: usize, const OUT: usize> {
81 /// Payload type for messages consumed by the node.
82 type InP: Payload;
83
84 /// Payload type for messages produced by the node.
85 type OutP: Payload;
86
87 /// Queue type used for input ports.
88 type InQ: Edge;
89
90 /// Queue type used for output ports.
91 type OutQ: Edge;
92
93 /// Memory manager type for input ports.
94 type InM: MemoryManager<Self::InP>;
95
96 /// Memory manager type for output ports.
97 type OutM: MemoryManager<Self::OutP>;
98}
99
100/// Builder for per-node execution contexts.
101///
102/// This trait is implemented by the graph-building macro. It allows the runtime
103/// to create a [`StepContext`] for a given node using compile-time wiring of its
104/// ports, queues, and policies.
105///
106/// # Type Parameters
107/// - `I`: Compile-time index of the node within the graph.
108/// - `IN`: Number of input ports for the node.
109/// - `OUT`: Number of output ports for the node.
110pub trait GraphNodeContextBuilder<const I: usize, const IN: usize, const OUT: usize>:
111 GraphNodeTypes<I, IN, OUT>
112{
113 /// Construct a [`StepContext`] for node `I`.
114 ///
115 /// # Parameters
116 /// - `clock`: Reference to the runtime clock abstraction.
117 /// - `telemetry`: Mutable reference to the telemetry collector.
118 ///
119 /// # Returns
120 /// A fully wired [`StepContext`] for the node, ready for execution.
121 #[allow(clippy::type_complexity)]
122 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
123 &'graph mut self,
124 clock: &'clock C,
125 telemetry: &'telemetry mut T,
126 ) -> StepContext<
127 'graph,
128 'telemetry,
129 'clock,
130 IN,
131 OUT,
132 <Self as GraphNodeTypes<I, IN, OUT>>::InP,
133 <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
134 <Self as GraphNodeTypes<I, IN, OUT>>::InQ,
135 <Self as GraphNodeTypes<I, IN, OUT>>::OutQ,
136 <Self as GraphNodeTypes<I, IN, OUT>>::InM,
137 <Self as GraphNodeTypes<I, IN, OUT>>::OutM,
138 C,
139 T,
140 >
141 where
142 EdgePolicy: Copy,
143 C: PlatformClock + Sized,
144 T: Telemetry + Sized;
145
146 /// Borrowed handoff: in one `&mut self` borrow, lend both
147 /// `&mut node(I)` and a fully wired `StepContext` to a closure.
148 /// This avoids overlapping `&mut` borrows escaping the call.
149 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
150 &mut self,
151 clock: &'clock C,
152 telemetry: &'telemetry mut T,
153 f: impl FnOnce(
154 &mut <Self as GraphNodeAccess<I>>::Node,
155 &mut StepContext<
156 '_,
157 'telemetry,
158 'clock,
159 IN,
160 OUT,
161 <Self as GraphNodeTypes<I, IN, OUT>>::InP,
162 <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
163 <Self as GraphNodeTypes<I, IN, OUT>>::InQ,
164 <Self as GraphNodeTypes<I, IN, OUT>>::OutQ,
165 <Self as GraphNodeTypes<I, IN, OUT>>::InM,
166 <Self as GraphNodeTypes<I, IN, OUT>>::OutM,
167 C,
168 T,
169 >,
170 ) -> Result<R, E>,
171 ) -> Result<R, E>
172 where
173 Self: GraphNodeAccess<I>,
174 EdgePolicy: Copy,
175 C: PlatformClock + Sized,
176 T: Telemetry + Sized;
177}
178
179/// Unified runtime-facing graph API.
180///
181/// Exposes the minimal surface shared by all runtimes (P0, P1, P2, P2Concurrent).
182/// The `NODE_COUNT` and `EDGE_COUNT` const generics define the compile-time
183/// sizes for node and edge descriptor arrays and occupancy snapshots.
184///
185/// ## Occupancy buffer semantics
186///
187/// - `write_all_edge_occupancies` writes **current** occupancy for **every** edge
188/// into the slot whose index equals that edge’s `EdgeIndex.0`.
189/// It must not depend on pre-existing contents of `out`.
190///
191/// - `refresh_occupancies_for_node` is a **partial, in-place** refresh:
192/// it MUST update only entries for edges incident to node `I` (either upstream
193/// or downstream) and MUST NOT modify any other slots.
194///
195/// If sampling a particular edge fails, implementations should return
196/// `Err(GraphError::OccupancySampleFailed(edge_idx))`.
197pub trait GraphApi<const NODE_COUNT: usize, const EDGE_COUNT: usize> {
198 // ----- Descriptors & validation -----
199
200 /// Returns the static descriptors for all nodes in the graph.
201 ///
202 /// The returned array length must equal `NODE_COUNT` and be consistent with
203 /// the edge descriptors exposed by [`get_edge_descriptors`](Self::get_edge_descriptors).
204 fn get_node_descriptors(&self) -> [NodeDescriptor; NODE_COUNT];
205
206 /// Returns the static descriptors for all edges in the graph.
207 ///
208 /// The returned array length must equal `EDGE_COUNT` and reference only valid
209 /// node indices described by [`get_node_descriptors`](Self::get_node_descriptors).
210 fn get_edge_descriptors(&self) -> [EdgeDescriptor; EDGE_COUNT];
211
212 /// Returns the static `NodePolicy` for every node in the graph.
213 ///
214 /// The returned array length must equal `NODE_COUNT`, and index `i`
215 /// corresponds to `NodeIndex::from(i)`.
216 fn get_node_policies(&self) -> [NodePolicy; NODE_COUNT];
217
218 /// Returns the static `EdgePolicy` for every edge in the graph.
219 ///
220 /// The returned array length must equal `EDGE_COUNT`, and index `e`
221 /// corresponds to `EdgeIndex::from(e)`.
222 fn get_edge_policies(&self) -> [EdgePolicy; EDGE_COUNT];
223
224 /// Validates the graph topology and policies derived from node and edge descriptors.
225 ///
226 /// This checks index bounds, arities, endpoint compatibility, and any static
227 /// policy invariants enforced by `GraphDescBuf::validate`.
228 ///
229 /// # Errors
230 ///
231 /// Returns a [`GraphError`] if the descriptors are inconsistent or violate
232 /// graph-level constraints.
233 #[inline]
234 fn validate_graph(&self) -> Result<(), GraphError> {
235 GraphDescBuf::new(self.get_node_descriptors(), self.get_edge_descriptors()).validate()
236 }
237
238 // ----- Occupancy snapshot helpers -----
239
240 /// Returns a one-shot occupancy snapshot for edge `E`.
241 ///
242 /// Useful for lightweight telemetry or scheduling decisions that only need
243 /// the latest observed queue depth for a specific edge.
244 ///
245 /// # Type Parameters
246 ///
247 /// * `E` — The compile-time edge index in `0..EDGE_COUNT`.
248 ///
249 /// # Errors
250 ///
251 /// Returns a [`GraphError`] if `E` is out of range or the occupancy cannot
252 /// be sampled.
253 fn edge_occupancy_for<const E: usize>(&self) -> Result<EdgeOccupancy, GraphError>;
254
255 /// Write **current** occupancy for **all** edges into `out`.
256 ///
257 /// Contract:
258 /// - Must populate every `out[k]` with the current occupancy for the edge
259 /// whose `EdgeIndex.0 == k`.
260 /// - Must not depend on prior `out` contents.
261 /// - On per-edge sampling failure, return
262 /// `Err(GraphError::OccupancySampleFailed(edge_idx))`.
263 fn write_all_edge_occupancies(
264 &self,
265 out: &mut [EdgeOccupancy; EDGE_COUNT],
266 ) -> Result<(), GraphError>;
267
268 /// **Partial refresh**: update only entries for edges incident to node `I`.
269 ///
270 /// Contract:
271 /// - MUST update `out[k]` iff edge `k` is upstream **or** downstream of node `I`.
272 /// - MUST NOT modify any other `out[k]`.
273 /// - If node `I` has no incident edges, this is a no-op that returns `Ok(())`.
274 /// - On sampling failure for any incident edge, return
275 /// `Err(GraphError::OccupancySampleFailed(edge_idx))`.
276 fn refresh_occupancies_for_node<const I: usize, const IN: usize, const OUT: usize>(
277 &self,
278 out: &mut [EdgeOccupancy; EDGE_COUNT],
279 ) -> Result<(), GraphError>;
280
281 // ----- Generic step-by-index (for P0/P1) -----
282
283 /// Drives a single scheduling step for the node at `index`.
284 ///
285 /// Runtimes P0/P1 use this to advance nodes generically over an abstract
286 /// clock and telemetry sink without requiring node-specific types.
287 ///
288 /// # Parameters
289 ///
290 /// * `index` — The dynamic node index to step.
291 /// * `clock` — A clock-like source used by the node during execution.
292 /// * `telemetry` — A sink for emitting per-step metrics or traces.
293 ///
294 /// # Returns
295 ///
296 /// A [`StepResult`] indicating whether work was performed or the node is idle.
297 ///
298 /// # Errors
299 ///
300 /// Returns a [`NodeError`] if the node fails to execute its step.
301 fn step_node_by_index<C, T>(
302 &mut self,
303 index: usize,
304 clock: &C,
305 telemetry: &mut T,
306 ) -> Result<StepResult, NodeError>
307 where
308 EdgePolicy: Copy,
309 C: PlatformClock + Sized,
310 T: Telemetry + Sized;
311
312 // ----- Optional: static node policy read -----
313
314 /// Returns the static [`NodePolicy`] for node `I` (compile-time index).
315 ///
316 /// This queries the node type directly, without requiring an instance step,
317 /// and is useful for planning or verifying scheduling constraints.
318 ///
319 /// # Type Parameters
320 ///
321 /// * `I` — The compile-time node index in `0..NODE_COUNT`.
322 /// * `IN` — The node’s input arity.
323 /// * `OUT` — The node’s output arity.
324 fn node_policy_for<const I: usize, const IN: usize, const OUT: usize>(&self) -> NodePolicy
325 where
326 Self: GraphNodeAccess<I> + GraphNodeTypes<I, IN, OUT>,
327 <Self as GraphNodeAccess<I>>::Node: Node<
328 IN,
329 OUT,
330 <Self as GraphNodeTypes<I, IN, OUT>>::InP,
331 <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
332 >,
333 {
334 <<Self as GraphNodeAccess<I>>::Node as Node<
335 IN,
336 OUT,
337 <Self as GraphNodeTypes<I, IN, OUT>>::InP,
338 <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
339 >>::policy(<Self as GraphNodeAccess<I>>::node_ref(self))
340 }
341}
342
343/// Opaque, runtime-owned buffer of edge occupancy snapshots.
344///
345/// # Semantics
346/// - This buffer is **owned by the runtime** and passed by mutable reference to
347/// graph APIs that *write into it* (see [`GraphApi::write_all_edge_occupancies`]
348/// and [`GraphApi::refresh_occupancies_for_node`]).
349/// - Each entry is a point-in-time [`EdgeOccupancy`] snapshot for the edge at the
350/// same index as returned by `GraphApi::get_edge_descriptors()`.
351/// - **Writers must not re-order** entries. Writers may update some or all slots,
352/// but any slot not explicitly written must be left untouched.
353///
354/// # Contracts
355/// - `GraphApi::write_all_edge_occupancies(&mut EdgeOccupancyBuf<E>)` **must write**
356/// a fresh value for **every** slot `[0..E)`.
357/// - `GraphApi::refresh_occupancies_for_node<I, IN, OUT>(&mut EdgeOccupancyBuf<E>)`
358/// is a **partial refresh**: it **must only** update slots corresponding to edges
359/// that are upstream or downstream of node `I` and **must not** modify any other
360/// slots.
361///
362/// # Usage
363/// - Runtimes typically allocate one `EdgeOccupancyBuf<E>` and reuse it across
364/// sampling intervals. After a full write, they may call partial refreshes to keep
365/// entries warm for the currently stepped node without touching unrelated edges.
366/// - Consumers should treat the contents as **snapshots** only; values may change
367/// immediately after sampling due to concurrent producers/consumers.
368///
369/// See also: [`EdgeOccupancy`], [`GraphApi`].
370pub type EdgeOccupancyBuf<const E: usize> = [EdgeOccupancy; E];
371
372// ---------------------------------------------------------------------------
373// Concurrent scoped execution
374// ---------------------------------------------------------------------------
375
376/// Extension trait for graphs that support concurrent execution via scoped threads.
377///
378/// The runtime provides a [`WorkerScheduler`] that controls per-worker stepping.
379/// The graph sets up scoped threads and disjoint node borrows; the scheduler
380/// decides when each worker steps, waits, or exits.
381///
382/// This trait is `#[cfg(feature = "std")]` because it requires `std::thread::scope`.
383/// [`GraphApi`] remains `no_std`-compatible.
384///
385/// # Edge and manager handles
386///
387/// Edges must implement [`ScopedEdge`](crate::edge::ScopedEdge) and managers
388/// must implement [`ScopedManager`](crate::memory::manager::ScopedManager) to produce
389/// per-worker handles. Arc-based types (e.g. `ConcurrentEdge`,
390/// `ConcurrentMemoryManager`) return clones; future lock-free types will
391/// return split producer/consumer handles via
392/// [`EdgeHandleKind`](crate::edge::EdgeHandleKind).
393///
394/// # Scheduling model
395///
396/// The codegen-generated implementation:
397/// 1. Obtains per-worker edge and manager handles via `ScopedEdge::scoped_handle`
398/// and `ScopedManager::scoped_handle` (before node borrows)
399/// 2. Takes disjoint `&mut` borrows of each node (tuple field access)
400/// 3. Spawns one scoped thread per node
401/// 4. Each worker loop:
402/// - Queries edge occupancy from concrete types (no dyn dispatch)
403/// - Builds a [`WorkerState`] snapshot (readiness, backpressure, tick, last result)
404/// - Calls `scheduler.decide(&state)` (static dispatch via `S`)
405/// - Acts on the [`WorkerDecision`]: step, wait, or exit
406///
407/// The runtime controls execution policy by choosing which [`WorkerScheduler`]
408/// to pass. Different runtimes can implement different strategies (backoff,
409/// EDF, throughput, criticality-aware) without changing the graph.
410///
411/// [`WorkerScheduler`]: crate::scheduling::WorkerScheduler
412/// [`WorkerState`]: crate::scheduling::WorkerState
413/// [`WorkerDecision`]: crate::scheduling::WorkerDecision
414#[cfg(feature = "std")]
415pub trait ScopedGraphApi<const NODE_COUNT: usize, const EDGE_COUNT: usize>:
416 GraphApi<NODE_COUNT, EDGE_COUNT>
417{
418 /// Run the graph concurrently with scheduler-controlled workers.
419 ///
420 /// Spawns one scoped thread per node. Each worker calls
421 /// `scheduler.decide()` before every step. All threads join when
422 /// all workers return (scope exit).
423 ///
424 /// Clock and telemetry are moved into the scope and distributed to workers
425 /// via `Clone`. After this method returns, all threads have joined.
426 fn run_scoped<C, T, S>(&mut self, clock: C, telemetry: T, scheduler: S)
427 where
428 C: PlatformClock + Clone + Send + Sync + 'static,
429 T: Telemetry + Clone + Send + 'static,
430 S: crate::scheduling::WorkerScheduler + 'static;
431}