protoflow_core/
input_port.rs1use crate::{
4 prelude::{fmt, Arc, Cow, MaybeLabeled, MaybeNamed, PhantomData},
5 InputPortID, Message, MessageReceiver, Port, PortID, PortResult, PortState, System, Transport,
6};
7
8#[derive(Clone)] pub struct InputPort<T: Message> {
10 pub(crate) id: InputPortID,
11 pub(crate) transport: Arc<dyn Transport>,
12 _phantom: PhantomData<T>,
13}
14
15impl<T: Message> InputPort<T> {
16 pub fn new<X: Transport + Default>(system: &System<X>) -> Self {
17 let runtime = system.runtime.as_ref();
18 let transport = runtime.transport.clone();
19 Self {
20 _phantom: PhantomData,
21 id: transport.open_input().unwrap(),
22 transport,
23 }
24 }
25
26 pub fn close(&mut self) -> PortResult<bool> {
27 self.transport.close(PortID::Input(self.id))
28 }
29
30 pub fn recv(&self) -> PortResult<Option<T>> {
31 match self.transport.recv(self.id)? {
32 None => Ok(None), Some(encoded_message) => {
34 if encoded_message.len() == 0 {
35 Ok(None) } else {
37 match T::decode_length_delimited(encoded_message) {
38 Ok(message) => Ok(Some(message)),
39 Err(err) => Err(err.into()),
40 }
41 }
42 }
43 }
44 }
45
46 pub fn try_recv(&self) -> PortResult<Option<T>> {
47 match self.transport.try_recv(self.id)? {
48 None => Ok(None), Some(encoded_message) => match T::decode(encoded_message) {
50 Ok(message) => Ok(Some(message)),
51 Err(err) => Err(err.into()),
52 },
53 }
54 }
55}
56
57impl<T: Message> MaybeNamed for InputPort<T> {
58 fn name(&self) -> Option<Cow<str>> {
59 None }
61}
62
63impl<T: Message> MaybeLabeled for InputPort<T> {
64 fn label(&self) -> Option<Cow<str>> {
65 None }
67}
68
69impl<T: Message> Port for InputPort<T> {
70 fn id(&self) -> PortID {
71 PortID::Input(self.id)
72 }
73
74 fn state(&self) -> PortState {
75 self.transport
76 .state(PortID::Input(self.id))
77 .unwrap_or(PortState::Closed)
78 }
79
80 fn close(&mut self) -> PortResult<bool> {
81 InputPort::close(self)
82 }
83}
84
85impl<T: Message> MessageReceiver<T> for InputPort<T> {
86 fn recv(&self) -> PortResult<Option<T>> {
87 InputPort::recv(self)
88 }
89
90 fn try_recv(&self) -> PortResult<Option<T>> {
91 InputPort::try_recv(self)
92 }
93}
94
95impl<T: Message> fmt::Display for InputPort<T> {
96 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97 write!(f, "→{}", self.id)
98 }
99}
100
101impl<T: Message> fmt::Debug for InputPort<T> {
102 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
103 f.debug_struct("InputPort").field("id", &self.id).finish()
104 }
105}