1pub mod link;
23pub mod model;
24pub mod sink;
25pub mod source;
26
27#[cfg(any(test, feature = "bench"))]
28pub mod bench;
29
30#[cfg(any(test, feature = "bench"))]
31pub mod contract_tests;
32#[cfg(any(test, feature = "bench"))]
33pub use contract_tests::*;
34
35use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
36use crate::errors::{NodeError, QueueError};
37use crate::memory::PlacementAcceptance;
38use crate::message::{payload::Payload, Message};
39use crate::policy::{
40 AdmissionDecision, BatchingPolicy, EdgePolicy, NodePolicy, SlidingWindow, WindowKind,
41};
42use crate::prelude::{BatchMessageIter, MemoryManager, PlatformClock, TelemetryKey, TelemetryKind};
43use crate::telemetry::Telemetry;
44use crate::types::Ticks;
45
46#[non_exhaustive]
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum NodeKind {
52 Source,
56 Process,
60 Model,
64 Split,
68 Join,
72 Sink,
76 External,
78}
79
80#[non_exhaustive]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
83pub struct NodeCapabilities {
84 device_streams: bool,
86 degrade_tiers: bool,
88}
89
90impl NodeCapabilities {
91 #[inline]
93 pub const fn new(device_streams: bool, degrade_tiers: bool) -> Self {
94 Self {
95 device_streams,
96 degrade_tiers,
97 }
98 }
99
100 #[inline]
102 pub fn device_streams(&self) -> &bool {
103 &self.device_streams
104 }
105
106 #[inline]
108 pub fn degrade_tiers(&self) -> &bool {
109 &self.degrade_tiers
110 }
111}
112
113#[non_exhaustive]
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum StepResult {
117 MadeProgress,
119 NoInput,
121 Backpressured,
123 WaitingOnExternal,
125 YieldUntil(Ticks),
127 Terminal,
129}
130
131#[non_exhaustive]
137#[derive(Debug)]
138pub enum ProcessResult<P: Payload> {
139 Output(Message<P>),
141 Consumed,
143 Skip,
145}
146
147pub struct StepContext<
153 'graph,
154 'telemetry,
155 'clock,
156 const IN: usize,
157 const OUT: usize,
158 InP,
159 OutP,
160 InQ,
161 OutQ,
162 InM,
163 OutM,
164 C,
165 T,
166> where
167 InP: Payload,
168 OutP: Payload,
169 C: PlatformClock + Sized,
170 T: Telemetry + Sized,
171{
172 inputs: [&'graph mut InQ; IN],
174 outputs: [&'graph mut OutQ; OUT],
176 in_managers: [&'graph mut InM; IN],
178 out_managers: [&'graph mut OutM; OUT],
180 in_policies: [EdgePolicy; IN],
182 out_policies: [EdgePolicy; OUT],
184
185 node_id: u32,
187 in_edge_ids: [u32; IN],
189 out_edge_ids: [u32; OUT],
191
192 clock: &'clock C,
194 telemetry: &'telemetry mut T,
196 _marker: core::marker::PhantomData<(InP, OutP)>,
198}
199
200impl<
201 'graph,
202 'telemetry,
203 'clock,
204 const IN: usize,
205 const OUT: usize,
206 InP,
207 OutP,
208 InQ,
209 OutQ,
210 InM,
211 OutM,
212 C,
213 T,
214 > StepContext<'graph, 'telemetry, 'clock, IN, OUT, InP, OutP, InQ, OutQ, InM, OutM, C, T>
215where
216 InP: Payload,
217 OutP: Payload,
218 C: PlatformClock + Sized,
219 T: Telemetry + Sized,
220{
221 #[allow(clippy::too_many_arguments)]
223 pub fn new(
224 inputs: [&'graph mut InQ; IN],
225 outputs: [&'graph mut OutQ; OUT],
226 in_managers: [&'graph mut InM; IN],
227 out_managers: [&'graph mut OutM; OUT],
228 in_policies: [EdgePolicy; IN],
229 out_policies: [EdgePolicy; OUT],
230 node_id: u32,
231 in_edge_ids: [u32; IN],
232 out_edge_ids: [u32; OUT],
233 clock: &'clock C,
234 telemetry: &'telemetry mut T,
235 ) -> Self {
236 Self {
237 inputs,
238 outputs,
239 in_managers,
240 out_managers,
241 in_policies,
242 out_policies,
243 node_id,
244 in_edge_ids,
245 out_edge_ids,
246 clock,
247 telemetry,
248 _marker: core::marker::PhantomData,
249 }
250 }
251}
252
253impl<
254 'graph,
255 'telemetry,
256 'clock,
257 const IN: usize,
258 const OUT: usize,
259 InP,
260 OutP,
261 InQ,
262 OutQ,
263 InM,
264 OutM,
265 C,
266 T,
267 > StepContext<'graph, 'telemetry, 'clock, IN, OUT, InP, OutP, InQ, OutQ, InM, OutM, C, T>
268where
269 InP: Payload,
270 OutP: Payload,
271 InQ: Edge,
272 OutQ: Edge,
273 InM: MemoryManager<InP>,
274 OutM: MemoryManager<OutP>,
275 C: PlatformClock + Sized,
276 T: Telemetry + Sized,
277{
278 #[inline]
287 pub fn in_peek_header(&self, i: usize) -> Result<InM::HeaderGuard<'_>, QueueError> {
288 debug_assert!(i < IN);
289 let token = self.inputs[i].try_peek()?;
290 self.in_managers[i]
291 .peek_header(token)
292 .map_err(|_| QueueError::Empty)
293 }
294
295 pub fn pop_and_process<F>(&mut self, port: usize, f: F) -> Result<StepResult, NodeError>
302 where
303 F: FnOnce(&Message<InP>) -> Result<ProcessResult<OutP>, NodeError>,
304 {
305 debug_assert!(port < IN);
306
307 let token = match self.inputs[port].try_pop(&*self.in_managers[port]) {
308 Ok(t) => t,
309 Err(QueueError::Empty) => return Ok(StepResult::NoInput),
310 Err(QueueError::Backpressured) | Err(QueueError::AtOrAboveHardCap) => {
311 return Err(NodeError::backpressured());
312 }
313 Err(QueueError::Poisoned) | Err(QueueError::Unsupported) => {
314 return Err(NodeError::execution_failed());
315 }
316 };
317
318 let guard = self.in_managers[port]
319 .read(token)
320 .map_err(|_| NodeError::execution_failed())?;
321
322 let result = f(&*guard)?;
323
324 drop(guard);
325 let _ = self.in_managers[port].free(token);
326
327 if T::METRICS_ENABLED {
328 self.telemetry.incr_counter(
329 TelemetryKey::node(self.node_id, TelemetryKind::IngressMsgs),
330 1,
331 );
332 let _ = self.in_occupancy(port);
333 }
334
335 match result {
336 ProcessResult::Output(out_msg) => self.push_output(0, out_msg),
337 ProcessResult::Consumed => Ok(StepResult::MadeProgress),
338 ProcessResult::Skip => Ok(StepResult::NoInput),
339 }
340 }
341
342 pub fn pop_batch_and_process<F>(
346 &mut self,
347 port: usize,
348 nmax: usize,
349 node_policy: &NodePolicy,
350 mut f: F,
351 ) -> Result<StepResult, NodeError>
352 where
353 F: FnMut(&Message<InP>) -> Result<ProcessResult<OutP>, NodeError>,
354 {
355 debug_assert!(port < IN);
356
357 if nmax == 0 {
358 return Err(NodeError::execution_failed());
359 }
360
361 let requested_policy = {
363 let nb = *node_policy.batching();
364 BatchingPolicy::with_window(
365 nb.fixed_n().map(|f_n| core::cmp::min(f_n, nmax)),
366 *nb.max_delta_t(),
367 match nb.window_kind() {
368 WindowKind::Disjoint => WindowKind::Disjoint,
369 WindowKind::Sliding(sw) => {
370 let size = nb
371 .fixed_n()
372 .map(|f_n| core::cmp::min(f_n, nmax))
373 .unwrap_or(1);
374 let stride = core::cmp::min(*sw.stride(), size);
375 WindowKind::Sliding(SlidingWindow::new(stride))
376 }
377 },
378 )
379 };
380
381 let stride = match requested_policy.window_kind() {
383 WindowKind::Disjoint => usize::MAX,
384 WindowKind::Sliding(sw) => *sw.stride(),
385 };
386
387 let occ_before = self.inputs[port].occupancy(&self.in_policies[port]);
389
390 let batch =
392 match self.inputs[port].try_pop_batch(&requested_policy, &*self.in_managers[port]) {
393 Ok(b) => b,
394 Err(QueueError::Empty) => return Ok(StepResult::NoInput),
395 Err(QueueError::Backpressured) | Err(QueueError::AtOrAboveHardCap) => {
396 return Err(NodeError::backpressured());
397 }
398 Err(QueueError::Poisoned) => {
399 return Err(NodeError::execution_failed().with_code(1));
400 }
401 Err(QueueError::Unsupported) => {
402 return Err(NodeError::execution_failed().with_code(2));
403 }
404 };
405
406 let batch_len = batch.len();
407 if batch_len == 0 {
408 return Ok(StepResult::NoInput);
409 }
410 let actual_stride = core::cmp::min(stride, batch_len);
411
412 let in_mgr: &mut InM = &mut *self.in_managers[port];
414
415 for (idx, &token) in batch.as_slice().iter().enumerate() {
417 if idx < actual_stride {
418 if let Ok(mut wg) = in_mgr.read_mut(token) {
419 if idx == 0 {
420 wg.header_mut().set_first_in_batch();
421 }
422 if idx == batch_len - 1 || batch_len == 1 {
423 wg.header_mut().set_last_in_batch();
424 }
425 }
426 }
427 }
428
429 let iter =
432 BatchMessageIter::new(batch.as_slice().iter(), &*in_mgr, actual_stride, batch_len);
433
434 let out_policies = self.out_policies;
435 let out_edge_ids = self.out_edge_ids;
436 let node_id = self.node_id;
437 let clock = self.clock;
438 let telemetry: &mut T = &mut *self.telemetry;
439 let outputs = &mut self.outputs;
440 let out_managers = &mut self.out_managers;
441
442 let mut out = OutStepContext {
443 outputs,
444 out_managers,
445 out_policies,
446 out_edge_ids,
447 node_id,
448 clock,
449 telemetry,
450 _marker: core::marker::PhantomData,
451 };
452
453 let mut any_made = false;
454 let mut backpressured = false;
455 for guard in iter {
456 if backpressured {
457 drop(guard);
460 continue;
461 }
462 match f(&*guard)? {
463 ProcessResult::Output(out_msg) => {
464 drop(guard);
465 match out.out_try_push(0, out_msg) {
466 EnqueueResult::Enqueued => {
467 any_made = true;
468 }
469 EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
470 backpressured = true;
471 }
472 }
473 }
474 ProcessResult::Consumed => {
475 drop(guard);
476 any_made = true;
477 }
478 ProcessResult::Skip => {
479 drop(guard);
480 }
481 }
482 }
483
484 for (idx, &token) in batch.as_slice().iter().enumerate() {
486 if idx < actual_stride {
487 let _ = in_mgr.free(token);
488 }
489 }
490
491 if T::METRICS_ENABLED {
493 let telemetry = &mut *out.telemetry;
494 telemetry.incr_counter(
495 TelemetryKey::node(node_id, TelemetryKind::IngressMsgs),
496 actual_stride as u64,
497 );
498 let after_items = occ_before.items().saturating_sub(actual_stride);
499 telemetry.set_gauge(
500 TelemetryKey::edge(self.in_edge_ids[port], TelemetryKind::QueueDepth),
501 after_items as u64,
502 );
503 }
504
505 if backpressured {
506 Ok(StepResult::Backpressured)
507 } else if any_made {
508 Ok(StepResult::MadeProgress)
509 } else {
510 Ok(StepResult::NoInput)
511 }
512 }
513
514 #[inline]
516 pub fn in_occupancy(&mut self, i: usize) -> EdgeOccupancy {
517 debug_assert!(i < IN);
518 let occ = self.inputs[i].occupancy(&self.in_policies[i]);
519 if T::METRICS_ENABLED {
520 self.telemetry.set_gauge(
521 TelemetryKey::edge(self.in_edge_ids[i], TelemetryKind::QueueDepth),
522 *occ.items() as u64,
523 );
524 }
525 occ
526 }
527
528 #[inline]
530 pub fn in_policy(&mut self, i: usize) -> EdgePolicy {
531 debug_assert!(i < IN);
532 self.in_policies[i]
533 }
534
535 pub fn out_try_push(&mut self, o: usize, m: Message<OutP>) -> EnqueueResult {
545 debug_assert!(o < OUT);
546
547 let token = match self.out_managers[o].store(m) {
548 Ok(t) => t,
549 Err(_) => return EnqueueResult::Rejected,
550 };
551
552 let decision = self.outputs[o].get_admission_decision(
554 &self.out_policies[o],
555 token,
556 &*self.out_managers[o],
557 );
558 match decision {
559 AdmissionDecision::Evict(n) => {
560 for _ in 0..n {
561 match self.outputs[o].try_pop(&*self.out_managers[o]) {
562 Ok(evicted) => {
563 let _ = self.out_managers[o].free(evicted);
564 if T::METRICS_ENABLED {
565 self.telemetry.incr_counter(
566 TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
567 1,
568 );
569 }
570 }
571 Err(_) => break,
572 }
573 }
574 }
575 AdmissionDecision::EvictUntilBelowHard => loop {
576 let occ = self.outputs[o].occupancy(&self.out_policies[o]);
577 if !self.out_policies[o]
578 .caps
579 .at_or_above_hard(*occ.items(), *occ.bytes())
580 {
581 break;
582 }
583 match self.outputs[o].try_pop(&*self.out_managers[o]) {
584 Ok(evicted) => {
585 let _ = self.out_managers[o].free(evicted);
586 if T::METRICS_ENABLED {
587 self.telemetry.incr_counter(
588 TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
589 1,
590 );
591 }
592 }
593 Err(_) => break,
594 }
595 },
596 AdmissionDecision::DropNewest => {
597 let _ = self.out_managers[o].free(token);
598 if T::METRICS_ENABLED {
599 self.telemetry
600 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
601 }
602 return EnqueueResult::DroppedNewest;
603 }
604 AdmissionDecision::Reject | AdmissionDecision::Block => {
605 let _ = self.out_managers[o].free(token);
606 if T::METRICS_ENABLED {
607 self.telemetry
608 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
609 }
610 return EnqueueResult::Rejected;
611 }
612 AdmissionDecision::Admit => {}
613 }
614
615 match self.outputs[o].try_push(token, &self.out_policies[o], &*self.out_managers[o]) {
616 EnqueueResult::Enqueued => {
617 if T::METRICS_ENABLED {
618 self.telemetry.incr_counter(
619 TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
620 1,
621 );
622 let _ = self.out_occupancy(o);
623 }
624 EnqueueResult::Enqueued
625 }
626 EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
627 let _ = self.out_managers[o].free(token);
628 if T::METRICS_ENABLED {
629 self.telemetry
630 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
631 }
632 EnqueueResult::Rejected
633 }
634 }
635 }
636
637 pub fn push_output(
646 &mut self,
647 port: usize,
648 msg: Message<OutP>,
649 ) -> Result<StepResult, NodeError> {
650 debug_assert!(port < OUT);
651
652 let admission_decision =
653 self.outputs[port].get_admission_decision_from_message(&self.out_policies[port], &msg);
654
655 match admission_decision {
656 AdmissionDecision::Evict(eviction_count) => {
657 for _ in 0..eviction_count {
658 match self.outputs[port].try_pop(&*self.out_managers[port]) {
659 Ok(evicted_token) => {
660 let _ = self.out_managers[port].free(evicted_token);
661 if T::METRICS_ENABLED {
662 self.telemetry.incr_counter(
663 TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
664 1,
665 );
666 }
667 }
668 Err(_) => break,
669 }
670 }
671 }
672 AdmissionDecision::EvictUntilBelowHard => loop {
673 let occupancy = self.outputs[port].occupancy(&self.out_policies[port]);
674 if !self.out_policies[port]
675 .caps
676 .at_or_above_hard(*occupancy.items(), *occupancy.bytes())
677 {
678 break;
679 }
680
681 match self.outputs[port].try_pop(&*self.out_managers[port]) {
682 Ok(evicted_token) => {
683 let _ = self.out_managers[port].free(evicted_token);
684 if T::METRICS_ENABLED {
685 self.telemetry.incr_counter(
686 TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
687 1,
688 );
689 }
690 }
691 Err(_) => break,
692 }
693 },
694 AdmissionDecision::DropNewest
695 | AdmissionDecision::Reject
696 | AdmissionDecision::Block => {
697 if T::METRICS_ENABLED {
698 self.telemetry
699 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
700 }
701 return Ok(StepResult::Backpressured);
702 }
703 AdmissionDecision::Admit => {}
704 }
705
706 let token = match self.out_managers[port].store(msg) {
707 Ok(token) => token,
708 Err(_) => {
709 if T::METRICS_ENABLED {
710 self.telemetry
711 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
712 }
713 return Ok(StepResult::Backpressured);
714 }
715 };
716
717 match self.outputs[port].try_push(
718 token,
719 &self.out_policies[port],
720 &*self.out_managers[port],
721 ) {
722 EnqueueResult::Enqueued => {
723 if T::METRICS_ENABLED {
724 self.telemetry.incr_counter(
725 TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
726 1,
727 );
728 let _ = self.out_occupancy(port);
729 }
730 Ok(StepResult::MadeProgress)
731 }
732 EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
733 let _ = self.out_managers[port].free(token);
734 if T::METRICS_ENABLED {
735 self.telemetry
736 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
737 }
738 Ok(StepResult::Backpressured)
739 }
740 }
741 }
742
743 #[inline]
745 pub fn out_occupancy(&mut self, o: usize) -> EdgeOccupancy {
746 debug_assert!(o < OUT);
747 let occ = self.outputs[o].occupancy(&self.out_policies[o]);
748 if T::METRICS_ENABLED {
749 self.telemetry.set_gauge(
750 TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
751 *occ.items() as u64,
752 );
753 }
754 occ
755 }
756
757 #[inline]
759 pub fn out_policy(&mut self, i: usize) -> EdgePolicy {
760 debug_assert!(i < OUT);
761 self.out_policies[i]
762 }
763
764 #[inline]
770 pub fn clock(&self) -> &C {
771 self.clock
772 }
773
774 #[inline]
776 pub fn telemetry_mut(&mut self) -> &mut T {
777 self.telemetry
778 }
779
780 #[inline]
782 pub fn now_ticks(&self) -> Ticks {
783 self.clock.now_ticks()
784 }
785
786 #[inline]
788 pub fn now_nanos(&self) -> u64 {
789 self.clock.ticks_to_nanos(self.clock.now_ticks())
790 }
791
792 #[inline]
794 pub fn ticks_to_nanos(&self, t: Ticks) -> u64 {
795 self.clock.ticks_to_nanos(t)
796 }
797
798 #[inline]
800 pub fn nanos_to_ticks(&self, ns: u64) -> Ticks {
801 self.clock.nanos_to_ticks(ns)
802 }
803
804 #[inline]
813 pub fn input_edge_has_batch(&mut self, port: usize, policy: &NodePolicy) -> bool {
814 debug_assert!(port < IN);
815
816 let occ = self.in_occupancy(port);
817 if occ.items() == &0 {
818 return false;
819 }
820
821 let fixed_opt = *policy.batching().fixed_n();
822 let delta_opt = *policy.batching().max_delta_t();
823
824 match (fixed_opt, delta_opt) {
825 (Some(fixed_n), None) => *occ.items() >= fixed_n,
826 (None, Some(_max_delta_t)) => true,
827 (Some(fixed_n), Some(max_delta_t)) => {
828 if *occ.items() < fixed_n {
829 return false;
830 }
831 let first_token = match self.inputs[port].try_peek_at(0) {
833 Ok(t) => t,
834 Err(_) => return false,
835 };
836 let last_token = match self.inputs[port].try_peek_at(fixed_n - 1) {
837 Ok(t) => t,
838 Err(_) => return false,
839 };
840
841 let first_ticks = match self.in_managers[port].peek_header(first_token) {
842 Ok(h) => *h.creation_tick(),
843 Err(_) => return false,
844 };
845 let last_ticks = match self.in_managers[port].peek_header(last_token) {
846 Ok(h) => *h.creation_tick(),
847 Err(_) => return false,
848 };
849
850 let span = last_ticks.saturating_sub(first_ticks);
851 span <= max_delta_t
852 }
853 (None, None) => true,
854 }
855 }
856
857 #[allow(dead_code)]
860 #[inline]
861 fn as_out_step_context<'ctx>(
862 &'ctx mut self,
863 ) -> OutStepContext<'graph, 'ctx, 'clock, OUT, OutP, OutQ, OutM, C, T>
864 where
865 EdgePolicy: Copy,
866 {
867 let out_policies = self.out_policies;
868 let out_edge_ids = self.out_edge_ids;
869 let node_id = self.node_id;
870 let clock = self.clock;
871 let telemetry = &mut *self.telemetry;
872 let outputs: &'ctx mut [&'graph mut OutQ; OUT] = &mut self.outputs;
873 let out_managers: &'ctx mut [&'graph mut OutM; OUT] = &mut self.out_managers;
874
875 OutStepContext {
876 outputs,
877 out_managers,
878 out_policies,
879 out_edge_ids,
880 node_id,
881 clock,
882 telemetry,
883 _marker: core::marker::PhantomData,
884 }
885 }
886}
887
888pub struct OutStepContext<'graph, 'ctx, 'clock, const OUT: usize, OutP, OutQ, OutM, C, T>
892where
893 OutP: Payload,
894 C: PlatformClock + Sized,
895 T: Telemetry + Sized,
896{
897 outputs: &'ctx mut [&'graph mut OutQ; OUT],
899 out_managers: &'ctx mut [&'graph mut OutM; OUT],
901 out_policies: [EdgePolicy; OUT],
903 out_edge_ids: [u32; OUT],
905 node_id: u32,
907 clock: &'clock C,
909 telemetry: &'ctx mut T,
911 _marker: core::marker::PhantomData<OutP>,
913}
914
915impl<'graph, 'ctx, 'clock, const OUT: usize, OutP, OutQ, OutM, C, T>
916 OutStepContext<'graph, 'ctx, 'clock, OUT, OutP, OutQ, OutM, C, T>
917where
918 OutP: Payload,
919 OutQ: Edge,
920 OutM: MemoryManager<OutP>,
921 C: PlatformClock + Sized,
922 T: Telemetry + Sized,
923{
924 pub fn out_try_push(&mut self, o: usize, m: Message<OutP>) -> EnqueueResult {
929 debug_assert!(o < OUT);
930
931 let token = match self.out_managers[o].store(m) {
932 Ok(t) => t,
933 Err(_) => return EnqueueResult::Rejected,
934 };
935
936 let decision = self.outputs[o].get_admission_decision(
938 &self.out_policies[o],
939 token,
940 &*self.out_managers[o],
941 );
942 match decision {
943 AdmissionDecision::Evict(n) => {
944 for _ in 0..n {
945 match self.outputs[o].try_pop(&*self.out_managers[o]) {
946 Ok(evicted) => {
947 let _ = self.out_managers[o].free(evicted);
948 if T::METRICS_ENABLED {
949 self.telemetry.incr_counter(
950 TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
951 1,
952 );
953 }
954 }
955 Err(_) => break,
956 }
957 }
958 }
959 AdmissionDecision::EvictUntilBelowHard => loop {
960 let occ = self.outputs[o].occupancy(&self.out_policies[o]);
961 if !self.out_policies[o]
962 .caps
963 .at_or_above_hard(*occ.items(), *occ.bytes())
964 {
965 break;
966 }
967 match self.outputs[o].try_pop(&*self.out_managers[o]) {
968 Ok(evicted) => {
969 let _ = self.out_managers[o].free(evicted);
970 if T::METRICS_ENABLED {
971 self.telemetry.incr_counter(
972 TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
973 1,
974 );
975 }
976 }
977 Err(_) => break,
978 }
979 },
980 AdmissionDecision::DropNewest => {
981 let _ = self.out_managers[o].free(token);
982 if T::METRICS_ENABLED {
983 self.telemetry
984 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
985 }
986 return EnqueueResult::DroppedNewest;
987 }
988 AdmissionDecision::Reject | AdmissionDecision::Block => {
989 let _ = self.out_managers[o].free(token);
990 if T::METRICS_ENABLED {
991 self.telemetry
992 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
993 }
994 return EnqueueResult::Rejected;
995 }
996 AdmissionDecision::Admit => {}
997 }
998
999 match self.outputs[o].try_push(token, &self.out_policies[o], &*self.out_managers[o]) {
1000 EnqueueResult::Enqueued => {
1001 if T::METRICS_ENABLED {
1002 self.telemetry.incr_counter(
1003 TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
1004 1,
1005 );
1006 let occ = self.outputs[o].occupancy(&self.out_policies[o]);
1007 self.telemetry.set_gauge(
1008 TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
1009 *occ.items() as u64,
1010 );
1011 }
1012 EnqueueResult::Enqueued
1013 }
1014 EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
1015 let _ = self.out_managers[o].free(token);
1016 if T::METRICS_ENABLED {
1017 self.telemetry
1018 .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
1019 }
1020 EnqueueResult::Rejected
1021 }
1022 }
1023 }
1024
1025 #[inline]
1027 pub fn out_occupancy(&mut self, o: usize) -> EdgeOccupancy {
1028 debug_assert!(o < OUT);
1029 let occ = self.outputs[o].occupancy(&self.out_policies[o]);
1030 if T::METRICS_ENABLED {
1031 self.telemetry.set_gauge(
1032 TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
1033 *occ.items() as u64,
1034 );
1035 }
1036 occ
1037 }
1038
1039 #[inline]
1041 pub fn out_policy(&mut self, o: usize) -> EdgePolicy {
1042 debug_assert!(o < OUT);
1043 self.out_policies[o]
1044 }
1045
1046 #[inline]
1048 pub fn clock(&self) -> &C {
1049 self.clock
1050 }
1051
1052 #[inline]
1054 pub fn telemetry_mut(&mut self) -> &mut T {
1055 self.telemetry
1056 }
1057
1058 #[inline]
1060 pub fn now_ticks(&self) -> Ticks {
1061 self.clock.now_ticks()
1062 }
1063
1064 #[inline]
1066 pub fn now_nanos(&self) -> u64 {
1067 self.clock.ticks_to_nanos(self.clock.now_ticks())
1068 }
1069
1070 #[inline]
1072 pub fn ticks_to_nanos(&self, t: Ticks) -> u64 {
1073 self.clock.ticks_to_nanos(t)
1074 }
1075
1076 #[inline]
1078 pub fn nanos_to_ticks(&self, ns: u64) -> Ticks {
1079 self.clock.nanos_to_ticks(ns)
1080 }
1081}
1082
1083pub trait Node<const IN: usize, const OUT: usize, InP, OutP>
1093where
1094 InP: Payload,
1095 OutP: Payload,
1096{
1097 fn describe_capabilities(&self) -> NodeCapabilities;
1099
1100 fn input_acceptance(&self) -> [PlacementAcceptance; IN];
1102
1103 fn output_acceptance(&self) -> [PlacementAcceptance; OUT];
1105
1106 fn policy(&self) -> NodePolicy;
1108
1109 #[cfg(any(test, feature = "bench"))]
1111 fn set_policy(&mut self, policy: NodePolicy);
1112
1113 fn node_kind(&self) -> NodeKind;
1115
1116 fn initialize<C, Tel>(&mut self, clock: &C, telemetry: &mut Tel) -> Result<(), NodeError>
1118 where
1119 Tel: Telemetry;
1120
1121 fn start<C, Tel>(&mut self, _clock: &C, _telemetry: &mut Tel) -> Result<(), NodeError>
1123 where
1124 Tel: Telemetry;
1125
1126 fn process_message<C>(
1133 &mut self,
1134 msg: &Message<InP>,
1135 sys_clock: &C,
1136 ) -> Result<ProcessResult<OutP>, NodeError>
1137 where
1138 C: PlatformClock + Sized;
1139
1140 fn step<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
1147 &mut self,
1148 ctx: &mut StepContext<
1149 'graph,
1150 'telemetry,
1151 'clock,
1152 IN,
1153 OUT,
1154 InP,
1155 OutP,
1156 InQ,
1157 OutQ,
1158 InM,
1159 OutM,
1160 C,
1161 Tel,
1162 >,
1163 ) -> Result<StepResult, NodeError>
1164 where
1165 InQ: Edge,
1166 OutQ: Edge,
1167 InM: MemoryManager<InP>,
1168 OutM: MemoryManager<OutP>,
1169 C: PlatformClock + Sized,
1170 Tel: Telemetry + Sized,
1171 {
1172 let node_policy = self.policy();
1173 let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
1174 Some(p) => p,
1175 None => return Ok(StepResult::NoInput),
1176 };
1177
1178 ctx.pop_and_process(port, |msg| self.process_message(msg, ctx.clock))
1179 }
1180
1181 fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
1185 &mut self,
1186 ctx: &mut StepContext<
1187 'graph,
1188 'telemetry,
1189 'clock,
1190 IN,
1191 OUT,
1192 InP,
1193 OutP,
1194 InQ,
1195 OutQ,
1196 InM,
1197 OutM,
1198 C,
1199 Tel,
1200 >,
1201 ) -> Result<StepResult, NodeError>
1202 where
1203 InQ: Edge,
1204 OutQ: Edge,
1205 InM: MemoryManager<InP>,
1206 OutM: MemoryManager<OutP>,
1207 C: PlatformClock + Sized,
1208 Tel: Telemetry + Sized,
1209 {
1210 let node_policy = self.policy();
1211 let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
1212 Some(p) => p,
1213 None => return Ok(StepResult::NoInput),
1214 };
1215 let nmax = node_policy.batching().fixed_n().unwrap_or(1);
1216
1217 ctx.pop_batch_and_process(port, nmax, &node_policy, |msg| {
1218 self.process_message(msg, ctx.clock)
1219 })
1220 }
1221
1222 fn on_watchdog_timeout<C, Tel>(
1224 &mut self,
1225 _clock: &C,
1226 _telemetry: &mut Tel,
1227 ) -> Result<StepResult, NodeError>
1228 where
1229 C: PlatformClock + Sized,
1230 Tel: Telemetry;
1231
1232 fn stop<C, Tel>(&mut self, _clock: &C, _telemetry: &mut Tel) -> Result<(), NodeError>
1234 where
1235 Tel: Telemetry;
1236}