1use std::collections::BTreeMap;
29use std::error::Error;
30use std::fmt;
31use std::num::NonZeroUsize;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34
35use asupersync::{Cx, channel::mpsc, runtime::yield_now};
36use pureflow_types::PortId;
37
38use crate::message::{MessageEnvelope, PacketPayload};
39use crate::{
40 context::{CancellationToken, NodeContext},
41 metadata::{
42 MessageBoundaryKind, MessageBoundaryRecord, MetadataRecord, MetadataSink,
43 QueuePortDirection, QueuePressureBoundaryKind, QueuePressureRecord,
44 },
45};
46
47pub type PortPacket = MessageEnvelope<PacketPayload>;
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum PortSendError {
53 UnknownPort {
55 port_id: PortId,
57 },
58 Disconnected {
60 port_id: PortId,
62 },
63 Full {
65 port_id: PortId,
67 },
68 Cancelled {
70 port_id: PortId,
72 },
73 Rejected {
75 port_id: PortId,
77 reason: String,
79 },
80}
81
82impl fmt::Display for PortSendError {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 Self::UnknownPort { port_id } => {
86 write!(f, "output port `{port_id}` is not declared")
87 }
88 Self::Disconnected { port_id } => {
89 write!(f, "output port `{port_id}` is disconnected")
90 }
91 Self::Full { port_id } => write!(f, "output port `{port_id}` is full"),
92 Self::Cancelled { port_id } => {
93 write!(f, "output port `{port_id}` send cancelled")
94 }
95 Self::Rejected { port_id, reason } => {
96 write!(f, "output port `{port_id}` rejected packet: {reason}")
97 }
98 }
99 }
100}
101
102impl Error for PortSendError {}
103
104pub trait OutputPacketValidator: Send + Sync {
106 fn validate(&self, port_id: &PortId, packet: &PortPacket) -> Result<(), PortSendError>;
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
116pub enum PortRecvError {
117 UnknownPort {
119 port_id: PortId,
121 },
122 Disconnected {
124 port_id: PortId,
126 },
127 Cancelled {
129 port_id: PortId,
131 },
132}
133
134impl fmt::Display for PortRecvError {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 match self {
137 Self::UnknownPort { port_id } => {
138 write!(f, "input port `{port_id}` is not declared")
139 }
140 Self::Disconnected { port_id } => {
141 write!(f, "input port `{port_id}` is disconnected")
142 }
143 Self::Cancelled { port_id } => {
144 write!(f, "input port `{port_id}` receive cancelled")
145 }
146 }
147 }
148}
149
150impl Error for PortRecvError {}
151
152#[derive(Debug)]
154pub struct InputPortHandle {
155 port_id: PortId,
156 receivers: Vec<mpsc::Receiver<PortPacket>>,
157}
158
159enum InputPollResult {
160 Packet(Box<PortPacket>),
161 Pending,
162 Closed,
163 Cancelled,
164}
165
166impl InputPortHandle {
167 #[must_use]
169 pub const fn disconnected(port_id: PortId) -> Self {
170 Self {
171 port_id,
172 receivers: Vec::new(),
173 }
174 }
175
176 fn connected(port_id: PortId, receiver: mpsc::Receiver<PortPacket>) -> Self {
177 Self {
178 port_id,
179 receivers: vec![receiver],
180 }
181 }
182
183 fn append(&mut self, mut other: Self) {
184 self.receivers.append(&mut other.receivers);
185 }
186
187 #[must_use]
189 pub const fn port_id(&self) -> &PortId {
190 &self.port_id
191 }
192
193 #[must_use]
195 pub const fn connected_edge_count(&self) -> usize {
196 self.receivers.len()
197 }
198
199 #[must_use]
201 pub fn capacity(&self) -> Option<usize> {
202 self.receivers.first().map(mpsc::Receiver::capacity)
203 }
204
205 fn total_capacity(&self) -> Option<usize> {
206 if self.receivers.is_empty() {
207 None
208 } else {
209 Some(self.receivers.iter().map(mpsc::Receiver::capacity).sum())
210 }
211 }
212
213 fn queued_count(&self) -> usize {
214 self.receivers.iter().map(mpsc::Receiver::len).sum()
215 }
216
217 fn try_recv(&mut self) -> Result<Option<PortPacket>, PortRecvError> {
218 let mut disconnected_count: usize = 0;
219
220 for receiver in &mut self.receivers {
221 match receiver.try_recv() {
222 Ok(packet) => return Ok(Some(packet)),
223 Err(mpsc::RecvError::Empty) => {}
224 Err(mpsc::RecvError::Disconnected) => {
225 disconnected_count += 1;
226 }
227 Err(mpsc::RecvError::Cancelled) => {
228 return Err(PortRecvError::Cancelled {
229 port_id: self.port_id.clone(),
230 });
231 }
232 }
233 }
234
235 if !self.receivers.is_empty() && disconnected_count == self.receivers.len() {
236 return Err(PortRecvError::Disconnected {
237 port_id: self.port_id.clone(),
238 });
239 }
240
241 Ok(None)
242 }
243
244 async fn recv(
245 &mut self,
246 cancellation: &CancellationToken,
247 ) -> Result<Option<PortPacket>, PortRecvError> {
248 if self.receivers.is_empty() {
249 return Ok(None);
250 }
251
252 let runtime_cx: Cx = Cx::current().unwrap_or_else(Cx::for_testing);
253 std::future::poll_fn(|task_cx: &mut Context<'_>| {
254 if cancellation.is_cancelled() {
255 return Poll::Ready(Err(PortRecvError::Cancelled {
256 port_id: self.port_id.clone(),
257 }));
258 }
259
260 let mut disconnected_count: usize = 0;
261 for receiver in &mut self.receivers {
262 match receiver.poll_recv(&runtime_cx, task_cx) {
263 Poll::Ready(Ok(packet)) => return Poll::Ready(Ok(Some(packet))),
264 Poll::Ready(Err(mpsc::RecvError::Disconnected)) => {
265 disconnected_count += 1;
266 }
267 Poll::Ready(Err(mpsc::RecvError::Cancelled)) => {
268 return Poll::Ready(Err(PortRecvError::Cancelled {
269 port_id: self.port_id.clone(),
270 }));
271 }
272 Poll::Ready(Err(mpsc::RecvError::Empty)) | Poll::Pending => {}
273 }
274 }
275
276 if disconnected_count == self.receivers.len() {
277 return Poll::Ready(Err(PortRecvError::Disconnected {
278 port_id: self.port_id.clone(),
279 }));
280 }
281
282 Poll::Pending
283 })
284 .await
285 }
286
287 fn poll_any(&mut self, runtime_cx: &Cx, task_cx: &mut Context<'_>) -> InputPollResult {
288 if self.receivers.is_empty() {
289 return InputPollResult::Closed;
290 }
291
292 let mut disconnected_count: usize = 0;
293 for receiver in &mut self.receivers {
294 let poll_result: Poll<Result<PortPacket, mpsc::RecvError>> =
295 receiver.poll_recv(runtime_cx, task_cx);
296 match poll_result {
297 Poll::Ready(Ok(packet)) => return InputPollResult::Packet(Box::new(packet)),
298 Poll::Ready(Err(mpsc::RecvError::Disconnected)) => {
299 disconnected_count += 1;
300 }
301 Poll::Ready(Err(mpsc::RecvError::Cancelled)) => return InputPollResult::Cancelled,
302 Poll::Ready(Err(mpsc::RecvError::Empty)) | Poll::Pending => {}
303 }
304 }
305
306 if disconnected_count == self.receivers.len() {
307 return InputPollResult::Closed;
308 }
309
310 InputPollResult::Pending
311 }
312}
313
314#[derive(Debug, Clone)]
316pub struct OutputPortHandle {
317 port_id: PortId,
318 senders: Vec<mpsc::Sender<PortPacket>>,
319}
320
321impl OutputPortHandle {
322 #[must_use]
324 pub const fn disconnected(port_id: PortId) -> Self {
325 Self {
326 port_id,
327 senders: Vec::new(),
328 }
329 }
330
331 fn connected(port_id: PortId, sender: mpsc::Sender<PortPacket>) -> Self {
332 Self {
333 port_id,
334 senders: vec![sender],
335 }
336 }
337
338 fn append(&mut self, mut other: Self) {
339 self.senders.append(&mut other.senders);
340 }
341
342 #[must_use]
344 pub const fn port_id(&self) -> &PortId {
345 &self.port_id
346 }
347
348 #[must_use]
350 pub const fn connected_edge_count(&self) -> usize {
351 self.senders.len()
352 }
353
354 #[must_use]
356 pub fn capacity(&self) -> Option<usize> {
357 self.senders.first().map(mpsc::Sender::capacity)
358 }
359
360 fn total_capacity(&self) -> Option<usize> {
361 if self.senders.is_empty() {
362 None
363 } else {
364 Some(self.senders.iter().map(mpsc::Sender::capacity).sum())
365 }
366 }
367
368 fn try_reserve(
369 &self,
370 metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
371 validator: Option<Arc<dyn OutputPacketValidator>>,
372 context: Option<&NodeContext>,
373 ) -> Result<OutputPortSendPermit<'_>, PortSendError> {
374 record_output_queue_pressure(
375 metadata_sink.as_ref(),
376 context,
377 self,
378 QueuePressureBoundaryKind::ReserveAttempted,
379 );
380 let split_senders: Option<(&mpsc::Sender<PortPacket>, &[mpsc::Sender<PortPacket>])> =
381 self.senders.split_last();
382 let Some((last_sender, leading_senders)): Option<(
383 &mpsc::Sender<PortPacket>,
384 &[mpsc::Sender<PortPacket>],
385 )> = split_senders
386 else {
387 record_output_queue_pressure(
388 metadata_sink.as_ref(),
389 context,
390 self,
391 QueuePressureBoundaryKind::ReserveReady,
392 );
393 return Ok(OutputPortSendPermit {
394 port_id: self.port_id.clone(),
395 permits: Vec::new(),
396 metadata_sink,
397 validator,
398 context: context.cloned(),
399 connected_edge_count: self.connected_edge_count(),
400 capacity: self.total_capacity(),
401 });
402 };
403
404 let mut permits: Vec<mpsc::SendPermit<'_, PortPacket>> =
405 Vec::with_capacity(self.senders.len());
406
407 for sender in leading_senders {
408 match sender.try_reserve() {
409 Ok(permit) => permits.push(permit),
410 Err(err) => {
411 if matches!(err, mpsc::SendError::Full(())) {
412 record_output_queue_pressure(
413 metadata_sink.as_ref(),
414 context,
415 self,
416 QueuePressureBoundaryKind::ReserveFull,
417 );
418 }
419 return Err(self.map_send_error(err));
420 }
421 }
422 }
423
424 match last_sender.try_reserve() {
425 Ok(permit) => {
426 permits.push(permit);
427 record_output_queue_pressure(
428 metadata_sink.as_ref(),
429 context,
430 self,
431 QueuePressureBoundaryKind::ReserveReady,
432 );
433 Ok(OutputPortSendPermit {
434 port_id: self.port_id.clone(),
435 permits,
436 metadata_sink,
437 validator,
438 context: context.cloned(),
439 connected_edge_count: self.connected_edge_count(),
440 capacity: self.total_capacity(),
441 })
442 }
443 Err(err) => {
444 if matches!(err, mpsc::SendError::Full(())) {
445 record_output_queue_pressure(
446 metadata_sink.as_ref(),
447 context,
448 self,
449 QueuePressureBoundaryKind::ReserveFull,
450 );
451 }
452 Err(self.map_send_error(err))
453 }
454 }
455 }
456
457 fn map_send_error(&self, err: mpsc::SendError<()>) -> PortSendError {
458 match err {
459 mpsc::SendError::Disconnected(()) => PortSendError::Disconnected {
460 port_id: self.port_id.clone(),
461 },
462 mpsc::SendError::Cancelled(()) => PortSendError::Cancelled {
463 port_id: self.port_id.clone(),
464 },
465 mpsc::SendError::Full(()) => PortSendError::Full {
466 port_id: self.port_id.clone(),
467 },
468 }
469 }
470
471 async fn reserve(
472 &self,
473 cancellation: &CancellationToken,
474 metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
475 validator: Option<Arc<dyn OutputPacketValidator>>,
476 context: Option<&NodeContext>,
477 ) -> Result<OutputPortSendPermit<'_>, PortSendError> {
478 loop {
479 if cancellation.is_cancelled() {
480 return Err(PortSendError::Cancelled {
481 port_id: self.port_id.clone(),
482 });
483 }
484
485 match self.try_reserve(metadata_sink.clone(), validator.clone(), context) {
486 Ok(permit) => return Ok(permit),
487 Err(PortSendError::Full { .. }) => yield_now().await,
488 Err(err) => return Err(err),
489 }
490 }
491 }
492}
493
494#[must_use = "PortSendPermit must be committed with send() or explicitly aborted"]
496pub struct PortSendPermit<'a> {
497 inner: OutputPortSendPermit<'a>,
498}
499
500impl PortSendPermit<'_> {
501 pub fn send(self, packet: PortPacket) -> Result<(), PortSendError> {
507 self.inner.send(packet)
508 }
509
510 pub fn abort(self) {
512 self.inner.abort();
513 }
514}
515
516struct OutputPortSendPermit<'a> {
517 port_id: PortId,
518 permits: Vec<mpsc::SendPermit<'a, PortPacket>>,
519 metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
520 validator: Option<Arc<dyn OutputPacketValidator>>,
521 context: Option<NodeContext>,
522 connected_edge_count: usize,
523 capacity: Option<usize>,
524}
525
526impl OutputPortSendPermit<'_> {
527 fn send(mut self, packet: PortPacket) -> Result<(), PortSendError> {
528 if let Some(validator) = self.validator.as_ref()
529 && let Err(err) = validator.validate(&self.port_id, &packet)
530 {
531 self.abort();
532 return Err(err);
533 }
534
535 let boundary_kind: MessageBoundaryKind = if self.permits.is_empty() {
536 MessageBoundaryKind::Dropped
537 } else {
538 MessageBoundaryKind::Enqueued
539 };
540 if let Some(metadata_sink) = self.metadata_sink.as_ref() {
541 let queue_kind: QueuePressureBoundaryKind = if self.permits.is_empty() {
542 QueuePressureBoundaryKind::SendDropped
543 } else {
544 QueuePressureBoundaryKind::SendCommitted
545 };
546 let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
547 self.context.clone(),
548 QueuePortDirection::Output,
549 self.port_id.clone(),
550 queue_kind,
551 self.connected_edge_count,
552 self.capacity,
553 None,
554 ));
555 let _ = metadata_sink.record(&record);
556 let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
557 boundary_kind,
558 packet.metadata().clone(),
559 ));
560 let _ = metadata_sink.record(&record);
561 }
562
563 let last_permit: Option<mpsc::SendPermit<'_, PortPacket>> = self.permits.pop();
564 let last_permit: mpsc::SendPermit<'_, PortPacket> = match last_permit {
565 Some(permit) => permit,
566 None => return Ok(()),
567 };
568 let leading_permits: Vec<mpsc::SendPermit<'_, PortPacket>> = self.permits;
569
570 for permit in leading_permits {
571 permit.send(packet.clone());
572 }
573 last_permit.send(packet);
574 Ok(())
575 }
576
577 fn abort(self) {
578 for permit in self.permits {
579 permit.abort();
580 }
581 }
582}
583
584#[must_use]
586pub fn bounded_edge_channel(
587 output_port_id: PortId,
588 input_port_id: PortId,
589 capacity: NonZeroUsize,
590) -> (OutputPortHandle, InputPortHandle) {
591 let (sender, receiver): (mpsc::Sender<PortPacket>, mpsc::Receiver<PortPacket>) =
592 mpsc::channel(capacity.get());
593 (
594 OutputPortHandle::connected(output_port_id, sender),
595 InputPortHandle::connected(input_port_id, receiver),
596 )
597}
598
599#[derive(Default)]
601pub struct PortsIn {
602 port_ids: Vec<PortId>,
603 ports: Vec<InputPortHandle>,
604 metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
605 context: Option<NodeContext>,
606}
607
608impl PortsIn {
609 #[must_use]
611 pub fn new(port_ids: impl Into<Vec<PortId>>) -> Self {
612 let port_ids: Vec<PortId> = port_ids.into();
613 Self::from_handles(port_ids, Vec::new())
614 }
615
616 #[must_use]
618 pub fn from_handles(
619 port_ids: impl Into<Vec<PortId>>,
620 handles: impl Into<Vec<InputPortHandle>>,
621 ) -> Self {
622 let port_ids: Vec<PortId> = port_ids.into();
623 let mut by_port: BTreeMap<PortId, InputPortHandle> = BTreeMap::new();
624
625 for handle in handles.into() {
626 let port_id: PortId = handle.port_id().clone();
627 if let Some(existing) = by_port.get_mut(&port_id) {
628 existing.append(handle);
629 } else {
630 by_port.insert(port_id, handle);
631 }
632 }
633
634 let mut ports: Vec<InputPortHandle> = Vec::with_capacity(port_ids.len());
635 for port_id in &port_ids {
636 let handle: InputPortHandle = by_port
637 .remove(port_id)
638 .unwrap_or_else(|| InputPortHandle::disconnected(port_id.clone()));
639 ports.push(handle);
640 }
641
642 Self {
643 port_ids,
644 ports,
645 metadata_sink: None,
646 context: None,
647 }
648 }
649
650 #[must_use]
652 pub fn with_metadata_sink(
653 mut self,
654 metadata_sink: Arc<dyn MetadataSink + Send + Sync>,
655 ) -> Self {
656 self.metadata_sink = Some(metadata_sink);
657 self
658 }
659
660 #[must_use]
662 pub fn with_node_context(mut self, context: NodeContext) -> Self {
663 self.context = Some(context);
664 self
665 }
666
667 #[must_use]
669 pub fn port_ids(&self) -> &[PortId] {
670 &self.port_ids
671 }
672
673 #[must_use]
675 pub const fn is_empty(&self) -> bool {
676 self.port_ids.is_empty()
677 }
678
679 #[must_use]
681 pub fn connected_edge_count(&self, port_id: &PortId) -> Option<usize> {
682 self.ports
683 .iter()
684 .find(|port: &&InputPortHandle| port.port_id() == port_id)
685 .map(InputPortHandle::connected_edge_count)
686 }
687
688 #[must_use]
690 pub fn capacity(&self, port_id: &PortId) -> Option<usize> {
691 self.ports
692 .iter()
693 .find(|port: &&InputPortHandle| port.port_id() == port_id)
694 .and_then(InputPortHandle::capacity)
695 }
696
697 pub fn try_recv(&mut self, port_id: &PortId) -> Result<Option<PortPacket>, PortRecvError> {
707 let port: &mut InputPortHandle = self
708 .ports
709 .iter_mut()
710 .find(|port: &&mut InputPortHandle| port.port_id() == port_id)
711 .ok_or_else(|| PortRecvError::UnknownPort {
712 port_id: port_id.clone(),
713 })?;
714 Self::record_input_queue_pressure(
715 self.metadata_sink.as_ref(),
716 self.context.as_ref(),
717 port,
718 QueuePressureBoundaryKind::ReceiveAttempted,
719 );
720 let packet: Option<PortPacket> = match port.try_recv() {
721 Ok(packet) => packet,
722 Err(PortRecvError::Disconnected { .. }) => {
723 Self::record_input_queue_pressure(
724 self.metadata_sink.as_ref(),
725 self.context.as_ref(),
726 port,
727 QueuePressureBoundaryKind::ReceiveClosed,
728 );
729 return Err(PortRecvError::Disconnected {
730 port_id: port_id.clone(),
731 });
732 }
733 Err(err) => return Err(err),
734 };
735 match packet.as_ref() {
736 Some(packet) => {
737 Self::record_input_queue_pressure(
738 self.metadata_sink.as_ref(),
739 self.context.as_ref(),
740 port,
741 QueuePressureBoundaryKind::ReceiveReady,
742 );
743 Self::record_message_boundary(
744 self.metadata_sink.as_ref(),
745 MessageBoundaryKind::Dequeued,
746 packet.metadata(),
747 );
748 }
749 None => Self::record_input_queue_pressure(
750 self.metadata_sink.as_ref(),
751 self.context.as_ref(),
752 port,
753 QueuePressureBoundaryKind::ReceiveEmpty,
754 ),
755 }
756 Ok(packet)
757 }
758
759 pub async fn recv(
770 &mut self,
771 port_id: &PortId,
772 cancellation: &CancellationToken,
773 ) -> Result<Option<PortPacket>, PortRecvError> {
774 let port: &mut InputPortHandle = self
775 .ports
776 .iter_mut()
777 .find(|port: &&mut InputPortHandle| port.port_id() == port_id)
778 .ok_or_else(|| PortRecvError::UnknownPort {
779 port_id: port_id.clone(),
780 })?;
781 Self::record_input_queue_pressure(
782 self.metadata_sink.as_ref(),
783 self.context.as_ref(),
784 port,
785 QueuePressureBoundaryKind::ReceiveAttempted,
786 );
787 let packet: Option<PortPacket> = match port.recv(cancellation).await {
788 Ok(packet) => packet,
789 Err(PortRecvError::Disconnected { .. }) => {
790 Self::record_input_queue_pressure(
791 self.metadata_sink.as_ref(),
792 self.context.as_ref(),
793 port,
794 QueuePressureBoundaryKind::ReceiveClosed,
795 );
796 return Err(PortRecvError::Disconnected {
797 port_id: port_id.clone(),
798 });
799 }
800 Err(err) => return Err(err),
801 };
802 match packet.as_ref() {
803 Some(packet) => {
804 Self::record_input_queue_pressure(
805 self.metadata_sink.as_ref(),
806 self.context.as_ref(),
807 port,
808 QueuePressureBoundaryKind::ReceiveReady,
809 );
810 Self::record_message_boundary(
811 self.metadata_sink.as_ref(),
812 MessageBoundaryKind::Dequeued,
813 packet.metadata(),
814 );
815 }
816 None => Self::record_input_queue_pressure(
817 self.metadata_sink.as_ref(),
818 self.context.as_ref(),
819 port,
820 QueuePressureBoundaryKind::ReceiveEmpty,
821 ),
822 }
823 Ok(packet)
824 }
825
826 pub async fn recv_any(
836 &mut self,
837 cancellation: &CancellationToken,
838 ) -> Result<Option<(PortId, PortPacket)>, PortRecvError> {
839 if self.ports.is_empty() {
840 return Ok(None);
841 }
842
843 let metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>> =
844 self.metadata_sink.as_ref();
845 let context: Option<&NodeContext> = self.context.as_ref();
846 let runtime_cx: Cx = Cx::current().unwrap_or_else(Cx::for_testing);
847 for port in &self.ports {
848 Self::record_input_queue_pressure(
849 metadata_sink,
850 context,
851 port,
852 QueuePressureBoundaryKind::ReceiveAttempted,
853 );
854 }
855 std::future::poll_fn(|task_cx: &mut Context<'_>| {
856 if cancellation.is_cancelled() {
857 return self.ports.first().map_or(
858 Poll::Ready(Ok(None)),
859 |first_port: &InputPortHandle| {
860 Poll::Ready(Err(PortRecvError::Cancelled {
861 port_id: first_port.port_id().clone(),
862 }))
863 },
864 );
865 }
866
867 let mut closed_port_count: usize = 0;
868 for port in &mut self.ports {
869 match port.poll_any(&runtime_cx, task_cx) {
870 InputPollResult::Packet(packet) => {
871 Self::record_input_queue_pressure(
872 metadata_sink,
873 context,
874 port,
875 QueuePressureBoundaryKind::ReceiveReady,
876 );
877 Self::record_message_boundary(
878 metadata_sink,
879 MessageBoundaryKind::Dequeued,
880 packet.metadata(),
881 );
882 return Poll::Ready(Ok(Some((port.port_id().clone(), *packet))));
883 }
884 InputPollResult::Closed => {
885 closed_port_count += 1;
886 Self::record_input_queue_pressure(
887 metadata_sink,
888 context,
889 port,
890 QueuePressureBoundaryKind::ReceiveClosed,
891 );
892 }
893 InputPollResult::Cancelled => {
894 return Poll::Ready(Err(PortRecvError::Cancelled {
895 port_id: port.port_id().clone(),
896 }));
897 }
898 InputPollResult::Pending => {}
899 }
900 }
901
902 if closed_port_count == self.ports.len() {
903 return Poll::Ready(Ok(None));
904 }
905
906 Poll::Pending
907 })
908 .await
909 }
910
911 fn record_message_boundary(
912 metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>>,
913 kind: MessageBoundaryKind,
914 metadata: &crate::message::MessageMetadata,
915 ) {
916 let Some(metadata_sink): Option<&Arc<dyn MetadataSink + Send + Sync>> = metadata_sink
917 else {
918 return;
919 };
920
921 let record: MetadataRecord =
922 MetadataRecord::Message(MessageBoundaryRecord::new(kind, metadata.clone()));
923 let _ = metadata_sink.record(&record);
924 }
925
926 fn record_input_queue_pressure(
927 metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>>,
928 context: Option<&NodeContext>,
929 port: &InputPortHandle,
930 kind: QueuePressureBoundaryKind,
931 ) {
932 let Some(metadata_sink): Option<&Arc<dyn MetadataSink + Send + Sync>> = metadata_sink
933 else {
934 return;
935 };
936
937 let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
938 context.cloned(),
939 QueuePortDirection::Input,
940 port.port_id().clone(),
941 kind,
942 port.connected_edge_count(),
943 port.total_capacity(),
944 Some(port.queued_count()),
945 ));
946 let _ = metadata_sink.record(&record);
947 }
948}
949
950#[derive(Clone, Default)]
952pub struct PortsOut {
953 port_ids: Vec<PortId>,
954 ports: Vec<OutputPortHandle>,
955 metadata_sink: Option<Arc<dyn MetadataSink + Send + Sync>>,
956 validator: Option<Arc<dyn OutputPacketValidator>>,
957 context: Option<NodeContext>,
958}
959
960impl PortsOut {
961 #[must_use]
963 pub fn new(port_ids: impl Into<Vec<PortId>>) -> Self {
964 let port_ids: Vec<PortId> = port_ids.into();
965 Self::from_handles(port_ids, Vec::new())
966 }
967
968 #[must_use]
970 pub fn from_handles(
971 port_ids: impl Into<Vec<PortId>>,
972 handles: impl Into<Vec<OutputPortHandle>>,
973 ) -> Self {
974 let port_ids: Vec<PortId> = port_ids.into();
975 let mut by_port: BTreeMap<PortId, OutputPortHandle> = BTreeMap::new();
976
977 for handle in handles.into() {
978 let port_id: PortId = handle.port_id().clone();
979 if let Some(existing) = by_port.get_mut(&port_id) {
980 existing.append(handle);
981 } else {
982 by_port.insert(port_id, handle);
983 }
984 }
985
986 let mut ports: Vec<OutputPortHandle> = Vec::with_capacity(port_ids.len());
987 for port_id in &port_ids {
988 let handle: OutputPortHandle = by_port
989 .remove(port_id)
990 .unwrap_or_else(|| OutputPortHandle::disconnected(port_id.clone()));
991 ports.push(handle);
992 }
993
994 Self {
995 port_ids,
996 ports,
997 metadata_sink: None,
998 validator: None,
999 context: None,
1000 }
1001 }
1002
1003 #[must_use]
1005 pub fn with_metadata_sink(
1006 mut self,
1007 metadata_sink: Arc<dyn MetadataSink + Send + Sync>,
1008 ) -> Self {
1009 self.metadata_sink = Some(metadata_sink);
1010 self
1011 }
1012
1013 #[must_use]
1015 pub fn with_node_context(mut self, context: NodeContext) -> Self {
1016 self.context = Some(context);
1017 self
1018 }
1019
1020 #[must_use]
1022 pub fn with_output_validator(mut self, validator: Arc<dyn OutputPacketValidator>) -> Self {
1023 self.validator = Some(validator);
1024 self
1025 }
1026
1027 #[must_use]
1029 pub fn port_ids(&self) -> &[PortId] {
1030 &self.port_ids
1031 }
1032
1033 #[must_use]
1035 pub const fn is_empty(&self) -> bool {
1036 self.port_ids.is_empty()
1037 }
1038
1039 #[must_use]
1041 pub fn connected_edge_count(&self, port_id: &PortId) -> Option<usize> {
1042 self.ports
1043 .iter()
1044 .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1045 .map(OutputPortHandle::connected_edge_count)
1046 }
1047
1048 #[must_use]
1050 pub fn capacity(&self, port_id: &PortId) -> Option<usize> {
1051 self.ports
1052 .iter()
1053 .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1054 .and_then(OutputPortHandle::capacity)
1055 }
1056
1057 pub fn try_send(&self, port_id: &PortId, packet: PortPacket) -> Result<(), PortSendError> {
1070 self.try_reserve(port_id)?.send(packet)
1071 }
1072
1073 pub async fn send(
1081 &self,
1082 port_id: &PortId,
1083 packet: PortPacket,
1084 cancellation: &CancellationToken,
1085 ) -> Result<(), PortSendError> {
1086 self.reserve(port_id, cancellation).await?.send(packet)
1087 }
1088
1089 pub fn try_reserve(&self, port_id: &PortId) -> Result<PortSendPermit<'_>, PortSendError> {
1098 let port: &OutputPortHandle = self
1099 .ports
1100 .iter()
1101 .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1102 .ok_or_else(|| PortSendError::UnknownPort {
1103 port_id: port_id.clone(),
1104 })?;
1105 port.try_reserve(
1106 self.metadata_sink.clone(),
1107 self.validator.clone(),
1108 self.context.as_ref(),
1109 )
1110 .map(|inner: OutputPortSendPermit<'_>| PortSendPermit { inner })
1111 }
1112
1113 pub async fn reserve(
1122 &self,
1123 port_id: &PortId,
1124 cancellation: &CancellationToken,
1125 ) -> Result<PortSendPermit<'_>, PortSendError> {
1126 let port: &OutputPortHandle = self
1127 .ports
1128 .iter()
1129 .find(|port: &&OutputPortHandle| port.port_id() == port_id)
1130 .ok_or_else(|| PortSendError::UnknownPort {
1131 port_id: port_id.clone(),
1132 })?;
1133 port.reserve(
1134 cancellation,
1135 self.metadata_sink.clone(),
1136 self.validator.clone(),
1137 self.context.as_ref(),
1138 )
1139 .await
1140 .map(|inner: OutputPortSendPermit<'_>| PortSendPermit { inner })
1141 }
1142}
1143
1144fn record_output_queue_pressure(
1145 metadata_sink: Option<&Arc<dyn MetadataSink + Send + Sync>>,
1146 context: Option<&NodeContext>,
1147 port: &OutputPortHandle,
1148 kind: QueuePressureBoundaryKind,
1149) {
1150 let Some(metadata_sink): Option<&Arc<dyn MetadataSink + Send + Sync>> = metadata_sink else {
1151 return;
1152 };
1153
1154 let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
1155 context.cloned(),
1156 QueuePortDirection::Output,
1157 port.port_id().clone(),
1158 kind,
1159 port.connected_edge_count(),
1160 port.total_capacity(),
1161 None,
1162 ));
1163 let _ = metadata_sink.record(&record);
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169 use std::future::Future;
1170 use std::sync::{Arc, Mutex};
1171
1172 use asupersync::runtime::{Runtime, RuntimeBuilder};
1173 use pureflow_types::{ExecutionId, MessageId, NodeId, WorkflowId};
1174
1175 use crate::{
1176 context::{CancellationRequest, ExecutionMetadata},
1177 message::{MessageEndpoint, MessageMetadata, MessageRoute},
1178 };
1179
1180 fn execution_id(value: &str) -> ExecutionId {
1181 ExecutionId::new(value).expect("valid execution id")
1182 }
1183
1184 fn message_id(value: &str) -> MessageId {
1185 MessageId::new(value).expect("valid message id")
1186 }
1187
1188 fn node_id(value: &str) -> NodeId {
1189 NodeId::new(value).expect("valid node id")
1190 }
1191
1192 fn port_id(value: &str) -> PortId {
1193 PortId::new(value).expect("valid port id")
1194 }
1195
1196 fn workflow_id(value: &str) -> WorkflowId {
1197 WorkflowId::new(value).expect("valid workflow id")
1198 }
1199
1200 fn packet(value: &[u8]) -> PortPacket {
1201 let source: MessageEndpoint = MessageEndpoint::new(node_id("source"), port_id("out"));
1202 let target: MessageEndpoint = MessageEndpoint::new(node_id("sink"), port_id("in"));
1203 let route: MessageRoute = MessageRoute::new(Some(source), target);
1204 let execution: ExecutionMetadata = ExecutionMetadata::first_attempt(execution_id("run-1"));
1205 let metadata: MessageMetadata =
1206 MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution, route);
1207
1208 MessageEnvelope::new(metadata, PacketPayload::from(value.to_vec()))
1209 }
1210
1211 fn block_on_port<F: Future>(future: F) -> F::Output {
1212 let runtime: Runtime = RuntimeBuilder::current_thread()
1213 .build()
1214 .expect("test runtime should build");
1215 runtime.block_on(future)
1216 }
1217
1218 #[derive(Debug, Default)]
1219 struct RecordingMetadataSink {
1220 records: Mutex<Vec<MetadataRecord>>,
1221 }
1222
1223 impl RecordingMetadataSink {
1224 fn records(&self) -> Vec<MetadataRecord> {
1225 self.records
1226 .lock()
1227 .expect("metadata sink lock should not be poisoned")
1228 .clone()
1229 }
1230 }
1231
1232 impl MetadataSink for RecordingMetadataSink {
1233 fn record(&self, record: &MetadataRecord) -> crate::Result<()> {
1234 self.records
1235 .lock()
1236 .expect("metadata sink lock should not be poisoned")
1237 .push(record.clone());
1238 Ok(())
1239 }
1240 }
1241
1242 #[derive(Debug)]
1243 struct RejectingOutputValidator;
1244
1245 impl OutputPacketValidator for RejectingOutputValidator {
1246 fn validate(&self, port_id: &PortId, _packet: &PortPacket) -> Result<(), PortSendError> {
1247 Err(PortSendError::Rejected {
1248 port_id: port_id.clone(),
1249 reason: "contract mismatch".to_owned(),
1250 })
1251 }
1252 }
1253
1254 #[test]
1255 fn ports_preserve_declared_port_order() {
1256 let inputs: PortsIn = PortsIn::new(vec![port_id("left"), port_id("right")]);
1257 let outputs: PortsOut = PortsOut::new(vec![port_id("out")]);
1258
1259 assert_eq!(
1260 inputs
1261 .port_ids()
1262 .iter()
1263 .map(PortId::as_str)
1264 .collect::<Vec<_>>(),
1265 vec!["left", "right"]
1266 );
1267 assert_eq!(
1268 outputs
1269 .port_ids()
1270 .iter()
1271 .map(PortId::as_str)
1272 .collect::<Vec<_>>(),
1273 vec!["out"]
1274 );
1275 }
1276
1277 #[test]
1278 fn bounded_edge_channel_enforces_capacity() {
1279 let (output, input): (OutputPortHandle, InputPortHandle) =
1280 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1281 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1282 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1283
1284 outputs
1285 .try_send(&port_id("out"), packet(b"first"))
1286 .expect("first packet should fit");
1287 let err: PortSendError = outputs
1288 .try_send(&port_id("out"), packet(b"second"))
1289 .expect_err("second packet should hit bounded capacity");
1290
1291 assert_eq!(
1292 err,
1293 PortSendError::Full {
1294 port_id: port_id("out")
1295 }
1296 );
1297 assert_eq!(outputs.capacity(&port_id("out")), Some(1));
1298 assert_eq!(inputs.capacity(&port_id("in")), Some(1));
1299
1300 let received: PortPacket = inputs
1301 .try_recv(&port_id("in"))
1302 .expect("receive should succeed")
1303 .expect("packet should be queued");
1304
1305 assert_eq!(
1306 received
1307 .payload()
1308 .as_bytes()
1309 .expect("received packet should contain bytes")
1310 .as_ref(),
1311 b"first"
1312 );
1313 assert!(
1314 inputs
1315 .try_recv(&port_id("in"))
1316 .expect("empty receive should not fail")
1317 .is_none()
1318 );
1319 }
1320
1321 #[test]
1322 fn reserved_output_capacity_commits_on_send() {
1323 let (output, input): (OutputPortHandle, InputPortHandle) =
1324 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1325 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1326 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1327
1328 let permit: PortSendPermit<'_> = outputs
1329 .try_reserve(&port_id("out"))
1330 .expect("reservation should succeed");
1331 let err: PortSendError = outputs
1332 .try_send(&port_id("out"), packet(b"blocked"))
1333 .expect_err("reserved capacity should block another send");
1334
1335 assert_eq!(
1336 err,
1337 PortSendError::Full {
1338 port_id: port_id("out")
1339 }
1340 );
1341
1342 permit
1343 .send(packet(b"committed"))
1344 .expect("reserved packet should pass validation");
1345
1346 let received: PortPacket = inputs
1347 .try_recv(&port_id("in"))
1348 .expect("receive should succeed")
1349 .expect("committed packet should be queued");
1350 assert_eq!(
1351 received
1352 .payload()
1353 .as_bytes()
1354 .expect("received packet should contain bytes")
1355 .as_ref(),
1356 b"committed"
1357 );
1358 }
1359
1360 #[test]
1361 fn dropped_output_permit_releases_capacity_without_message() {
1362 let (output, input): (OutputPortHandle, InputPortHandle) =
1363 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1364 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1365 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1366
1367 let permit: PortSendPermit<'_> = outputs
1368 .try_reserve(&port_id("out"))
1369 .expect("reservation should succeed");
1370 drop(permit);
1371
1372 assert!(
1373 inputs
1374 .try_recv(&port_id("in"))
1375 .expect("dropped permit should not disconnect")
1376 .is_none()
1377 );
1378
1379 outputs
1380 .try_send(&port_id("out"), packet(b"after-drop"))
1381 .expect("dropped permit should release capacity");
1382 let received: PortPacket = inputs
1383 .try_recv(&port_id("in"))
1384 .expect("receive should succeed")
1385 .expect("new packet should be queued");
1386
1387 assert_eq!(
1388 received
1389 .payload()
1390 .as_bytes()
1391 .expect("received packet should contain bytes")
1392 .as_ref(),
1393 b"after-drop"
1394 );
1395 }
1396
1397 #[test]
1398 fn aborted_output_permit_releases_capacity_without_message() {
1399 let (output, input): (OutputPortHandle, InputPortHandle) =
1400 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1401 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1402 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1403
1404 outputs
1405 .try_reserve(&port_id("out"))
1406 .expect("reservation should succeed")
1407 .abort();
1408
1409 assert!(
1410 inputs
1411 .try_recv(&port_id("in"))
1412 .expect("aborted permit should not disconnect")
1413 .is_none()
1414 );
1415 outputs
1416 .try_send(&port_id("out"), packet(b"after-abort"))
1417 .expect("aborted permit should release capacity");
1418 }
1419
1420 #[test]
1421 fn send_and_recv_emit_message_boundary_metadata() {
1422 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1423 let (output, input): (OutputPortHandle, InputPortHandle) =
1424 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1425 let mut inputs: PortsIn =
1426 PortsIn::from_handles([port_id("in")], [input]).with_metadata_sink(sink.clone());
1427 let outputs: PortsOut =
1428 PortsOut::from_handles([port_id("out")], [output]).with_metadata_sink(sink.clone());
1429
1430 outputs
1431 .try_send(&port_id("out"), packet(b"boundary"))
1432 .expect("send should succeed");
1433 let received: PortPacket = inputs
1434 .try_recv(&port_id("in"))
1435 .expect("receive should succeed")
1436 .expect("packet should be queued");
1437
1438 assert_eq!(
1439 received
1440 .payload()
1441 .as_bytes()
1442 .expect("received packet should contain bytes")
1443 .as_ref(),
1444 b"boundary"
1445 );
1446 assert_eq!(
1447 sink.records()
1448 .into_iter()
1449 .filter_map(|record| match record {
1450 MetadataRecord::Message(message) => Some(message.kind()),
1451 _ => None,
1452 })
1453 .collect::<Vec<_>>(),
1454 vec![MessageBoundaryKind::Enqueued, MessageBoundaryKind::Dequeued]
1455 );
1456 }
1457
1458 #[test]
1459 fn send_and_recv_emit_queue_pressure_metadata() {
1460 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1461 let (output, input): (OutputPortHandle, InputPortHandle) =
1462 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1463 let mut inputs: PortsIn =
1464 PortsIn::from_handles([port_id("in")], [input]).with_metadata_sink(sink.clone());
1465 let outputs: PortsOut =
1466 PortsOut::from_handles([port_id("out")], [output]).with_metadata_sink(sink.clone());
1467
1468 outputs
1469 .try_send(&port_id("out"), packet(b"boundary"))
1470 .expect("send should succeed");
1471 let _received: PortPacket = inputs
1472 .try_recv(&port_id("in"))
1473 .expect("receive should succeed")
1474 .expect("packet should be queued");
1475
1476 let queue_records: Vec<QueuePressureRecord> = sink
1477 .records()
1478 .into_iter()
1479 .filter_map(|record: MetadataRecord| match record {
1480 MetadataRecord::QueuePressure(queue) => Some(queue),
1481 _ => None,
1482 })
1483 .collect();
1484 assert_eq!(
1485 queue_records
1486 .iter()
1487 .map(QueuePressureRecord::kind)
1488 .collect::<Vec<_>>(),
1489 vec![
1490 QueuePressureBoundaryKind::ReserveAttempted,
1491 QueuePressureBoundaryKind::ReserveReady,
1492 QueuePressureBoundaryKind::SendCommitted,
1493 QueuePressureBoundaryKind::ReceiveAttempted,
1494 QueuePressureBoundaryKind::ReceiveReady,
1495 ]
1496 );
1497 let reserve_attempt: &QueuePressureRecord = queue_records
1498 .iter()
1499 .find(|record: &&QueuePressureRecord| {
1500 record.kind() == QueuePressureBoundaryKind::ReserveAttempted
1501 })
1502 .expect("reserve attempt should be recorded");
1503 let receive_attempt: &QueuePressureRecord = queue_records
1504 .iter()
1505 .find(|record: &&QueuePressureRecord| {
1506 record.kind() == QueuePressureBoundaryKind::ReceiveAttempted
1507 })
1508 .expect("receive attempt should be recorded");
1509 let receive_ready: &QueuePressureRecord = queue_records
1510 .iter()
1511 .find(|record: &&QueuePressureRecord| {
1512 record.kind() == QueuePressureBoundaryKind::ReceiveReady
1513 })
1514 .expect("receive ready should be recorded");
1515
1516 assert_eq!(reserve_attempt.capacity(), Some(1));
1517 assert_eq!(receive_attempt.queued_count(), Some(1));
1518 assert_eq!(receive_ready.queued_count(), Some(0));
1519 }
1520
1521 #[test]
1522 fn unconnected_output_records_message_drop() {
1523 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1524 let outputs: PortsOut = PortsOut::new([port_id("out")]).with_metadata_sink(sink.clone());
1525
1526 outputs
1527 .try_send(&port_id("out"), packet(b"dropped"))
1528 .expect("unconnected output should accept and drop packets");
1529
1530 assert_eq!(
1531 sink.records()
1532 .into_iter()
1533 .filter_map(|record| match record {
1534 MetadataRecord::Message(message) => Some(message.kind()),
1535 _ => None,
1536 })
1537 .collect::<Vec<_>>(),
1538 vec![MessageBoundaryKind::Dropped]
1539 );
1540 }
1541
1542 #[test]
1543 fn async_send_and_recv_round_trip() {
1544 let (output, input): (OutputPortHandle, InputPortHandle) =
1545 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1546 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1547 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1548 let cancellation: CancellationToken = CancellationToken::active();
1549
1550 let received: PortPacket = block_on_port(async {
1551 outputs
1552 .send(&port_id("out"), packet(b"async"), &cancellation)
1553 .await
1554 .expect("async send should succeed");
1555 inputs
1556 .recv(&port_id("in"), &cancellation)
1557 .await
1558 .expect("async receive should succeed")
1559 .expect("packet should be available")
1560 });
1561
1562 assert_eq!(
1563 received
1564 .payload()
1565 .as_bytes()
1566 .expect("received packet should contain bytes")
1567 .as_ref(),
1568 b"async"
1569 );
1570 }
1571
1572 #[test]
1573 fn async_reserve_commits_after_capacity_is_available() {
1574 let (output, input): (OutputPortHandle, InputPortHandle) =
1575 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1576 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1577 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1578 let cancellation: CancellationToken = CancellationToken::active();
1579
1580 outputs
1581 .try_send(&port_id("out"), packet(b"queued"))
1582 .expect("first packet should fill the edge");
1583 let queued: PortPacket = inputs
1584 .try_recv(&port_id("in"))
1585 .expect("receive should succeed")
1586 .expect("queued packet should be present");
1587 assert_eq!(
1588 queued
1589 .payload()
1590 .as_bytes()
1591 .expect("queued packet should contain bytes")
1592 .as_ref(),
1593 b"queued"
1594 );
1595
1596 block_on_port(async {
1597 outputs
1598 .reserve(&port_id("out"), &cancellation)
1599 .await
1600 .expect("capacity should be available")
1601 .send(packet(b"reserved"))
1602 .expect("reserved packet should pass validation");
1603 });
1604
1605 let received: PortPacket = inputs
1606 .try_recv(&port_id("in"))
1607 .expect("receive should succeed")
1608 .expect("reserved packet should be queued");
1609 assert_eq!(
1610 received
1611 .payload()
1612 .as_bytes()
1613 .expect("received packet should contain bytes")
1614 .as_ref(),
1615 b"reserved"
1616 );
1617 }
1618
1619 #[test]
1620 fn async_recv_reports_disconnected_after_sender_drop() {
1621 let (output, input): (OutputPortHandle, InputPortHandle) =
1622 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1623 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1624 let cancellation: CancellationToken = CancellationToken::active();
1625 drop(output);
1626
1627 let err: PortRecvError = block_on_port(async {
1628 inputs
1629 .recv(&port_id("in"), &cancellation)
1630 .await
1631 .expect_err("disconnected input should fail")
1632 });
1633
1634 assert_eq!(
1635 err,
1636 PortRecvError::Disconnected {
1637 port_id: port_id("in")
1638 }
1639 );
1640 }
1641
1642 #[test]
1643 fn async_port_operations_observe_pre_cancelled_tokens() {
1644 let (output, input): (OutputPortHandle, InputPortHandle) =
1645 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1646 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1647 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output]);
1648 let cancellation: CancellationToken =
1649 CancellationToken::cancelled(CancellationRequest::new("test shutdown"));
1650
1651 let recv_err: PortRecvError = block_on_port(async {
1652 inputs
1653 .recv(&port_id("in"), &cancellation)
1654 .await
1655 .expect_err("cancelled receive should fail")
1656 });
1657 let send_err: PortSendError = block_on_port(async {
1658 outputs
1659 .send(&port_id("out"), packet(b"cancelled"), &cancellation)
1660 .await
1661 .expect_err("cancelled send should fail")
1662 });
1663
1664 assert_eq!(
1665 recv_err,
1666 PortRecvError::Cancelled {
1667 port_id: port_id("in")
1668 }
1669 );
1670 assert_eq!(
1671 send_err,
1672 PortSendError::Cancelled {
1673 port_id: port_id("out")
1674 }
1675 );
1676 }
1677
1678 #[test]
1679 fn output_validator_rejects_before_enqueueing_packet() {
1680 let (output, input): (OutputPortHandle, InputPortHandle) =
1681 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1682 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1683 let outputs: PortsOut = PortsOut::from_handles([port_id("out")], [output])
1684 .with_output_validator(Arc::new(RejectingOutputValidator));
1685
1686 let err: PortSendError = outputs
1687 .try_send(&port_id("out"), packet(b"rejected"))
1688 .expect_err("validator should reject the packet");
1689
1690 assert_eq!(
1691 err,
1692 PortSendError::Rejected {
1693 port_id: port_id("out"),
1694 reason: "contract mismatch".to_owned()
1695 }
1696 );
1697 assert!(
1698 inputs
1699 .try_recv(&port_id("in"))
1700 .expect("receive should succeed")
1701 .is_none()
1702 );
1703 }
1704
1705 #[test]
1706 fn recv_any_returns_first_ready_input_packet() {
1707 let (left_output, left_input): (OutputPortHandle, InputPortHandle) =
1708 bounded_edge_channel(port_id("left-out"), port_id("left"), NonZeroUsize::MIN);
1709 let (right_output, right_input): (OutputPortHandle, InputPortHandle) =
1710 bounded_edge_channel(port_id("right-out"), port_id("right"), NonZeroUsize::MIN);
1711 let mut inputs: PortsIn = PortsIn::from_handles(
1712 [port_id("left"), port_id("right")],
1713 [left_input, right_input],
1714 );
1715 let right_outputs: PortsOut =
1716 PortsOut::from_handles([port_id("right-out")], [right_output]);
1717 let cancellation: CancellationToken = CancellationToken::active();
1718 let _left_output: OutputPortHandle = left_output;
1719
1720 right_outputs
1721 .try_send(&port_id("right-out"), packet(b"right-ready"))
1722 .expect("right packet should queue");
1723
1724 let (ready_port, received): (PortId, PortPacket) = block_on_port(async {
1725 inputs
1726 .recv_any(&cancellation)
1727 .await
1728 .expect("recv_any should succeed")
1729 .expect("one input should be ready")
1730 });
1731
1732 assert_eq!(ready_port, port_id("right"));
1733 assert_eq!(
1734 received
1735 .payload()
1736 .as_bytes()
1737 .expect("received packet should contain bytes")
1738 .as_ref(),
1739 b"right-ready"
1740 );
1741 }
1742
1743 #[test]
1744 fn recv_any_returns_none_when_all_inputs_are_closed() {
1745 let (left_output, left_input): (OutputPortHandle, InputPortHandle) =
1746 bounded_edge_channel(port_id("left-out"), port_id("left"), NonZeroUsize::MIN);
1747 let (right_output, right_input): (OutputPortHandle, InputPortHandle) =
1748 bounded_edge_channel(port_id("right-out"), port_id("right"), NonZeroUsize::MIN);
1749 let mut inputs: PortsIn = PortsIn::from_handles(
1750 [port_id("left"), port_id("right")],
1751 [left_input, right_input],
1752 );
1753 let cancellation: CancellationToken = CancellationToken::active();
1754 drop(left_output);
1755 drop(right_output);
1756
1757 let received: Option<(PortId, PortPacket)> = block_on_port(async {
1758 inputs
1759 .recv_any(&cancellation)
1760 .await
1761 .expect("closed inputs should end cleanly")
1762 });
1763
1764 assert!(received.is_none());
1765 }
1766
1767 #[test]
1768 fn recv_any_observes_pre_cancelled_tokens() {
1769 let (_output, input): (OutputPortHandle, InputPortHandle) =
1770 bounded_edge_channel(port_id("out"), port_id("in"), NonZeroUsize::MIN);
1771 let mut inputs: PortsIn = PortsIn::from_handles([port_id("in")], [input]);
1772 let cancellation: CancellationToken =
1773 CancellationToken::cancelled(CancellationRequest::new("test shutdown"));
1774
1775 let err: PortRecvError = block_on_port(async {
1776 inputs
1777 .recv_any(&cancellation)
1778 .await
1779 .expect_err("cancelled recv_any should fail")
1780 });
1781
1782 assert_eq!(
1783 err,
1784 PortRecvError::Cancelled {
1785 port_id: port_id("in")
1786 }
1787 );
1788 }
1789
1790 #[test]
1791 fn undeclared_ports_are_rejected() {
1792 let mut inputs: PortsIn = PortsIn::new([port_id("in")]);
1793 let outputs: PortsOut = PortsOut::new([port_id("out")]);
1794
1795 assert_eq!(
1796 outputs
1797 .try_send(&port_id("missing"), packet(b"value"))
1798 .expect_err("unknown output must fail"),
1799 PortSendError::UnknownPort {
1800 port_id: port_id("missing")
1801 }
1802 );
1803 assert_eq!(
1804 inputs
1805 .try_recv(&port_id("missing"))
1806 .expect_err("unknown input must fail"),
1807 PortRecvError::UnknownPort {
1808 port_id: port_id("missing")
1809 }
1810 );
1811 }
1812}