telltale_runtime/topology/
transport.rs1use super::{Location, Topology, TopologyMode};
9use crate::identifiers::RoleName;
10use crate::mutex_lock;
11use crate::runtime::sync::{mpsc, Mutex};
12use async_trait::async_trait;
13use cfg_if::cfg_if;
14#[cfg(target_arch = "wasm32")]
15use futures::{SinkExt, StreamExt};
16use std::collections::BTreeMap;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[derive(Debug, Error)]
22pub enum TransportError {
23 #[error("connection failed: {0}")]
24 ConnectionFailed(String),
25
26 #[error("send failed: {0}")]
27 SendFailed(String),
28
29 #[error("receive failed: {0}")]
30 ReceiveFailed(String),
31
32 #[error("timeout")]
33 Timeout,
34
35 #[error("channel closed")]
36 ChannelClosed,
37
38 #[error("unknown role: {0}")]
39 UnknownRole(RoleName),
40
41 #[error("transport not ready")]
42 NotReady,
43
44 #[error("IO error: {0}")]
45 IoError(#[from] std::io::Error),
46}
47
48pub type TransportResult<T> = Result<T, TransportError>;
50
51pub trait TransportMessage: Send + Sync + 'static {
53 fn to_bytes(&self) -> Vec<u8>;
55
56 fn from_bytes(bytes: &[u8]) -> Result<Self, String>
58 where
59 Self: Sized;
60}
61
62#[derive(Debug, Clone)]
64pub struct ByteMessage(pub Vec<u8>);
65
66impl TransportMessage for ByteMessage {
67 fn to_bytes(&self) -> Vec<u8> {
68 self.0.clone()
69 }
70
71 fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
72 Ok(ByteMessage(bytes.to_vec()))
73 }
74}
75
76#[async_trait]
78pub trait Transport: Send + Sync + 'static {
79 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()>;
81
82 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>>;
84
85 fn is_connected(&self, role: &RoleName) -> bool;
87
88 async fn close(&self) -> TransportResult<()>;
90}
91
92pub struct InMemoryChannelTransport {
97 role: RoleName,
99 senders: Arc<Mutex<BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>>>,
101 receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
103}
104
105impl InMemoryChannelTransport {
106 pub fn new(role: RoleName) -> Self {
108 Self {
109 role,
110 senders: Arc::new(Mutex::new(BTreeMap::new())),
111 receivers: Arc::new(Mutex::new(BTreeMap::new())),
112 }
113 }
114
115 pub async fn connect(&self, other: &InMemoryChannelTransport) {
117 let (tx1, rx1) = mpsc::channel(32);
118 let (tx2, rx2) = mpsc::channel(32);
119
120 mutex_lock!(self.senders).insert(other.role.clone(), tx1);
122 mutex_lock!(other.receivers).insert(self.role.clone(), rx1);
123
124 mutex_lock!(other.senders).insert(self.role.clone(), tx2);
126 mutex_lock!(self.receivers).insert(other.role.clone(), rx2);
127 }
128
129 pub fn role(&self) -> &RoleName {
131 &self.role
132 }
133}
134
135#[async_trait]
136impl Transport for InMemoryChannelTransport {
137 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
138 cfg_if! {
139 if #[cfg(target_arch = "wasm32")] {
140 let sender = {
142 let senders = mutex_lock!(self.senders);
143 senders
144 .get(to_role)
145 .cloned()
146 .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?
147 };
148
149 let mut sender = sender;
150 sender
151 .send(message)
152 .await
153 .map_err(|_| TransportError::ChannelClosed)
154 } else {
155 let senders = mutex_lock!(self.senders);
156 let sender = senders
157 .get(to_role)
158 .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?;
159
160 sender
161 .send(message)
162 .await
163 .map_err(|_| TransportError::ChannelClosed)
164 }
165 }
166 }
167
168 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
169 cfg_if! {
170 if #[cfg(target_arch = "wasm32")] {
171 let mut receiver = {
173 let mut receivers = mutex_lock!(self.receivers);
174 receivers
175 .remove(from_role)
176 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?
177 };
178
179 let result = receiver.next().await;
180
181 {
182 let mut receivers = mutex_lock!(self.receivers);
183 receivers.insert(from_role.clone(), receiver);
184 }
185
186 result.ok_or(TransportError::ChannelClosed)
187 } else {
188 let mut receivers = mutex_lock!(self.receivers);
189 let receiver = receivers
190 .get_mut(from_role)
191 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
192 receiver.recv().await.ok_or(TransportError::ChannelClosed)
193 }
194 }
195 }
196
197 fn is_connected(&self, _role: &RoleName) -> bool {
198 true
201 }
202
203 async fn close(&self) -> TransportResult<()> {
204 mutex_lock!(self.senders).clear();
205 mutex_lock!(self.receivers).clear();
206 Ok(())
207 }
208}
209
210pub struct TransportFactory;
212
213impl TransportFactory {
214 pub fn create(topology: &Topology, role: &RoleName) -> Box<dyn Transport> {
216 match &topology.mode {
217 Some(TopologyMode::Local) | None => {
218 Box::new(InMemoryChannelTransport::new(role.clone()))
219 }
220 Some(TopologyMode::PerRole) => {
221 Box::new(InMemoryChannelTransport::new(role.clone()))
223 }
224 Some(TopologyMode::Kubernetes(_namespace)) => {
225 Box::new(InMemoryChannelTransport::new(role.clone()))
227 }
228 Some(TopologyMode::Consul(_datacenter)) => {
229 Box::new(InMemoryChannelTransport::new(role.clone()))
231 }
232 }
233 }
234
235 pub fn transport_for_location(
237 _from_role: &RoleName,
238 to_role: &RoleName,
239 topology: &Topology,
240 ) -> Result<TransportType, super::TopologyError> {
241 match topology.get_location(to_role)? {
242 Location::Local => Ok(TransportType::InMemory),
243 Location::Colocated(_) => Ok(TransportType::SharedMemory),
244 Location::Remote(_) => Ok(TransportType::Tcp),
245 }
246 }
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum TransportType {
252 InMemory,
254 SharedMemory,
256 Tcp,
258 WebSocket,
260}
261
262impl TransportType {
263 pub fn is_local(&self) -> bool {
265 matches!(self, TransportType::InMemory | TransportType::SharedMemory)
266 }
267}
268
269#[cfg(all(test, not(target_arch = "wasm32")))]
270mod tests {
271 use super::*;
272
273 #[tokio::test]
274 async fn test_in_memory_transport() {
275 let alice = InMemoryChannelTransport::new(RoleName::from_static("Alice"));
276 let bob = InMemoryChannelTransport::new(RoleName::from_static("Bob"));
277
278 alice.connect(&bob).await;
279
280 alice
282 .send(&RoleName::from_static("Bob"), b"Hello Bob".to_vec())
283 .await
284 .unwrap();
285
286 let msg = bob.recv(&RoleName::from_static("Alice")).await.unwrap();
288 assert_eq!(msg, b"Hello Bob".to_vec());
289
290 bob.send(&RoleName::from_static("Alice"), b"Hello Alice".to_vec())
292 .await
293 .unwrap();
294
295 let msg = alice.recv(&RoleName::from_static("Bob")).await.unwrap();
297 assert_eq!(msg, b"Hello Alice".to_vec());
298 }
299
300 #[test]
301 fn test_transport_type_for_location() {
302 let topology = Topology::builder()
303 .local_role(RoleName::from_static("Alice"))
304 .remote_role(
305 RoleName::from_static("Bob"),
306 crate::identifiers::Endpoint::new("localhost:8080").unwrap(),
307 )
308 .colocated_role(
309 RoleName::from_static("Carol"),
310 RoleName::from_static("Alice"),
311 )
312 .build();
313
314 assert_eq!(
315 TransportFactory::transport_for_location(
316 &RoleName::from_static("Alice"),
317 &RoleName::from_static("Alice"),
318 &topology
319 )
320 .unwrap(),
321 TransportType::InMemory
322 );
323 assert_eq!(
324 TransportFactory::transport_for_location(
325 &RoleName::from_static("Alice"),
326 &RoleName::from_static("Bob"),
327 &topology
328 )
329 .unwrap(),
330 TransportType::Tcp
331 );
332 assert_eq!(
333 TransportFactory::transport_for_location(
334 &RoleName::from_static("Alice"),
335 &RoleName::from_static("Carol"),
336 &topology
337 )
338 .unwrap(),
339 TransportType::SharedMemory
340 );
341 }
342
343 #[test]
344 fn test_transport_type_is_local() {
345 assert!(TransportType::InMemory.is_local());
346 assert!(TransportType::SharedMemory.is_local());
347 assert!(!TransportType::Tcp.is_local());
348 assert!(!TransportType::WebSocket.is_local());
349 }
350}