1use crate::edge::EdgeOccupancy;
4use crate::errors::{NodeErrorKind, RuntimeError, RuntimeInvariantError};
5use crate::event_message;
6use crate::graph::GraphApi;
7use crate::node::StepResult;
8use crate::policy::{BatchingPolicy, BudgetPolicy, DeadlinePolicy, NodePolicy, WatermarkState};
9use crate::prelude::{PlatformClock, Telemetry};
10
11use super::LimenRuntime;
12
13pub struct TestNoStdRuntime<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
18where
19 C: PlatformClock + Sized,
20 T: Telemetry + Sized,
21{
22 stop: bool,
23 next: usize,
24 occ: [EdgeOccupancy; EDGE_COUNT],
25 node_policies: [NodePolicy; NODE_COUNT],
26 clock: Option<C>,
27 telemetry: Option<T>,
28}
29
30impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
31 TestNoStdRuntime<C, T, NODE_COUNT, EDGE_COUNT>
32where
33 C: PlatformClock + Sized,
34 T: Telemetry + Sized,
35{
36 pub const fn new() -> Self {
38 const INIT_OCC: EdgeOccupancy = EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard);
39 const INIT_POLICY: NodePolicy = NodePolicy::new(
40 BatchingPolicy::none(),
41 BudgetPolicy::new(None, None),
42 DeadlinePolicy::new(false, None, None),
43 );
44
45 Self {
46 stop: false,
47 next: 0,
48 occ: [INIT_OCC; EDGE_COUNT],
49 node_policies: [INIT_POLICY; NODE_COUNT],
50 clock: None,
51 telemetry: None,
52 }
53 }
54
55 #[inline]
59 fn made_progress(sr: &StepResult) -> bool {
60 match sr {
61 StepResult::MadeProgress => true,
62 StepResult::Terminal => true,
63 StepResult::YieldUntil(_) => true,
65 StepResult::NoInput | StepResult::Backpressured | StepResult::WaitingOnExternal => {
66 false
67 }
68 }
69 }
70
71 #[inline]
73 fn now_nanos(clock: &C) -> u64 {
74 let ticks = clock.now_ticks();
75 clock.ticks_to_nanos(ticks)
76 }
77
78 #[inline]
80 fn step_inner<Graph>(
81 &mut self,
82 graph: &mut Graph,
83 clock: &C,
84 telemetry: &mut T,
85 ) -> Result<bool, RuntimeError>
86 where
87 Graph: GraphApi<NODE_COUNT, EDGE_COUNT>,
88 {
89 let start = self.next;
91 let mut tried = 0usize;
92
93 while tried < NODE_COUNT {
94 let node_index = (start + tried) % NODE_COUNT;
95
96 let result = graph.step_node_by_index(node_index, clock, telemetry);
100
101 match result {
103 Ok(step_result) => {
104 if Self::made_progress(&step_result) {
105 graph
109 .write_all_edge_occupancies(&mut self.occ)
110 .map_err(RuntimeError::from)?;
111
112 self.next = (node_index + 1) % NODE_COUNT;
113 return Ok(true);
114 } else {
115 tried += 1;
116 continue;
117 }
118 }
119 Err(error) => match error.kind() {
120 NodeErrorKind::NoInput | NodeErrorKind::Backpressured => {
121 tried += 1;
122 continue;
123 }
124 _ => return Err(RuntimeError::from(error)),
125 },
126 }
127 }
128
129 Ok(false)
131 }
132
133 #[inline]
141 pub fn with_telemetry<F, R>(&mut self, f: F) -> Result<Option<R>, RuntimeError>
142 where
143 F: FnOnce(&mut T) -> R,
144 {
145 if let Some(t) = self.telemetry.as_mut() {
146 let r = f(t);
147 Ok(Some(r))
148 } else {
149 Ok(None)
150 }
151 }
152}
153
154impl<Graph, C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
155 LimenRuntime<Graph, NODE_COUNT, EDGE_COUNT> for TestNoStdRuntime<C, T, NODE_COUNT, EDGE_COUNT>
156where
157 Graph: GraphApi<NODE_COUNT, EDGE_COUNT>,
158 C: PlatformClock + Sized,
159 T: Telemetry + Sized,
160{
161 type Clock = C;
162 type Telemetry = T;
163 type Error = RuntimeError;
164
165 #[cfg(feature = "std")]
166 type StopHandle = crate::runtime::RuntimeStopHandle;
167
168 #[inline]
169 fn init(
170 &mut self,
171 graph: &mut Graph,
172 clock: Self::Clock,
173 mut telemetry: Self::Telemetry,
174 ) -> Result<(), Self::Error> {
175 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
176
177 graph.validate_graph().map_err(RuntimeError::from)?;
179
180 graph
182 .write_all_edge_occupancies(&mut self.occ)
183 .map_err(RuntimeError::from)?;
184
185 self.node_policies = graph.get_node_policies();
187
188 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
189 let timestamp_ns = Self::now_nanos(&clock);
190 let event = crate::telemetry::TelemetryEvent::runtime(
191 crate::telemetry::RuntimeTelemetryEvent::new(
192 GRAPH_ID,
193 timestamp_ns,
194 crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
195 None,
196 ),
197 );
198 telemetry.push_event(event);
199 }
200
201 self.clock = Some(clock);
202 self.telemetry = Some(telemetry);
203
204 self.stop = false;
205 self.next = 0;
206
207 Ok(())
208 }
209
210 #[inline]
211 fn reset(&mut self, graph: &Graph) -> Result<(), Self::Error> {
212 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
213
214 self.stop = false;
215 self.next = 0;
216 graph
217 .write_all_edge_occupancies(&mut self.occ)
218 .map_err(RuntimeError::from)?;
219
220 if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
221 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
222 let timestamp_ns = Self::now_nanos(clock);
223 let event = crate::telemetry::TelemetryEvent::runtime(
224 crate::telemetry::RuntimeTelemetryEvent::new(
225 GRAPH_ID,
226 timestamp_ns,
227 crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
228 Some(event_message!("graph reset")),
229 ),
230 );
231 telemetry.push_event(event);
232 }
233 }
234
235 Ok(())
236 }
237
238 #[inline]
239 fn request_stop(&mut self) {
240 self.stop = true;
241
242 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
243
244 if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
245 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
246 let timestamp_ns = Self::now_nanos(clock);
247 let event = crate::telemetry::TelemetryEvent::runtime(
248 crate::telemetry::RuntimeTelemetryEvent::new(
249 GRAPH_ID,
250 timestamp_ns,
251 crate::telemetry::RuntimeTelemetryEventKind::GraphStopped,
252 None,
253 ),
254 );
255 telemetry.push_event(event);
256 }
257 }
258 }
259
260 #[inline]
261 fn is_stopping(&self) -> bool {
262 self.stop
263 }
264
265 #[inline]
266 fn occupancies(&self) -> &[EdgeOccupancy; EDGE_COUNT] {
267 &self.occ
268 }
269
270 #[inline]
271 fn step(&mut self, graph: &mut Graph) -> Result<bool, Self::Error> {
272 if self.stop {
273 return Ok(false);
274 }
275
276 let clock = match self.clock.take() {
278 Some(c) => c,
279 None => {
280 return Err(RuntimeError::RuntimeInvariant(
281 RuntimeInvariantError::UninitializedClock,
282 ))
283 }
284 };
285
286 let mut telemetry = match self.telemetry.take() {
288 Some(t) => t,
289 None => {
290 self.clock = Some(clock);
291 return Err(RuntimeError::RuntimeInvariant(
292 RuntimeInvariantError::UninitializedTelemetry,
293 ));
294 }
295 };
296
297 let result = self.step_inner(graph, &clock, &mut telemetry);
298
299 self.telemetry = Some(telemetry);
301 self.clock = Some(clock);
302
303 result
304 }
305}
306
307impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize> Default
308 for TestNoStdRuntime<C, T, NODE_COUNT, EDGE_COUNT>
309where
310 C: PlatformClock + Sized,
311 T: Telemetry + Sized,
312{
313 #[inline]
314 fn default() -> Self {
315 Self::new()
316 }
317}
318
319#[cfg(feature = "std")]
321pub mod concurrent_runtime {
322 use crate::edge::EdgeOccupancy;
323 use crate::errors::{RuntimeError, RuntimeInvariantError};
324 use crate::event_message;
325 use crate::graph::{GraphApi, ScopedGraphApi};
326 use crate::node::StepResult;
327 use crate::policy::WatermarkState;
328 use crate::prelude::{PlatformClock, Readiness, Telemetry};
329 use crate::runtime::LimenRuntime;
330 use crate::scheduling::{WorkerDecision, WorkerScheduler, WorkerState};
331 use crate::types::NodeIndex;
332
333 use std::sync::atomic::{AtomicBool, Ordering};
334 use std::sync::Arc;
335
336 pub struct SimpleBackoffScheduler {
347 stop: Arc<AtomicBool>,
348 idle_micros: u64,
349 backpressure_micros: u64,
350 }
351
352 impl SimpleBackoffScheduler {
353 pub fn new(stop: Arc<AtomicBool>, idle_micros: u64, backpressure_micros: u64) -> Self {
355 Self {
356 stop,
357 idle_micros,
358 backpressure_micros,
359 }
360 }
361 }
362
363 impl WorkerScheduler for SimpleBackoffScheduler {
364 fn decide(&self, state: &WorkerState) -> WorkerDecision {
365 if self.stop.load(Ordering::Relaxed) {
367 return WorkerDecision::Stop;
368 }
369
370 if let Some(last) = state.last_step {
372 match last {
373 StepResult::Terminal => return WorkerDecision::Stop,
374 StepResult::Backpressured => {
375 return WorkerDecision::WaitMicros(self.backpressure_micros)
376 }
377 StepResult::MadeProgress => {}
378 StepResult::NoInput
380 | StepResult::WaitingOnExternal
381 | StepResult::YieldUntil(_) => {}
382 }
383 }
384
385 match state.readiness {
388 Readiness::Ready | Readiness::ReadyUnderPressure => WorkerDecision::Step,
389 Readiness::NotReady => WorkerDecision::WaitMicros(self.idle_micros),
390 }
391 }
392 }
393
394 pub struct TestScopedRuntime<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
407 where
408 C: PlatformClock + Clone + Send + Sync + 'static,
409 T: Telemetry + Clone + Send + 'static,
410 {
411 stop: Arc<AtomicBool>,
412 occ: [EdgeOccupancy; EDGE_COUNT],
413 node_last_step: [Option<StepResult>; NODE_COUNT],
415 clock: Option<C>,
416 telemetry: Option<T>,
417 }
418
419 impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
420 TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT>
421 where
422 C: PlatformClock + Clone + Send + Sync + 'static,
423 T: Telemetry + Clone + Send + 'static,
424 {
425 pub fn new() -> Self {
427 const INIT_OCC: EdgeOccupancy = EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard);
428 Self {
429 stop: Arc::new(AtomicBool::new(false)),
430 occ: [INIT_OCC; EDGE_COUNT],
431 node_last_step: [None; NODE_COUNT],
432 clock: None,
433 telemetry: None,
434 }
435 }
436
437 #[inline]
439 fn now_nanos(clock: &C) -> u64 {
440 let ticks = clock.now_ticks();
441 clock.ticks_to_nanos(ticks)
442 }
443
444 #[inline]
446 pub fn with_telemetry<F, R>(&mut self, f: F) -> Result<Option<R>, RuntimeError>
447 where
448 F: FnOnce(&mut T) -> R,
449 {
450 if let Some(t) = self.telemetry.as_mut() {
451 let r = f(t);
452 Ok(Some(r))
453 } else {
454 Ok(None)
455 }
456 }
457 }
458
459 impl<Graph, C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize>
460 LimenRuntime<Graph, NODE_COUNT, EDGE_COUNT>
461 for TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT>
462 where
463 Graph: GraphApi<NODE_COUNT, EDGE_COUNT> + ScopedGraphApi<NODE_COUNT, EDGE_COUNT>,
464 C: PlatformClock + Clone + Send + Sync + 'static,
465 T: Telemetry + Clone + Send + 'static,
466 {
467 type Clock = C;
468 type Telemetry = T;
469 type Error = RuntimeError;
470
471 #[cfg(feature = "std")]
472 type StopHandle = crate::runtime::RuntimeStopHandle;
473
474 fn init(
475 &mut self,
476 graph: &mut Graph,
477 clock: Self::Clock,
478 mut telemetry: Self::Telemetry,
479 ) -> Result<(), Self::Error> {
480 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
481
482 graph.validate_graph().map_err(RuntimeError::from)?;
483
484 graph
485 .write_all_edge_occupancies(&mut self.occ)
486 .map_err(RuntimeError::from)?;
487
488 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
489 let timestamp_ns = Self::now_nanos(&clock);
490 let event = crate::telemetry::TelemetryEvent::runtime(
491 crate::telemetry::RuntimeTelemetryEvent::new(
492 GRAPH_ID,
493 timestamp_ns,
494 crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
495 None,
496 ),
497 );
498 telemetry.push_event(event);
499 }
500
501 self.clock = Some(clock);
502 self.telemetry = Some(telemetry);
503 self.stop.store(false, Ordering::Relaxed);
504 self.node_last_step = [None; NODE_COUNT];
505
506 Ok(())
507 }
508
509 fn reset(&mut self, graph: &Graph) -> Result<(), Self::Error> {
510 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
511
512 self.stop.store(false, Ordering::Relaxed);
513 self.node_last_step = [None; NODE_COUNT];
514 graph
515 .write_all_edge_occupancies(&mut self.occ)
516 .map_err(RuntimeError::from)?;
517
518 if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
519 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
520 let timestamp_ns = Self::now_nanos(clock);
521 let event = crate::telemetry::TelemetryEvent::runtime(
522 crate::telemetry::RuntimeTelemetryEvent::new(
523 GRAPH_ID,
524 timestamp_ns,
525 crate::telemetry::RuntimeTelemetryEventKind::GraphStarted,
526 Some(event_message!("graph reset")),
527 ),
528 );
529 telemetry.push_event(event);
530 }
531 }
532
533 Ok(())
534 }
535
536 fn request_stop(&mut self) {
537 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
538
539 self.stop.store(true, Ordering::Relaxed);
540
541 if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
542 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
543 let timestamp_ns = Self::now_nanos(clock);
544 let event = crate::telemetry::TelemetryEvent::runtime(
545 crate::telemetry::RuntimeTelemetryEvent::new(
546 GRAPH_ID,
547 timestamp_ns,
548 crate::telemetry::RuntimeTelemetryEventKind::GraphStopped,
549 None,
550 ),
551 );
552 telemetry.push_event(event);
553 }
554 }
555 }
556
557 #[cfg(feature = "std")]
558 fn stop_handle(&self) -> Option<Self::StopHandle> {
559 Some(crate::runtime::RuntimeStopHandle::new(self.stop.clone()))
560 }
561
562 #[inline]
563 fn is_stopping(&self) -> bool {
564 self.stop.load(Ordering::Relaxed)
565 }
566
567 #[inline]
568 fn occupancies(&self) -> &[EdgeOccupancy; EDGE_COUNT] {
569 &self.occ
570 }
571
572 fn step(&mut self, graph: &mut Graph) -> Result<bool, Self::Error> {
583 if <TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT> as LimenRuntime<
584 Graph,
585 NODE_COUNT,
586 EDGE_COUNT,
587 >>::is_stopping(self)
588 {
589 return Ok(false);
590 }
591
592 let clock = match self.clock.take() {
593 Some(c) => c,
594 None => {
595 return Err(RuntimeError::RuntimeInvariant(
596 RuntimeInvariantError::UninitializedClock,
597 ))
598 }
599 };
600 let mut telemetry = match self.telemetry.take() {
601 Some(t) => t,
602 None => {
603 self.clock = Some(clock);
604 return Err(RuntimeError::RuntimeInvariant(
605 RuntimeInvariantError::UninitializedTelemetry,
606 ));
607 }
608 };
609
610 let scheduler = SimpleBackoffScheduler::new(self.stop.clone(), 50, 200);
611 let mut any_progress = false;
612
613 graph
616 .write_all_edge_occupancies(&mut self.occ)
617 .map_err(RuntimeError::from)?;
618
619 for i in 0..NODE_COUNT {
620 let mut state = WorkerState::new(i, NODE_COUNT, clock.now_ticks());
621 state.last_step = self.node_last_step[i];
622
623 let mut _max_wm = WatermarkState::BelowSoft;
627 let mut any_input_has_items = false;
628 let node_idx = NodeIndex::from(i);
629
630 for ed in graph.get_edge_descriptors().iter() {
631 let eid = *ed.id().as_usize();
632 if ed.upstream().node() == &node_idx {
634 let occ = &self.occ[eid];
635 if *occ.watermark() > _max_wm {
636 _max_wm = *occ.watermark();
637 }
638 }
639 if ed.downstream().node() == &node_idx {
641 let occ = &self.occ[eid];
642 if *occ.items() > 0 {
643 any_input_has_items = true;
644 }
645 }
646 }
647 state.backpressure = _max_wm;
648
649 if state.last_step.is_none() {
652 state.readiness = Readiness::Ready;
653 } else {
654 state.readiness = if any_input_has_items {
657 if _max_wm >= WatermarkState::BetweenSoftAndHard {
658 Readiness::ReadyUnderPressure
659 } else {
660 Readiness::Ready
661 }
662 } else {
663 Readiness::NotReady
664 };
665 }
666
667 let decision = scheduler.decide(&state);
669 match decision {
678 WorkerDecision::Step => {
679 match graph.step_node_by_index(i, &clock, &mut telemetry) {
680 Ok(sr) => {
681 self.node_last_step[i] = Some(sr);
683 if matches!(sr, StepResult::MadeProgress | StepResult::Terminal) {
684 any_progress = true;
685 }
686
687 graph
691 .write_all_edge_occupancies(&mut self.occ)
692 .map_err(RuntimeError::from)?;
693 }
694 Err(e) => {
695 ::std::eprintln!("sched-debug: node={} step error: {:?}", i, e);
696 self.node_last_step[i] = None;
698 }
699 }
700 }
701 WorkerDecision::WaitMicros(_) => {
702 self.node_last_step[i] = None;
706 }
707 WorkerDecision::Stop => {
708 }
710 }
711 }
712
713 self.telemetry = Some(telemetry);
717 self.clock = Some(clock);
718
719 Ok(any_progress)
720 }
721
722 fn run(&mut self, graph: &mut Graph) -> Result<(), Self::Error> {
730 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
731
732 let clock = self.clock.clone().ok_or(RuntimeError::RuntimeInvariant(
733 RuntimeInvariantError::UninitializedClock,
734 ))?;
735 let telemetry = self
736 .telemetry
737 .clone()
738 .ok_or(RuntimeError::RuntimeInvariant(
739 RuntimeInvariantError::UninitializedTelemetry,
740 ))?;
741
742 let scheduler = SimpleBackoffScheduler::new(
743 self.stop.clone(),
744 50, 200, );
747
748 graph.run_scoped(clock, telemetry, scheduler);
749
750 graph
752 .write_all_edge_occupancies(&mut self.occ)
753 .map_err(RuntimeError::from)?;
754
755 if let (Some(ref clock), Some(telemetry)) = (&self.clock, self.telemetry.as_mut()) {
756 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
757 let timestamp_ns = Self::now_nanos(clock);
758 let event = crate::telemetry::TelemetryEvent::runtime(
759 crate::telemetry::RuntimeTelemetryEvent::new(
760 GRAPH_ID,
761 timestamp_ns,
762 crate::telemetry::RuntimeTelemetryEventKind::GraphStopped,
763 None,
764 ),
765 );
766 telemetry.push_event(event);
767 }
768 }
769
770 Ok(())
771 }
772 }
773
774 impl<C, T, const NODE_COUNT: usize, const EDGE_COUNT: usize> Default
775 for TestScopedRuntime<C, T, NODE_COUNT, EDGE_COUNT>
776 where
777 C: PlatformClock + Clone + Send + Sync + 'static,
778 T: Telemetry + Clone + Send + 'static,
779 {
780 #[inline]
781 fn default() -> Self {
782 Self::new()
783 }
784 }
785}