protoflow_core/
output_port.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{
4    prelude::{fmt, Arc, Bytes, Cow, MaybeLabeled, MaybeNamed, PhantomData},
5    Message, MessageSender, OutputPortID, Port, PortID, PortResult, PortState, System, Transport,
6};
7
8#[derive(Clone)] //, Debug, Eq, Ord, PartialEq, PartialOrd)]
9pub struct OutputPort<T: Message> {
10    pub(crate) id: OutputPortID,
11    pub(crate) transport: Arc<dyn Transport>,
12    _phantom: PhantomData<T>,
13}
14
15impl<T: Message> OutputPort<T> {
16    pub fn new<X: Transport + Default>(system: &System<X>) -> Self {
17        let runtime = system.runtime.as_ref();
18        let transport = runtime.transport.clone();
19        Self {
20            _phantom: PhantomData,
21            id: transport.open_output().unwrap(),
22            transport,
23        }
24    }
25
26    pub fn close(&mut self) -> PortResult<bool> {
27        self.transport.close(PortID::Output(self.id))
28    }
29
30    pub fn send<'a>(&self, message: impl Into<&'a T>) -> PortResult<()>
31    where
32        T: 'a,
33    {
34        let message: &T = message.into();
35        let bytes = Bytes::from(message.encode_length_delimited_to_vec());
36        self.transport.send(self.id, bytes)
37    }
38}
39
40impl<T: Message> MaybeNamed for OutputPort<T> {
41    fn name(&self) -> Option<Cow<str>> {
42        None // TODO
43    }
44}
45
46impl<T: Message> MaybeLabeled for OutputPort<T> {
47    fn label(&self) -> Option<Cow<str>> {
48        None // TODO
49    }
50}
51
52impl<T: Message> Port for OutputPort<T> {
53    fn id(&self) -> PortID {
54        PortID::Output(self.id)
55    }
56
57    fn state(&self) -> PortState {
58        self.transport
59            .state(PortID::Output(self.id))
60            .unwrap_or(PortState::Closed)
61    }
62
63    fn close(&mut self) -> PortResult<bool> {
64        OutputPort::close(self)
65    }
66}
67
68impl<T: Message> MessageSender<T> for OutputPort<T> {
69    fn send<'a>(&self, message: impl Into<&'a T>) -> PortResult<()>
70    where
71        T: 'a,
72    {
73        OutputPort::send(self, message)
74    }
75}
76
77impl<T: Message> fmt::Display for OutputPort<T> {
78    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
79        write!(f, "{}→", self.id)
80    }
81}
82
83impl<T: Message> fmt::Debug for OutputPort<T> {
84    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85        f.debug_struct("OutputPort").field("id", &self.id).finish()
86    }
87}