protoflow_core/transports/
mpsc.rs1mod event;
4use event::*;
5
6mod input;
7use input::*;
8
9mod output;
10use output::*;
11
12extern crate std;
13
14use crate::{
15 prelude::{Bytes, ToString},
16 transport::Transport,
17 InputPortID, OutputPortID, PortError, PortResult, PortState,
18};
19use parking_lot::{Mutex, RwLock};
20use sharded_slab::Slab;
21use std::sync::mpsc::sync_channel;
22
23pub(crate) const DEFAULT_CONNECTION_CAPACITY: usize = 1;
24
25#[derive(Debug, Default)]
26pub struct MpscTransport {
27 outputs: Slab<RwLock<MpscTransportOutputPortState>>,
28 inputs: Slab<RwLock<MpscTransportInputPortState>>,
29}
30
31impl MpscTransport {
32 pub fn new() -> Self {
33 Self::default()
34 }
35}
36
37impl Transport for MpscTransport {
38 fn input_state(&self, input: InputPortID) -> PortResult<PortState> {
39 match self.inputs.get(input.index()) {
40 None => Err(PortError::Invalid(input.into())),
41 Some(entry) => Ok(entry.read().state()),
42 }
43 }
44
45 fn output_state(&self, output: OutputPortID) -> PortResult<PortState> {
46 match self.outputs.get(output.index()) {
47 None => Err(PortError::Invalid(output.into())),
48 Some(entry) => Ok(entry.read().state()),
49 }
50 }
51
52 fn open_input(&self) -> PortResult<InputPortID> {
53 let input_index = self
54 .inputs
55 .insert(RwLock::new(MpscTransportInputPortState::Open))
56 .unwrap();
57
58 InputPortID::try_from(-(input_index as isize + 1))
59 .map_err(|s| PortError::Other(s.to_string()))
60 }
61
62 fn open_output(&self) -> PortResult<OutputPortID> {
63 let output_index = self
64 .outputs
65 .insert(RwLock::new(MpscTransportOutputPortState::Open))
66 .unwrap();
67
68 OutputPortID::try_from(output_index as isize + 1)
69 .map_err(|s| PortError::Other(s.to_string()))
70 }
71
72 fn close_input(&self, input: InputPortID) -> PortResult<bool> {
73 let Some(input_entry) = self.inputs.get(input.index()) else {
74 return Err(PortError::Invalid(input.into()));
75 };
76 let mut input_state = input_entry.write();
77
78 use MpscTransportInputPortState::*;
79 Ok(match *input_state {
80 Closed => false, Open | Connected(_) => {
82 *input_state = MpscTransportInputPortState::Closed;
83 true
84 }
85 })
86 }
87
88 fn close_output(&self, output: OutputPortID) -> PortResult<bool> {
89 let Some(output_entry) = self.outputs.get(output.index()) else {
90 return Err(PortError::Invalid(output.into()));
91 };
92 let mut output_state = output_entry.write();
93
94 use MpscTransportOutputPortState::*;
95 Ok(match *output_state {
96 Closed => false, Open => {
98 *output_state = MpscTransportOutputPortState::Closed;
99 true
100 }
101 Connected(ref sender) => {
102 let sender = sender.clone();
103 *output_state = MpscTransportOutputPortState::Closed;
104 drop(output_state);
105 sender.send(MpscTransportEvent::Disconnect).unwrap(); true
107 }
108 })
109 }
110
111 fn connect(&self, source: OutputPortID, target: InputPortID) -> PortResult<bool> {
112 let Some(output_entry) = self.outputs.get(source.index()) else {
113 return Err(PortError::Invalid(source.into()));
114 };
115
116 let Some(input_entry) = self.inputs.get(target.index()) else {
117 return Err(PortError::Invalid(target.into()));
118 };
119
120 let mut output_state = output_entry.write();
121 let mut input_state = input_entry.write();
122 if !output_state.state().is_open() && !input_state.state().is_open() {
123 return Err(PortError::Other("connect".to_string())); }
125
126 let (sender, receiver) = sync_channel(DEFAULT_CONNECTION_CAPACITY);
127 *output_state = MpscTransportOutputPortState::Connected(sender);
128 *input_state = MpscTransportInputPortState::Connected(Mutex::new(receiver));
129 Ok(true)
130 }
131
132 fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> {
133 let Some(output_entry) = self.outputs.get(output.index()) else {
134 return Err(PortError::Invalid(output.into()));
135 };
136 let output_state = output_entry.read();
137
138 use MpscTransportOutputPortState::*;
139 match *output_state {
140 Closed => return Err(PortError::Closed),
141 Open => return Err(PortError::Disconnected),
142 Connected(ref sender) => {
143 let sender = sender.clone();
144 Ok(sender.send(MpscTransportEvent::Message(message)).unwrap()) }
146 }
147 }
148
149 fn recv(&self, input: InputPortID) -> PortResult<Option<Bytes>> {
150 let Some(input_entry) = self.inputs.get(input.index()) else {
151 return Err(PortError::Invalid(input.into()));
152 };
153 let input_state = input_entry.read();
154
155 use MpscTransportInputPortState::*;
156 match *input_state {
157 Closed => return Ok(None), Open => return Ok(None), Connected(ref receiver) => {
160 use MpscTransportEvent::*;
161 let receiver = receiver.lock();
162 match receiver
163 .recv() .map_err(|_| PortError::Disconnected)?
165 {
166 Connect => unreachable!(),
167 Message(bytes) => Ok(Some(bytes)),
168 Disconnect => {
169 drop(receiver);
170 drop(input_state);
171 let mut input_state = input_entry.write();
172 *input_state = Closed;
173 Ok(None) }
175 }
176 }
177 }
178 }
179
180 fn try_recv(&self, _input: InputPortID) -> PortResult<Option<Bytes>> {
181 todo!() }
183}