protoflow_core/transports/
mpsc.rs

1// This is free and unencumbered software released into the public domain.
2
3mod event;
4use event::*;
5
6mod input;
7use input::*;
8
9mod output;
10use output::*;
11
12extern crate std;
13
14use crate::{
15    prelude::{Bytes, ToString},
16    transport::Transport,
17    InputPortID, OutputPortID, PortError, PortResult, PortState,
18};
19use parking_lot::{Mutex, RwLock};
20use sharded_slab::Slab;
21use std::sync::mpsc::sync_channel;
22
23pub(crate) const DEFAULT_CONNECTION_CAPACITY: usize = 1;
24
25#[derive(Debug, Default)]
26pub struct MpscTransport {
27    outputs: Slab<RwLock<MpscTransportOutputPortState>>,
28    inputs: Slab<RwLock<MpscTransportInputPortState>>,
29}
30
31impl MpscTransport {
32    pub fn new() -> Self {
33        Self::default()
34    }
35}
36
37impl Transport for MpscTransport {
38    fn input_state(&self, input: InputPortID) -> PortResult<PortState> {
39        match self.inputs.get(input.index()) {
40            None => Err(PortError::Invalid(input.into())),
41            Some(entry) => Ok(entry.read().state()),
42        }
43    }
44
45    fn output_state(&self, output: OutputPortID) -> PortResult<PortState> {
46        match self.outputs.get(output.index()) {
47            None => Err(PortError::Invalid(output.into())),
48            Some(entry) => Ok(entry.read().state()),
49        }
50    }
51
52    fn open_input(&self) -> PortResult<InputPortID> {
53        let input_index = self
54            .inputs
55            .insert(RwLock::new(MpscTransportInputPortState::Open))
56            .unwrap();
57
58        InputPortID::try_from(-(input_index as isize + 1))
59            .map_err(|s| PortError::Other(s.to_string()))
60    }
61
62    fn open_output(&self) -> PortResult<OutputPortID> {
63        let output_index = self
64            .outputs
65            .insert(RwLock::new(MpscTransportOutputPortState::Open))
66            .unwrap();
67
68        OutputPortID::try_from(output_index as isize + 1)
69            .map_err(|s| PortError::Other(s.to_string()))
70    }
71
72    fn close_input(&self, input: InputPortID) -> PortResult<bool> {
73        let Some(input_entry) = self.inputs.get(input.index()) else {
74            return Err(PortError::Invalid(input.into()));
75        };
76        let mut input_state = input_entry.write();
77
78        use MpscTransportInputPortState::*;
79        Ok(match *input_state {
80            Closed => false, // already closed
81            Open | Connected(_) => {
82                *input_state = MpscTransportInputPortState::Closed;
83                true
84            }
85        })
86    }
87
88    fn close_output(&self, output: OutputPortID) -> PortResult<bool> {
89        let Some(output_entry) = self.outputs.get(output.index()) else {
90            return Err(PortError::Invalid(output.into()));
91        };
92        let mut output_state = output_entry.write();
93
94        use MpscTransportOutputPortState::*;
95        Ok(match *output_state {
96            Closed => false, // already closed
97            Open => {
98                *output_state = MpscTransportOutputPortState::Closed;
99                true
100            }
101            Connected(ref sender) => {
102                let sender = sender.clone();
103                *output_state = MpscTransportOutputPortState::Closed;
104                drop(output_state);
105                sender.send(MpscTransportEvent::Disconnect).unwrap(); // blocking
106                true
107            }
108        })
109    }
110
111    fn connect(&self, source: OutputPortID, target: InputPortID) -> PortResult<bool> {
112        let Some(output_entry) = self.outputs.get(source.index()) else {
113            return Err(PortError::Invalid(source.into()));
114        };
115
116        let Some(input_entry) = self.inputs.get(target.index()) else {
117            return Err(PortError::Invalid(target.into()));
118        };
119
120        let mut output_state = output_entry.write();
121        let mut input_state = input_entry.write();
122        if !output_state.state().is_open() && !input_state.state().is_open() {
123            return Err(PortError::Other("connect".to_string())); // TODO: better errors
124        }
125
126        let (sender, receiver) = sync_channel(DEFAULT_CONNECTION_CAPACITY);
127        *output_state = MpscTransportOutputPortState::Connected(sender);
128        *input_state = MpscTransportInputPortState::Connected(Mutex::new(receiver));
129        Ok(true)
130    }
131
132    fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> {
133        let Some(output_entry) = self.outputs.get(output.index()) else {
134            return Err(PortError::Invalid(output.into()));
135        };
136        let output_state = output_entry.read();
137
138        use MpscTransportOutputPortState::*;
139        match *output_state {
140            Closed => return Err(PortError::Closed),
141            Open => return Err(PortError::Disconnected),
142            Connected(ref sender) => {
143                let sender = sender.clone();
144                Ok(sender.send(MpscTransportEvent::Message(message)).unwrap()) // blocking (TODO: error handling)
145            }
146        }
147    }
148
149    fn recv(&self, input: InputPortID) -> PortResult<Option<Bytes>> {
150        let Some(input_entry) = self.inputs.get(input.index()) else {
151            return Err(PortError::Invalid(input.into()));
152        };
153        let input_state = input_entry.read();
154
155        use MpscTransportInputPortState::*;
156        match *input_state {
157            Closed => return Ok(None), // EOS
158            Open => return Ok(None),   // FIXME
159            Connected(ref receiver) => {
160                use MpscTransportEvent::*;
161                let receiver = receiver.lock();
162                match receiver
163                    .recv() // blocking
164                    .map_err(|_| PortError::Disconnected)?
165                {
166                    Connect => unreachable!(),
167                    Message(bytes) => Ok(Some(bytes)),
168                    Disconnect => {
169                        drop(receiver);
170                        drop(input_state);
171                        let mut input_state = input_entry.write();
172                        *input_state = Closed;
173                        Ok(None) // EOS
174                    }
175                }
176            }
177        }
178    }
179
180    fn try_recv(&self, _input: InputPortID) -> PortResult<Option<Bytes>> {
181        todo!() // TODO: implement try_recv()
182    }
183}