Skip to main content

limen_core/runtime/
bench.rs

1//! (Work)bench [test] Runtime implementation.
2
3use crate::edge::EdgeOccupancy;
4use crate::errors::{NodeErrorKind, RuntimeError, RuntimeInvariantError};
5use crate::event_message;
6use crate::graph::GraphApi;
7use crate::node::StepResult;
8use crate::policy::{BatchingPolicy, BudgetPolicy, DeadlinePolicy, NodePolicy, WatermarkState};
9use crate::prelude::{PlatformClock, Telemetry};
10
11use super::LimenRuntime;
12
13/// A tiny, no_std test runtime:
14/// - round-robin over nodes
15/// - uses a single occupancy array
16/// - no heap, no threads, no timers
17pub struct TestNoStdRuntime<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
18where
19    C: PlatformClock + Sized,
20    T: Telemetry + Sized,
21{
22    stop: bool,
23    next: usize,
24    occ: [EdgeOccupancy; EDGE_COUNT],
25    node_policies: [NodePolicy; NODE_COUNT],
26    clock: Option<C>,
27    telemetry: Option<T>,
28}
29
30impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
31    TestNoStdRuntime<C, T, NODE_COUNT, EDGE_COUNT>
32where
33    C: PlatformClock + Sized,
34    T: Telemetry + Sized,
35{
36    /// Construct with a pessimistic initial occupancy; `init()` will overwrite it.
37    pub const fn new() -> Self {
38        const INIT_OCC: EdgeOccupancy = EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard);
39        const INIT_POLICY: NodePolicy = NodePolicy::new(
40            BatchingPolicy::none(),
41            BudgetPolicy::new(None, None),
42            DeadlinePolicy::new(false, None, None),
43        );
44
45        Self {
46            stop: false,
47            next: 0,
48            occ: [INIT_OCC; EDGE_COUNT],
49            node_policies: [INIT_POLICY; NODE_COUNT],
50            clock: None,
51            telemetry: None,
52        }
53    }
54
55    /// Decide whether a node's `StepResult` constitutes "progress".
56    /// Currently conservative: treat any `Ok(_)` as progress to keep the runtime simple.
57    /// If/when `StepResult` exposes a richer API (e.g., `is_progress()`), update this.
58    #[inline]
59    fn made_progress(sr: &StepResult) -> bool {
60        match sr {
61            StepResult::MadeProgress => true,
62            StepResult::Terminal => true,
63            // TODO: Handle this.
64            StepResult::YieldUntil(_) => true,
65            StepResult::NoInput | StepResult::Backpressured | StepResult::WaitingOnExternal => {
66                false
67            }
68        }
69    }
70
71    /// Internal helper for a monotonic nanosecond timestamp.
72    #[inline]
73    fn now_nanos(clock: &C) -> u64 {
74        let ticks = clock.now_ticks();
75        clock.ticks_to_nanos(ticks)
76    }
77
78    /// Hot-path inner step: requires `&mut self`, `&C`, and `&mut T`.
79    #[inline]
80    fn step_inner<Graph>(
81        &mut self,
82        graph: &mut Graph,
83        clock: &C,
84        telemetry: &mut T,
85    ) -> Result<bool, RuntimeError>
86    where
87        Graph: GraphApi<NODE_COUNT, EDGE_COUNT>,
88    {
89        // Try each node once, starting from `self.next` (round-robin).
90        let start = self.next;
91        let mut tried = 0usize;
92
93        while tried < NODE_COUNT {
94            let node_index = (start + tried) % NODE_COUNT;
95
96            // Execute the node step. All node-level telemetry (latency, processed,
97            // deadline, NodeStep events) is now handled in NodeLink::step via
98            // the graph implementation, not here.
99            let result = graph.step_node_by_index(node_index, clock, telemetry);
100
101            // ---- Scheduler logic (unchanged) ----
102            match result {
103                Ok(step_result) => {
104                    if Self::made_progress(&step_result) {
105                        // Keep this as the canonical place where the runtime refreshes the
106                        // occupancy buffer for scheduling. This remains separate from
107                        // telemetry, which is handled in StepContext/NodeLink.
108                        graph
109                            .write_all_edge_occupancies(&mut self.occ)
110                            .map_err(RuntimeError::from)?;
111
112                        self.next = (node_index + 1) % NODE_COUNT;
113                        return Ok(true);
114                    } else {
115                        tried += 1;
116                        continue;
117                    }
118                }
119                Err(error) => match error.kind() {
120                    NodeErrorKind::NoInput | NodeErrorKind::Backpressured => {
121                        tried += 1;
122                        continue;
123                    }
124                    _ => return Err(RuntimeError::from(error)),
125                },
126            }
127        }
128
129        // We tried all nodes and none made progress.
130        Ok(false)
131    }
132
133    /// Safely access telemetry by mutable reference, if it is present.
134    ///
135    /// Returns:
136    /// - `Ok(Some(r))` if telemetry is present and `f` ran.
137    /// - `Ok(None)` if telemetry is currently absent (e.g. not yet initialized).
138    ///
139    /// This never panics and never moves telemetry out of the runtime.
140    #[inline]
141    pub fn with_telemetry<F, R>(&mut self, f: F) -> Result<Option<R>, RuntimeError>
142    where
143        F: FnOnce(&mut T) -> R,
144    {
145        if let Some(t) = self.telemetry.as_mut() {
146            let r = f(t);
147            Ok(Some(r))
148        } else {
149            Ok(None)
150        }
151    }
152}
153
154impl<Graph, C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
155    LimenRuntime<Graph, NODE_COUNT, EDGE_COUNT> for TestNoStdRuntime<C, T, NODE_COUNT, EDGE_COUNT>
156where
157    Graph: GraphApi<NODE_COUNT, EDGE_COUNT>,
158    C: PlatformClock + Sized,
159    T: Telemetry + Sized,
160{
161    type Clock = C;
162    type Telemetry = T;
163    type Error = RuntimeError;
164
165    #[cfg(feature = "std")]
166    type StopHandle = crate::runtime::RuntimeStopHandle;
167
168    #[inline]
169    fn init(
170        &mut self,
171        graph: &mut Graph,
172        clock: Self::Clock,
173        mut telemetry: Self::Telemetry,
174    ) -> Result<(), Self::Error> {
175        const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
176
177        // Validate (pure, read-only).
178        graph.validate_graph().map_err(RuntimeError::from)?;
179
180        // Snapshot occupancies into our persistent buffer.
181        graph
182            .write_all_edge_occupancies(&mut self.occ)
183            .map_err(RuntimeError::from)?;
184
185        // Cache NodePolicy for every node using the GraphApi hook.
186        self.node_policies = graph.get_node_policies();
187
188        if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
189            let timestamp_ns = Self::now_nanos(&clock);
190            let event = crate::telemetry::TelemetryEvent::runtime(
191                crate::telemetry::RuntimeTelemetryEvent::new(
192                    GRAPH_ID,
193                    timestamp_ns,
194                    crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
195                    None,
196                ),
197            );
198            telemetry.push_event(event);
199        }
200
201        self.clock = Some(clock);
202        self.telemetry = Some(telemetry);
203
204        self.stop = false;
205        self.next = 0;
206
207        Ok(())
208    }
209
210    #[inline]
211    fn reset(&mut self, graph: &Graph) -> Result<(), Self::Error> {
212        const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
213
214        self.stop = false;
215        self.next = 0;
216        graph
217            .write_all_edge_occupancies(&mut self.occ)
218            .map_err(RuntimeError::from)?;
219
220        if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
221            if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
222                let timestamp_ns = Self::now_nanos(clock);
223                let event = crate::telemetry::TelemetryEvent::runtime(
224                    crate::telemetry::RuntimeTelemetryEvent::new(
225                        GRAPH_ID,
226                        timestamp_ns,
227                        crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
228                        Some(event_message!("graph reset")),
229                    ),
230                );
231                telemetry.push_event(event);
232            }
233        }
234
235        Ok(())
236    }
237
238    #[inline]
239    fn request_stop(&mut self) {
240        self.stop = true;
241
242        const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
243
244        if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
245            if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
246                let timestamp_ns = Self::now_nanos(clock);
247                let event = crate::telemetry::TelemetryEvent::runtime(
248                    crate::telemetry::RuntimeTelemetryEvent::new(
249                        GRAPH_ID,
250                        timestamp_ns,
251                        crate::telemetry::RuntimeTelemetryEventKind::GraphStopped,
252                        None,
253                    ),
254                );
255                telemetry.push_event(event);
256            }
257        }
258    }
259
260    #[inline]
261    fn is_stopping(&self) -> bool {
262        self.stop
263    }
264
265    #[inline]
266    fn occupancies(&self) -> &[EdgeOccupancy; EDGE_COUNT] {
267        &self.occ
268    }
269
270    #[inline]
271    fn step(&mut self, graph: &mut Graph) -> Result<bool, Self::Error> {
272        if self.stop {
273            return Ok(false);
274        }
275
276        // Safely take the clock; error if missing.
277        let clock = match self.clock.take() {
278            Some(c) => c,
279            None => {
280                return Err(RuntimeError::RuntimeInvariant(
281                    RuntimeInvariantError::UninitializedClock,
282                ))
283            }
284        };
285
286        // Safely take telemetry; if missing, put clock back before returning.
287        let mut telemetry = match self.telemetry.take() {
288            Some(t) => t,
289            None => {
290                self.clock = Some(clock);
291                return Err(RuntimeError::RuntimeInvariant(
292                    RuntimeInvariantError::UninitializedTelemetry,
293                ));
294            }
295        };
296
297        let result = self.step_inner(graph, &clock, &mut telemetry);
298
299        // Put telemetry and clock back.
300        self.telemetry = Some(telemetry);
301        self.clock = Some(clock);
302
303        result
304    }
305}
306
307impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize> Default
308    for TestNoStdRuntime<C, T, NODE_COUNT, EDGE_COUNT>
309where
310    C: PlatformClock + Sized,
311    T: Telemetry + Sized,
312{
313    #[inline]
314    fn default() -> Self {
315        Self::new()
316    }
317}
318
319/// ===== std test runtime: scheduler-driven concurrent execution =====
320#[cfg(feature = "std")]
321pub mod concurrent_runtime {
322    use crate::edge::EdgeOccupancy;
323    use crate::errors::{RuntimeError, RuntimeInvariantError};
324    use crate::event_message;
325    use crate::graph::{GraphApi, ScopedGraphApi};
326    use crate::node::StepResult;
327    use crate::policy::WatermarkState;
328    use crate::prelude::{PlatformClock, Readiness, Telemetry};
329    use crate::runtime::LimenRuntime;
330    use crate::scheduling::{WorkerDecision, WorkerScheduler, WorkerState};
331    use crate::types::NodeIndex;
332
333    use std::sync::atomic::{AtomicBool, Ordering};
334    use std::sync::Arc;
335
336    // ------------------------------------------------------------------
337    // SimpleBackoffScheduler
338    // ------------------------------------------------------------------
339
340    /// Simple backoff scheduler for test/bench use.
341    ///
342    /// - Steps immediately when ready or when there's no prior result.
343    /// - Backs off on backpressure (longer wait).
344    /// - Idles on no-input/waiting (shorter wait).
345    /// - Stops when the shared `AtomicBool` is set.
346    pub struct SimpleBackoffScheduler {
347        stop: Arc<AtomicBool>,
348        idle_micros: u64,
349        backpressure_micros: u64,
350    }
351
352    impl SimpleBackoffScheduler {
353        /// Create a new scheduler with the given stop flag and backoff durations.
354        pub fn new(stop: Arc<AtomicBool>, idle_micros: u64, backpressure_micros: u64) -> Self {
355            Self {
356                stop,
357                idle_micros,
358                backpressure_micros,
359            }
360        }
361    }
362
363    impl WorkerScheduler for SimpleBackoffScheduler {
364        fn decide(&self, state: &WorkerState) -> WorkerDecision {
365            // Honor immediate stop.
366            if self.stop.load(Ordering::Relaxed) {
367                return WorkerDecision::Stop;
368            }
369
370            // If we have a last step result, handle the authoritative cases first.
371            if let Some(last) = state.last_step {
372                match last {
373                    StepResult::Terminal => return WorkerDecision::Stop,
374                    StepResult::Backpressured => {
375                        return WorkerDecision::WaitMicros(self.backpressure_micros)
376                    }
377                    StepResult::MadeProgress => {}
378                    // For NoInput / WaitingOnExternal / YieldUntil(_) we'll consult readiness below.
379                    StepResult::NoInput
380                    | StepResult::WaitingOnExternal
381                    | StepResult::YieldUntil(_) => {}
382                }
383            }
384
385            // Either last_step was None, or the last step did not make progress.
386            // Use the node's computed readiness (set by the worker loop) to decide.
387            match state.readiness {
388                Readiness::Ready | Readiness::ReadyUnderPressure => WorkerDecision::Step,
389                Readiness::NotReady => WorkerDecision::WaitMicros(self.idle_micros),
390            }
391        }
392    }
393
394    // ------------------------------------------------------------------
395    // TestScopedRuntime
396    // ------------------------------------------------------------------
397
398    /// Concurrent runtime that delegates to `ScopedGraphApi::run_scoped`.
399    ///
400    /// Implements `LimenRuntime`. The runtime is the orchestrator:
401    /// - `step()`: sequential round-robin via `GraphApi::step_node_by_index`.
402    ///   For debug/test single-tick use.
403    /// - `run()`: overrides default. Creates a `SimpleBackoffScheduler` and calls
404    ///   `graph.run_scoped(clock, telemetry, scheduler)` — one scoped thread per
405    ///   node, true concurrent execution, scheduler-controlled stepping.
406    pub struct TestScopedRuntime<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
407    where
408        C: PlatformClock + Clone + Send + Sync + 'static,
409        T: Telemetry + Clone + Send + 'static,
410    {
411        stop: Arc<AtomicBool>,
412        occ: [EdgeOccupancy; EDGE_COUNT],
413        /// Per-node last step result, used by `step()` to make scheduler decisions.
414        node_last_step: [Option<StepResult>; NODE_COUNT],
415        clock: Option<C>,
416        telemetry: Option<T>,
417    }
418
419    impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
420        TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT>
421    where
422        C: PlatformClock + Clone + Send + Sync + 'static,
423        T: Telemetry + Clone + Send + 'static,
424    {
425        /// Construct with pessimistic initial occupancy; `init()` will overwrite.
426        pub fn new() -> Self {
427            const INIT_OCC: EdgeOccupancy = EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard);
428            Self {
429                stop: Arc::new(AtomicBool::new(false)),
430                occ: [INIT_OCC; EDGE_COUNT],
431                node_last_step: [None; NODE_COUNT],
432                clock: None,
433                telemetry: None,
434            }
435        }
436
437        /// Internal helper for a monotonic nanosecond timestamp.
438        #[inline]
439        fn now_nanos(clock: &C) -> u64 {
440            let ticks = clock.now_ticks();
441            clock.ticks_to_nanos(ticks)
442        }
443
444        /// Safely access telemetry by mutable reference, if present.
445        #[inline]
446        pub fn with_telemetry<F, R>(&mut self, f: F) -> Result<Option<R>, RuntimeError>
447        where
448            F: FnOnce(&mut T) -> R,
449        {
450            if let Some(t) = self.telemetry.as_mut() {
451                let r = f(t);
452                Ok(Some(r))
453            } else {
454                Ok(None)
455            }
456        }
457    }
458
459    impl<Graph, C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
460        LimenRuntime<Graph, NODE_COUNT, EDGE_COUNT>
461        for TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT>
462    where
463        Graph: GraphApi<NODE_COUNT, EDGE_COUNT> + ScopedGraphApi<NODE_COUNT, EDGE_COUNT>,
464        C: PlatformClock + Clone + Send + Sync + 'static,
465        T: Telemetry + Clone + Send + 'static,
466    {
467        type Clock = C;
468        type Telemetry = T;
469        type Error = RuntimeError;
470
471        #[cfg(feature = "std")]
472        type StopHandle = crate::runtime::RuntimeStopHandle;
473
474        fn init(
475            &mut self,
476            graph: &mut Graph,
477            clock: Self::Clock,
478            mut telemetry: Self::Telemetry,
479        ) -> Result<(), Self::Error> {
480            const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
481
482            graph.validate_graph().map_err(RuntimeError::from)?;
483
484            graph
485                .write_all_edge_occupancies(&mut self.occ)
486                .map_err(RuntimeError::from)?;
487
488            if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
489                let timestamp_ns = Self::now_nanos(&clock);
490                let event = crate::telemetry::TelemetryEvent::runtime(
491                    crate::telemetry::RuntimeTelemetryEvent::new(
492                        GRAPH_ID,
493                        timestamp_ns,
494                        crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
495                        None,
496                    ),
497                );
498                telemetry.push_event(event);
499            }
500
501            self.clock = Some(clock);
502            self.telemetry = Some(telemetry);
503            self.stop.store(false, Ordering::Relaxed);
504            self.node_last_step = [None; NODE_COUNT];
505
506            Ok(())
507        }
508
509        fn reset(&mut self, graph: &Graph) -> Result<(), Self::Error> {
510            const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
511
512            self.stop.store(false, Ordering::Relaxed);
513            self.node_last_step = [None; NODE_COUNT];
514            graph
515                .write_all_edge_occupancies(&mut self.occ)
516                .map_err(RuntimeError::from)?;
517
518            if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
519                if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
520                    let timestamp_ns = Self::now_nanos(clock);
521                    let event = crate::telemetry::TelemetryEvent::runtime(
522                        crate::telemetry::RuntimeTelemetryEvent::new(
523                            GRAPH_ID,
524                            timestamp_ns,
525                            crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
526                            Some(event_message!("graph reset")),
527                        ),
528                    );
529                    telemetry.push_event(event);
530                }
531            }
532
533            Ok(())
534        }
535
536        fn request_stop(&mut self) {
537            const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
538
539            self.stop.store(true, Ordering::Relaxed);
540
541            if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
542                if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
543                    let timestamp_ns = Self::now_nanos(clock);
544                    let event = crate::telemetry::TelemetryEvent::runtime(
545                        crate::telemetry::RuntimeTelemetryEvent::new(
546                            GRAPH_ID,
547                            timestamp_ns,
548                            crate::telemetry::RuntimeTelemetryEventKind::GraphStopped,
549                            None,
550                        ),
551                    );
552                    telemetry.push_event(event);
553                }
554            }
555        }
556
557        #[cfg(feature = "std")]
558        fn stop_handle(&self) -> Option<Self::StopHandle> {
559            Some(crate::runtime::RuntimeStopHandle::new(self.stop.clone()))
560        }
561
562        #[inline]
563        fn is_stopping(&self) -> bool {
564            self.stop.load(Ordering::Relaxed)
565        }
566
567        #[inline]
568        fn occupancies(&self) -> &[EdgeOccupancy; EDGE_COUNT] {
569            &self.occ
570        }
571
572        /// Sequential scheduler-driven step for debug/test single-tick use.
573        ///
574        /// For each node, consults `SimpleBackoffScheduler` using the stored
575        /// per-node `last_step` to decide whether to step. This ensures the
576        /// same scheduling policy governs both `step()` and `run()`.
577        ///
578        /// `WaitMicros` decisions are treated as "skip this tick" — the
579        /// `last_step` is then cleared so the scheduler returns `Step` on
580        /// the next `step()` call. This avoids permanent starvation in
581        /// sequential mode where there is no real time passage between ticks.
582        fn step(&mut self, graph: &mut Graph) -> Result<bool, Self::Error> {
583            if <TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT> as LimenRuntime<
584                Graph,
585                NODE_COUNT,
586                EDGE_COUNT,
587            >>::is_stopping(self)
588            {
589                return Ok(false);
590            }
591
592            let clock = match self.clock.take() {
593                Some(c) => c,
594                None => {
595                    return Err(RuntimeError::RuntimeInvariant(
596                        RuntimeInvariantError::UninitializedClock,
597                    ))
598                }
599            };
600            let mut telemetry = match self.telemetry.take() {
601                Some(t) => t,
602                None => {
603                    self.clock = Some(clock);
604                    return Err(RuntimeError::RuntimeInvariant(
605                        RuntimeInvariantError::UninitializedTelemetry,
606                    ));
607                }
608            };
609
610            let scheduler = SimpleBackoffScheduler::new(self.stop.clone(), 50, 200);
611            let mut any_progress = false;
612
613            // Refresh a cheap occupancy snapshot for this tick so we can compute
614            // inexpensive readiness for each node (same idea as run_scoped workers).
615            graph
616                .write_all_edge_occupancies(&mut self.occ)
617                .map_err(RuntimeError::from)?;
618
619            for i in 0..NODE_COUNT {
620                let mut state = WorkerState::new(i, NODE_COUNT, clock.now_ticks());
621                state.last_step = self.node_last_step[i];
622
623                // Compute max output backpressure and whether any input has items
624                // by scanning the graph's edge descriptors and the occupancy
625                // snapshot `self.occ`.
626                let mut _max_wm = WatermarkState::BelowSoft;
627                let mut any_input_has_items = false;
628                let node_idx = NodeIndex::from(i);
629
630                for ed in graph.get_edge_descriptors().iter() {
631                    let eid = *ed.id().as_usize();
632                    // Upstream edges contribute to output backpressure.
633                    if ed.upstream().node() == &node_idx {
634                        let occ = &self.occ[eid];
635                        if *occ.watermark() > _max_wm {
636                            _max_wm = *occ.watermark();
637                        }
638                    }
639                    // Downstream edges determine whether this node has any input items.
640                    if ed.downstream().node() == &node_idx {
641                        let occ = &self.occ[eid];
642                        if *occ.items() > 0 {
643                            any_input_has_items = true;
644                        }
645                    }
646                }
647                state.backpressure = _max_wm;
648
649                // Compatibility: if we have no prior step (first probe) let the
650                // scheduler see Ready so the node gets probed at least once.
651                if state.last_step.is_none() {
652                    state.readiness = Readiness::Ready;
653                } else {
654                    // Otherwise use the cheap pre-check: if there are items on any
655                    // input edge (ingress included) we are Ready; otherwise NotReady.
656                    state.readiness = if any_input_has_items {
657                        if _max_wm >= WatermarkState::BetweenSoftAndHard {
658                            Readiness::ReadyUnderPressure
659                        } else {
660                            Readiness::Ready
661                        }
662                    } else {
663                        Readiness::NotReady
664                    };
665                }
666
667                // Ask the scheduler and log the decision for debugging.
668                let decision = scheduler.decide(&state);
669                // ::std::eprintln!(
670                //     "sched-debug: node={} last_step={:?} readiness={:?} => decision={:?}",
671                //     i,
672                //     state.last_step,
673                //     state.readiness,
674                //     decision
675                // );
676
677                match decision {
678                    WorkerDecision::Step => {
679                        match graph.step_node_by_index(i, &clock, &mut telemetry) {
680                            Ok(sr) => {
681                                // Record last step result and note progress.
682                                self.node_last_step[i] = Some(sr);
683                                if matches!(sr, StepResult::MadeProgress | StepResult::Terminal) {
684                                    any_progress = true;
685                                }
686
687                                // **Crucial:** update occupancy snapshot immediately so
688                                // downstream nodes in this same step() iteration see the
689                                // newly-produced items and compute readiness correctly.
690                                graph
691                                    .write_all_edge_occupancies(&mut self.occ)
692                                    .map_err(RuntimeError::from)?;
693                            }
694                            Err(e) => {
695                                ::std::eprintln!("sched-debug: node={} step error: {:?}", i, e);
696                                // NodeError is not fatal; clear state so we retry.
697                                self.node_last_step[i] = None;
698                            }
699                        }
700                    }
701                    WorkerDecision::WaitMicros(_) => {
702                        // In sequential mode, skip this node for this tick.
703                        // Clear last_step so the scheduler returns Step next tick,
704                        // preventing permanent starvation.
705                        self.node_last_step[i] = None;
706                    }
707                    WorkerDecision::Stop => {
708                        // Node terminated or runtime stopping; do not step.
709                    }
710                }
711            }
712
713            // No final write_all_edge_occupancies here: we update occupancies
714            // right after each successful step above, so self.occ is up-to-date.
715
716            self.telemetry = Some(telemetry);
717            self.clock = Some(clock);
718
719            Ok(any_progress)
720        }
721
722        /// Concurrent execution via `ScopedGraphApi::run_scoped`.
723        ///
724        /// Creates a `SimpleBackoffScheduler` and delegates to the graph's
725        /// scoped thread infrastructure. Each node gets its own thread;
726        /// the scheduler controls per-worker stepping.
727        ///
728        /// After all threads join, refreshes the occupancy snapshot.
729        fn run(&mut self, graph: &mut Graph) -> Result<(), Self::Error> {
730            const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
731
732            let clock = self.clock.clone().ok_or(RuntimeError::RuntimeInvariant(
733                RuntimeInvariantError::UninitializedClock,
734            ))?;
735            let telemetry = self
736                .telemetry
737                .clone()
738                .ok_or(RuntimeError::RuntimeInvariant(
739                    RuntimeInvariantError::UninitializedTelemetry,
740                ))?;
741
742            let scheduler = SimpleBackoffScheduler::new(
743                self.stop.clone(),
744                50,  // idle_micros
745                200, // backpressure_micros
746            );
747
748            graph.run_scoped(clock, telemetry, scheduler);
749
750            // After scope exits (all threads joined): refresh occupancies.
751            graph
752                .write_all_edge_occupancies(&mut self.occ)
753                .map_err(RuntimeError::from)?;
754
755            if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
756                if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
757                    let timestamp_ns = Self::now_nanos(clock);
758                    let event = crate::telemetry::TelemetryEvent::runtime(
759                        crate::telemetry::RuntimeTelemetryEvent::new(
760                            GRAPH_ID,
761                            timestamp_ns,
762                            crate::telemetry::RuntimeTelemetryEventKind::GraphStopped,
763                            None,
764                        ),
765                    );
766                    telemetry.push_event(event);
767                }
768            }
769
770            Ok(())
771        }
772    }
773
774    impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize> Default
775        for TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT>
776    where
777        C: PlatformClock + Clone + Send + Sync + 'static,
778        T: Telemetry + Clone + Send + 'static,
779    {
780        #[inline]
781        fn default() -> Self {
782            Self::new()
783        }
784    }
785}