Skip to main content

telltale_runtime/topology/
transport.rs

1//! Transport trait and implementations for topology-aware communication.
2//!
3//! This module provides abstractions for different transport mechanisms:
4//! - `InMemoryTransport`: In-process communication using channels
5//! - `TcpTransport`: Network communication over TCP
6
7use super::{
8    validate_transport_contract_profile, DocumentedTransportContract, Location, Topology,
9    TransportContractProfile, TransportContractTier, TransportOperationalContract,
10    TransportSemanticContract, TransportStartupMode,
11};
12use crate::identifiers::RoleName;
13use crate::mutex_lock;
14#[cfg(not(target_arch = "wasm32"))]
15use crate::runtime::spawn::spawn;
16use crate::runtime::sync::{mpsc, Mutex};
17use async_trait::async_trait;
18use cfg_if::cfg_if;
19#[cfg(target_arch = "wasm32")]
20use futures::{SinkExt, StreamExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23#[cfg(not(target_arch = "wasm32"))]
24use std::sync::{Mutex as StdMutex, OnceLock};
25use thiserror::Error;
26
27#[cfg(not(target_arch = "wasm32"))]
28use tokio::io::{AsyncReadExt, AsyncWriteExt};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::net::{TcpListener, TcpStream};
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::time::{sleep, Duration};
33
34/// Errors that can occur during transport operations.
35#[derive(Debug, Error)]
36pub enum TransportError {
37    #[error("connection failed: {0}")]
38    ConnectionFailed(String),
39
40    #[error("send failed: {0}")]
41    SendFailed(String),
42
43    #[error("receive failed: {0}")]
44    ReceiveFailed(String),
45
46    #[error("timeout")]
47    Timeout,
48
49    #[error("channel closed")]
50    ChannelClosed,
51
52    #[error("unknown role: {0}")]
53    UnknownRole(RoleName),
54
55    #[error("transport not ready")]
56    NotReady,
57
58    #[error("IO error: {0}")]
59    IoError(#[from] std::io::Error),
60}
61
62/// Result type for transport operations.
63pub type TransportResult<T> = Result<T, TransportError>;
64
65/// A message that can be sent over a transport.
66pub trait TransportMessage: Send + Sync + 'static {
67    /// Serialize the message to bytes.
68    fn to_bytes(&self) -> Vec<u8>;
69
70    /// Deserialize from bytes.
71    fn from_bytes(bytes: &[u8]) -> Result<Self, String>
72    where
73        Self: Sized;
74}
75
76/// Simple byte message for basic transport.
77#[derive(Debug, Clone)]
78pub struct ByteMessage(pub Vec<u8>);
79
80impl TransportMessage for ByteMessage {
81    fn to_bytes(&self) -> Vec<u8> {
82        self.0.clone()
83    }
84
85    fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
86        Ok(ByteMessage(bytes.to_vec()))
87    }
88}
89
90/// Transport trait for sending and receiving messages between roles.
91#[async_trait]
92pub trait Transport: Send + Sync + 'static {
93    /// Send a message to a specific role.
94    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()>;
95
96    /// Receive a message from a specific role.
97    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>>;
98
99    /// Check if the transport is connected to a role.
100    fn is_connected(&self, role: &RoleName) -> bool;
101
102    /// Close the transport connection.
103    async fn close(&self) -> TransportResult<()>;
104}
105
106/// In-memory transport using channels.
107///
108/// This is the default transport for local testing where all roles
109/// run in the same process.
110pub struct InMemoryChannelTransport {
111    /// Role this transport belongs to.
112    role: RoleName,
113    /// Sender channels to other roles (role -> sender).
114    senders: Arc<Mutex<BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>>>,
115    /// Receiver channels from other roles (role -> receiver).
116    receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
117}
118
119impl InMemoryChannelTransport {
120    /// Create a new in-memory transport for a role.
121    pub fn new(role: RoleName) -> Self {
122        Self {
123            role,
124            senders: Arc::new(Mutex::new(BTreeMap::new())),
125            receivers: Arc::new(Mutex::new(BTreeMap::new())),
126        }
127    }
128
129    /// Connect this transport to another role's transport.
130    pub async fn connect(&self, other: &InMemoryChannelTransport) {
131        let (tx1, rx1) = mpsc::channel(32);
132        let (tx2, rx2) = mpsc::channel(32);
133
134        // Self -> Other
135        mutex_lock!(self.senders).insert(other.role.clone(), tx1);
136        mutex_lock!(other.receivers).insert(self.role.clone(), rx1);
137
138        // Other -> Self
139        mutex_lock!(other.senders).insert(self.role.clone(), tx2);
140        mutex_lock!(self.receivers).insert(other.role.clone(), rx2);
141    }
142
143    /// Get the role name.
144    pub fn role(&self) -> &RoleName {
145        &self.role
146    }
147}
148
149impl DocumentedTransportContract for InMemoryChannelTransport {
150    fn contract_profile() -> TransportContractProfile {
151        TransportContractProfile {
152            transport_name: "InMemoryChannelTransport",
153            tier: TransportContractTier::FirstPartyRuntime,
154            semantics: TransportSemanticContract {
155                role_addressed_routing: true,
156                per_peer_fifo_delivery: true,
157                fail_closed_unknown_role: true,
158                no_message_synthesis: true,
159                explicit_readiness_errors: false,
160                deterministic_for_regression: true,
161            },
162            operational: TransportOperationalContract {
163                transport_type: TransportType::InMemory,
164                startup_mode: TransportStartupMode::ReadyOnCreate,
165                environment_resolved: false,
166            },
167            notes: vec![
168                "In-process channel transport for first-party local execution.",
169                "Deterministic enough for strict regression suites.",
170            ],
171        }
172    }
173}
174
175#[async_trait]
176impl Transport for InMemoryChannelTransport {
177    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
178        cfg_if! {
179            if #[cfg(target_arch = "wasm32")] {
180                // Clone the sender to release the lock before awaiting.
181                let sender = {
182                    let senders = mutex_lock!(self.senders);
183                    senders
184                        .get(to_role)
185                        .cloned()
186                        .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?
187                };
188
189                let mut sender = sender;
190                sender
191                    .send(message)
192                    .await
193                    .map_err(|_| TransportError::ChannelClosed)
194            } else {
195                let senders = mutex_lock!(self.senders);
196                let sender = senders
197                    .get(to_role)
198                    .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?;
199
200                sender
201                    .send(message)
202                    .await
203                    .map_err(|_| TransportError::ChannelClosed)
204            }
205        }
206    }
207
208    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
209        cfg_if! {
210            if #[cfg(target_arch = "wasm32")] {
211                // For WASM, take the receiver out so the lock is not held across `.await`.
212                let mut receiver = {
213                    let mut receivers = mutex_lock!(self.receivers);
214                    receivers
215                        .remove(from_role)
216                        .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?
217                };
218
219                let result = receiver.next().await;
220
221                {
222                    let mut receivers = mutex_lock!(self.receivers);
223                    receivers.insert(from_role.clone(), receiver);
224                }
225
226                result.ok_or(TransportError::ChannelClosed)
227            } else {
228                let mut receivers = mutex_lock!(self.receivers);
229                let receiver = receivers
230                    .get_mut(from_role)
231                    .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
232                receiver.recv().await.ok_or(TransportError::ChannelClosed)
233            }
234        }
235    }
236
237    fn is_connected(&self, _role: &RoleName) -> bool {
238        // For in-memory, assume always connected after setup
239        // In production, this should check if we have a sender for this role
240        true
241    }
242
243    async fn close(&self) -> TransportResult<()> {
244        mutex_lock!(self.senders).clear();
245        mutex_lock!(self.receivers).clear();
246        Ok(())
247    }
248}
249
250#[cfg(not(target_arch = "wasm32"))]
251enum TcpListenerState {
252    NotStarted,
253    Started,
254    Failed(String),
255}
256
257#[cfg(not(target_arch = "wasm32"))]
258struct TcpRoleState {
259    role: RoleName,
260    self_endpoint: Option<crate::identifiers::Endpoint>,
261    inbound_senders: BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>,
262    inbound_receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
263    listener_state: Arc<Mutex<TcpListenerState>>,
264}
265
266#[cfg(not(target_arch = "wasm32"))]
267impl TcpRoleState {
268    fn new(
269        role: RoleName,
270        self_endpoint: Option<crate::identifiers::Endpoint>,
271        peer_roles: impl IntoIterator<Item = RoleName>,
272    ) -> Self {
273        let mut inbound_senders = BTreeMap::new();
274        let mut inbound_receivers = BTreeMap::new();
275        for peer in peer_roles {
276            let (tx, rx) = mpsc::channel(32);
277            inbound_senders.insert(peer.clone(), tx);
278            inbound_receivers.insert(peer, rx);
279        }
280        Self {
281            role,
282            self_endpoint,
283            inbound_senders,
284            inbound_receivers: Arc::new(Mutex::new(inbound_receivers)),
285            listener_state: Arc::new(Mutex::new(TcpListenerState::NotStarted)),
286        }
287    }
288
289    async fn ensure_started(self: &Arc<Self>) -> TransportResult<()> {
290        let mut state = mutex_lock!(self.listener_state);
291        match &*state {
292            TcpListenerState::Started => return Ok(()),
293            TcpListenerState::Failed(message) => {
294                return Err(TransportError::ConnectionFailed(message.clone()));
295            }
296            TcpListenerState::NotStarted => {}
297        }
298
299        let Some(endpoint) = self.self_endpoint.clone() else {
300            *state = TcpListenerState::Started;
301            return Ok(());
302        };
303
304        let listener = TcpListener::bind(endpoint.as_str()).await.map_err(|err| {
305            let message = format!(
306                "failed to bind {} for role {}: {}",
307                endpoint, self.role, err
308            );
309            *state = TcpListenerState::Failed(message.clone());
310            TransportError::ConnectionFailed(message)
311        })?;
312        let role_state = Arc::clone(self);
313        spawn(async move {
314            role_state.accept_loop(listener).await;
315        });
316        *state = TcpListenerState::Started;
317        Ok(())
318    }
319
320    async fn accept_loop(self: Arc<Self>, listener: TcpListener) {
321        loop {
322            let Ok((socket, _)) = listener.accept().await else {
323                break;
324            };
325            let role_state = Arc::clone(&self);
326            spawn(async move {
327                let _ = role_state.handle_socket(socket).await;
328            });
329        }
330    }
331
332    async fn handle_socket(&self, mut socket: TcpStream) -> TransportResult<()> {
333        let role_len = socket.read_u32().await? as usize;
334        let mut role_buf = vec![0_u8; role_len];
335        socket.read_exact(&mut role_buf).await?;
336        let from_role = String::from_utf8(role_buf).map_err(|err| {
337            TransportError::ReceiveFailed(format!("invalid sender header: {err}"))
338        })?;
339        let payload_len = socket.read_u32().await? as usize;
340        let mut payload = vec![0_u8; payload_len];
341        socket.read_exact(&mut payload).await?;
342        let sender_role = RoleName::new(from_role.clone()).map_err(|err| {
343            TransportError::ReceiveFailed(format!("invalid sender role `{from_role}`: {err}"))
344        })?;
345        let sender = self
346            .inbound_senders
347            .get(&sender_role)
348            .cloned()
349            .ok_or_else(|| {
350                TransportError::ReceiveFailed(format!(
351                    "sender role `{sender_role}` is not configured for {}",
352                    self.role
353                ))
354            })?;
355        sender
356            .send(payload)
357            .await
358            .map_err(|_| TransportError::ChannelClosed)
359    }
360
361    async fn recv_from(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
362        let mut receivers = mutex_lock!(self.inbound_receivers);
363        let receiver = receivers
364            .get_mut(from_role)
365            .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
366        receiver.recv().await.ok_or(TransportError::ChannelClosed)
367    }
368}
369
370#[cfg(not(target_arch = "wasm32"))]
371type SharedTcpRegistry = BTreeMap<String, Arc<TcpRoleState>>;
372
373#[cfg(not(target_arch = "wasm32"))]
374fn shared_tcp_registry() -> &'static StdMutex<SharedTcpRegistry> {
375    static REGISTRY: OnceLock<StdMutex<SharedTcpRegistry>> = OnceLock::new();
376    REGISTRY.get_or_init(|| StdMutex::new(BTreeMap::new()))
377}
378
379#[cfg(not(target_arch = "wasm32"))]
380fn tcp_role_registry_key(topology_signature: &str, role: &RoleName) -> String {
381    format!("{topology_signature}|role:{role}")
382}
383
384#[cfg(not(target_arch = "wasm32"))]
385fn shared_tcp_role_state(
386    topology: &Topology,
387    topology_signature: &str,
388    role: &RoleName,
389) -> TransportResult<Arc<TcpRoleState>> {
390    let key = tcp_role_registry_key(topology_signature, role);
391    let mut registry = shared_tcp_registry()
392        .lock()
393        .unwrap_or_else(|poisoned| poisoned.into_inner());
394    if let Some(existing) = registry.get(&key) {
395        return Ok(Arc::clone(existing));
396    }
397
398    let self_endpoint = match topology.get_location(role) {
399        Ok(Location::Remote(endpoint)) => Some(endpoint),
400        Ok(Location::Local | Location::Colocated(_)) => None,
401        Err(_) => return Err(TransportError::UnknownRole(role.clone())),
402    };
403    let peer_roles = topology
404        .locations
405        .keys()
406        .filter(|peer| *peer != role)
407        .cloned();
408    let state = Arc::new(TcpRoleState::new(role.clone(), self_endpoint, peer_roles));
409    registry.insert(key, Arc::clone(&state));
410    Ok(state)
411}
412
413#[cfg(not(target_arch = "wasm32"))]
414async fn connect_with_retry(endpoint: &crate::identifiers::Endpoint) -> TransportResult<TcpStream> {
415    let mut attempts = 0_u8;
416    loop {
417        match TcpStream::connect(endpoint.as_str()).await {
418            Ok(stream) => return Ok(stream),
419            Err(err) if attempts < 10 => {
420                attempts = attempts.saturating_add(1);
421                if err.kind() != std::io::ErrorKind::ConnectionRefused {
422                    return Err(TransportError::ConnectionFailed(err.to_string()));
423                }
424                sleep(Duration::from_millis(10)).await;
425            }
426            Err(err) => return Err(TransportError::ConnectionFailed(err.to_string())),
427        }
428    }
429}
430
431#[cfg(not(target_arch = "wasm32"))]
432struct TcpPeerTransport {
433    state: Arc<TcpRoleState>,
434    peer_role: RoleName,
435    peer_endpoint: Option<crate::identifiers::Endpoint>,
436}
437
438#[cfg(not(target_arch = "wasm32"))]
439impl DocumentedTransportContract for TcpPeerTransport {
440    fn contract_profile() -> TransportContractProfile {
441        TransportContractProfile {
442            transport_name: "TcpPeerTransport",
443            tier: TransportContractTier::FirstPartyRuntime,
444            semantics: TransportSemanticContract {
445                role_addressed_routing: true,
446                per_peer_fifo_delivery: true,
447                fail_closed_unknown_role: true,
448                no_message_synthesis: true,
449                explicit_readiness_errors: true,
450                deterministic_for_regression: false,
451            },
452            operational: TransportOperationalContract {
453                transport_type: TransportType::Tcp,
454                startup_mode: TransportStartupMode::BackgroundWarmup,
455                environment_resolved: false,
456            },
457            notes: vec![
458                "Single-peer runtime TCP transport used for loopback remote topology execution.",
459            ],
460        }
461    }
462}
463
464#[cfg(not(target_arch = "wasm32"))]
465#[async_trait]
466impl Transport for TcpPeerTransport {
467    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
468        if to_role != &self.peer_role {
469            return Err(TransportError::UnknownRole(to_role.clone()));
470        }
471        let endpoint = self.peer_endpoint.clone().ok_or_else(|| {
472            TransportError::ConnectionFailed(format!(
473                "role {} has no remote endpoint configured for peer {}",
474                self.state.role, self.peer_role
475            ))
476        })?;
477        let mut stream = connect_with_retry(&endpoint).await?;
478        let role_bytes = self.state.role.to_string().into_bytes();
479        stream.write_u32(role_bytes.len() as u32).await?;
480        stream.write_all(&role_bytes).await?;
481        stream.write_u32(message.len() as u32).await?;
482        stream.write_all(&message).await?;
483        stream.shutdown().await?;
484        Ok(())
485    }
486
487    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
488        if from_role != &self.peer_role {
489            return Err(TransportError::UnknownRole(from_role.clone()));
490        }
491        self.state.recv_from(from_role).await
492    }
493
494    fn is_connected(&self, role: &RoleName) -> bool {
495        role == &self.peer_role
496    }
497
498    async fn close(&self) -> TransportResult<()> {
499        Ok(())
500    }
501}
502
503#[cfg(not(target_arch = "wasm32"))]
504struct TcpRoleTransport {
505    state: Arc<TcpRoleState>,
506    peer_endpoints: BTreeMap<RoleName, Option<crate::identifiers::Endpoint>>,
507}
508
509#[cfg(not(target_arch = "wasm32"))]
510impl DocumentedTransportContract for TcpRoleTransport {
511    fn contract_profile() -> TransportContractProfile {
512        TransportContractProfile {
513            transport_name: "TcpRoleTransport",
514            tier: TransportContractTier::FirstPartyRuntime,
515            semantics: TransportSemanticContract {
516                role_addressed_routing: true,
517                per_peer_fifo_delivery: true,
518                fail_closed_unknown_role: true,
519                no_message_synthesis: true,
520                explicit_readiness_errors: true,
521                deterministic_for_regression: false,
522            },
523            operational: TransportOperationalContract {
524                transport_type: TransportType::Tcp,
525                startup_mode: TransportStartupMode::BackgroundWarmup,
526                environment_resolved: false,
527            },
528            notes: vec![
529                "Role-addressed runtime TCP transport used by the first-party topology helper.",
530            ],
531        }
532    }
533}
534
535#[cfg(not(target_arch = "wasm32"))]
536#[async_trait]
537impl Transport for TcpRoleTransport {
538    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
539        self.state.ensure_started().await?;
540        let endpoint = self
541            .peer_endpoints
542            .get(to_role)
543            .cloned()
544            .flatten()
545            .ok_or_else(|| {
546                TransportError::ConnectionFailed(format!(
547                    "role {} has no remote endpoint configured for peer {}",
548                    self.state.role, to_role
549                ))
550            })?;
551        let mut stream = connect_with_retry(&endpoint).await?;
552        let role_bytes = self.state.role.to_string().into_bytes();
553        stream.write_u32(role_bytes.len() as u32).await?;
554        stream.write_all(&role_bytes).await?;
555        stream.write_u32(message.len() as u32).await?;
556        stream.write_all(&message).await?;
557        stream.shutdown().await?;
558        Ok(())
559    }
560
561    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
562        self.state.ensure_started().await?;
563        self.state.recv_from(from_role).await
564    }
565
566    fn is_connected(&self, role: &RoleName) -> bool {
567        self.peer_endpoints.contains_key(role)
568    }
569
570    async fn close(&self) -> TransportResult<()> {
571        Ok(())
572    }
573}
574
575#[cfg(not(target_arch = "wasm32"))]
576pub(crate) async fn create_peer_transport(
577    topology: &Topology,
578    topology_signature: &str,
579    role: &RoleName,
580    peer: &RoleName,
581) -> TransportResult<Box<dyn Transport>> {
582    topology
583        .region_for_role(role)
584        .map_err(TransportError::ConnectionFailed)?;
585    topology
586        .region_for_role(peer)
587        .map_err(TransportError::ConnectionFailed)?;
588    let state = shared_tcp_role_state(topology, topology_signature, role)?;
589    state.ensure_started().await?;
590    let peer_endpoint = match topology.get_location(peer) {
591        Ok(Location::Remote(endpoint)) => Some(endpoint),
592        Ok(Location::Local | Location::Colocated(_)) => None,
593        Err(_) => return Err(TransportError::UnknownRole(peer.clone())),
594    };
595    Ok(Box::new(TcpPeerTransport {
596        state,
597        peer_role: peer.clone(),
598        peer_endpoint,
599    }))
600}
601
602/// Factory for creating transports based on topology.
603pub struct TransportFactory;
604
605impl TransportFactory {
606    fn validated_first_party_profile(
607        profile: TransportContractProfile,
608    ) -> TransportResult<TransportContractProfile> {
609        validate_transport_contract_profile(&profile)
610            .map_err(|err| TransportError::ConnectionFailed(err.to_string()))?;
611        Ok(profile)
612    }
613
614    /// Return the documented first-party transport contract selected for a role/topology pair.
615    pub fn contract_profile_for_topology(
616        topology: &Topology,
617        role: &RoleName,
618    ) -> TransportResult<TransportContractProfile> {
619        let has_remote_participants = topology
620            .locations
621            .values()
622            .any(|location| matches!(location, Location::Remote(_)));
623        if has_remote_participants {
624            #[cfg(target_arch = "wasm32")]
625            {
626                let _ = (topology, role);
627                Err(TransportError::NotReady)
628            }
629            #[cfg(not(target_arch = "wasm32"))]
630            {
631                topology
632                    .region_for_role(role)
633                    .map_err(TransportError::ConnectionFailed)?;
634                Self::validated_first_party_profile(TcpRoleTransport::contract_profile())
635            }
636        } else {
637            Self::validated_first_party_profile(InMemoryChannelTransport::contract_profile())
638        }
639    }
640
641    /// Create a transport for a role based on the topology.
642    pub fn create(topology: &Topology, role: &RoleName) -> TransportResult<Box<dyn Transport>> {
643        let _profile = Self::contract_profile_for_topology(topology, role)?;
644        let has_remote_participants = topology
645            .locations
646            .values()
647            .any(|location| matches!(location, Location::Remote(_)));
648        if has_remote_participants {
649            #[cfg(target_arch = "wasm32")]
650            {
651                let _ = role;
652                Err(TransportError::NotReady)
653            }
654            #[cfg(not(target_arch = "wasm32"))]
655            {
656                topology
657                    .region_for_role(role)
658                    .map_err(TransportError::ConnectionFailed)?;
659                let state = shared_tcp_role_state(topology, "transport_factory", role)?;
660                let warm_state = Arc::clone(&state);
661                spawn(async move {
662                    let _ = warm_state.ensure_started().await;
663                });
664                let peer_endpoints = topology
665                    .locations
666                    .iter()
667                    .filter(|(peer, _)| *peer != role)
668                    .map(|(peer, location)| {
669                        let _ = topology
670                            .region_for_role(peer)
671                            .map_err(TransportError::ConnectionFailed)?;
672                        let endpoint = match location {
673                            Location::Remote(endpoint) => Some(endpoint.clone()),
674                            Location::Local | Location::Colocated(_) => None,
675                        };
676                        Ok((peer.clone(), endpoint))
677                    })
678                    .collect::<TransportResult<BTreeMap<_, _>>>()?;
679                Ok(Box::new(TcpRoleTransport {
680                    state,
681                    peer_endpoints,
682                }))
683            }
684        } else {
685            Ok(Box::new(InMemoryChannelTransport::new(role.clone())))
686        }
687    }
688
689    /// Select transport type based on location.
690    pub fn transport_for_location(
691        _from_role: &RoleName,
692        to_role: &RoleName,
693        topology: &Topology,
694    ) -> Result<TransportType, super::TopologyError> {
695        match topology.get_location(to_role)? {
696            Location::Local => Ok(TransportType::InMemory),
697            Location::Colocated(_) => Ok(TransportType::SharedMemory),
698            Location::Remote(_) => Ok(TransportType::Tcp),
699        }
700    }
701}
702
703/// Types of transport available.
704#[derive(Debug, Clone, Copy, PartialEq, Eq)]
705pub enum TransportType {
706    /// In-process channels.
707    InMemory,
708    /// Shared memory (for colocated roles).
709    SharedMemory,
710    /// TCP network transport.
711    Tcp,
712    /// WebSocket transport.
713    WebSocket,
714}
715
716impl TransportType {
717    /// Check if this transport type is local (no network).
718    pub fn is_local(&self) -> bool {
719        matches!(self, TransportType::InMemory | TransportType::SharedMemory)
720    }
721}
722
723#[cfg(all(test, not(target_arch = "wasm32")))]
724mod tests {
725    use super::*;
726
727    #[tokio::test]
728    async fn test_in_memory_transport() {
729        let alice = InMemoryChannelTransport::new(RoleName::from_static("Alice"));
730        let bob = InMemoryChannelTransport::new(RoleName::from_static("Bob"));
731
732        alice.connect(&bob).await;
733
734        // Alice sends to Bob
735        alice
736            .send(&RoleName::from_static("Bob"), b"Hello Bob".to_vec())
737            .await
738            .unwrap();
739
740        // Bob receives from Alice
741        let msg = bob.recv(&RoleName::from_static("Alice")).await.unwrap();
742        assert_eq!(msg, b"Hello Bob".to_vec());
743
744        // Bob sends to Alice
745        bob.send(&RoleName::from_static("Alice"), b"Hello Alice".to_vec())
746            .await
747            .unwrap();
748
749        // Alice receives from Bob
750        let msg = alice.recv(&RoleName::from_static("Bob")).await.unwrap();
751        assert_eq!(msg, b"Hello Alice".to_vec());
752    }
753
754    #[test]
755    fn test_transport_type_for_location() {
756        let topology = Topology::builder()
757            .local_role(RoleName::from_static("Alice"))
758            .remote_role(
759                RoleName::from_static("Bob"),
760                crate::identifiers::Endpoint::new("localhost:8080").unwrap(),
761            )
762            .colocated_role(
763                RoleName::from_static("Carol"),
764                RoleName::from_static("Alice"),
765            )
766            .build();
767
768        assert_eq!(
769            TransportFactory::transport_for_location(
770                &RoleName::from_static("Alice"),
771                &RoleName::from_static("Alice"),
772                &topology
773            )
774            .unwrap(),
775            TransportType::InMemory
776        );
777        assert_eq!(
778            TransportFactory::transport_for_location(
779                &RoleName::from_static("Alice"),
780                &RoleName::from_static("Bob"),
781                &topology
782            )
783            .unwrap(),
784            TransportType::Tcp
785        );
786        assert_eq!(
787            TransportFactory::transport_for_location(
788                &RoleName::from_static("Alice"),
789                &RoleName::from_static("Carol"),
790                &topology
791            )
792            .unwrap(),
793            TransportType::SharedMemory
794        );
795    }
796
797    #[test]
798    fn test_transport_type_is_local() {
799        assert!(TransportType::InMemory.is_local());
800        assert!(TransportType::SharedMemory.is_local());
801        assert!(!TransportType::Tcp.is_local());
802        assert!(!TransportType::WebSocket.is_local());
803    }
804
805    #[tokio::test]
806    async fn test_transport_factory_create_supports_loopback_remote_topologies() {
807        let local_topology = Topology::builder()
808            .local_role(RoleName::from_static("Alice"))
809            .local_role(RoleName::from_static("Bob"))
810            .build();
811        assert!(TransportFactory::create(&local_topology, &RoleName::from_static("Alice")).is_ok());
812
813        let remote_topology = Topology::builder()
814            .remote_role(
815                RoleName::from_static("Alice"),
816                crate::identifiers::Endpoint::new("127.0.0.1:19801").unwrap(),
817            )
818            .remote_role(
819                RoleName::from_static("Bob"),
820                crate::identifiers::Endpoint::new("127.0.0.1:19802").unwrap(),
821            )
822            .build();
823        let alice = TransportFactory::create(&remote_topology, &RoleName::from_static("Alice"))
824            .expect("remote transport for Alice");
825        let bob = TransportFactory::create(&remote_topology, &RoleName::from_static("Bob"))
826            .expect("remote transport for Bob");
827        alice
828            .send(&RoleName::from_static("Bob"), b"hello remote".to_vec())
829            .await
830            .expect("remote send");
831        assert_eq!(
832            bob.recv(&RoleName::from_static("Alice"))
833                .await
834                .expect("remote recv"),
835            b"hello remote".to_vec()
836        );
837    }
838}