1use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
21use crate::errors::NodeError;
22use crate::errors::QueueError;
23use crate::memory::PlacementAcceptance;
24use crate::message::{payload::Payload, Message};
25use crate::node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult};
26use crate::policy::{BatchingPolicy, EdgePolicy, NodePolicy, WatermarkState};
27use crate::prelude::{
28 BatchView, EdgeDescriptor, HeaderStore, MemoryManager, PlatformClock, Telemetry,
29};
30use crate::types::{EdgeIndex, MessageToken, NodeIndex, PortId};
31
32use core::marker::PhantomData;
33
34pub const EXTERNAL_INGRESS_NODE: NodeIndex = NodeIndex::new(usize::MAX);
36
37pub trait Source<OutP, const OUT: usize>
47where
48 OutP: Payload,
49{
50 type Error;
52
53 fn open(&mut self) -> Result<(), Self::Error>;
58
59 fn try_produce(&mut self) -> Option<(usize, Message<OutP>)>;
67
68 fn ingress_occupancy(&self) -> EdgeOccupancy;
76
77 fn peek_ingress_creation_tick(&self, item_index: usize) -> Option<u64>;
81
82 fn output_acceptance(&self) -> [PlacementAcceptance; OUT];
84
85 fn capabilities(&self) -> NodeCapabilities;
87
88 #[inline]
93 fn into_sourcenode(self, policy: NodePolicy) -> SourceNode<Self, OutP, OUT>
94 where
95 Self: Sized,
96 {
97 SourceNode::new(self, policy)
98 }
99
100 fn policy(&self) -> NodePolicy;
102
103 fn ingress_policy(&self) -> EdgePolicy;
105}
106
107pub struct SourceNode<S, OutP, const OUT: usize>
113where
114 S: Source<OutP, OUT>,
115 OutP: Payload,
116{
117 src: S,
119 policy: NodePolicy,
121 _pd: PhantomData<OutP>,
123}
124
125impl<S, OutP, const OUT: usize> From<S> for SourceNode<S, OutP, OUT>
127where
128 S: Source<OutP, OUT>,
129 OutP: Payload,
130{
131 #[inline]
132 fn from(src: S) -> Self {
133 let policy = src.policy();
134 SourceNode::new(src, policy)
135 }
136}
137
138impl<S, OutP, const OUT: usize> SourceNode<S, OutP, OUT>
139where
140 S: Source<OutP, OUT>,
141 OutP: Payload,
142{
143 #[inline]
145 pub const fn new(src: S, policy: NodePolicy) -> Self {
146 Self {
147 src,
148 policy,
149 _pd: PhantomData,
150 }
151 }
152
153 #[inline]
155 pub fn source_ref(&self) -> &S {
156 &self.src
157 }
158
159 #[inline]
161 pub fn source_mut(&mut self) -> &mut S {
162 &mut self.src
163 }
164
165 #[inline]
172 pub fn ingress_edge_has_batch(&self) -> bool {
173 let ingress_occ = self.source_ref().ingress_occupancy();
175 if *ingress_occ.items() == 0 {
176 return false;
177 }
178
179 let policy = self.policy.batching();
180
181 let fixed_opt = *policy.fixed_n();
182 let delta_opt = *policy.max_delta_t();
183
184 match (fixed_opt, delta_opt) {
185 (Some(fixed_n), None) => *ingress_occ.items() >= fixed_n,
186 (None, Some(_max_delta_t)) => {
187 true
189 }
190 (Some(fixed_n), Some(max_delta_t)) => {
191 if *ingress_occ.items() < fixed_n {
193 return false;
194 }
195
196 let first_tick_opt = self.src.peek_ingress_creation_tick(0);
199 let last_tick_opt = self
200 .src
201 .peek_ingress_creation_tick(fixed_n.saturating_sub(1));
202
203 match (first_tick_opt, last_tick_opt) {
204 (Some(first_ticks), Some(last_ticks)) => {
205 let span = last_ticks.saturating_sub(first_ticks);
206 span <= *max_delta_t.as_u64()
207 }
208 _ => false,
209 }
210 }
211 (None, None) => {
212 true
214 }
215 }
216 }
217}
218
219impl<S, OutP, const OUT: usize> Node<0, OUT, (), OutP> for SourceNode<S, OutP, OUT>
220where
221 S: Source<OutP, OUT>,
222 OutP: Payload + Copy,
223{
224 #[inline]
225 fn describe_capabilities(&self) -> NodeCapabilities {
226 self.src.capabilities()
227 }
228
229 #[inline]
230 fn input_acceptance(&self) -> [PlacementAcceptance; 0] {
231 []
232 }
233
234 #[inline]
235 fn output_acceptance(&self) -> [PlacementAcceptance; OUT] {
236 self.src.output_acceptance()
237 }
238
239 #[inline]
240 fn policy(&self) -> NodePolicy {
241 self.policy
242 }
243
244 #[cfg(any(test, feature = "bench"))]
246 fn set_policy(&mut self, policy: NodePolicy) {
247 self.policy = policy;
248 }
249
250 #[inline]
251 fn node_kind(&self) -> NodeKind {
252 NodeKind::Source
253 }
254
255 #[inline]
256 fn initialize<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
257 where
258 T: Telemetry,
259 {
260 self.src
261 .open()
262 .map_err(|_| NodeError::external_unavailable())
263 }
264
265 #[inline]
266 fn start<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
267 where
268 T: Telemetry,
269 {
270 Ok(())
271 }
272
273 #[inline]
274 fn process_message<C>(
275 &mut self,
276 _msg: &Message<()>,
277 _sys_clock: &C,
278 ) -> Result<ProcessResult<OutP>, NodeError>
279 where
280 C: PlatformClock + Sized,
281 {
282 if let Some((_port, msg)) = self.src.try_produce() {
283 Ok(ProcessResult::Output(msg))
284 } else {
285 Err(NodeError::no_input())
286 }
287 }
288
289 #[inline]
290 fn step<'g, 't, 'ck, InQ, OutQ, InM, OutM, C, Tel>(
291 &mut self,
292 ctx: &mut StepContext<'g, 't, 'ck, 0, OUT, (), OutP, InQ, OutQ, InM, OutM, C, Tel>,
293 ) -> Result<StepResult, NodeError>
294 where
295 InQ: Edge,
296 OutQ: Edge,
297 InM: MemoryManager<()>,
298 OutM: MemoryManager<OutP>,
299 C: PlatformClock + Sized,
300 Tel: Telemetry + Sized,
301 {
302 if let Some((port, msg)) = self.src.try_produce() {
303 ctx.push_output(port, msg)
304 } else {
305 Ok(StepResult::NoInput)
306 }
307 }
308
309 fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
310 &mut self,
311 ctx: &mut StepContext<
312 'graph,
313 'telemetry,
314 'clock,
315 0,
316 OUT,
317 (),
318 OutP,
319 InQ,
320 OutQ,
321 InM,
322 OutM,
323 C,
324 Tel,
325 >,
326 ) -> Result<StepResult, NodeError>
327 where
328 InQ: Edge,
329 OutQ: Edge,
330 InM: MemoryManager<()>,
331 OutM: MemoryManager<OutP>,
332 C: PlatformClock + Sized,
333 Tel: Telemetry + Sized,
334 {
335 let ingress_occ = self.source_ref().ingress_occupancy();
336 if *ingress_occ.items() == 0 {
337 return Ok(StepResult::NoInput);
338 }
339
340 let policy = self.policy();
341
342 let fixed_opt = *policy.batching().fixed_n();
343 let delta_opt = *policy.batching().max_delta_t();
344
345 let has_batch = match (fixed_opt, delta_opt) {
346 (Some(fixed_n), None) => *ingress_occ.items() >= fixed_n,
347 (None, Some(_max_delta_t)) => {
348 true
351 }
352 (Some(fixed_n), Some(max_delta_t)) => {
353 if *ingress_occ.items() < fixed_n {
355 false
356 } else {
357 let first_tick_opt = self.src.peek_ingress_creation_tick(0);
358 let last_tick_opt = self
359 .src
360 .peek_ingress_creation_tick(fixed_n.saturating_sub(1));
361
362 match (first_tick_opt, last_tick_opt) {
363 (Some(first_ticks), Some(last_ticks)) => {
364 let span = last_ticks.saturating_sub(first_ticks);
365 span <= *max_delta_t.as_u64()
366 }
367 _ => false,
368 }
369 }
370 }
371 (None, None) => {
372 true
374 }
375 };
376
377 if !has_batch {
378 return Ok(StepResult::NoInput);
379 }
380
381 let batch_n: usize = fixed_opt.unwrap_or(1);
382
383 let mut made_progress = false;
384
385 for _ in 0..batch_n {
386 match self.src.try_produce() {
387 Some((port, msg)) => match ctx.push_output(port, msg) {
388 Ok(StepResult::MadeProgress) => {
389 made_progress = true;
390 }
391 Ok(StepResult::Backpressured) | Err(_) => {
392 return Ok(StepResult::Backpressured);
393 }
394 Ok(_) => {}
395 },
396 None => {
397 break;
398 }
399 }
400 }
401
402 if made_progress {
403 Ok(StepResult::MadeProgress)
404 } else {
405 Ok(StepResult::NoInput)
406 }
407 }
408
409 #[inline]
410 fn on_watchdog_timeout<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<StepResult, NodeError>
411 where
412 Tel: Telemetry,
413 {
414 Ok(StepResult::WaitingOnExternal)
415 }
416
417 #[inline]
418 fn stop<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<(), NodeError>
419 where
420 Tel: Telemetry,
421 {
422 Ok(())
423 }
424}
425
426pub struct SourceIngressEdge<'src, OutP, S, const OUT: usize>
434where
435 OutP: Payload,
436 S: Source<OutP, OUT> + ?Sized,
437{
438 src: &'src S,
440 _pd: PhantomData<OutP>,
442}
443
444impl<'src, OutP, S, const OUT: usize> SourceIngressEdge<'src, OutP, S, OUT>
445where
446 OutP: Payload,
447 S: Source<OutP, OUT> + ?Sized,
448{
449 #[inline]
451 pub const fn new(src: &'src S) -> Self {
452 Self {
453 src,
454 _pd: PhantomData,
455 }
456 }
457}
458
459impl<'src, OutP, S, const OUT: usize> Edge for SourceIngressEdge<'src, OutP, S, OUT>
460where
461 OutP: Payload,
462 S: Source<OutP, OUT> + ?Sized,
463{
464 #[inline]
465 fn try_push<H: HeaderStore>(
466 &mut self,
467 _token: MessageToken,
468 _policy: &EdgePolicy,
469 _headers: &H,
470 ) -> EnqueueResult {
471 EnqueueResult::Rejected
472 }
473
474 #[inline]
475 fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
476 Err(QueueError::Empty)
477 }
478
479 #[inline]
480 fn try_peek(&self) -> Result<MessageToken, QueueError> {
481 Err(QueueError::Empty)
482 }
483
484 #[inline]
485 fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
486 Err(QueueError::Empty)
487 }
488
489 #[inline]
490 fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
491 self.src.ingress_occupancy()
492 }
493
494 #[inline]
495 fn is_empty(&self) -> bool {
496 *self.src.ingress_occupancy().items() == 0
497 }
498
499 #[inline]
500 fn try_pop_batch<H: HeaderStore>(
501 &mut self,
502 _policy: &BatchingPolicy,
503 _headers: &H,
504 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
505 Err(QueueError::Empty)
506 }
507}
508
509pub struct IngressEdgeLink<'src, OutP, S, const OUT: usize>
514where
515 OutP: Payload,
516 S: Source<OutP, OUT> + ?Sized,
517{
518 edge: SourceIngressEdge<'src, OutP, S, OUT>,
519 id: EdgeIndex,
520 upstream: PortId,
521 downstream: PortId,
522 policy: EdgePolicy,
523 name: Option<&'static str>,
524}
525
526impl<'src, OutP, S, const OUT: usize> IngressEdgeLink<'src, OutP, S, OUT>
527where
528 OutP: Payload,
529 S: Source<OutP, OUT> + ?Sized,
530{
531 #[inline]
533 pub const fn from_source(
534 src: &'src S,
535 id: EdgeIndex,
536 upstream: PortId,
537 downstream: PortId,
538 policy: EdgePolicy,
539 name: Option<&'static str>,
540 ) -> Self {
541 Self {
542 edge: SourceIngressEdge::new(src),
543 id,
544 upstream,
545 downstream,
546 policy,
547 name,
548 }
549 }
550
551 #[inline]
553 pub fn descriptor(&self) -> EdgeDescriptor {
554 EdgeDescriptor::new(self.id, self.upstream, self.downstream, self.name)
555 }
556
557 #[inline]
559 pub fn policy(&self) -> EdgePolicy {
560 self.policy
561 }
562
563 #[inline]
565 pub fn inner(&self) -> &SourceIngressEdge<'src, OutP, S, OUT> {
566 &self.edge
567 }
568}
569
570impl<'s, OutP, S, const OUT: usize> Edge for IngressEdgeLink<'s, OutP, S, OUT>
571where
572 OutP: Payload,
573 S: Source<OutP, OUT> + ?Sized,
574{
575 #[inline]
576 fn try_push<H: HeaderStore>(
577 &mut self,
578 _token: MessageToken,
579 _policy: &EdgePolicy,
580 _headers: &H,
581 ) -> EnqueueResult {
582 EnqueueResult::Rejected
583 }
584 #[inline]
585 fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
586 Err(QueueError::Empty)
587 }
588 #[inline]
589 fn try_peek(&self) -> Result<MessageToken, QueueError> {
590 Err(QueueError::Empty)
591 }
592 #[inline]
593 fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
594 Err(QueueError::Empty)
595 }
596 #[inline]
597 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
598 self.edge.occupancy(policy)
600 }
601 #[inline]
602 fn is_empty(&self) -> bool {
603 self.edge.is_empty()
604 }
605 #[inline]
606 fn try_pop_batch<H: HeaderStore>(
607 &mut self,
608 _policy: &BatchingPolicy,
609 _headers: &H,
610 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
611 Err(QueueError::Empty)
612 }
613}
614
615pub trait IngressProbe: Send {
621 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy;
624}
625
626pub struct NoProbe;
628
629impl IngressProbe for NoProbe {
630 #[inline]
631 fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
632 EdgeOccupancy::new(0, 0, WatermarkState::BelowSoft)
633 }
634}
635
636#[cfg(feature = "std")]
641pub type IngressProbeImpl = probe::SourceIngressProbe;
642
643#[cfg(not(feature = "std"))]
648pub type IngressProbeImpl = NoProbe;
649
650#[cfg(feature = "std")]
652pub mod probe {
653 use super::*;
654 use core::sync::atomic::{AtomicUsize, Ordering};
655 use std::sync::Arc;
656
657 #[derive(Clone, Debug)]
659 pub struct SourceIngressProbe {
660 items: Arc<AtomicUsize>,
661 bytes: Arc<AtomicUsize>,
662 }
663
664 impl SourceIngressProbe {
665 #[inline]
667 pub fn new() -> Self {
668 Self {
669 items: Arc::new(AtomicUsize::new(0)),
670 bytes: Arc::new(AtomicUsize::new(0)),
671 }
672 }
673
674 #[inline]
676 pub fn set_items(&self, n: usize) {
677 self.items.store(n, Ordering::Relaxed);
678 }
679
680 #[inline]
682 pub fn set_bytes(&self, b: usize) {
683 self.bytes.store(b, Ordering::Relaxed);
684 }
685
686 #[inline]
689 pub fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
690 let items = self.items.load(Ordering::Relaxed);
691 let bytes = self.bytes.load(Ordering::Relaxed);
692 EdgeOccupancy::new(items, bytes, policy.watermark(items, bytes))
693 }
694 }
695
696 impl Default for SourceIngressProbe {
697 fn default() -> Self {
698 Self {
699 items: Arc::new(AtomicUsize::new(0)),
700 bytes: Arc::new(AtomicUsize::new(0)),
701 }
702 }
703 }
704
705 impl super::IngressProbe for SourceIngressProbe {
706 #[inline]
707 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
708 SourceIngressProbe::occupancy(self, policy)
709 }
710 }
711
712 #[derive(Debug, Clone)]
714 pub struct SourceIngressProbeEdge<P: Payload> {
715 probe: SourceIngressProbe,
716 _pd: PhantomData<P>,
717 }
718
719 impl<P: Payload> SourceIngressProbeEdge<P> {
720 #[inline]
722 pub fn new(probe: SourceIngressProbe) -> Self {
723 Self {
724 probe,
725 _pd: PhantomData,
726 }
727 }
728
729 #[inline]
731 pub fn inner(&self) -> &SourceIngressProbe {
732 &self.probe
733 }
734 }
735
736 impl<P: Payload> Edge for SourceIngressProbeEdge<P> {
737 #[inline]
738 fn try_push<H: HeaderStore>(
739 &mut self,
740 _token: MessageToken,
741 _policy: &EdgePolicy,
742 _headers: &H,
743 ) -> EnqueueResult {
744 EnqueueResult::Rejected
745 }
746
747 #[inline]
748 fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
749 Err(QueueError::Empty)
750 }
751
752 #[inline]
753 fn try_peek(&self) -> Result<MessageToken, QueueError> {
754 Err(QueueError::Empty)
755 }
756
757 #[inline]
758 fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
759 Err(QueueError::Empty)
760 }
761
762 #[inline]
763 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
764 self.probe.occupancy(policy)
765 }
766
767 #[inline]
768 fn is_empty(&self) -> bool {
769 self.probe.items.load(core::sync::atomic::Ordering::Relaxed) == 0
770 }
771
772 #[inline]
773 fn try_pop_batch<H: HeaderStore>(
774 &mut self,
775 _policy: &BatchingPolicy,
776 _headers: &H,
777 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
778 Err(QueueError::Empty)
779 }
780 }
781
782 #[derive(Clone)]
784 pub struct SourceIngressUpdater {
785 probe: SourceIngressProbe,
786 }
787
788 impl SourceIngressUpdater {
789 #[inline]
791 pub fn new(probe: SourceIngressProbe) -> Self {
792 Self { probe }
793 }
794
795 #[inline]
797 pub fn update(&self, items: usize, bytes: usize) {
798 self.probe.set_items(items);
799 self.probe.set_bytes(bytes);
800 }
801 }
802
803 #[inline]
805 pub fn new_probe_edge_pair<P: Payload>() -> (SourceIngressProbeEdge<P>, SourceIngressUpdater) {
806 let probe = SourceIngressProbe::new();
807 let edge = SourceIngressProbeEdge::<P>::new(probe.clone());
808 let updater = SourceIngressUpdater::new(probe);
809 (edge, updater)
810 }
811
812 #[inline]
814 pub fn new_probe_pair() -> (SourceIngressProbe, SourceIngressUpdater) {
815 let p = SourceIngressProbe::new();
816 (p.clone(), SourceIngressUpdater::new(p))
817 }
818
819 #[derive(Debug)]
821 pub struct ConcurrentIngressEdgeLink<OutP: Payload> {
822 edge: SourceIngressProbeEdge<OutP>,
823 id: EdgeIndex,
824 upstream: PortId,
825 downstream: PortId,
826 policy: EdgePolicy,
827 name: Option<&'static str>,
828 }
829
830 impl<OutP: Payload> ConcurrentIngressEdgeLink<OutP> {
831 #[inline]
834 pub fn from_probe(
835 probe_edge: SourceIngressProbeEdge<OutP>,
836 id: EdgeIndex,
837 upstream: PortId,
838 downstream: PortId,
839 policy: EdgePolicy,
840 name: Option<&'static str>,
841 ) -> Self {
842 Self {
843 edge: probe_edge,
844 id,
845 upstream,
846 downstream,
847 policy,
848 name,
849 }
850 }
851
852 #[inline]
854 pub fn descriptor(&self) -> EdgeDescriptor {
855 EdgeDescriptor::new(self.id, self.upstream, self.downstream, self.name)
856 }
857
858 #[inline]
860 pub fn policy(&self) -> EdgePolicy {
861 self.policy
862 }
863
864 #[inline]
866 pub fn inner(&self) -> &SourceIngressProbeEdge<OutP> {
867 &self.edge
868 }
869
870 #[inline]
872 pub fn inner_mut(&mut self) -> &mut SourceIngressProbeEdge<OutP> {
873 &mut self.edge
874 }
875 }
876
877 impl<OutP: Payload> Edge for ConcurrentIngressEdgeLink<OutP> {
878 #[inline]
879 fn try_push<H: HeaderStore>(
880 &mut self,
881 _token: MessageToken,
882 _policy: &EdgePolicy,
883 _headers: &H,
884 ) -> EnqueueResult {
885 EnqueueResult::Rejected
886 }
887 #[inline]
888 fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
889 Err(QueueError::Empty)
890 }
891 #[inline]
892 fn try_peek(&self) -> Result<MessageToken, QueueError> {
893 Err(QueueError::Empty)
894 }
895 #[inline]
896 fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
897 Err(QueueError::Empty)
898 }
899 #[inline]
900 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
901 self.edge.occupancy(policy)
902 }
903 #[inline]
904 fn is_empty(&self) -> bool {
905 self.edge.is_empty()
906 }
907 #[inline]
908 fn try_pop_batch<H: HeaderStore>(
909 &mut self,
910 _policy: &BatchingPolicy,
911 _headers: &H,
912 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
913 Err(QueueError::Empty)
914 }
915 }
916}