Skip to main content

limen_core/
graph.rs

1//! Graph trait hierarchy and typed graph contracts.
2//!
3//! Graphs are fully typed and monomorphized. Node/edge counts are `const` generics;
4//! there is no runtime-length collection in the hot path.
5//!
6//! Key traits:
7//! - [`GraphApi`] — top-level entry point; step + occupancy sampling.
8//! - `ScopedGraphApi` (`std`) — concurrent variant with scoped worker threads.
9//! - [`GraphNodeAccess<I>`] / [`GraphEdgeAccess<E>`] — compile-time indexed access.
10//! - [`GraphNodeTypes<I, IN, OUT>`] — per-node payload and queue type associations.
11//! - [`GraphNodeContextBuilder<I, IN, OUT>`] — factory for [`StepContext`].
12//!
13//! Submodules:
14//! - [`validate`] — [`GraphValidator`] and [`GraphDescBuf`] for descriptor validation.
15//! - `bench` — test graphs (`bench` / `test` feature).
16
17pub mod validate;
18
19#[cfg(any(test, feature = "bench"))]
20pub mod bench;
21
22use crate::node::Node;
23use crate::prelude::{MemoryManager, PlatformClock, Telemetry};
24use crate::{
25    edge::{link::EdgeDescriptor, Edge, EdgeOccupancy},
26    errors::{GraphError, NodeError},
27    graph::validate::{GraphDescBuf, GraphValidator},
28    message::payload::Payload,
29    node::{link::NodeDescriptor, StepContext, StepResult},
30    policy::{EdgePolicy, NodePolicy},
31};
32
33// pub mod builder;
34
35/// Provides indexed access to a graph node.
36///
37/// This trait is implemented by graph container types (e.g. a generated
38/// graph struct) to allow compile-time access to a specific node by index.
39///
40/// # Type Parameters
41/// - `I`: The compile-time index of the node within the graph.
42pub trait GraphNodeAccess<const I: usize> {
43    /// The concrete node type at index `I`.
44    type Node;
45
46    /// Immutable access to the node at index `I`.
47    fn node_ref(&self) -> &Self::Node;
48
49    /// Mutable access to the node at index `I`.
50    fn node_mut(&mut self) -> &mut Self::Node;
51}
52
53/// Provides indexed access to a graph edge.
54///
55/// This trait is implemented by graph container types to allow compile-time
56/// access to a specific edge by index.
57///
58/// # Type Parameters
59/// - `E`: The compile-time index of the edge within the graph.
60pub trait GraphEdgeAccess<const E: usize> {
61    /// The concrete edge type at index `E`.
62    type Edge;
63
64    /// Immutable access to the edge at index `E`.
65    fn edge_ref(&self) -> &Self::Edge;
66
67    /// Mutable access to the edge at index `E`.
68    fn edge_mut(&mut self) -> &mut Self::Edge;
69}
70
71/// Defines per-node compile-time types and arity.
72///
73/// Implemented for each node in the graph by the graph-building macro.  
74/// Associates payload types, queue types, and port counts with the node.
75///
76/// # Type Parameters
77/// - `I`:   Compile-time index of the node within the graph.
78/// - `IN`:  Number of input ports for the node.
79/// - `OUT`: Number of output ports for the node.
80pub trait GraphNodeTypes<const I: usize, const IN: usize, const OUT: usize> {
81    /// Payload type for messages consumed by the node.
82    type InP: Payload;
83
84    /// Payload type for messages produced by the node.
85    type OutP: Payload;
86
87    /// Queue type used for input ports.
88    type InQ: Edge;
89
90    /// Queue type used for output ports.
91    type OutQ: Edge;
92
93    /// Memory manager type for input ports.
94    type InM: MemoryManager<Self::InP>;
95
96    /// Memory manager type for output ports.
97    type OutM: MemoryManager<Self::OutP>;
98}
99
100/// Builder for per-node execution contexts.
101///
102/// This trait is implemented by the graph-building macro. It allows the runtime
103/// to create a [`StepContext`] for a given node using compile-time wiring of its
104/// ports, queues, and policies.
105///
106/// # Type Parameters
107/// - `I`:   Compile-time index of the node within the graph.
108/// - `IN`:  Number of input ports for the node.
109/// - `OUT`: Number of output ports for the node.
110pub trait GraphNodeContextBuilder<const I: usize, const IN: usize, const OUT: usize>:
111    GraphNodeTypes<I, IN, OUT>
112{
113    /// Construct a [`StepContext`] for node `I`.
114    ///
115    /// # Parameters
116    /// - `clock`:     Reference to the runtime clock abstraction.
117    /// - `telemetry`: Mutable reference to the telemetry collector.
118    ///
119    /// # Returns
120    /// A fully wired [`StepContext`] for the node, ready for execution.
121    #[allow(clippy::type_complexity)]
122    fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
123        &'graph mut self,
124        clock: &'clock C,
125        telemetry: &'telemetry mut T,
126    ) -> StepContext<
127        'graph,
128        'telemetry,
129        'clock,
130        IN,
131        OUT,
132        <Self as GraphNodeTypes<I, IN, OUT>>::InP,
133        <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
134        <Self as GraphNodeTypes<I, IN, OUT>>::InQ,
135        <Self as GraphNodeTypes<I, IN, OUT>>::OutQ,
136        <Self as GraphNodeTypes<I, IN, OUT>>::InM,
137        <Self as GraphNodeTypes<I, IN, OUT>>::OutM,
138        C,
139        T,
140    >
141    where
142        EdgePolicy: Copy,
143        C: PlatformClock + Sized,
144        T: Telemetry + Sized;
145
146    /// Borrowed handoff: in one `&mut self` borrow, lend both
147    /// `&mut node(I)` and a fully wired `StepContext` to a closure.
148    /// This avoids overlapping `&mut` borrows escaping the call.
149    fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
150        &mut self,
151        clock: &'clock C,
152        telemetry: &'telemetry mut T,
153        f: impl FnOnce(
154            &mut <Self as GraphNodeAccess<I>>::Node,
155            &mut StepContext<
156                '_,
157                'telemetry,
158                'clock,
159                IN,
160                OUT,
161                <Self as GraphNodeTypes<I, IN, OUT>>::InP,
162                <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
163                <Self as GraphNodeTypes<I, IN, OUT>>::InQ,
164                <Self as GraphNodeTypes<I, IN, OUT>>::OutQ,
165                <Self as GraphNodeTypes<I, IN, OUT>>::InM,
166                <Self as GraphNodeTypes<I, IN, OUT>>::OutM,
167                C,
168                T,
169            >,
170        ) -> Result<R, E>,
171    ) -> Result<R, E>
172    where
173        Self: GraphNodeAccess<I>,
174        EdgePolicy: Copy,
175        C: PlatformClock + Sized,
176        T: Telemetry + Sized;
177}
178
179/// Unified runtime-facing graph API.
180///
181/// Exposes the minimal surface shared by all runtimes (P0, P1, P2, P2Concurrent).
182/// The `NODE_COUNT` and `EDGE_COUNT` const generics define the compile-time
183/// sizes for node and edge descriptor arrays and occupancy snapshots.
184///
185/// ## Occupancy buffer semantics
186///
187/// - `write_all_edge_occupancies` writes **current** occupancy for **every** edge
188///   into the slot whose index equals that edge’s `EdgeIndex.0`.
189///   It must not depend on pre-existing contents of `out`.
190///
191/// - `refresh_occupancies_for_node` is a **partial, in-place** refresh:
192///   it MUST update only entries for edges incident to node `I` (either upstream
193///   or downstream) and MUST NOT modify any other slots.
194///
195/// If sampling a particular edge fails, implementations should return
196/// `Err(GraphError::OccupancySampleFailed(edge_idx))`.
197pub trait GraphApi<const NODE_COUNT: usize, const EDGE_COUNT: usize> {
198    // ----- Descriptors & validation -----
199
200    /// Returns the static descriptors for all nodes in the graph.
201    ///
202    /// The returned array length must equal `NODE_COUNT` and be consistent with
203    /// the edge descriptors exposed by [`get_edge_descriptors`](Self::get_edge_descriptors).
204    fn get_node_descriptors(&self) -> [NodeDescriptor; NODE_COUNT];
205
206    /// Returns the static descriptors for all edges in the graph.
207    ///
208    /// The returned array length must equal `EDGE_COUNT` and reference only valid
209    /// node indices described by [`get_node_descriptors`](Self::get_node_descriptors).
210    fn get_edge_descriptors(&self) -> [EdgeDescriptor; EDGE_COUNT];
211
212    /// Returns the static `NodePolicy` for every node in the graph.
213    ///
214    /// The returned array length must equal `NODE_COUNT`, and index `i`
215    /// corresponds to `NodeIndex::from(i)`.
216    fn get_node_policies(&self) -> [NodePolicy; NODE_COUNT];
217
218    /// Returns the static `EdgePolicy` for every edge in the graph.
219    ///
220    /// The returned array length must equal `EDGE_COUNT`, and index `e`
221    /// corresponds to `EdgeIndex::from(e)`.
222    fn get_edge_policies(&self) -> [EdgePolicy; EDGE_COUNT];
223
224    /// Validates the graph topology and policies derived from node and edge descriptors.
225    ///
226    /// This checks index bounds, arities, endpoint compatibility, and any static
227    /// policy invariants enforced by `GraphDescBuf::validate`.
228    ///
229    /// # Errors
230    ///
231    /// Returns a [`GraphError`] if the descriptors are inconsistent or violate
232    /// graph-level constraints.
233    #[inline]
234    fn validate_graph(&self) -> Result<(), GraphError> {
235        GraphDescBuf::new(self.get_node_descriptors(), self.get_edge_descriptors()).validate()
236    }
237
238    // ----- Occupancy snapshot helpers -----
239
240    /// Returns a one-shot occupancy snapshot for edge `E`.
241    ///
242    /// Useful for lightweight telemetry or scheduling decisions that only need
243    /// the latest observed queue depth for a specific edge.
244    ///
245    /// # Type Parameters
246    ///
247    /// * `E` — The compile-time edge index in `0..EDGE_COUNT`.
248    ///
249    /// # Errors
250    ///
251    /// Returns a [`GraphError`] if `E` is out of range or the occupancy cannot
252    /// be sampled.
253    fn edge_occupancy_for<const E: usize>(&self) -> Result<EdgeOccupancy, GraphError>;
254
255    /// Write **current** occupancy for **all** edges into `out`.
256    ///
257    /// Contract:
258    /// - Must populate every `out[k]` with the current occupancy for the edge
259    ///   whose `EdgeIndex.0 == k`.
260    /// - Must not depend on prior `out` contents.
261    /// - On per-edge sampling failure, return
262    ///   `Err(GraphError::OccupancySampleFailed(edge_idx))`.
263    fn write_all_edge_occupancies(
264        &self,
265        out: &mut [EdgeOccupancy; EDGE_COUNT],
266    ) -> Result<(), GraphError>;
267
268    /// **Partial refresh**: update only entries for edges incident to node `I`.
269    ///
270    /// Contract:
271    /// - MUST update `out[k]` iff edge `k` is upstream **or** downstream of node `I`.
272    /// - MUST NOT modify any other `out[k]`.
273    /// - If node `I` has no incident edges, this is a no-op that returns `Ok(())`.
274    /// - On sampling failure for any incident edge, return
275    ///   `Err(GraphError::OccupancySampleFailed(edge_idx))`.
276    fn refresh_occupancies_for_node<const I: usize, const IN: usize, const OUT: usize>(
277        &self,
278        out: &mut [EdgeOccupancy; EDGE_COUNT],
279    ) -> Result<(), GraphError>;
280
281    // ----- Generic step-by-index (for P0/P1) -----
282
283    /// Drives a single scheduling step for the node at `index`.
284    ///
285    /// Runtimes P0/P1 use this to advance nodes generically over an abstract
286    /// clock and telemetry sink without requiring node-specific types.
287    ///
288    /// # Parameters
289    ///
290    /// * `index` — The dynamic node index to step.
291    /// * `clock` — A clock-like source used by the node during execution.
292    /// * `telemetry` — A sink for emitting per-step metrics or traces.
293    ///
294    /// # Returns
295    ///
296    /// A [`StepResult`] indicating whether work was performed or the node is idle.
297    ///
298    /// # Errors
299    ///
300    /// Returns a [`NodeError`] if the node fails to execute its step.
301    fn step_node_by_index<C, T>(
302        &mut self,
303        index: usize,
304        clock: &C,
305        telemetry: &mut T,
306    ) -> Result<StepResult, NodeError>
307    where
308        EdgePolicy: Copy,
309        C: PlatformClock + Sized,
310        T: Telemetry + Sized;
311
312    // ----- Optional: static node policy read -----
313
314    /// Returns the static [`NodePolicy`] for node `I` (compile-time index).
315    ///
316    /// This queries the node type directly, without requiring an instance step,
317    /// and is useful for planning or verifying scheduling constraints.
318    ///
319    /// # Type Parameters
320    ///
321    /// * `I` — The compile-time node index in `0..NODE_COUNT`.
322    /// * `IN` — The node’s input arity.
323    /// * `OUT` — The node’s output arity.
324    fn node_policy_for<const I: usize, const IN: usize, const OUT: usize>(&self) -> NodePolicy
325    where
326        Self: GraphNodeAccess<I> + GraphNodeTypes<I, IN, OUT>,
327        <Self as GraphNodeAccess<I>>::Node: Node<
328            IN,
329            OUT,
330            <Self as GraphNodeTypes<I, IN, OUT>>::InP,
331            <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
332        >,
333    {
334        <<Self as GraphNodeAccess<I>>::Node as Node<
335            IN,
336            OUT,
337            <Self as GraphNodeTypes<I, IN, OUT>>::InP,
338            <Self as GraphNodeTypes<I, IN, OUT>>::OutP,
339        >>::policy(<Self as GraphNodeAccess<I>>::node_ref(self))
340    }
341}
342
343/// Opaque, runtime-owned buffer of edge occupancy snapshots.
344///
345/// # Semantics
346/// - This buffer is **owned by the runtime** and passed by mutable reference to
347///   graph APIs that *write into it* (see [`GraphApi::write_all_edge_occupancies`]
348///   and [`GraphApi::refresh_occupancies_for_node`]).
349/// - Each entry is a point-in-time [`EdgeOccupancy`] snapshot for the edge at the
350///   same index as returned by `GraphApi::get_edge_descriptors()`.
351/// - **Writers must not re-order** entries. Writers may update some or all slots,
352///   but any slot not explicitly written must be left untouched.
353///
354/// # Contracts
355/// - `GraphApi::write_all_edge_occupancies(&mut EdgeOccupancyBuf<E>)` **must write**
356///   a fresh value for **every** slot `[0..E)`.
357/// - `GraphApi::refresh_occupancies_for_node<I, IN, OUT>(&mut EdgeOccupancyBuf<E>)`
358///   is a **partial refresh**: it **must only** update slots corresponding to edges
359///   that are upstream or downstream of node `I` and **must not** modify any other
360///   slots.
361///
362/// # Usage
363/// - Runtimes typically allocate one `EdgeOccupancyBuf<E>` and reuse it across
364///   sampling intervals. After a full write, they may call partial refreshes to keep
365///   entries warm for the currently stepped node without touching unrelated edges.
366/// - Consumers should treat the contents as **snapshots** only; values may change
367///   immediately after sampling due to concurrent producers/consumers.
368///
369/// See also: [`EdgeOccupancy`], [`GraphApi`].
370pub type EdgeOccupancyBuf<const E: usize> = [EdgeOccupancy; E];
371
372// ---------------------------------------------------------------------------
373// Concurrent scoped execution
374// ---------------------------------------------------------------------------
375
376/// Extension trait for graphs that support concurrent execution via scoped threads.
377///
378/// The runtime provides a [`WorkerScheduler`] that controls per-worker stepping.
379/// The graph sets up scoped threads and disjoint node borrows; the scheduler
380/// decides when each worker steps, waits, or exits.
381///
382/// This trait is `#[cfg(feature = "std")]` because it requires `std::thread::scope`.
383/// [`GraphApi`] remains `no_std`-compatible.
384///
385/// # Edge and manager handles
386///
387/// Edges must implement [`ScopedEdge`](crate::edge::ScopedEdge) and managers
388/// must implement [`ScopedManager`](crate::memory::manager::ScopedManager) to produce
389/// per-worker handles. Arc-based types (e.g. `ConcurrentEdge`,
390/// `ConcurrentMemoryManager`) return clones; future lock-free types will
391/// return split producer/consumer handles via
392/// [`EdgeHandleKind`](crate::edge::EdgeHandleKind).
393///
394/// # Scheduling model
395///
396/// The codegen-generated implementation:
397/// 1. Obtains per-worker edge and manager handles via `ScopedEdge::scoped_handle`
398///    and `ScopedManager::scoped_handle` (before node borrows)
399/// 2. Takes disjoint `&mut` borrows of each node (tuple field access)
400/// 3. Spawns one scoped thread per node
401/// 4. Each worker loop:
402///    - Queries edge occupancy from concrete types (no dyn dispatch)
403///    - Builds a [`WorkerState`] snapshot (readiness, backpressure, tick, last result)
404///    - Calls `scheduler.decide(&state)` (static dispatch via `S`)
405///    - Acts on the [`WorkerDecision`]: step, wait, or exit
406///
407/// The runtime controls execution policy by choosing which [`WorkerScheduler`]
408/// to pass. Different runtimes can implement different strategies (backoff,
409/// EDF, throughput, criticality-aware) without changing the graph.
410///
411/// [`WorkerScheduler`]: crate::scheduling::WorkerScheduler
412/// [`WorkerState`]: crate::scheduling::WorkerState
413/// [`WorkerDecision`]: crate::scheduling::WorkerDecision
414#[cfg(feature = "std")]
415pub trait ScopedGraphApi<const NODE_COUNT: usize, const EDGE_COUNT: usize>:
416    GraphApi<NODE_COUNT, EDGE_COUNT>
417{
418    /// Run the graph concurrently with scheduler-controlled workers.
419    ///
420    /// Spawns one scoped thread per node. Each worker calls
421    /// `scheduler.decide()` before every step. All threads join when
422    /// all workers return (scope exit).
423    ///
424    /// Clock and telemetry are moved into the scope and distributed to workers
425    /// via `Clone`. After this method returns, all threads have joined.
426    fn run_scoped<C, T, S>(&mut self, clock: C, telemetry: T, scheduler: S)
427    where
428        C: PlatformClock + Clone + Send + Sync + 'static,
429        T: Telemetry + Clone + Send + 'static,
430        S: crate::scheduling::WorkerScheduler + 'static;
431}