protoflow_core/
output_port.rs1use crate::{
4 prelude::{fmt, Arc, Bytes, Cow, MaybeLabeled, MaybeNamed, PhantomData},
5 Message, MessageSender, OutputPortID, Port, PortID, PortResult, PortState, System, Transport,
6};
7
8#[derive(Clone)] pub struct OutputPort<T: Message> {
10 pub(crate) id: OutputPortID,
11 pub(crate) transport: Arc<dyn Transport>,
12 _phantom: PhantomData<T>,
13}
14
15impl<T: Message> OutputPort<T> {
16 pub fn new<X: Transport + Default>(system: &System<X>) -> Self {
17 let runtime = system.runtime.as_ref();
18 let transport = runtime.transport.clone();
19 Self {
20 _phantom: PhantomData,
21 id: transport.open_output().unwrap(),
22 transport,
23 }
24 }
25
26 pub fn close(&mut self) -> PortResult<bool> {
27 self.transport.close(PortID::Output(self.id))
28 }
29
30 pub fn send<'a>(&self, message: impl Into<&'a T>) -> PortResult<()>
31 where
32 T: 'a,
33 {
34 let message: &T = message.into();
35 let bytes = Bytes::from(message.encode_length_delimited_to_vec());
36 self.transport.send(self.id, bytes)
37 }
38}
39
40impl<T: Message> MaybeNamed for OutputPort<T> {
41 fn name(&self) -> Option<Cow<str>> {
42 None }
44}
45
46impl<T: Message> MaybeLabeled for OutputPort<T> {
47 fn label(&self) -> Option<Cow<str>> {
48 None }
50}
51
52impl<T: Message> Port for OutputPort<T> {
53 fn id(&self) -> PortID {
54 PortID::Output(self.id)
55 }
56
57 fn state(&self) -> PortState {
58 self.transport
59 .state(PortID::Output(self.id))
60 .unwrap_or(PortState::Closed)
61 }
62
63 fn close(&mut self) -> PortResult<bool> {
64 OutputPort::close(self)
65 }
66}
67
68impl<T: Message> MessageSender<T> for OutputPort<T> {
69 fn send<'a>(&self, message: impl Into<&'a T>) -> PortResult<()>
70 where
71 T: 'a,
72 {
73 OutputPort::send(self, message)
74 }
75}
76
77impl<T: Message> fmt::Display for OutputPort<T> {
78 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
79 write!(f, "{}→", self.id)
80 }
81}
82
83impl<T: Message> fmt::Debug for OutputPort<T> {
84 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85 f.debug_struct("OutputPort").field("id", &self.id).finish()
86 }
87}