Skip to main content

pureflow_core/
ports.rs

1//! Pureflow-owned port handles for the executor boundary.
2//!
3//! ## Fragment: executor-port-staging
4//!
5//! The executor contract is explicit about engine-owned inputs and outputs.
6//! These handles now preserve declared port identity and can carry bounded
7//! edge channels, but the node-facing surface remains Pureflow-owned. Runtime
8//! code may use `asupersync` internally without making node implementations
9//! depend on raw runtime channels or task context.
10//!
11//! ## Fragment: port-adapter-boundary
12//!
13//! The handles expose both non-blocking probes and async operations while
14//! callers still depend on Pureflow port semantics rather than on the concrete
15//! async runtime. In practice that means `asupersync` concepts such as task
16//! context, send permits, and channel errors belong behind `PortsIn` and
17//! `PortsOut`, with explicit Pureflow error and cancellation mapping at the
18//! boundary.
19//!
20//! ## Fragment: output-reserve-commit
21//!
22//! Output sends use a two-phase reserve/commit shape even before the fully
23//! async `Cx`-based API lands. Reserving capacity produces a Pureflow-owned
24//! permit; committing enqueues the packet; dropping or aborting the permit
25//! releases capacity without creating a ghost message. This mirrors the
26//! `asupersync` channel contract while keeping runtime details hidden.
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fmt;
31use std::num::NonZeroUsize;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34
35use asupersync::{Cx, channel::mpsc, runtime::yield_now};
36use pureflow_types::PortId;
37
38use crate::message::{MessageEnvelope, PacketPayload};
39use crate::{
40    context::{CancellationToken, NodeContext},
41    metadata::{
42        MessageBoundaryKind, MessageBoundaryRecord, MetadataRecord, MetadataSink,
43        QueuePortDirection, QueuePressureBoundaryKind, QueuePressureRecord,
44    },
45};
46
47/// Default packet payload for the first channel-backed port surface.
48pub type PortPacket = MessageEnvelope<PacketPayload>;
49
50/// Error returned when an output port cannot accept a packet.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum PortSendError {
53    /// The node does not declare the requested output port.
54    UnknownPort {
55        /// Port requested by the caller.
56        port_id: PortId,
57    },
58    /// A downstream input has disconnected.
59    Disconnected {
60        /// Output port being sent through.
61        port_id: PortId,
62    },
63    /// At least one bounded downstream edge is full.
64    Full {
65        /// Output port being sent through.
66        port_id: PortId,
67    },
68    /// The send was cancelled before it could complete.
69    Cancelled {
70        /// Output port being sent through.
71        port_id: PortId,
72    },
73    /// A validation policy rejected the packet before it entered the graph.
74    Rejected {
75        /// Output port being sent through.
76        port_id: PortId,
77        /// Human-readable rejection reason.
78        reason: String,
79    },
80}
81
82impl fmt::Display for PortSendError {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            Self::UnknownPort { port_id } => {
86                write!(f, "output port `{port_id}` is not declared")
87            }
88            Self::Disconnected { port_id } => {
89                write!(f, "output port `{port_id}` is disconnected")
90            }
91            Self::Full { port_id } => write!(f, "output port `{port_id}` is full"),
92            Self::Cancelled { port_id } => {
93                write!(f, "output port `{port_id}` send cancelled")
94            }
95            Self::Rejected { port_id, reason } => {
96                write!(f, "output port `{port_id}` rejected packet: {reason}")
97            }
98        }
99    }
100}
101
102impl Error for PortSendError {}
103
104/// Validator invoked before output packets enter downstream graph edges.
105pub trait OutputPacketValidator: Send + Sync {
106    /// Validate one output packet for the requested output port.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error when the packet must not be sent.
111    fn validate(&self, port_id: &PortId, packet: &PortPacket) -> Result<(), PortSendError>;
112}
113
114/// Error returned when an input port cannot provide a packet.
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub enum PortRecvError {
117    /// The node does not declare the requested input port.
118    UnknownPort {
119        /// Port requested by the caller.
120        port_id: PortId,
121    },
122    /// All upstream senders for this input have disconnected.
123    Disconnected {
124        /// Input port being received from.
125        port_id: PortId,
126    },
127    /// The receive was cancelled before it could complete.
128    Cancelled {
129        /// Input port being received from.
130        port_id: PortId,
131    },
132}
133
134impl fmt::Display for PortRecvError {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            Self::UnknownPort { port_id } => {
138                write!(f, "input port `{port_id}` is not declared")
139            }
140            Self::Disconnected { port_id } => {
141                write!(f, "input port `{port_id}` is disconnected")
142            }
143            Self::Cancelled { port_id } => {
144                write!(f, "input port `{port_id}` receive cancelled")
145            }
146        }
147    }
148}
149
150impl Error for PortRecvError {}
151
152/// Channel-backed input endpoint for one declared input port.
153#[derive(Debug)]
154pub struct InputPortHandle {
155    port_id: PortId,
156    receivers: Vec<mpsc::Receiver<PortPacket>>,
157}
158
159enum InputPollResult {
160    Packet(Box<PortPacket>),
161    Pending,
162    Closed,
163    Cancelled,
164}
165
166impl InputPortHandle {
167    /// Create an input handle with no connected upstream edge.
168    #[must_use]
169    pub const fn disconnected(port_id: PortId) -> Self {
170        Self {
171            port_id,
172            receivers: Vec::new(),
173        }
174    }
175
176    fn connected(port_id: PortId, receiver: mpsc::Receiver<PortPacket>) -> Self {
177        Self {
178            port_id,
179            receivers: vec![receiver],
180        }
181    }
182
183    fn append(&mut self, mut other: Self) {
184        self.receivers.append(&mut other.receivers);
185    }
186
187    /// Declared input port identifier.
188    #[must_use]
189    pub const fn port_id(&self) -> &PortId {
190        &self.port_id
191    }
192
193    /// Number of upstream bounded edges connected to this input port.
194    #[must_use]
195    pub const fn connected_edge_count(&self) -> usize {
196        self.receivers.len()
197    }
198
199    /// Capacity of the first connected upstream edge, if one exists.
200    #[must_use]
201    pub fn capacity(&self) -> Option<usize> {
202        self.receivers.first().map(mpsc::Receiver::capacity)
203    }
204
205    fn total_capacity(&self) -> Option<usize> {
206        if self.receivers.is_empty() {
207            None
208        } else {
209            Some(self.receivers.iter().map(mpsc::Receiver::capacity).sum())
210        }
211    }
212
213    fn queued_count(&self) -> usize {
214        self.receivers.iter().map(mpsc::Receiver::len).sum()
215    }
216
217    fn try_recv(&mut self) -> Result<Option<PortPacket>, PortRecvError> {
218        let mut disconnected_count: usize = 0;
219
220        for receiver in &mut self.receivers {
221            match receiver.try_recv() {
222                Ok(packet) => return Ok(Some(packet)),
223                Err(mpsc::RecvError::Empty) => {}
224                Err(mpsc::RecvError::Disconnected) => {
225                    disconnected_count += 1;
226                }
227                Err(mpsc::RecvError::Cancelled) => {
228                    return Err(PortRecvError::Cancelled {
229                        port_id: self.port_id.clone(),
230                    });
231                }
232            }
233        }
234
235        if !self.receivers.is_empty() && disconnected_count == self.receivers.len() {
236            return Err(PortRecvError::Disconnected {
237                port_id: self.port_id.clone(),
238            });
239        }
240
241        Ok(None)
242    }
243
244    async fn recv(
245        &mut self,
246        cancellation: &CancellationToken,
247    ) -> Result<Option<PortPacket>, PortRecvError> {
248        if self.receivers.is_empty() {
249            return Ok(None);
250        }
251
252        let runtime_cx: Cx = Cx::current().unwrap_or_else(Cx::for_testing);
253        std::future::poll_fn(|task_cx: &mut Context<'_>| {
254            if cancellation.is_cancelled() {
255                return Poll::Ready(Err(PortRecvError::Cancelled {
256                    port_id: self.port_id.clone(),
257                }));
258            }
259
260            let mut disconnected_count: usize = 0;
261            for receiver in &mut self.receivers {
262                match receiver.poll_recv(&runtime_cx, task_cx) {
263                    Poll::Ready(Ok(packet)) => return Poll::Ready(Ok(Some(packet))),
264                    Poll::Ready(Err(mpsc::RecvError::Disconnected)) => {
265                        disconnected_count += 1;
266                    }
267                    Poll::Ready(Err(mpsc::RecvError::Cancelled)) => {
268                        return Poll::Ready(Err(PortRecvError::Cancelled {
269                            port_id: self.port_id.clone(),
270                        }));
271                    }
272                    Poll::Ready(Err(mpsc::RecvError::Empty)) | Poll::Pending => {}
273                }
274            }
275
276            if disconnected_count == self.receivers.len() {
277                return Poll::Ready(Err(PortRecvError::Disconnected {
278                    port_id: self.port_id.clone(),
279                }));
280            }
281
282            Poll::Pending
283        })
284        .await
285    }
286
287    fn poll_any(&mut self, runtime_cx: &Cx, task_cx: &mut Context<'_>) -> InputPollResult {
288        if self.receivers.is_empty() {
289            return InputPollResult::Closed;
290        }
291
292        let mut disconnected_count: usize = 0;
293        for receiver in &mut self.receivers {
294            let poll_result: Poll<Result<PortPacket, mpsc::RecvError>> =
295                receiver.poll_recv(runtime_cx, task_cx);
296            match poll_result {
297                Poll::Ready(Ok(packet)) => return InputPollResult::Packet(Box::new(packet)),
298                Poll::Ready(Err(mpsc::RecvError::Disconnected)) => {
299                    disconnected_count += 1;
300                }
301                Poll::Ready(Err(mpsc::RecvError::Cancelled)) => return InputPollResult::Cancelled,
302                Poll::Ready(Err(mpsc::RecvError::Empty)) | Poll::Pending => {}
303            }
304        }
305
306        if disconnected_count == self.receivers.len() {
307            return InputPollResult::Closed;
308        }
309
310        InputPollResult::Pending
311    }
312}
313
314/// Channel-backed output endpoint for one declared output port.
315#[derive(Debug, Clone)]
316pub struct OutputPortHandle {
317    port_id: PortId,
318    senders: Vec<mpsc::Sender<PortPacket>>,
319}
320
321impl OutputPortHandle {
322    /// Create an output handle with no connected downstream edge.
323    #[must_use]
324    pub const fn disconnected(port_id: PortId) -> Self {
325        Self {
326            port_id,
327            senders: Vec::new(),
328        }
329    }
330
331    fn connected(port_id: PortId, sender: mpsc::Sender<PortPacket>) -> Self {
332        Self {
333            port_id,
334            senders: vec![sender],
335        }
336    }
337
338    fn append(&mut self, mut other: Self) {
339        self.senders.append(&mut other.senders);
340    }
341
342    /// Declared output port identifier.
343    #[must_use]
344    pub const fn port_id(&self) -> &PortId {
345        &self.port_id
346    }
347
348    /// Number of downstream bounded edges connected to this output port.
349    #[must_use]
350    pub const fn connected_edge_count(&self) -> usize {
351        self.senders.len()
352    }
353
354    /// Capacity of the first connected downstream edge, if one exists.
355    #[must_use]
356    pub fn capacity(&self) -> Option<usize> {
357        self.senders.first().map(mpsc::Sender::capacity)
358    }
359
360    fn total_capacity(&self) -> Option<usize> {
361        if self.senders.is_empty() {
362            None
363        } else {
364            Some(self.senders.iter().map(mpsc::Sender::capacity).sum())
365        }
366    }
367
368    fn try_reserve(
369        &self,
370        metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
371        validator: Option<Arc<dyn OutputPacketValidator>>,
372        context: Option<&NodeContext>,
373    ) -> Result<OutputPortSendPermit<'_>, PortSendError> {
374        record_output_queue_pressure(
375            metadata_sink.as_ref(),
376            context,
377            self,
378            QueuePressureBoundaryKind::ReserveAttempted,
379        );
380        let split_senders: Option<(&mpsc::Sender<PortPacket>, &[mpsc::Sender<PortPacket>])> =
381            self.senders.split_last();
382        let Some((last_sender, leading_senders)): Option<(
383            &mpsc::Sender<PortPacket>,
384            &[mpsc::Sender<PortPacket>],
385        )> = split_senders
386        else {
387            record_output_queue_pressure(
388                metadata_sink.as_ref(),
389                context,
390                self,
391                QueuePressureBoundaryKind::ReserveReady,
392            );
393            return Ok(OutputPortSendPermit {
394                port_id: self.port_id.clone(),
395                permits: Vec::new(),
396                metadata_sink,
397                validator,
398                context: context.cloned(),
399                connected_edge_count: self.connected_edge_count(),
400                capacity: self.total_capacity(),
401            });
402        };
403
404        let mut permits: Vec<mpsc::SendPermit<'_, PortPacket>> =
405            Vec::with_capacity(self.senders.len());
406
407        for sender in leading_senders {
408            match sender.try_reserve() {
409                Ok(permit) => permits.push(permit),
410                Err(err) => {
411                    if matches!(err, mpsc::SendError::Full(())) {
412                        record_output_queue_pressure(
413                            metadata_sink.as_ref(),
414                            context,
415                            self,
416                            QueuePressureBoundaryKind::ReserveFull,
417                        );
418                    }
419                    return Err(self.map_send_error(err));
420                }
421            }
422        }
423
424        match last_sender.try_reserve() {
425            Ok(permit) => {
426                permits.push(permit);
427                record_output_queue_pressure(
428                    metadata_sink.as_ref(),
429                    context,
430                    self,
431                    QueuePressureBoundaryKind::ReserveReady,
432                );
433                Ok(OutputPortSendPermit {
434                    port_id: self.port_id.clone(),
435                    permits,
436                    metadata_sink,
437                    validator,
438                    context: context.cloned(),
439                    connected_edge_count: self.connected_edge_count(),
440                    capacity: self.total_capacity(),
441                })
442            }
443            Err(err) => {
444                if matches!(err, mpsc::SendError::Full(())) {
445                    record_output_queue_pressure(
446                        metadata_sink.as_ref(),
447                        context,
448                        self,
449                        QueuePressureBoundaryKind::ReserveFull,
450                    );
451                }
452                Err(self.map_send_error(err))
453            }
454        }
455    }
456
457    fn map_send_error(&self, err: mpsc::SendError<()>) -> PortSendError {
458        match err {
459            mpsc::SendError::Disconnected(()) => PortSendError::Disconnected {
460                port_id: self.port_id.clone(),
461            },
462            mpsc::SendError::Cancelled(()) => PortSendError::Cancelled {
463                port_id: self.port_id.clone(),
464            },
465            mpsc::SendError::Full(()) => PortSendError::Full {
466                port_id: self.port_id.clone(),
467            },
468        }
469    }
470
471    async fn reserve(
472        &self,
473        cancellation: &CancellationToken,
474        metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
475        validator: Option<Arc<dyn OutputPacketValidator>>,
476        context: Option<&NodeContext>,
477    ) -> Result<OutputPortSendPermit<'_>, PortSendError> {
478        loop {
479            if cancellation.is_cancelled() {
480                return Err(PortSendError::Cancelled {
481                    port_id: self.port_id.clone(),
482                });
483            }
484
485            match self.try_reserve(metadata_sink.clone(), validator.clone(), context) {
486                Ok(permit) => return Ok(permit),
487                Err(PortSendError::Full { .. }) => yield_now().await,
488                Err(err) => return Err(err),
489            }
490        }
491    }
492}
493
494/// Reserved output capacity for one declared output port.
495#[must_use = "PortSendPermit must be committed with send() or explicitly aborted"]
496pub struct PortSendPermit<'a> {
497    inner: OutputPortSendPermit<'a>,
498}
499
500impl PortSendPermit<'_> {
501    /// Commit the reserved capacity and enqueue the packet.
502    ///
503    /// # Errors
504    ///
505    /// Returns an error if output validation rejects the packet.
506    pub fn send(self, packet: PortPacket) -> Result<(), PortSendError> {
507        self.inner.send(packet)
508    }
509
510    /// Release the reserved capacity without enqueueing a packet.
511    pub fn abort(self) {
512        self.inner.abort();
513    }
514}
515
516struct OutputPortSendPermit<'a> {
517    port_id: PortId,
518    permits: Vec<mpsc::SendPermit<'a, PortPacket>>,
519    metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
520    validator: Option<Arc<dyn OutputPacketValidator>>,
521    context: Option<NodeContext>,
522    connected_edge_count: usize,
523    capacity: Option<usize>,
524}
525
526impl OutputPortSendPermit<'_> {
527    fn send(mut self, packet: PortPacket) -> Result<(), PortSendError> {
528        if let Some(validator) = self.validator.as_ref()
529            && let Err(err) = validator.validate(&self.port_id, &packet)
530        {
531            self.abort();
532            return Err(err);
533        }
534
535        let boundary_kind: MessageBoundaryKind = if self.permits.is_empty() {
536            MessageBoundaryKind::Dropped
537        } else {
538            MessageBoundaryKind::Enqueued
539        };
540        if let Some(metadata_sink) = self.metadata_sink.as_ref() {
541            let queue_kind: QueuePressureBoundaryKind = if self.permits.is_empty() {
542                QueuePressureBoundaryKind::SendDropped
543            } else {
544                QueuePressureBoundaryKind::SendCommitted
545            };
546            let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
547                self.context.clone(),
548                QueuePortDirection::Output,
549                self.port_id.clone(),
550                queue_kind,
551                self.connected_edge_count,
552                self.capacity,
553                None,
554            ));
555            let _ = metadata_sink.record(&record);
556            let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
557                boundary_kind,
558                packet.metadata().clone(),
559            ));
560            let _ = metadata_sink.record(&record);
561        }
562
563        let last_permit: Option<mpsc::SendPermit<'_, PortPacket>> = self.permits.pop();
564        let last_permit: mpsc::SendPermit<'_, PortPacket> = match last_permit {
565            Some(permit) => permit,
566            None => return Ok(()),
567        };
568        let leading_permits: Vec<mpsc::SendPermit<'_, PortPacket>> = self.permits;
569
570        for permit in leading_permits {
571            permit.send(packet.clone());
572        }
573        last_permit.send(packet);
574        Ok(())
575    }
576
577    fn abort(self) {
578        for permit in self.permits {
579            permit.abort();
580        }
581    }
582}
583
584/// Create one bounded edge channel between an output port and an input port.
585#[must_use]
586pub fn bounded_edge_channel(
587    output_port_id: PortId,
588    input_port_id: PortId,
589    capacity: NonZeroUsize,
590) -> (OutputPortHandle, InputPortHandle) {
591    let (sender, receiver): (mpsc::Sender<PortPacket>, mpsc::Receiver<PortPacket>) =
592        mpsc::channel(capacity.get());
593    (
594        OutputPortHandle::connected(output_port_id, sender),
595        InputPortHandle::connected(input_port_id, receiver),
596    )
597}
598
599/// Declared input ports available to a node execution boundary.
600#[derive(Default)]
601pub struct PortsIn {
602    port_ids: Vec<PortId>,
603    ports: Vec<InputPortHandle>,
604    metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
605    context: Option<NodeContext>,
606}
607
608impl PortsIn {
609    /// Create input handles with declared port identifiers and no channels.
610    #[must_use]
611    pub fn new(port_ids: impl Into<Vec<PortId>>) -> Self {
612        let port_ids: Vec<PortId> = port_ids.into();
613        Self::from_handles(port_ids, Vec::new())
614    }
615
616    /// Create input handles from declared ports and connected channel handles.
617    #[must_use]
618    pub fn from_handles(
619        port_ids: impl Into<Vec<PortId>>,
620        handles: impl Into<Vec<InputPortHandle>>,
621    ) -> Self {
622        let port_ids: Vec<PortId> = port_ids.into();
623        let mut by_port: BTreeMap<PortId, InputPortHandle> = BTreeMap::new();
624
625        for handle in handles.into() {
626            let port_id: PortId = handle.port_id().clone();
627            if let Some(existing) = by_port.get_mut(&port_id) {
628                existing.append(handle);
629            } else {
630                by_port.insert(port_id, handle);
631            }
632        }
633
634        let mut ports: Vec<InputPortHandle> = Vec::with_capacity(port_ids.len());
635        for port_id in &port_ids {
636            let handle: InputPortHandle = by_port
637                .remove(port_id)
638                .unwrap_or_else(|| InputPortHandle::disconnected(port_id.clone()));
639            ports.push(handle);
640        }
641
642        Self {
643            port_ids,
644            ports,
645            metadata_sink: None,
646            context: None,
647        }
648    }
649
650    /// Attach a metadata sink for receive-side observations.
651    #[must_use]
652    pub fn with_metadata_sink(
653        mut self,
654        metadata_sink: Arc<dyn MetadataSink + Send + Sync>,
655    ) -> Self {
656        self.metadata_sink = Some(metadata_sink);
657        self
658    }
659
660    /// Attach node context for receive-side queue observations.
661    #[must_use]
662    pub fn with_node_context(mut self, context: NodeContext) -> Self {
663        self.context = Some(context);
664        self
665    }
666
667    /// Declared input port identifiers for this node.
668    #[must_use]
669    pub fn port_ids(&self) -> &[PortId] {
670        &self.port_ids
671    }
672
673    /// Return whether this node currently has no declared inputs.
674    #[must_use]
675    pub const fn is_empty(&self) -> bool {
676        self.port_ids.is_empty()
677    }
678
679    /// Number of connected upstream edges for a declared input port.
680    #[must_use]
681    pub fn connected_edge_count(&self, port_id: &PortId) -> Option<usize> {
682        self.ports
683            .iter()
684            .find(|port: &&InputPortHandle| port.port_id() == port_id)
685            .map(InputPortHandle::connected_edge_count)
686    }
687
688    /// Capacity of the first connected upstream edge for a declared input port.
689    #[must_use]
690    pub fn capacity(&self, port_id: &PortId) -> Option<usize> {
691        self.ports
692            .iter()
693            .find(|port: &&InputPortHandle| port.port_id() == port_id)
694            .and_then(InputPortHandle::capacity)
695    }
696
697    /// Try to receive one packet from a declared input port without blocking.
698    ///
699    /// Returns `Ok(None)` when the port is declared but no packet is currently
700    /// queued. Use [`Self::recv`] to wait asynchronously.
701    ///
702    /// # Errors
703    ///
704    /// Returns an error if the port is undeclared or all upstream senders have
705    /// disconnected.
706    pub fn try_recv(&mut self, port_id: &PortId) -> Result<Option<PortPacket>, PortRecvError> {
707        let port: &mut InputPortHandle = self
708            .ports
709            .iter_mut()
710            .find(|port: &&mut InputPortHandle| port.port_id() == port_id)
711            .ok_or_else(|| PortRecvError::UnknownPort {
712                port_id: port_id.clone(),
713            })?;
714        Self::record_input_queue_pressure(
715            self.metadata_sink.as_ref(),
716            self.context.as_ref(),
717            port,
718            QueuePressureBoundaryKind::ReceiveAttempted,
719        );
720        let packet: Option<PortPacket> = match port.try_recv() {
721            Ok(packet) => packet,
722            Err(PortRecvError::Disconnected { .. }) => {
723                Self::record_input_queue_pressure(
724                    self.metadata_sink.as_ref(),
725                    self.context.as_ref(),
726                    port,
727                    QueuePressureBoundaryKind::ReceiveClosed,
728                );
729                return Err(PortRecvError::Disconnected {
730                    port_id: port_id.clone(),
731                });
732            }
733            Err(err) => return Err(err),
734        };
735        match packet.as_ref() {
736            Some(packet) => {
737                Self::record_input_queue_pressure(
738                    self.metadata_sink.as_ref(),
739                    self.context.as_ref(),
740                    port,
741                    QueuePressureBoundaryKind::ReceiveReady,
742                );
743                Self::record_message_boundary(
744                    self.metadata_sink.as_ref(),
745                    MessageBoundaryKind::Dequeued,
746                    packet.metadata(),
747                );
748            }
749            None => Self::record_input_queue_pressure(
750                self.metadata_sink.as_ref(),
751                self.context.as_ref(),
752                port,
753                QueuePressureBoundaryKind::ReceiveEmpty,
754            ),
755        }
756        Ok(packet)
757    }
758
759    /// Receive one packet from a declared input port, waiting asynchronously.
760    ///
761    /// Returns `Ok(None)` when the port is declared but has no connected
762    /// upstream edges. Connected ports wait until a packet arrives, every
763    /// upstream edge disconnects, or cancellation is observed.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the port is undeclared, all upstream senders have
768    /// disconnected, or cancellation is observed.
769    pub async fn recv(
770        &mut self,
771        port_id: &PortId,
772        cancellation: &CancellationToken,
773    ) -> Result<Option<PortPacket>, PortRecvError> {
774        let port: &mut InputPortHandle = self
775            .ports
776            .iter_mut()
777            .find(|port: &&mut InputPortHandle| port.port_id() == port_id)
778            .ok_or_else(|| PortRecvError::UnknownPort {
779                port_id: port_id.clone(),
780            })?;
781        Self::record_input_queue_pressure(
782            self.metadata_sink.as_ref(),
783            self.context.as_ref(),
784            port,
785            QueuePressureBoundaryKind::ReceiveAttempted,
786        );
787        let packet: Option<PortPacket> = match port.recv(cancellation).await {
788            Ok(packet) => packet,
789            Err(PortRecvError::Disconnected { .. }) => {
790                Self::record_input_queue_pressure(
791                    self.metadata_sink.as_ref(),
792                    self.context.as_ref(),
793                    port,
794                    QueuePressureBoundaryKind::ReceiveClosed,
795                );
796                return Err(PortRecvError::Disconnected {
797                    port_id: port_id.clone(),
798                });
799            }
800            Err(err) => return Err(err),
801        };
802        match packet.as_ref() {
803            Some(packet) => {
804                Self::record_input_queue_pressure(
805                    self.metadata_sink.as_ref(),
806                    self.context.as_ref(),
807                    port,
808                    QueuePressureBoundaryKind::ReceiveReady,
809                );
810                Self::record_message_boundary(
811                    self.metadata_sink.as_ref(),
812                    MessageBoundaryKind::Dequeued,
813                    packet.metadata(),
814                );
815            }
816            None => Self::record_input_queue_pressure(
817                self.metadata_sink.as_ref(),
818                self.context.as_ref(),
819                port,
820                QueuePressureBoundaryKind::ReceiveEmpty,
821            ),
822        }
823        Ok(packet)
824    }
825
826    /// Receive one packet from any declared input port, waiting asynchronously.
827    ///
828    /// Returns the port that produced the packet with the packet itself.
829    /// Returns `Ok(None)` when the node has no declared inputs or every input
830    /// is closed or disconnected.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if cancellation is observed while waiting.
835    pub async fn recv_any(
836        &mut self,
837        cancellation: &CancellationToken,
838    ) -> Result<Option<(PortId, PortPacket)>, PortRecvError> {
839        if self.ports.is_empty() {
840            return Ok(None);
841        }
842
843        let metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>> =
844            self.metadata_sink.as_ref();
845        let context: Option<&NodeContext> = self.context.as_ref();
846        let runtime_cx: Cx = Cx::current().unwrap_or_else(Cx::for_testing);
847        for port in &self.ports {
848            Self::record_input_queue_pressure(
849                metadata_sink,
850                context,
851                port,
852                QueuePressureBoundaryKind::ReceiveAttempted,
853            );
854        }
855        std::future::poll_fn(|task_cx: &mut Context<'_>| {
856            if cancellation.is_cancelled() {
857                return self.ports.first().map_or(
858                    Poll::Ready(Ok(None)),
859                    |first_port: &InputPortHandle| {
860                        Poll::Ready(Err(PortRecvError::Cancelled {
861                            port_id: first_port.port_id().clone(),
862                        }))
863                    },
864                );
865            }
866
867            let mut closed_port_count: usize = 0;
868            for port in &mut self.ports {
869                match port.poll_any(&runtime_cx, task_cx) {
870                    InputPollResult::Packet(packet) => {
871                        Self::record_input_queue_pressure(
872                            metadata_sink,
873                            context,
874                            port,
875                            QueuePressureBoundaryKind::ReceiveReady,
876                        );
877                        Self::record_message_boundary(
878                            metadata_sink,
879                            MessageBoundaryKind::Dequeued,
880                            packet.metadata(),
881                        );
882                        return Poll::Ready(Ok(Some((port.port_id().clone(), *packet))));
883                    }
884                    InputPollResult::Closed => {
885                        closed_port_count += 1;
886                        Self::record_input_queue_pressure(
887                            metadata_sink,
888                            context,
889                            port,
890                            QueuePressureBoundaryKind::ReceiveClosed,
891                        );
892                    }
893                    InputPollResult::Cancelled => {
894                        return Poll::Ready(Err(PortRecvError::Cancelled {
895                            port_id: port.port_id().clone(),
896                        }));
897                    }
898                    InputPollResult::Pending => {}
899                }
900            }
901
902            if closed_port_count == self.ports.len() {
903                return Poll::Ready(Ok(None));
904            }
905
906            Poll::Pending
907        })
908        .await
909    }
910
911    fn record_message_boundary(
912        metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>>,
913        kind: MessageBoundaryKind,
914        metadata: &crate::message::MessageMetadata,
915    ) {
916        let Some(metadata_sink): Option<&Arc<dyn MetadataSink + Send + Sync>> = metadata_sink
917        else {
918            return;
919        };
920
921        let record: MetadataRecord =
922            MetadataRecord::Message(MessageBoundaryRecord::new(kind, metadata.clone()));
923        let _ = metadata_sink.record(&record);
924    }
925
926    fn record_input_queue_pressure(
927        metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>>,
928        context: Option<&NodeContext>,
929        port: &InputPortHandle,
930        kind: QueuePressureBoundaryKind,
931    ) {
932        let Some(metadata_sink): Option<&Arc<dyn MetadataSink + Send + Sync>> = metadata_sink
933        else {
934            return;
935        };
936
937        let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
938            context.cloned(),
939            QueuePortDirection::Input,
940            port.port_id().clone(),
941            kind,
942            port.connected_edge_count(),
943            port.total_capacity(),
944            Some(port.queued_count()),
945        ));
946        let _ = metadata_sink.record(&record);
947    }
948}
949
950/// Declared output ports available to a node execution boundary.
951#[derive(Clone, Default)]
952pub struct PortsOut {
953    port_ids: Vec<PortId>,
954    ports: Vec<OutputPortHandle>,
955    metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
956    validator: Option<Arc<dyn OutputPacketValidator>>,
957    context: Option<NodeContext>,
958}
959
960impl PortsOut {
961    /// Create output handles with declared port identifiers and no channels.
962    #[must_use]
963    pub fn new(port_ids: impl Into<Vec<PortId>>) -> Self {
964        let port_ids: Vec<PortId> = port_ids.into();
965        Self::from_handles(port_ids, Vec::new())
966    }
967
968    /// Create output handles from declared ports and connected channel handles.
969    #[must_use]
970    pub fn from_handles(
971        port_ids: impl Into<Vec<PortId>>,
972        handles: impl Into<Vec<OutputPortHandle>>,
973    ) -> Self {
974        let port_ids: Vec<PortId> = port_ids.into();
975        let mut by_port: BTreeMap<PortId, OutputPortHandle> = BTreeMap::new();
976
977        for handle in handles.into() {
978            let port_id: PortId = handle.port_id().clone();
979            if let Some(existing) = by_port.get_mut(&port_id) {
980                existing.append(handle);
981            } else {
982                by_port.insert(port_id, handle);
983            }
984        }
985
986        let mut ports: Vec<OutputPortHandle> = Vec::with_capacity(port_ids.len());
987        for port_id in &port_ids {
988            let handle: OutputPortHandle = by_port
989                .remove(port_id)
990                .unwrap_or_else(|| OutputPortHandle::disconnected(port_id.clone()));
991            ports.push(handle);
992        }
993
994        Self {
995            port_ids,
996            ports,
997            metadata_sink: None,
998            validator: None,
999            context: None,
1000        }
1001    }
1002
1003    /// Attach a metadata sink for send-side observations.
1004    #[must_use]
1005    pub fn with_metadata_sink(
1006        mut self,
1007        metadata_sink: Arc<dyn MetadataSink + Send + Sync>,
1008    ) -> Self {
1009        self.metadata_sink = Some(metadata_sink);
1010        self
1011    }
1012
1013    /// Attach node context for send-side queue observations.
1014    #[must_use]
1015    pub fn with_node_context(mut self, context: NodeContext) -> Self {
1016        self.context = Some(context);
1017        self
1018    }
1019
1020    /// Attach a validator that runs before output packets enter graph edges.
1021    #[must_use]
1022    pub fn with_output_validator(mut self, validator: Arc<dyn OutputPacketValidator>) -> Self {
1023        self.validator = Some(validator);
1024        self
1025    }
1026
1027    /// Declared output port identifiers for this node.
1028    #[must_use]
1029    pub fn port_ids(&self) -> &[PortId] {
1030        &self.port_ids
1031    }
1032
1033    /// Return whether this node currently has no declared outputs.
1034    #[must_use]
1035    pub const fn is_empty(&self) -> bool {
1036        self.port_ids.is_empty()
1037    }
1038
1039    /// Number of connected downstream edges for a declared output port.
1040    #[must_use]
1041    pub fn connected_edge_count(&self, port_id: &PortId) -> Option<usize> {
1042        self.ports
1043            .iter()
1044            .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1045            .map(OutputPortHandle::connected_edge_count)
1046    }
1047
1048    /// Capacity of the first connected downstream edge for a declared output port.
1049    #[must_use]
1050    pub fn capacity(&self, port_id: &PortId) -> Option<usize> {
1051        self.ports
1052            .iter()
1053            .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1054            .and_then(OutputPortHandle::capacity)
1055    }
1056
1057    /// Try to send one packet through a declared output port without blocking.
1058    ///
1059    /// Unconnected declared output ports accept and drop packets. That keeps
1060    /// early scaffold nodes simple while later beads define explicit fan-out
1061    /// and disconnected-edge policy. Connected sends reserve capacity before
1062    /// committing the packet, so cancellation or drop between those phases
1063    /// releases the reserved slots instead of creating partial messages.
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if the port is undeclared, a downstream receiver has
1068    /// disconnected, or a bounded downstream edge is full.
1069    pub fn try_send(&self, port_id: &PortId, packet: PortPacket) -> Result<(), PortSendError> {
1070        self.try_reserve(port_id)?.send(packet)
1071    }
1072
1073    /// Send one packet through a declared output port, waiting asynchronously
1074    /// for bounded downstream capacity.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the port is undeclared, a downstream receiver has
1079    /// disconnected, or cancellation is observed.
1080    pub async fn send(
1081        &self,
1082        port_id: &PortId,
1083        packet: PortPacket,
1084        cancellation: &CancellationToken,
1085    ) -> Result<(), PortSendError> {
1086        self.reserve(port_id, cancellation).await?.send(packet)
1087    }
1088
1089    /// Try to reserve output capacity without committing a packet.
1090    ///
1091    /// Dropping the returned permit releases all reserved downstream slots.
1092    ///
1093    /// # Errors
1094    ///
1095    /// Returns an error if the port is undeclared, a downstream receiver has
1096    /// disconnected, or a bounded downstream edge is full.
1097    pub fn try_reserve(&self, port_id: &PortId) -> Result<PortSendPermit<'_>, PortSendError> {
1098        let port: &OutputPortHandle = self
1099            .ports
1100            .iter()
1101            .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1102            .ok_or_else(|| PortSendError::UnknownPort {
1103                port_id: port_id.clone(),
1104            })?;
1105        port.try_reserve(
1106            self.metadata_sink.clone(),
1107            self.validator.clone(),
1108            self.context.as_ref(),
1109        )
1110        .map(|inner: OutputPortSendPermit<'_>| PortSendPermit { inner })
1111    }
1112
1113    /// Reserve output capacity asynchronously without committing a packet.
1114    ///
1115    /// Dropping the returned permit releases all reserved downstream slots.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if the port is undeclared, a downstream receiver has
1120    /// disconnected, or cancellation is observed.
1121    pub async fn reserve(
1122        &self,
1123        port_id: &PortId,
1124        cancellation: &CancellationToken,
1125    ) -> Result<PortSendPermit<'_>, PortSendError> {
1126        let port: &OutputPortHandle = self
1127            .ports
1128            .iter()
1129            .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1130            .ok_or_else(|| PortSendError::UnknownPort {
1131                port_id: port_id.clone(),
1132            })?;
1133        port.reserve(
1134            cancellation,
1135            self.metadata_sink.clone(),
1136            self.validator.clone(),
1137            self.context.as_ref(),
1138        )
1139        .await
1140        .map(|inner: OutputPortSendPermit<'_>| PortSendPermit { inner })
1141    }
1142}
1143
1144fn record_output_queue_pressure(
1145    metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>>,
1146    context: Option<&NodeContext>,
1147    port: &OutputPortHandle,
1148    kind: QueuePressureBoundaryKind,
1149) {
1150    let Some(metadata_sink): Option<&Arc<dyn MetadataSink + Send + Sync>> = metadata_sink else {
1151        return;
1152    };
1153
1154    let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
1155        context.cloned(),
1156        QueuePortDirection::Output,
1157        port.port_id().clone(),
1158        kind,
1159        port.connected_edge_count(),
1160        port.total_capacity(),
1161        None,
1162    ));
1163    let _ = metadata_sink.record(&record);
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169    use std::future::Future;
1170    use std::sync::{Arc, Mutex};
1171
1172    use asupersync::runtime::{Runtime, RuntimeBuilder};
1173    use pureflow_types::{ExecutionId, MessageId, NodeId, WorkflowId};
1174
1175    use crate::{
1176        context::{CancellationRequest, ExecutionMetadata},
1177        message::{MessageEndpoint, MessageMetadata, MessageRoute},
1178    };
1179
1180    fn execution_id(value: &str) -> ExecutionId {
1181        ExecutionId::new(value).expect("valid execution id")
1182    }
1183
1184    fn message_id(value: &str) -> MessageId {
1185        MessageId::new(value).expect("valid message id")
1186    }
1187
1188    fn node_id(value: &str) -> NodeId {
1189        NodeId::new(value).expect("valid node id")
1190    }
1191
1192    fn port_id(value: &str) -> PortId {
1193        PortId::new(value).expect("valid port id")
1194    }
1195
1196    fn workflow_id(value: &str) -> WorkflowId {
1197        WorkflowId::new(value).expect("valid workflow id")
1198    }
1199
1200    fn packet(value: &[u8]) -> PortPacket {
1201        let source: MessageEndpoint = MessageEndpoint::new(node_id("source"), port_id("out"));
1202        let target: MessageEndpoint = MessageEndpoint::new(node_id("sink"), port_id("in"));
1203        let route: MessageRoute = MessageRoute::new(Some(source), target);
1204        let execution: ExecutionMetadata = ExecutionMetadata::first_attempt(execution_id("run-1"));
1205        let metadata: MessageMetadata =
1206            MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution, route);
1207
1208        MessageEnvelope::new(metadata, PacketPayload::from(value.to_vec()))
1209    }
1210
1211    fn block_on_port<F: Future>(future: F) -> F::Output {
1212        let runtime: Runtime = RuntimeBuilder::current_thread()
1213            .build()
1214            .expect("test runtime should build");
1215        runtime.block_on(future)
1216    }
1217
1218    #[derive(Debug, Default)]
1219    struct RecordingMetadataSink {
1220        records: Mutex<Vec<MetadataRecord>>,
1221    }
1222
1223    impl RecordingMetadataSink {
1224        fn records(&self) -> Vec<MetadataRecord> {
1225            self.records
1226                .lock()
1227                .expect("metadata sink lock should not be poisoned")
1228                .clone()
1229        }
1230    }
1231
1232    impl MetadataSink for RecordingMetadataSink {
1233        fn record(&self, record: &MetadataRecord) -> crate::Result<()> {
1234            self.records
1235                .lock()
1236                .expect("metadata sink lock should not be poisoned")
1237                .push(record.clone());
1238            Ok(())
1239        }
1240    }
1241
1242    #[derive(Debug)]
1243    struct RejectingOutputValidator;
1244
1245    impl OutputPacketValidator for RejectingOutputValidator {
1246        fn validate(&self, port_id: &PortId, _packet: &PortPacket) -> Result<(), PortSendError> {
1247            Err(PortSendError::Rejected {
1248                port_id: port_id.clone(),
1249                reason: "contract mismatch".to_owned(),
1250            })
1251        }
1252    }
1253
1254    #[test]
1255    fn ports_preserve_declared_port_order() {
1256        let inputs: PortsIn = PortsIn::new(vec![port_id("left"), port_id("right")]);
1257        let outputs: PortsOut = PortsOut::new(vec![port_id("out")]);
1258
1259        assert_eq!(
1260            inputs
1261                .port_ids()
1262                .iter()
1263                .map(PortId::as_str)
1264                .collect::<Vec<_>>(),
1265            vec!["left", "right"]
1266        );
1267        assert_eq!(
1268            outputs
1269                .port_ids()
1270                .iter()
1271                .map(PortId::as_str)
1272                .collect::<Vec<_>>(),
1273            vec!["out"]
1274        );
1275    }
1276
1277    #[test]
1278    fn bounded_edge_channel_enforces_capacity() {
1279        let (output, input): (OutputPortHandle, InputPortHandle) =
1280            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1281        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1282        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1283
1284        outputs
1285            .try_send(&port_id("out"), packet(b"first"))
1286            .expect("first packet should fit");
1287        let err: PortSendError = outputs
1288            .try_send(&port_id("out"), packet(b"second"))
1289            .expect_err("second packet should hit bounded capacity");
1290
1291        assert_eq!(
1292            err,
1293            PortSendError::Full {
1294                port_id: port_id("out")
1295            }
1296        );
1297        assert_eq!(outputs.capacity(&port_id("out")), Some(1));
1298        assert_eq!(inputs.capacity(&port_id("in")), Some(1));
1299
1300        let received: PortPacket = inputs
1301            .try_recv(&port_id("in"))
1302            .expect("receive should succeed")
1303            .expect("packet should be queued");
1304
1305        assert_eq!(
1306            received
1307                .payload()
1308                .as_bytes()
1309                .expect("received packet should contain bytes")
1310                .as_ref(),
1311            b"first"
1312        );
1313        assert!(
1314            inputs
1315                .try_recv(&port_id("in"))
1316                .expect("empty receive should not fail")
1317                .is_none()
1318        );
1319    }
1320
1321    #[test]
1322    fn reserved_output_capacity_commits_on_send() {
1323        let (output, input): (OutputPortHandle, InputPortHandle) =
1324            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1325        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1326        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1327
1328        let permit: PortSendPermit<'_> = outputs
1329            .try_reserve(&port_id("out"))
1330            .expect("reservation should succeed");
1331        let err: PortSendError = outputs
1332            .try_send(&port_id("out"), packet(b"blocked"))
1333            .expect_err("reserved capacity should block another send");
1334
1335        assert_eq!(
1336            err,
1337            PortSendError::Full {
1338                port_id: port_id("out")
1339            }
1340        );
1341
1342        permit
1343            .send(packet(b"committed"))
1344            .expect("reserved packet should pass validation");
1345
1346        let received: PortPacket = inputs
1347            .try_recv(&port_id("in"))
1348            .expect("receive should succeed")
1349            .expect("committed packet should be queued");
1350        assert_eq!(
1351            received
1352                .payload()
1353                .as_bytes()
1354                .expect("received packet should contain bytes")
1355                .as_ref(),
1356            b"committed"
1357        );
1358    }
1359
1360    #[test]
1361    fn dropped_output_permit_releases_capacity_without_message() {
1362        let (output, input): (OutputPortHandle, InputPortHandle) =
1363            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1364        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1365        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1366
1367        let permit: PortSendPermit<'_> = outputs
1368            .try_reserve(&port_id("out"))
1369            .expect("reservation should succeed");
1370        drop(permit);
1371
1372        assert!(
1373            inputs
1374                .try_recv(&port_id("in"))
1375                .expect("dropped permit should not disconnect")
1376                .is_none()
1377        );
1378
1379        outputs
1380            .try_send(&port_id("out"), packet(b"after-drop"))
1381            .expect("dropped permit should release capacity");
1382        let received: PortPacket = inputs
1383            .try_recv(&port_id("in"))
1384            .expect("receive should succeed")
1385            .expect("new packet should be queued");
1386
1387        assert_eq!(
1388            received
1389                .payload()
1390                .as_bytes()
1391                .expect("received packet should contain bytes")
1392                .as_ref(),
1393            b"after-drop"
1394        );
1395    }
1396
1397    #[test]
1398    fn aborted_output_permit_releases_capacity_without_message() {
1399        let (output, input): (OutputPortHandle, InputPortHandle) =
1400            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1401        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1402        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1403
1404        outputs
1405            .try_reserve(&port_id("out"))
1406            .expect("reservation should succeed")
1407            .abort();
1408
1409        assert!(
1410            inputs
1411                .try_recv(&port_id("in"))
1412                .expect("aborted permit should not disconnect")
1413                .is_none()
1414        );
1415        outputs
1416            .try_send(&port_id("out"), packet(b"after-abort"))
1417            .expect("aborted permit should release capacity");
1418    }
1419
1420    #[test]
1421    fn send_and_recv_emit_message_boundary_metadata() {
1422        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1423        let (output, input): (OutputPortHandle, InputPortHandle) =
1424            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1425        let mut inputs: PortsIn =
1426            PortsIn::from_handles([port_id("in")], [input]).with_metadata_sink(sink.clone());
1427        let outputs: PortsOut =
1428            PortsOut::from_handles([port_id("out")], [output]).with_metadata_sink(sink.clone());
1429
1430        outputs
1431            .try_send(&port_id("out"), packet(b"boundary"))
1432            .expect("send should succeed");
1433        let received: PortPacket = inputs
1434            .try_recv(&port_id("in"))
1435            .expect("receive should succeed")
1436            .expect("packet should be queued");
1437
1438        assert_eq!(
1439            received
1440                .payload()
1441                .as_bytes()
1442                .expect("received packet should contain bytes")
1443                .as_ref(),
1444            b"boundary"
1445        );
1446        assert_eq!(
1447            sink.records()
1448                .into_iter()
1449                .filter_map(|record| match record {
1450                    MetadataRecord::Message(message) => Some(message.kind()),
1451                    _ => None,
1452                })
1453                .collect::<Vec<_>>(),
1454            vec![MessageBoundaryKind::Enqueued, MessageBoundaryKind::Dequeued]
1455        );
1456    }
1457
1458    #[test]
1459    fn send_and_recv_emit_queue_pressure_metadata() {
1460        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1461        let (output, input): (OutputPortHandle, InputPortHandle) =
1462            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1463        let mut inputs: PortsIn =
1464            PortsIn::from_handles([port_id("in")], [input]).with_metadata_sink(sink.clone());
1465        let outputs: PortsOut =
1466            PortsOut::from_handles([port_id("out")], [output]).with_metadata_sink(sink.clone());
1467
1468        outputs
1469            .try_send(&port_id("out"), packet(b"boundary"))
1470            .expect("send should succeed");
1471        let _received: PortPacket = inputs
1472            .try_recv(&port_id("in"))
1473            .expect("receive should succeed")
1474            .expect("packet should be queued");
1475
1476        let queue_records: Vec<QueuePressureRecord> = sink
1477            .records()
1478            .into_iter()
1479            .filter_map(|record: MetadataRecord| match record {
1480                MetadataRecord::QueuePressure(queue) => Some(queue),
1481                _ => None,
1482            })
1483            .collect();
1484        assert_eq!(
1485            queue_records
1486                .iter()
1487                .map(QueuePressureRecord::kind)
1488                .collect::<Vec<_>>(),
1489            vec![
1490                QueuePressureBoundaryKind::ReserveAttempted,
1491                QueuePressureBoundaryKind::ReserveReady,
1492                QueuePressureBoundaryKind::SendCommitted,
1493                QueuePressureBoundaryKind::ReceiveAttempted,
1494                QueuePressureBoundaryKind::ReceiveReady,
1495            ]
1496        );
1497        let reserve_attempt: &QueuePressureRecord = queue_records
1498            .iter()
1499            .find(|record: &&QueuePressureRecord| {
1500                record.kind() == QueuePressureBoundaryKind::ReserveAttempted
1501            })
1502            .expect("reserve attempt should be recorded");
1503        let receive_attempt: &QueuePressureRecord = queue_records
1504            .iter()
1505            .find(|record: &&QueuePressureRecord| {
1506                record.kind() == QueuePressureBoundaryKind::ReceiveAttempted
1507            })
1508            .expect("receive attempt should be recorded");
1509        let receive_ready: &QueuePressureRecord = queue_records
1510            .iter()
1511            .find(|record: &&QueuePressureRecord| {
1512                record.kind() == QueuePressureBoundaryKind::ReceiveReady
1513            })
1514            .expect("receive ready should be recorded");
1515
1516        assert_eq!(reserve_attempt.capacity(), Some(1));
1517        assert_eq!(receive_attempt.queued_count(), Some(1));
1518        assert_eq!(receive_ready.queued_count(), Some(0));
1519    }
1520
1521    #[test]
1522    fn unconnected_output_records_message_drop() {
1523        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1524        let outputs: PortsOut = PortsOut::new([port_id("out")]).with_metadata_sink(sink.clone());
1525
1526        outputs
1527            .try_send(&port_id("out"), packet(b"dropped"))
1528            .expect("unconnected output should accept and drop packets");
1529
1530        assert_eq!(
1531            sink.records()
1532                .into_iter()
1533                .filter_map(|record| match record {
1534                    MetadataRecord::Message(message) => Some(message.kind()),
1535                    _ => None,
1536                })
1537                .collect::<Vec<_>>(),
1538            vec![MessageBoundaryKind::Dropped]
1539        );
1540    }
1541
1542    #[test]
1543    fn async_send_and_recv_round_trip() {
1544        let (output, input): (OutputPortHandle, InputPortHandle) =
1545            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1546        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1547        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1548        let cancellation: CancellationToken = CancellationToken::active();
1549
1550        let received: PortPacket = block_on_port(async {
1551            outputs
1552                .send(&port_id("out"), packet(b"async"), &cancellation)
1553                .await
1554                .expect("async send should succeed");
1555            inputs
1556                .recv(&port_id("in"), &cancellation)
1557                .await
1558                .expect("async receive should succeed")
1559                .expect("packet should be available")
1560        });
1561
1562        assert_eq!(
1563            received
1564                .payload()
1565                .as_bytes()
1566                .expect("received packet should contain bytes")
1567                .as_ref(),
1568            b"async"
1569        );
1570    }
1571
1572    #[test]
1573    fn async_reserve_commits_after_capacity_is_available() {
1574        let (output, input): (OutputPortHandle, InputPortHandle) =
1575            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1576        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1577        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1578        let cancellation: CancellationToken = CancellationToken::active();
1579
1580        outputs
1581            .try_send(&port_id("out"), packet(b"queued"))
1582            .expect("first packet should fill the edge");
1583        let queued: PortPacket = inputs
1584            .try_recv(&port_id("in"))
1585            .expect("receive should succeed")
1586            .expect("queued packet should be present");
1587        assert_eq!(
1588            queued
1589                .payload()
1590                .as_bytes()
1591                .expect("queued packet should contain bytes")
1592                .as_ref(),
1593            b"queued"
1594        );
1595
1596        block_on_port(async {
1597            outputs
1598                .reserve(&port_id("out"), &cancellation)
1599                .await
1600                .expect("capacity should be available")
1601                .send(packet(b"reserved"))
1602                .expect("reserved packet should pass validation");
1603        });
1604
1605        let received: PortPacket = inputs
1606            .try_recv(&port_id("in"))
1607            .expect("receive should succeed")
1608            .expect("reserved packet should be queued");
1609        assert_eq!(
1610            received
1611                .payload()
1612                .as_bytes()
1613                .expect("received packet should contain bytes")
1614                .as_ref(),
1615            b"reserved"
1616        );
1617    }
1618
1619    #[test]
1620    fn async_recv_reports_disconnected_after_sender_drop() {
1621        let (output, input): (OutputPortHandle, InputPortHandle) =
1622            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1623        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1624        let cancellation: CancellationToken = CancellationToken::active();
1625        drop(output);
1626
1627        let err: PortRecvError = block_on_port(async {
1628            inputs
1629                .recv(&port_id("in"), &cancellation)
1630                .await
1631                .expect_err("disconnected input should fail")
1632        });
1633
1634        assert_eq!(
1635            err,
1636            PortRecvError::Disconnected {
1637                port_id: port_id("in")
1638            }
1639        );
1640    }
1641
1642    #[test]
1643    fn async_port_operations_observe_pre_cancelled_tokens() {
1644        let (output, input): (OutputPortHandle, InputPortHandle) =
1645            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1646        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1647        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1648        let cancellation: CancellationToken =
1649            CancellationToken::cancelled(CancellationRequest::new("test shutdown"));
1650
1651        let recv_err: PortRecvError = block_on_port(async {
1652            inputs
1653                .recv(&port_id("in"), &cancellation)
1654                .await
1655                .expect_err("cancelled receive should fail")
1656        });
1657        let send_err: PortSendError = block_on_port(async {
1658            outputs
1659                .send(&port_id("out"), packet(b"cancelled"), &cancellation)
1660                .await
1661                .expect_err("cancelled send should fail")
1662        });
1663
1664        assert_eq!(
1665            recv_err,
1666            PortRecvError::Cancelled {
1667                port_id: port_id("in")
1668            }
1669        );
1670        assert_eq!(
1671            send_err,
1672            PortSendError::Cancelled {
1673                port_id: port_id("out")
1674            }
1675        );
1676    }
1677
1678    #[test]
1679    fn output_validator_rejects_before_enqueueing_packet() {
1680        let (output, input): (OutputPortHandle, InputPortHandle) =
1681            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1682        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1683        let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output])
1684            .with_output_validator(Arc::new(RejectingOutputValidator));
1685
1686        let err: PortSendError = outputs
1687            .try_send(&port_id("out"), packet(b"rejected"))
1688            .expect_err("validator should reject the packet");
1689
1690        assert_eq!(
1691            err,
1692            PortSendError::Rejected {
1693                port_id: port_id("out"),
1694                reason: "contract mismatch".to_owned()
1695            }
1696        );
1697        assert!(
1698            inputs
1699                .try_recv(&port_id("in"))
1700                .expect("receive should succeed")
1701                .is_none()
1702        );
1703    }
1704
1705    #[test]
1706    fn recv_any_returns_first_ready_input_packet() {
1707        let (left_output, left_input): (OutputPortHandle, InputPortHandle) =
1708            bounded_edge_channel(port_id("left-out"), port_id("left"), NonZeroUsize::MIN);
1709        let (right_output, right_input): (OutputPortHandle, InputPortHandle) =
1710            bounded_edge_channel(port_id("right-out"), port_id("right"), NonZeroUsize::MIN);
1711        let mut inputs: PortsIn = PortsIn::from_handles(
1712            [port_id("left"), port_id("right")],
1713            [left_input, right_input],
1714        );
1715        let right_outputs: PortsOut =
1716            PortsOut::from_handles([port_id("right-out")], [right_output]);
1717        let cancellation: CancellationToken = CancellationToken::active();
1718        let _left_output: OutputPortHandle = left_output;
1719
1720        right_outputs
1721            .try_send(&port_id("right-out"), packet(b"right-ready"))
1722            .expect("right packet should queue");
1723
1724        let (ready_port, received): (PortId, PortPacket) = block_on_port(async {
1725            inputs
1726                .recv_any(&cancellation)
1727                .await
1728                .expect("recv_any should succeed")
1729                .expect("one input should be ready")
1730        });
1731
1732        assert_eq!(ready_port, port_id("right"));
1733        assert_eq!(
1734            received
1735                .payload()
1736                .as_bytes()
1737                .expect("received packet should contain bytes")
1738                .as_ref(),
1739            b"right-ready"
1740        );
1741    }
1742
1743    #[test]
1744    fn recv_any_returns_none_when_all_inputs_are_closed() {
1745        let (left_output, left_input): (OutputPortHandle, InputPortHandle) =
1746            bounded_edge_channel(port_id("left-out"), port_id("left"), NonZeroUsize::MIN);
1747        let (right_output, right_input): (OutputPortHandle, InputPortHandle) =
1748            bounded_edge_channel(port_id("right-out"), port_id("right"), NonZeroUsize::MIN);
1749        let mut inputs: PortsIn = PortsIn::from_handles(
1750            [port_id("left"), port_id("right")],
1751            [left_input, right_input],
1752        );
1753        let cancellation: CancellationToken = CancellationToken::active();
1754        drop(left_output);
1755        drop(right_output);
1756
1757        let received: Option<(PortId, PortPacket)> = block_on_port(async {
1758            inputs
1759                .recv_any(&cancellation)
1760                .await
1761                .expect("closed inputs should end cleanly")
1762        });
1763
1764        assert!(received.is_none());
1765    }
1766
1767    #[test]
1768    fn recv_any_observes_pre_cancelled_tokens() {
1769        let (_output, input): (OutputPortHandle, InputPortHandle) =
1770            bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1771        let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1772        let cancellation: CancellationToken =
1773            CancellationToken::cancelled(CancellationRequest::new("test shutdown"));
1774
1775        let err: PortRecvError = block_on_port(async {
1776            inputs
1777                .recv_any(&cancellation)
1778                .await
1779                .expect_err("cancelled recv_any should fail")
1780        });
1781
1782        assert_eq!(
1783            err,
1784            PortRecvError::Cancelled {
1785                port_id: port_id("in")
1786            }
1787        );
1788    }
1789
1790    #[test]
1791    fn undeclared_ports_are_rejected() {
1792        let mut inputs: PortsIn = PortsIn::new([port_id("in")]);
1793        let outputs: PortsOut = PortsOut::new([port_id("out")]);
1794
1795        assert_eq!(
1796            outputs
1797                .try_send(&port_id("missing"), packet(b"value"))
1798                .expect_err("unknown output must fail"),
1799            PortSendError::UnknownPort {
1800                port_id: port_id("missing")
1801            }
1802        );
1803        assert_eq!(
1804            inputs
1805                .try_recv(&port_id("missing"))
1806                .expect_err("unknown input must fail"),
1807            PortRecvError::UnknownPort {
1808                port_id: port_id("missing")
1809            }
1810        );
1811    }
1812}