Skip to main content

telltale_runtime/topology/
transport.rs

1//! Transport trait and implementations for topology-aware communication.
2//!
3//! This module provides abstractions for different transport mechanisms:
4//! - `InMemoryTransport`: In-process communication using channels
5//! - `TcpTransport`: Network communication over TCP
6//! - Discovery-based transports (Kubernetes, Consul)
7
8use super::{Location, Topology, TopologyMode};
9use crate::identifiers::RoleName;
10use crate::mutex_lock;
11use crate::runtime::sync::{mpsc, Mutex};
12use async_trait::async_trait;
13use cfg_if::cfg_if;
14#[cfg(target_arch = "wasm32")]
15use futures::{SinkExt, StreamExt};
16use std::collections::BTreeMap;
17use std::sync::Arc;
18use thiserror::Error;
19
20/// Errors that can occur during transport operations.
21#[derive(Debug, Error)]
22pub enum TransportError {
23    #[error("connection failed: {0}")]
24    ConnectionFailed(String),
25
26    #[error("send failed: {0}")]
27    SendFailed(String),
28
29    #[error("receive failed: {0}")]
30    ReceiveFailed(String),
31
32    #[error("timeout")]
33    Timeout,
34
35    #[error("channel closed")]
36    ChannelClosed,
37
38    #[error("unknown role: {0}")]
39    UnknownRole(RoleName),
40
41    #[error("transport not ready")]
42    NotReady,
43
44    #[error("IO error: {0}")]
45    IoError(#[from] std::io::Error),
46}
47
48/// Result type for transport operations.
49pub type TransportResult<T> = Result<T, TransportError>;
50
51/// A message that can be sent over a transport.
52pub trait TransportMessage: Send + Sync + 'static {
53    /// Serialize the message to bytes.
54    fn to_bytes(&self) -> Vec<u8>;
55
56    /// Deserialize from bytes.
57    fn from_bytes(bytes: &[u8]) -> Result<Self, String>
58    where
59        Self: Sized;
60}
61
62/// Simple byte message for basic transport.
63#[derive(Debug, Clone)]
64pub struct ByteMessage(pub Vec<u8>);
65
66impl TransportMessage for ByteMessage {
67    fn to_bytes(&self) -> Vec<u8> {
68        self.0.clone()
69    }
70
71    fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
72        Ok(ByteMessage(bytes.to_vec()))
73    }
74}
75
76/// Transport trait for sending and receiving messages between roles.
77#[async_trait]
78pub trait Transport: Send + Sync + 'static {
79    /// Send a message to a specific role.
80    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()>;
81
82    /// Receive a message from a specific role.
83    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>>;
84
85    /// Check if the transport is connected to a role.
86    fn is_connected(&self, role: &RoleName) -> bool;
87
88    /// Close the transport connection.
89    async fn close(&self) -> TransportResult<()>;
90}
91
92/// In-memory transport using channels.
93///
94/// This is the default transport for local testing where all roles
95/// run in the same process.
96pub struct InMemoryChannelTransport {
97    /// Role this transport belongs to.
98    role: RoleName,
99    /// Sender channels to other roles (role -> sender).
100    senders: Arc<Mutex<BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>>>,
101    /// Receiver channels from other roles (role -> receiver).
102    receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
103}
104
105impl InMemoryChannelTransport {
106    /// Create a new in-memory transport for a role.
107    pub fn new(role: RoleName) -> Self {
108        Self {
109            role,
110            senders: Arc::new(Mutex::new(BTreeMap::new())),
111            receivers: Arc::new(Mutex::new(BTreeMap::new())),
112        }
113    }
114
115    /// Connect this transport to another role's transport.
116    pub async fn connect(&self, other: &InMemoryChannelTransport) {
117        let (tx1, rx1) = mpsc::channel(32);
118        let (tx2, rx2) = mpsc::channel(32);
119
120        // Self -> Other
121        mutex_lock!(self.senders).insert(other.role.clone(), tx1);
122        mutex_lock!(other.receivers).insert(self.role.clone(), rx1);
123
124        // Other -> Self
125        mutex_lock!(other.senders).insert(self.role.clone(), tx2);
126        mutex_lock!(self.receivers).insert(other.role.clone(), rx2);
127    }
128
129    /// Get the role name.
130    pub fn role(&self) -> &RoleName {
131        &self.role
132    }
133}
134
135#[async_trait]
136impl Transport for InMemoryChannelTransport {
137    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
138        cfg_if! {
139            if #[cfg(target_arch = "wasm32")] {
140                // Clone the sender to release the lock before awaiting.
141                let sender = {
142                    let senders = mutex_lock!(self.senders);
143                    senders
144                        .get(to_role)
145                        .cloned()
146                        .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?
147                };
148
149                let mut sender = sender;
150                sender
151                    .send(message)
152                    .await
153                    .map_err(|_| TransportError::ChannelClosed)
154            } else {
155                let senders = mutex_lock!(self.senders);
156                let sender = senders
157                    .get(to_role)
158                    .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?;
159
160                sender
161                    .send(message)
162                    .await
163                    .map_err(|_| TransportError::ChannelClosed)
164            }
165        }
166    }
167
168    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
169        cfg_if! {
170            if #[cfg(target_arch = "wasm32")] {
171                // For WASM, take the receiver out so the lock is not held across `.await`.
172                let mut receiver = {
173                    let mut receivers = mutex_lock!(self.receivers);
174                    receivers
175                        .remove(from_role)
176                        .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?
177                };
178
179                let result = receiver.next().await;
180
181                {
182                    let mut receivers = mutex_lock!(self.receivers);
183                    receivers.insert(from_role.clone(), receiver);
184                }
185
186                result.ok_or(TransportError::ChannelClosed)
187            } else {
188                let mut receivers = mutex_lock!(self.receivers);
189                let receiver = receivers
190                    .get_mut(from_role)
191                    .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
192                receiver.recv().await.ok_or(TransportError::ChannelClosed)
193            }
194        }
195    }
196
197    fn is_connected(&self, _role: &RoleName) -> bool {
198        // For in-memory, assume always connected after setup
199        // In production, this should check if we have a sender for this role
200        true
201    }
202
203    async fn close(&self) -> TransportResult<()> {
204        mutex_lock!(self.senders).clear();
205        mutex_lock!(self.receivers).clear();
206        Ok(())
207    }
208}
209
210/// Factory for creating transports based on topology.
211pub struct TransportFactory;
212
213impl TransportFactory {
214    /// Create a transport for a role based on the topology.
215    pub fn create(topology: &Topology, role: &RoleName) -> Box<dyn Transport> {
216        match &topology.mode {
217            Some(TopologyMode::Local) | None => {
218                Box::new(InMemoryChannelTransport::new(role.clone()))
219            }
220            Some(TopologyMode::PerRole) => {
221                // PerRole mode falls back to in-memory (TCP transport unsupported)
222                Box::new(InMemoryChannelTransport::new(role.clone()))
223            }
224            Some(TopologyMode::Kubernetes(_namespace)) => {
225                // Kubernetes mode falls back to in-memory (K8s discovery unsupported)
226                Box::new(InMemoryChannelTransport::new(role.clone()))
227            }
228            Some(TopologyMode::Consul(_datacenter)) => {
229                // Consul mode falls back to in-memory (Consul discovery unsupported)
230                Box::new(InMemoryChannelTransport::new(role.clone()))
231            }
232        }
233    }
234
235    /// Select transport type based on location.
236    pub fn transport_for_location(
237        _from_role: &RoleName,
238        to_role: &RoleName,
239        topology: &Topology,
240    ) -> Result<TransportType, super::TopologyError> {
241        match topology.get_location(to_role)? {
242            Location::Local => Ok(TransportType::InMemory),
243            Location::Colocated(_) => Ok(TransportType::SharedMemory),
244            Location::Remote(_) => Ok(TransportType::Tcp),
245        }
246    }
247}
248
249/// Types of transport available.
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum TransportType {
252    /// In-process channels.
253    InMemory,
254    /// Shared memory (for colocated roles).
255    SharedMemory,
256    /// TCP network transport.
257    Tcp,
258    /// WebSocket transport.
259    WebSocket,
260}
261
262impl TransportType {
263    /// Check if this transport type is local (no network).
264    pub fn is_local(&self) -> bool {
265        matches!(self, TransportType::InMemory | TransportType::SharedMemory)
266    }
267}
268
269#[cfg(all(test, not(target_arch = "wasm32")))]
270mod tests {
271    use super::*;
272
273    #[tokio::test]
274    async fn test_in_memory_transport() {
275        let alice = InMemoryChannelTransport::new(RoleName::from_static("Alice"));
276        let bob = InMemoryChannelTransport::new(RoleName::from_static("Bob"));
277
278        alice.connect(&bob).await;
279
280        // Alice sends to Bob
281        alice
282            .send(&RoleName::from_static("Bob"), b"Hello Bob".to_vec())
283            .await
284            .unwrap();
285
286        // Bob receives from Alice
287        let msg = bob.recv(&RoleName::from_static("Alice")).await.unwrap();
288        assert_eq!(msg, b"Hello Bob".to_vec());
289
290        // Bob sends to Alice
291        bob.send(&RoleName::from_static("Alice"), b"Hello Alice".to_vec())
292            .await
293            .unwrap();
294
295        // Alice receives from Bob
296        let msg = alice.recv(&RoleName::from_static("Bob")).await.unwrap();
297        assert_eq!(msg, b"Hello Alice".to_vec());
298    }
299
300    #[test]
301    fn test_transport_type_for_location() {
302        let topology = Topology::builder()
303            .local_role(RoleName::from_static("Alice"))
304            .remote_role(
305                RoleName::from_static("Bob"),
306                crate::identifiers::Endpoint::new("localhost:8080").unwrap(),
307            )
308            .colocated_role(
309                RoleName::from_static("Carol"),
310                RoleName::from_static("Alice"),
311            )
312            .build();
313
314        assert_eq!(
315            TransportFactory::transport_for_location(
316                &RoleName::from_static("Alice"),
317                &RoleName::from_static("Alice"),
318                &topology
319            )
320            .unwrap(),
321            TransportType::InMemory
322        );
323        assert_eq!(
324            TransportFactory::transport_for_location(
325                &RoleName::from_static("Alice"),
326                &RoleName::from_static("Bob"),
327                &topology
328            )
329            .unwrap(),
330            TransportType::Tcp
331        );
332        assert_eq!(
333            TransportFactory::transport_for_location(
334                &RoleName::from_static("Alice"),
335                &RoleName::from_static("Carol"),
336                &topology
337            )
338            .unwrap(),
339            TransportType::SharedMemory
340        );
341    }
342
343    #[test]
344    fn test_transport_type_is_local() {
345        assert!(TransportType::InMemory.is_local());
346        assert!(TransportType::SharedMemory.is_local());
347        assert!(!TransportType::Tcp.is_local());
348        assert!(!TransportType::WebSocket.is_local());
349    }
350}