Skip to main content

limen_core/node/
source.rs

1//! Source node traits and adapters.
2//!
3//! This module defines a minimal `Source` trait and a `SourceNode` adapter that
4//! plugs a source into the existing `Node` and `Edge` contracts without changing
5//! any runtime or graph APIs. It also includes:
6//! //! - `SourceIngressEdge`: a borrowing, no-alloc adapter that exposes **ingress
7//!   pressure** (items/bytes before the source) as an `Edge` so that the graph
8//!   and runtimes can uniformly sample it with their existing occupancy code.
9//! - `IngressProbe` / `NoProbe` / `IngressProbeImpl`: platform-agnostic ingress
10//!   pressure observer — zero-cost `NoProbe` by default; replace for real occupancy
11//!
12//! ### Design notes
13//! * A `Source` has **no input ports** and one or more **output ports**. It can
14//!   produce at most one message per `step()` via `try_produce()`.
15//! * Ingress pressure is surfaced via `ingress_occupancy()`, which the graph
16//!   exposes as a synthetic "monitor edge" using `SourceIngressEdge` (no_std)
17//!   or `probe::SourceIngressProbeEdge` (std). Runtimes keep using
18//!   `GraphApi::(edge_)occupancy` without any special-case code.
19
20use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
21use crate::errors::NodeError;
22use crate::errors::QueueError;
23use crate::memory::PlacementAcceptance;
24use crate::message::{payload::Payload, Message};
25use crate::node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult};
26use crate::policy::{BatchingPolicy, EdgePolicy, NodePolicy, WatermarkState};
27use crate::prelude::{
28    BatchView, EdgeDescriptor, HeaderStore, MemoryManager, PlatformClock, Telemetry,
29};
30use crate::types::{EdgeIndex, MessageToken, NodeIndex, PortId};
31
32use core::marker::PhantomData;
33
34/// Reserved node index used for virtual input nodes.
35pub const EXTERNAL_INGRESS_NODE: NodeIndex = NodeIndex::new(usize::MAX);
36
37/// Uniform contract for source implementations (0 inputs / ≥1 outputs).
38///
39/// `Source` types produce messages for downstream nodes and report **ingress
40/// pressure** (items/bytes *before* the source, e.g., device FIFO depth),
41/// allowing schedulers to decide when to poll the source.
42///
43/// # Type Parameters
44/// * `OutP` — Payload type for produced messages.
45/// * `OUT`  — Number of output ports on the source node.
46pub trait Source<OutP, const OUT: usize>
47where
48    OutP: Payload,
49{
50    /// Source-specific error type for `open()`.
51    type Error;
52
53    /// Prepare the source for production (e.g., open device, init driver).
54    ///
55    /// Called from `Node::initialize`. Must be idempotent or fail safely if
56    /// called multiple times by a higher layer.
57    fn open(&mut self) -> Result<(), Self::Error>;
58
59    /// Attempt to produce **exactly one** `(port, message)` pair.
60    ///
61    /// Return `None` if there is nothing to produce *right now*.
62    ///
63    /// # Contract
64    /// * Must be **non-blocking**.
65    /// * The returned `port` must be `< OUT`.
66    fn try_produce(&mut self) -> Option<(usize, Message<OutP>)>;
67
68    /// Report **ingress pressure** (items/bytes before the source).
69    ///
70    /// Implementations should be **non-blocking** and may read hardware
71    /// counters, driver FIFOs, ring buffer lengths, or cached snapshots.
72    ///
73    /// `policy` is provided so implementations can compute a consistent
74    /// `EdgeOccupancy.watermark` using the same thresholds as real edges.
75    fn ingress_occupancy(&self) -> EdgeOccupancy;
76
77    /// Return the creation tick of the `index`'th ingress item (0-based) without
78    /// dequeuing it. Implementations must be non-blocking and non-destructive.
79    /// Return `None` if metadata is unavailable or `index` is out-of-range.
80    fn peek_ingress_creation_tick(&self, item_index: usize) -> Option<u64>;
81
82    /// Return output placement acceptances for zero-copy compatibility.
83    fn output_acceptance(&self) -> [PlacementAcceptance; OUT];
84
85    /// Describe source capabilities (device streams, degrade tiers, etc.).
86    fn capabilities(&self) -> NodeCapabilities;
87
88    /// Convenience: wrap this source in a `SourceNode` with the provided policy.
89    ///
90    /// This is a zero-overhead helper so all `Source` implementations can be
91    /// lifted into a node uniformly without each impl writing a custom helper.
92    #[inline]
93    fn into_sourcenode(self, policy: NodePolicy) -> SourceNode<Self, OutP, OUT>
94    where
95        Self: Sized,
96    {
97        SourceNode::new(self, policy)
98    }
99
100    /// Provide the node policy bundle (batching/budget/deadlines).
101    fn policy(&self) -> NodePolicy;
102
103    /// Provude the ingress edge policy for this source node.
104    fn ingress_policy(&self) -> EdgePolicy;
105}
106
107/// A thin adapter that exposes a `Source` as a `Node<0, OUT, (), OutP>`.
108///
109/// This allows sources to participate in graphs and be scheduled by runtimes
110/// without any special-case code. The node owns the source and forwards the
111/// node lifecycle calls as needed.
112pub struct SourceNode<S, OutP, const OUT: usize>
113where
114    S: Source<OutP, OUT>,
115    OutP: Payload,
116{
117    /// The concrete source implementation.
118    src: S,
119    /// Static node policy (batching/budgets/deadlines).
120    policy: NodePolicy,
121    /// Phantom to bind the `OutP` generic.
122    _pd: PhantomData<OutP>,
123}
124
125/// Allow graphs to accept any `Source` and convert implicitly.
126impl<S, OutP, const OUT: usize> From<S> for SourceNode<S, OutP, OUT>
127where
128    S: Source<OutP, OUT>,
129    OutP: Payload,
130{
131    #[inline]
132    fn from(src: S) -> Self {
133        let policy = src.policy();
134        SourceNode::new(src, policy)
135    }
136}
137
138impl<S, OutP, const OUT: usize> SourceNode<S, OutP, OUT>
139where
140    S: Source<OutP, OUT>,
141    OutP: Payload,
142{
143    /// Construct a `SourceNode` from a source and a static policy bundle.
144    #[inline]
145    pub const fn new(src: S, policy: NodePolicy) -> Self {
146        Self {
147            src,
148            policy,
149            _pd: PhantomData,
150        }
151    }
152
153    /// Borrow the underlying source immutably.
154    #[inline]
155    pub fn source_ref(&self) -> &S {
156        &self.src
157    }
158
159    /// Borrow the underlying source mutably.
160    #[inline]
161    pub fn source_mut(&mut self) -> &mut S {
162        &mut self.src
163    }
164
165    /// Return `true` if the ingress (external) edge for this source can
166    /// produce a batch *now* under the given batching `policy`.
167    ///
168    /// This mirrors `StepContext::input_edge_has_batch` semantics and the
169    /// `step_batch` checks: it is *observational* (no side effects) and
170    /// conservative on header-peek failures.
171    #[inline]
172    pub fn ingress_edge_has_batch(&self) -> bool {
173        // occupancy short-circuit
174        let ingress_occ = self.source_ref().ingress_occupancy();
175        if *ingress_occ.items() == 0 {
176            return false;
177        }
178
179        let policy = self.policy.batching();
180
181        let fixed_opt = *policy.fixed_n();
182        let delta_opt = *policy.max_delta_t();
183
184        match (fixed_opt, delta_opt) {
185            (Some(fixed_n), None) => *ingress_occ.items() >= fixed_n,
186            (None, Some(_max_delta_t)) => {
187                // Delta-only: any non-empty ingress can form a size-1 batch.
188                true
189            }
190            (Some(fixed_n), Some(max_delta_t)) => {
191                // Must be able to form a full fixed_n batch first.
192                if *ingress_occ.items() < fixed_n {
193                    return false;
194                }
195
196                // Non-destructive peeks at creation ticks for the first and
197                // the fixed_n-th ingress items.
198                let first_tick_opt = self.src.peek_ingress_creation_tick(0);
199                let last_tick_opt = self
200                    .src
201                    .peek_ingress_creation_tick(fixed_n.saturating_sub(1));
202
203                match (first_tick_opt, last_tick_opt) {
204                    (Some(first_ticks), Some(last_ticks)) => {
205                        let span = last_ticks.saturating_sub(first_ticks);
206                        span <= *max_delta_t.as_u64()
207                    }
208                    _ => false,
209                }
210            }
211            (None, None) => {
212                // No batching configured: treat as single-message readiness.
213                true
214            }
215        }
216    }
217}
218
219impl<S, OutP, const OUT: usize> Node<0, OUT, (), OutP> for SourceNode<S, OutP, OUT>
220where
221    S: Source<OutP, OUT>,
222    OutP: Payload + Copy,
223{
224    #[inline]
225    fn describe_capabilities(&self) -> NodeCapabilities {
226        self.src.capabilities()
227    }
228
229    #[inline]
230    fn input_acceptance(&self) -> [PlacementAcceptance; 0] {
231        []
232    }
233
234    #[inline]
235    fn output_acceptance(&self) -> [PlacementAcceptance; OUT] {
236        self.src.output_acceptance()
237    }
238
239    #[inline]
240    fn policy(&self) -> NodePolicy {
241        self.policy
242    }
243
244    /// **TEST ONLY** method used to override batching policies for node contract tests.
245    #[cfg(any(test, feature = "bench"))]
246    fn set_policy(&mut self, policy: NodePolicy) {
247        self.policy = policy;
248    }
249
250    #[inline]
251    fn node_kind(&self) -> NodeKind {
252        NodeKind::Source
253    }
254
255    #[inline]
256    fn initialize<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
257    where
258        T: Telemetry,
259    {
260        self.src
261            .open()
262            .map_err(|_| NodeError::external_unavailable())
263    }
264
265    #[inline]
266    fn start<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
267    where
268        T: Telemetry,
269    {
270        Ok(())
271    }
272
273    #[inline]
274    fn process_message<C>(
275        &mut self,
276        _msg: &Message<()>,
277        _sys_clock: &C,
278    ) -> Result<ProcessResult<OutP>, NodeError>
279    where
280        C: PlatformClock + Sized,
281    {
282        if let Some((_port, msg)) = self.src.try_produce() {
283            Ok(ProcessResult::Output(msg))
284        } else {
285            Err(NodeError::no_input())
286        }
287    }
288
289    #[inline]
290    fn step<'g, 't, 'ck, InQ, OutQ, InM, OutM, C, Tel>(
291        &mut self,
292        ctx: &mut StepContext<'g, 't, 'ck, 0, OUT, (), OutP, InQ, OutQ, InM, OutM, C, Tel>,
293    ) -> Result<StepResult, NodeError>
294    where
295        InQ: Edge,
296        OutQ: Edge,
297        InM: MemoryManager<()>,
298        OutM: MemoryManager<OutP>,
299        C: PlatformClock + Sized,
300        Tel: Telemetry + Sized,
301    {
302        if let Some((port, msg)) = self.src.try_produce() {
303            ctx.push_output(port, msg)
304        } else {
305            Ok(StepResult::NoInput)
306        }
307    }
308
309    fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
310        &mut self,
311        ctx: &mut StepContext<
312            'graph,
313            'telemetry,
314            'clock,
315            0,
316            OUT,
317            (),
318            OutP,
319            InQ,
320            OutQ,
321            InM,
322            OutM,
323            C,
324            Tel,
325        >,
326    ) -> Result<StepResult, NodeError>
327    where
328        InQ: Edge,
329        OutQ: Edge,
330        InM: MemoryManager<()>,
331        OutM: MemoryManager<OutP>,
332        C: PlatformClock + Sized,
333        Tel: Telemetry + Sized,
334    {
335        let ingress_occ = self.source_ref().ingress_occupancy();
336        if *ingress_occ.items() == 0 {
337            return Ok(StepResult::NoInput);
338        }
339
340        let policy = self.policy();
341
342        let fixed_opt = *policy.batching().fixed_n();
343        let delta_opt = *policy.batching().max_delta_t();
344
345        let has_batch = match (fixed_opt, delta_opt) {
346            (Some(fixed_n), None) => *ingress_occ.items() >= fixed_n,
347            (None, Some(_max_delta_t)) => {
348                // Span constraint only: a non-empty queue can always produce a
349                // span-valid batch of size 1.
350                true
351            }
352            (Some(fixed_n), Some(max_delta_t)) => {
353                // Must be able to form a full fixed_n batch first.
354                if *ingress_occ.items() < fixed_n {
355                    false
356                } else {
357                    let first_tick_opt = self.src.peek_ingress_creation_tick(0);
358                    let last_tick_opt = self
359                        .src
360                        .peek_ingress_creation_tick(fixed_n.saturating_sub(1));
361
362                    match (first_tick_opt, last_tick_opt) {
363                        (Some(first_ticks), Some(last_ticks)) => {
364                            let span = last_ticks.saturating_sub(first_ticks);
365                            span <= *max_delta_t.as_u64()
366                        }
367                        _ => false,
368                    }
369                }
370            }
371            (None, None) => {
372                // No batching configured: treat as single-message readiness.
373                true
374            }
375        };
376
377        if !has_batch {
378            return Ok(StepResult::NoInput);
379        }
380
381        let batch_n: usize = fixed_opt.unwrap_or(1);
382
383        let mut made_progress = false;
384
385        for _ in 0..batch_n {
386            match self.src.try_produce() {
387                Some((port, msg)) => match ctx.push_output(port, msg) {
388                    Ok(StepResult::MadeProgress) => {
389                        made_progress = true;
390                    }
391                    Ok(StepResult::Backpressured) | Err(_) => {
392                        return Ok(StepResult::Backpressured);
393                    }
394                    Ok(_) => {}
395                },
396                None => {
397                    break;
398                }
399            }
400        }
401
402        if made_progress {
403            Ok(StepResult::MadeProgress)
404        } else {
405            Ok(StepResult::NoInput)
406        }
407    }
408
409    #[inline]
410    fn on_watchdog_timeout<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<StepResult, NodeError>
411    where
412        Tel: Telemetry,
413    {
414        Ok(StepResult::WaitingOnExternal)
415    }
416
417    #[inline]
418    fn stop<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<(), NodeError>
419    where
420        Tel: Telemetry,
421    {
422        Ok(())
423    }
424}
425
426/// Borrowing adapter that exposes a source’s **ingress pressure** as an `Edge`.
427///
428/// This is used by the graph/builder to wire a synthetic "monitor edge" whose
429/// occupancy is returned by `Source::ingress_occupancy()`. It rejects all push
430/// and pop operations (no buffering); only `occupancy()` is meaningful.
431///
432/// This form is zero-allocation and suitable for `no_std`/single-threaded runs.
433pub struct SourceIngressEdge<'src, OutP, S, const OUT: usize>
434where
435    OutP: Payload,
436    S: Source<OutP, OUT> + ?Sized,
437{
438    /// Borrow to the underlying source.
439    src: &'src S,
440    /// Phantom to bind the `OutP` generic.
441    _pd: PhantomData<OutP>,
442}
443
444impl<'src, OutP, S, const OUT: usize> SourceIngressEdge<'src, OutP, S, OUT>
445where
446    OutP: Payload,
447    S: Source<OutP, OUT> + ?Sized,
448{
449    /// Create a borrowing ingress-edge view over a source.
450    #[inline]
451    pub const fn new(src: &'src S) -> Self {
452        Self {
453            src,
454            _pd: PhantomData,
455        }
456    }
457}
458
459impl<'src, OutP, S, const OUT: usize> Edge for SourceIngressEdge<'src, OutP, S, OUT>
460where
461    OutP: Payload,
462    S: Source<OutP, OUT> + ?Sized,
463{
464    #[inline]
465    fn try_push<H: HeaderStore>(
466        &mut self,
467        _token: MessageToken,
468        _policy: &EdgePolicy,
469        _headers: &H,
470    ) -> EnqueueResult {
471        EnqueueResult::Rejected
472    }
473
474    #[inline]
475    fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
476        Err(QueueError::Empty)
477    }
478
479    #[inline]
480    fn try_peek(&self) -> Result<MessageToken, QueueError> {
481        Err(QueueError::Empty)
482    }
483
484    #[inline]
485    fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
486        Err(QueueError::Empty)
487    }
488
489    #[inline]
490    fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
491        self.src.ingress_occupancy()
492    }
493
494    #[inline]
495    fn is_empty(&self) -> bool {
496        *self.src.ingress_occupancy().items() == 0
497    }
498
499    #[inline]
500    fn try_pop_batch<H: HeaderStore>(
501        &mut self,
502        _policy: &BatchingPolicy,
503        _headers: &H,
504    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
505        Err(QueueError::Empty)
506    }
507}
508
509/// A tiny link wrapper for the synthetic ingress edge.
510///
511/// Wraps a borrowing `SourceIngressEdge<'s, OutP, S, OUT>` so the graph
512/// can expose ingress pressure straight from the concrete source implementation.
513pub struct IngressEdgeLink<'src, OutP, S, const OUT: usize>
514where
515    OutP: Payload,
516    S: Source<OutP, OUT> + ?Sized,
517{
518    edge: SourceIngressEdge<'src, OutP, S, OUT>,
519    id: EdgeIndex,
520    upstream: PortId,
521    downstream: PortId,
522    policy: EdgePolicy,
523    name: Option<&'static str>,
524}
525
526impl<'src, OutP, S, const OUT: usize> IngressEdgeLink<'src, OutP, S, OUT>
527where
528    OutP: Payload,
529    S: Source<OutP, OUT> + ?Sized,
530{
531    /// Construct from a borrowed source reference.
532    #[inline]
533    pub const fn from_source(
534        src: &'src S,
535        id: EdgeIndex,
536        upstream: PortId,
537        downstream: PortId,
538        policy: EdgePolicy,
539        name: Option<&'static str>,
540    ) -> Self {
541        Self {
542            edge: SourceIngressEdge::new(src),
543            id,
544            upstream,
545            downstream,
546            policy,
547            name,
548        }
549    }
550
551    /// Edge descriptor.
552    #[inline]
553    pub fn descriptor(&self) -> EdgeDescriptor {
554        EdgeDescriptor::new(self.id, self.upstream, self.downstream, self.name)
555    }
556
557    /// Policy accessor.
558    #[inline]
559    pub fn policy(&self) -> EdgePolicy {
560        self.policy
561    }
562
563    /// Borrow the inner borrowing edge.
564    #[inline]
565    pub fn inner(&self) -> &SourceIngressEdge<'src, OutP, S, OUT> {
566        &self.edge
567    }
568}
569
570impl<'s, OutP, S, const OUT: usize> Edge for IngressEdgeLink<'s, OutP, S, OUT>
571where
572    OutP: Payload,
573    S: Source<OutP, OUT> + ?Sized,
574{
575    #[inline]
576    fn try_push<H: HeaderStore>(
577        &mut self,
578        _token: MessageToken,
579        _policy: &EdgePolicy,
580        _headers: &H,
581    ) -> EnqueueResult {
582        EnqueueResult::Rejected
583    }
584    #[inline]
585    fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
586        Err(QueueError::Empty)
587    }
588    #[inline]
589    fn try_peek(&self) -> Result<MessageToken, QueueError> {
590        Err(QueueError::Empty)
591    }
592    #[inline]
593    fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
594        Err(QueueError::Empty)
595    }
596    #[inline]
597    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
598        // Delegate to the borrowing edge (reads Source::ingress_occupancy).
599        self.edge.occupancy(policy)
600    }
601    #[inline]
602    fn is_empty(&self) -> bool {
603        self.edge.is_empty()
604    }
605    #[inline]
606    fn try_pop_batch<H: HeaderStore>(
607        &mut self,
608        _policy: &BatchingPolicy,
609        _headers: &H,
610    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
611        Err(QueueError::Empty)
612    }
613}
614
615/// Platform-agnostic ingress pressure observer.
616///
617/// Implemented by `NoProbe` (zero-cost no_std stub) and by
618/// `probe::SourceIngressProbe` (live atomic counters, std-only).
619/// Source node logic uses `IngressProbeImpl` uniformly — no `#[cfg]` in node bodies.
620pub trait IngressProbe: Send {
621    /// Return the current ingress occupancy snapshot using `policy` to compute
622    /// the watermark consistently with real edges.
623    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy;
624}
625
626/// Zero-cost stub used on no_std targets where no real probe is wired up.
627pub struct NoProbe;
628
629impl IngressProbe for NoProbe {
630    #[inline]
631    fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
632        EdgeOccupancy::new(0, 0, WatermarkState::BelowSoft)
633    }
634}
635
636/// Concrete ingress probe type used on `std` targets.
637///
638/// This resolves to [`probe::SourceIngressProbe`], which tracks ingress
639/// pressure using shared atomic counters.
640#[cfg(feature = "std")]
641pub type IngressProbeImpl = probe::SourceIngressProbe;
642
643/// Concrete ingress probe type used on `no_std` targets.
644///
645/// This resolves to [`NoProbe`], a zero-cost stub that reports no ingress
646/// pressure when no live probe implementation is available.
647#[cfg(not(feature = "std"))]
648pub type IngressProbeImpl = NoProbe;
649
650/// Std-only, lock-free ingress pressure probe for cross-thread sources.
651#[cfg(feature = "std")]
652pub mod probe {
653    use super::*;
654    use core::sync::atomic::{AtomicUsize, Ordering};
655    use std::sync::Arc;
656
657    /// Shared atomic counters for ingress pressure (items/bytes).
658    #[derive(Clone, Debug)]
659    pub struct SourceIngressProbe {
660        items: Arc<AtomicUsize>,
661        bytes: Arc<AtomicUsize>,
662    }
663
664    impl SourceIngressProbe {
665        /// Create a new ingress probe with zeroed item and byte counters.
666        #[inline]
667        pub fn new() -> Self {
668            Self {
669                items: Arc::new(AtomicUsize::new(0)),
670                bytes: Arc::new(AtomicUsize::new(0)),
671            }
672        }
673
674        /// Set the current ingress item count snapshot.
675        #[inline]
676        pub fn set_items(&self, n: usize) {
677            self.items.store(n, Ordering::Relaxed);
678        }
679
680        /// Set the current ingress byte count snapshot.
681        #[inline]
682        pub fn set_bytes(&self, b: usize) {
683            self.bytes.store(b, Ordering::Relaxed);
684        }
685
686        /// Build an occupancy snapshot from the current probe counters using
687        /// `policy` to compute the watermark.
688        #[inline]
689        pub fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
690            let items = self.items.load(Ordering::Relaxed);
691            let bytes = self.bytes.load(Ordering::Relaxed);
692            EdgeOccupancy::new(items, bytes, policy.watermark(items, bytes))
693        }
694    }
695
696    impl Default for SourceIngressProbe {
697        fn default() -> Self {
698            Self {
699                items: Arc::new(AtomicUsize::new(0)),
700                bytes: Arc::new(AtomicUsize::new(0)),
701            }
702        }
703    }
704
705    impl super::IngressProbe for SourceIngressProbe {
706        #[inline]
707        fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
708            SourceIngressProbe::occupancy(self, policy)
709        }
710    }
711
712    /// Payload-typed wrapper that exposes a probe as an `Edge`.
713    #[derive(Debug, Clone)]
714    pub struct SourceIngressProbeEdge<P: Payload> {
715        probe: SourceIngressProbe,
716        _pd: PhantomData<P>,
717    }
718
719    impl<P: Payload> SourceIngressProbeEdge<P> {
720        /// Wrap a probe as a payload-typed ingress monitor edge.
721        #[inline]
722        pub fn new(probe: SourceIngressProbe) -> Self {
723            Self {
724                probe,
725                _pd: PhantomData,
726            }
727        }
728
729        /// Borrow the underlying ingress probe.
730        #[inline]
731        pub fn inner(&self) -> &SourceIngressProbe {
732            &self.probe
733        }
734    }
735
736    impl<P: Payload> Edge for SourceIngressProbeEdge<P> {
737        #[inline]
738        fn try_push<H: HeaderStore>(
739            &mut self,
740            _token: MessageToken,
741            _policy: &EdgePolicy,
742            _headers: &H,
743        ) -> EnqueueResult {
744            EnqueueResult::Rejected
745        }
746
747        #[inline]
748        fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
749            Err(QueueError::Empty)
750        }
751
752        #[inline]
753        fn try_peek(&self) -> Result<MessageToken, QueueError> {
754            Err(QueueError::Empty)
755        }
756
757        #[inline]
758        fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
759            Err(QueueError::Empty)
760        }
761
762        #[inline]
763        fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
764            self.probe.occupancy(policy)
765        }
766
767        #[inline]
768        fn is_empty(&self) -> bool {
769            self.probe.items.load(core::sync::atomic::Ordering::Relaxed) == 0
770        }
771
772        #[inline]
773        fn try_pop_batch<H: HeaderStore>(
774            &mut self,
775            _policy: &BatchingPolicy,
776            _headers: &H,
777        ) -> Result<BatchView<'_, MessageToken>, QueueError> {
778            Err(QueueError::Empty)
779        }
780    }
781
782    /// Cross-thread updater for a `SourceIngressProbe`.
783    #[derive(Clone)]
784    pub struct SourceIngressUpdater {
785        probe: SourceIngressProbe,
786    }
787
788    impl SourceIngressUpdater {
789        /// Create an updater for the given ingress probe.
790        #[inline]
791        pub fn new(probe: SourceIngressProbe) -> Self {
792            Self { probe }
793        }
794
795        /// Update both ingress counters atomically from the caller's perspective.
796        #[inline]
797        pub fn update(&self, items: usize, bytes: usize) {
798            self.probe.set_items(items);
799            self.probe.set_bytes(bytes);
800        }
801    }
802
803    /// Convenience: create a typed probe edge and its paired updater.
804    #[inline]
805    pub fn new_probe_edge_pair<P: Payload>() -> (SourceIngressProbeEdge<P>, SourceIngressUpdater) {
806        let probe = SourceIngressProbe::new();
807        let edge = SourceIngressProbeEdge::<P>::new(probe.clone());
808        let updater = SourceIngressUpdater::new(probe);
809        (edge, updater)
810    }
811
812    /// Convenience: create an untyped probe and updater.
813    #[inline]
814    pub fn new_probe_pair() -> (SourceIngressProbe, SourceIngressUpdater) {
815        let p = SourceIngressProbe::new();
816        (p.clone(), SourceIngressUpdater::new(p))
817    }
818
819    /// Link wrapper for a concurrent ingress monitor edge (std-only).
820    #[derive(Debug)]
821    pub struct ConcurrentIngressEdgeLink<OutP: Payload> {
822        edge: SourceIngressProbeEdge<OutP>,
823        id: EdgeIndex,
824        upstream: PortId,
825        downstream: PortId,
826        policy: EdgePolicy,
827        name: Option<&'static str>,
828    }
829
830    impl<OutP: Payload> ConcurrentIngressEdgeLink<OutP> {
831        /// Construct a concurrent ingress edge link from a probe-backed edge and its
832        /// descriptor metadata.
833        #[inline]
834        pub fn from_probe(
835            probe_edge: SourceIngressProbeEdge<OutP>,
836            id: EdgeIndex,
837            upstream: PortId,
838            downstream: PortId,
839            policy: EdgePolicy,
840            name: Option<&'static str>,
841        ) -> Self {
842            Self {
843                edge: probe_edge,
844                id,
845                upstream,
846                downstream,
847                policy,
848                name,
849            }
850        }
851
852        /// Return the descriptor for this synthetic ingress edge.
853        #[inline]
854        pub fn descriptor(&self) -> EdgeDescriptor {
855            EdgeDescriptor::new(self.id, self.upstream, self.downstream, self.name)
856        }
857
858        /// Return the policy associated with this synthetic ingress edge.
859        #[inline]
860        pub fn policy(&self) -> EdgePolicy {
861            self.policy
862        }
863
864        /// Borrow the inner probe-backed edge immutably.
865        #[inline]
866        pub fn inner(&self) -> &SourceIngressProbeEdge<OutP> {
867            &self.edge
868        }
869
870        /// Borrow the inner probe-backed edge mutably.
871        #[inline]
872        pub fn inner_mut(&mut self) -> &mut SourceIngressProbeEdge<OutP> {
873            &mut self.edge
874        }
875    }
876
877    impl<OutP: Payload> Edge for ConcurrentIngressEdgeLink<OutP> {
878        #[inline]
879        fn try_push<H: HeaderStore>(
880            &mut self,
881            _token: MessageToken,
882            _policy: &EdgePolicy,
883            _headers: &H,
884        ) -> EnqueueResult {
885            EnqueueResult::Rejected
886        }
887        #[inline]
888        fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
889            Err(QueueError::Empty)
890        }
891        #[inline]
892        fn try_peek(&self) -> Result<MessageToken, QueueError> {
893            Err(QueueError::Empty)
894        }
895        #[inline]
896        fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
897            Err(QueueError::Empty)
898        }
899        #[inline]
900        fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
901            self.edge.occupancy(policy)
902        }
903        #[inline]
904        fn is_empty(&self) -> bool {
905            self.edge.is_empty()
906        }
907        #[inline]
908        fn try_pop_batch<H: HeaderStore>(
909            &mut self,
910            _policy: &BatchingPolicy,
911            _headers: &H,
912        ) -> Result<BatchView<'_, MessageToken>, QueueError> {
913            Err(QueueError::Empty)
914        }
915    }
916}