protoflow_core/
input_port.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{
4    prelude::{fmt, Arc, Cow, MaybeLabeled, MaybeNamed, PhantomData},
5    InputPortID, Message, MessageReceiver, Port, PortID, PortResult, PortState, System, Transport,
6};
7
8#[derive(Clone)] //, Debug, Eq, Ord, PartialEq, PartialOrd)]
9pub struct InputPort<T: Message> {
10    pub(crate) id: InputPortID,
11    pub(crate) transport: Arc<dyn Transport>,
12    _phantom: PhantomData<T>,
13}
14
15impl<T: Message> InputPort<T> {
16    pub fn new<X: Transport + Default>(system: &System<X>) -> Self {
17        let runtime = system.runtime.as_ref();
18        let transport = runtime.transport.clone();
19        Self {
20            _phantom: PhantomData,
21            id: transport.open_input().unwrap(),
22            transport,
23        }
24    }
25
26    pub fn close(&mut self) -> PortResult<bool> {
27        self.transport.close(PortID::Input(self.id))
28    }
29
30    pub fn recv(&self) -> PortResult<Option<T>> {
31        match self.transport.recv(self.id)? {
32            None => Ok(None), // EOS (port closed)
33            Some(encoded_message) => {
34                if encoded_message.len() == 0 {
35                    Ok(None) // EOS (port disconnected)
36                } else {
37                    match T::decode_length_delimited(encoded_message) {
38                        Ok(message) => Ok(Some(message)),
39                        Err(err) => Err(err.into()),
40                    }
41                }
42            }
43        }
44    }
45
46    pub fn try_recv(&self) -> PortResult<Option<T>> {
47        match self.transport.try_recv(self.id)? {
48            None => Ok(None), // EOS
49            Some(encoded_message) => match T::decode(encoded_message) {
50                Ok(message) => Ok(Some(message)),
51                Err(err) => Err(err.into()),
52            },
53        }
54    }
55}
56
57impl<T: Message> MaybeNamed for InputPort<T> {
58    fn name(&self) -> Option<Cow<str>> {
59        None // TODO
60    }
61}
62
63impl<T: Message> MaybeLabeled for InputPort<T> {
64    fn label(&self) -> Option<Cow<str>> {
65        None // TODO
66    }
67}
68
69impl<T: Message> Port for InputPort<T> {
70    fn id(&self) -> PortID {
71        PortID::Input(self.id)
72    }
73
74    fn state(&self) -> PortState {
75        self.transport
76            .state(PortID::Input(self.id))
77            .unwrap_or(PortState::Closed)
78    }
79
80    fn close(&mut self) -> PortResult<bool> {
81        InputPort::close(self)
82    }
83}
84
85impl<T: Message> MessageReceiver<T> for InputPort<T> {
86    fn recv(&self) -> PortResult<Option<T>> {
87        InputPort::recv(self)
88    }
89
90    fn try_recv(&self) -> PortResult<Option<T>> {
91        InputPort::try_recv(self)
92    }
93}
94
95impl<T: Message> fmt::Display for InputPort<T> {
96    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97        write!(f, "→{}", self.id)
98    }
99}
100
101impl<T: Message> fmt::Debug for InputPort<T> {
102    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
103        f.debug_struct("InputPort").field("id", &self.id).finish()
104    }
105}