Skip to main content

telltale_runtime/topology/
transport.rs

1//! Transport trait and implementations for topology-aware communication.
2//!
3//! This module provides abstractions for different transport mechanisms:
4//! - `InMemoryTransport`: In-process communication using channels
5//! - `TcpTransport`: Network communication over TCP
6
7#[cfg(not(target_arch = "wasm32"))]
8use super::wire;
9use super::{
10    validate_transport_contract_profile, DocumentedTransportContract, Location, Topology,
11    TransportContractProfile, TransportContractTier, TransportOperationalContract,
12    TransportSemanticContract, TransportStartupMode,
13};
14use crate::identifiers::RoleName;
15use crate::mutex_lock;
16#[cfg(not(target_arch = "wasm32"))]
17use crate::util::spawn::spawn;
18use crate::util::sync::{mpsc, Mutex};
19use async_trait::async_trait;
20use cfg_if::cfg_if;
21#[cfg(target_arch = "wasm32")]
22use futures::{SinkExt, StreamExt};
23use std::collections::BTreeMap;
24#[cfg(not(target_arch = "wasm32"))]
25use std::collections::BTreeSet;
26#[cfg(not(target_arch = "wasm32"))]
27use std::net::IpAddr;
28use std::sync::Arc;
29#[cfg(not(target_arch = "wasm32"))]
30use std::sync::{Mutex as StdMutex, OnceLock};
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Instant;
33use thiserror::Error;
34
35#[cfg(not(target_arch = "wasm32"))]
36use tokio::io::AsyncWriteExt;
37#[cfg(not(target_arch = "wasm32"))]
38use tokio::net::{TcpListener, TcpStream};
39#[cfg(not(target_arch = "wasm32"))]
40use tokio::sync::{OwnedSemaphorePermit, Semaphore};
41#[cfg(not(target_arch = "wasm32"))]
42use tokio::time::{sleep, Duration};
43
44#[cfg(not(target_arch = "wasm32"))]
45const TCP_READ_TIMEOUT: Duration = Duration::from_secs(30);
46#[cfg(not(target_arch = "wasm32"))]
47const TCP_WRITE_TIMEOUT: Duration = Duration::from_secs(10);
48#[cfg(not(target_arch = "wasm32"))]
49const TCP_MAX_CONNECTIONS: usize = 1024;
50#[cfg(not(target_arch = "wasm32"))]
51const TCP_MAX_INFLIGHT_PAYLOAD_BYTES: usize = 16 * 1024 * 1024;
52#[cfg(not(target_arch = "wasm32"))]
53const TCP_PER_SOURCE_CONNECTION_LIMIT: usize = 64;
54#[cfg(not(target_arch = "wasm32"))]
55const TCP_PER_SOURCE_RECONNECT_LIMIT: usize = 128;
56#[cfg(not(target_arch = "wasm32"))]
57const TCP_RECONNECT_WINDOW: Duration = Duration::from_secs(10);
58
59/// Errors that can occur during transport operations.
60#[derive(Debug, Error)]
61pub enum TransportError {
62    #[error("connection failed: {0}")]
63    ConnectionFailed(String),
64
65    #[error("send failed: {0}")]
66    SendFailed(String),
67
68    #[error("receive failed: {0}")]
69    ReceiveFailed(String),
70
71    #[error("timeout")]
72    Timeout,
73
74    #[error("channel closed")]
75    ChannelClosed,
76
77    #[error("unknown role: {0}")]
78    UnknownRole(RoleName),
79
80    #[error("duplicate peer connection: {0}")]
81    DuplicatePeer(RoleName),
82
83    #[error("unsupported protocol: {0}")]
84    UnsupportedProtocol(String),
85
86    #[error("transport not ready")]
87    NotReady,
88
89    #[error("IO error: {0}")]
90    IoError(#[from] std::io::Error),
91}
92
93/// Result type for transport operations.
94pub type TransportResult<T> = Result<T, TransportError>;
95
96/// A message that can be sent over a transport.
97pub trait TransportMessage: Send + Sync + 'static {
98    /// Serialize the message to bytes.
99    fn to_bytes(&self) -> Vec<u8>;
100
101    /// Deserialize from bytes.
102    fn from_bytes(bytes: &[u8]) -> Result<Self, String>
103    where
104        Self: Sized;
105}
106
107/// Simple byte message for basic transport.
108#[derive(Debug, Clone)]
109pub struct ByteMessage(pub Vec<u8>);
110
111impl TransportMessage for ByteMessage {
112    fn to_bytes(&self) -> Vec<u8> {
113        self.0.clone()
114    }
115
116    fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
117        Ok(ByteMessage(bytes.to_vec()))
118    }
119}
120
121/// Transport trait for sending and receiving messages between roles.
122#[async_trait]
123pub trait Transport: Send + Sync + 'static {
124    /// Send a message to a specific role.
125    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()>;
126
127    /// Receive a message from a specific role.
128    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>>;
129
130    /// Check if the transport is connected to a role.
131    fn is_connected(&self, role: &RoleName) -> bool;
132
133    /// Close the transport connection.
134    async fn close(&self) -> TransportResult<()>;
135}
136
137/// In-memory transport using channels.
138///
139/// This is the default transport for local testing where all roles
140/// run in the same process.
141pub struct InMemoryChannelTransport {
142    /// Role this transport belongs to.
143    role: RoleName,
144    /// Sender channels to other roles (role -> sender).
145    senders: Arc<Mutex<BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>>>,
146    /// Receiver channels from other roles (role -> receiver).
147    receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
148}
149
150impl InMemoryChannelTransport {
151    /// Create a new in-memory transport for a role.
152    pub fn new(role: RoleName) -> Self {
153        Self {
154            role,
155            senders: Arc::new(Mutex::new(BTreeMap::new())),
156            receivers: Arc::new(Mutex::new(BTreeMap::new())),
157        }
158    }
159
160    /// Connect this transport to another role's transport.
161    pub async fn connect(&self, other: &InMemoryChannelTransport) {
162        let (tx1, rx1) = mpsc::channel(32);
163        let (tx2, rx2) = mpsc::channel(32);
164
165        // Self -> Other
166        mutex_lock!(self.senders).insert(other.role.clone(), tx1);
167        mutex_lock!(other.receivers).insert(self.role.clone(), rx1);
168
169        // Other -> Self
170        mutex_lock!(other.senders).insert(self.role.clone(), tx2);
171        mutex_lock!(self.receivers).insert(other.role.clone(), rx2);
172    }
173
174    /// Get the role name.
175    pub fn role(&self) -> &RoleName {
176        &self.role
177    }
178}
179
180impl DocumentedTransportContract for InMemoryChannelTransport {
181    fn contract_profile() -> TransportContractProfile {
182        TransportContractProfile {
183            transport_name: "InMemoryChannelTransport",
184            tier: TransportContractTier::FirstPartyRuntime,
185            semantics: TransportSemanticContract {
186                role_addressed_routing: true,
187                authenticated_peers: true,
188                per_peer_fifo_delivery: true,
189                fail_closed_unknown_role: true,
190                no_message_synthesis: true,
191                explicit_readiness_errors: false,
192                deterministic_for_regression: true,
193            },
194            operational: TransportOperationalContract {
195                transport_type: TransportType::InMemory,
196                startup_mode: TransportStartupMode::ReadyOnCreate,
197                environment_resolved: false,
198            },
199            notes: vec![
200                "In-process channel transport for first-party local execution.",
201                "Deterministic enough for strict regression suites.",
202            ],
203        }
204    }
205}
206
207#[async_trait]
208impl Transport for InMemoryChannelTransport {
209    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
210        cfg_if! {
211            if #[cfg(target_arch = "wasm32")] {
212                // Clone the sender to release the lock before awaiting.
213                let sender = {
214                    let senders = mutex_lock!(self.senders);
215                    senders
216                        .get(to_role)
217                        .cloned()
218                        .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?
219                };
220
221                let mut sender = sender;
222                sender
223                    .send(message)
224                    .await
225                    .map_err(|_| TransportError::ChannelClosed)
226            } else {
227                let senders = mutex_lock!(self.senders);
228                let sender = senders
229                    .get(to_role)
230                    .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?;
231
232                sender
233                    .send(message)
234                    .await
235                    .map_err(|_| TransportError::ChannelClosed)
236            }
237        }
238    }
239
240    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
241        cfg_if! {
242            if #[cfg(target_arch = "wasm32")] {
243                // For WASM, take the receiver out so the lock is not held across `.await`.
244                let mut receiver = {
245                    let mut receivers = mutex_lock!(self.receivers);
246                    receivers
247                        .remove(from_role)
248                        .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?
249                };
250
251                let result = receiver.next().await;
252
253                {
254                    let mut receivers = mutex_lock!(self.receivers);
255                    receivers.insert(from_role.clone(), receiver);
256                }
257
258                result.ok_or(TransportError::ChannelClosed)
259            } else {
260                let mut receivers = mutex_lock!(self.receivers);
261                let receiver = receivers
262                    .get_mut(from_role)
263                    .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
264                receiver.recv().await.ok_or(TransportError::ChannelClosed)
265            }
266        }
267    }
268
269    fn is_connected(&self, _role: &RoleName) -> bool {
270        // For in-memory, assume always connected after setup
271        // In production, this should check if we have a sender for this role
272        true
273    }
274
275    async fn close(&self) -> TransportResult<()> {
276        mutex_lock!(self.senders).clear();
277        mutex_lock!(self.receivers).clear();
278        Ok(())
279    }
280}
281
282#[cfg(not(target_arch = "wasm32"))]
283enum TcpListenerState {
284    NotStarted,
285    Started,
286    Failed(String),
287}
288
289#[cfg(not(target_arch = "wasm32"))]
290#[derive(Debug, Clone, Copy)]
291struct TcpSourceRateState {
292    window_start: Instant,
293    attempts: usize,
294    live_connections: usize,
295}
296
297#[cfg(not(target_arch = "wasm32"))]
298struct TcpRoleState {
299    role: RoleName,
300    self_endpoint: Option<crate::identifiers::Endpoint>,
301    inbound_senders: BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>,
302    inbound_receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
303    listener_state: Arc<Mutex<TcpListenerState>>,
304    claimed_inbound_roles: Arc<Mutex<BTreeSet<RoleName>>>,
305    active_connections: Arc<Mutex<usize>>,
306    payload_budget: Arc<Semaphore>,
307    source_limits: Arc<Mutex<BTreeMap<IpAddr, TcpSourceRateState>>>,
308}
309
310#[cfg(not(target_arch = "wasm32"))]
311impl TcpRoleState {
312    fn new(
313        role: RoleName,
314        self_endpoint: Option<crate::identifiers::Endpoint>,
315        peer_roles: impl IntoIterator<Item = RoleName>,
316    ) -> Self {
317        let mut inbound_senders = BTreeMap::new();
318        let mut inbound_receivers = BTreeMap::new();
319        for peer in peer_roles {
320            let (tx, rx) = mpsc::channel(32);
321            inbound_senders.insert(peer.clone(), tx);
322            inbound_receivers.insert(peer, rx);
323        }
324        Self {
325            role,
326            self_endpoint,
327            inbound_senders,
328            inbound_receivers: Arc::new(Mutex::new(inbound_receivers)),
329            listener_state: Arc::new(Mutex::new(TcpListenerState::NotStarted)),
330            claimed_inbound_roles: Arc::new(Mutex::new(BTreeSet::new())),
331            active_connections: Arc::new(Mutex::new(0)),
332            payload_budget: Arc::new(Semaphore::new(TCP_MAX_INFLIGHT_PAYLOAD_BYTES)),
333            source_limits: Arc::new(Mutex::new(BTreeMap::new())),
334        }
335    }
336
337    async fn ensure_started(self: &Arc<Self>) -> TransportResult<()> {
338        let mut state = mutex_lock!(self.listener_state);
339        match &*state {
340            TcpListenerState::Started => return Ok(()),
341            TcpListenerState::Failed(message) => {
342                return Err(TransportError::ConnectionFailed(message.clone()));
343            }
344            TcpListenerState::NotStarted => {}
345        }
346
347        let Some(endpoint) = self.self_endpoint.clone() else {
348            *state = TcpListenerState::Started;
349            return Ok(());
350        };
351
352        let listener = TcpListener::bind(endpoint.as_str()).await.map_err(|err| {
353            let message = format!(
354                "failed to bind {} for role {}: {}",
355                endpoint, self.role, err
356            );
357            *state = TcpListenerState::Failed(message.clone());
358            TransportError::ConnectionFailed(message)
359        })?;
360        let role_state = Arc::clone(self);
361        spawn(async move {
362            role_state.accept_loop(listener).await;
363        });
364        *state = TcpListenerState::Started;
365        Ok(())
366    }
367
368    async fn accept_loop(self: Arc<Self>, listener: TcpListener) {
369        loop {
370            let Ok((socket, addr)) = listener.accept().await else {
371                break;
372            };
373            if self.admit_connection(addr.ip()).await.is_err() {
374                continue;
375            }
376            let role_state = Arc::clone(&self);
377            spawn(async move {
378                let _ = role_state.handle_socket(socket).await;
379                role_state.release_connection(addr.ip()).await;
380            });
381        }
382    }
383
384    async fn handle_socket(&self, mut socket: TcpStream) -> TransportResult<()> {
385        wire::read_preamble(&mut socket, TCP_READ_TIMEOUT).await?;
386        let role_buf = wire::read_role_name_bytes(&mut socket, TCP_READ_TIMEOUT).await?;
387        let from_role = String::from_utf8(role_buf).map_err(|err| {
388            TransportError::ReceiveFailed(format!("invalid sender header: {err}"))
389        })?;
390        let sender_role = RoleName::new(from_role.clone()).map_err(|err| {
391            TransportError::ReceiveFailed(format!("invalid sender role `{from_role}`: {err}"))
392        })?;
393        let sender = self
394            .inbound_senders
395            .get(&sender_role)
396            .cloned()
397            .ok_or_else(|| {
398                TransportError::ReceiveFailed(format!(
399                    "sender role `{sender_role}` is not configured for {}",
400                    self.role
401                ))
402            })?;
403        self.claim_inbound_role(&sender_role).await?;
404        let result = async {
405            let payload_len = wire::read_payload_len(&mut socket, TCP_READ_TIMEOUT).await?;
406            let _payload_permit =
407                acquire_tcp_payload_budget(&self.payload_budget, payload_len.as_usize()).await?;
408            let mut payload = vec![0_u8; payload_len.as_usize()];
409            wire::read_exact_timeout(&mut socket, &mut payload, TCP_READ_TIMEOUT).await?;
410            sender
411                .send(payload)
412                .await
413                .map_err(|_| TransportError::ChannelClosed)
414        }
415        .await;
416        self.release_inbound_role(&sender_role).await;
417        result
418    }
419
420    async fn admit_connection(&self, source_ip: IpAddr) -> TransportResult<()> {
421        {
422            let mut active_connections = mutex_lock!(self.active_connections);
423            if *active_connections >= TCP_MAX_CONNECTIONS {
424                return Err(TransportError::ReceiveFailed(format!(
425                    "max TCP connections exceeded: {TCP_MAX_CONNECTIONS}"
426                )));
427            }
428            *active_connections += 1;
429        }
430
431        let mut sources = mutex_lock!(self.source_limits);
432        let now = Instant::now();
433        let state = sources.entry(source_ip).or_insert(TcpSourceRateState {
434            window_start: now,
435            attempts: 0,
436            live_connections: 0,
437        });
438
439        if now.duration_since(state.window_start) > TCP_RECONNECT_WINDOW {
440            state.window_start = now;
441            state.attempts = 0;
442        }
443
444        if state.live_connections >= TCP_PER_SOURCE_CONNECTION_LIMIT {
445            drop(sources);
446            self.release_active_connection().await;
447            return Err(TransportError::ReceiveFailed(format!(
448                "source {source_ip} has too many live TCP connections"
449            )));
450        }
451        if state.attempts >= TCP_PER_SOURCE_RECONNECT_LIMIT {
452            drop(sources);
453            self.release_active_connection().await;
454            return Err(TransportError::ReceiveFailed(format!(
455                "source {source_ip} exceeded TCP reconnect limit"
456            )));
457        }
458
459        state.live_connections += 1;
460        state.attempts += 1;
461        Ok(())
462    }
463
464    async fn release_active_connection(&self) {
465        let mut active_connections = mutex_lock!(self.active_connections);
466        *active_connections = active_connections.saturating_sub(1);
467    }
468
469    async fn release_connection(&self, source_ip: IpAddr) {
470        self.release_active_connection().await;
471        let mut sources = mutex_lock!(self.source_limits);
472        if let Some(state) = sources.get_mut(&source_ip) {
473            state.live_connections = state.live_connections.saturating_sub(1);
474        }
475    }
476
477    async fn claim_inbound_role(&self, sender_role: &RoleName) -> TransportResult<()> {
478        let mut claimed = mutex_lock!(self.claimed_inbound_roles);
479        if !claimed.insert(sender_role.clone()) {
480            return Err(TransportError::DuplicatePeer(sender_role.clone()));
481        }
482        Ok(())
483    }
484
485    async fn release_inbound_role(&self, sender_role: &RoleName) {
486        mutex_lock!(self.claimed_inbound_roles).remove(sender_role);
487    }
488
489    async fn recv_from(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
490        let mut receivers = mutex_lock!(self.inbound_receivers);
491        let receiver = receivers
492            .get_mut(from_role)
493            .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
494        receiver.recv().await.ok_or(TransportError::ChannelClosed)
495    }
496}
497
498#[cfg(not(target_arch = "wasm32"))]
499async fn acquire_tcp_payload_budget(
500    payload_budget: &Arc<Semaphore>,
501    bytes: usize,
502) -> TransportResult<OwnedSemaphorePermit> {
503    let permits =
504        u32::try_from(bytes).map_err(|err| TransportError::ReceiveFailed(err.to_string()))?;
505    Arc::clone(payload_budget)
506        .try_acquire_many_owned(permits)
507        .map_err(|_| {
508            TransportError::ReceiveFailed(
509                "global in-flight TCP payload byte cap reached".to_string(),
510            )
511        })
512}
513
514#[cfg(not(target_arch = "wasm32"))]
515type SharedTcpRegistry = BTreeMap<String, Arc<TcpRoleState>>;
516
517#[cfg(not(target_arch = "wasm32"))]
518fn shared_tcp_registry() -> &'static StdMutex<SharedTcpRegistry> {
519    static REGISTRY: OnceLock<StdMutex<SharedTcpRegistry>> = OnceLock::new();
520    REGISTRY.get_or_init(|| StdMutex::new(BTreeMap::new()))
521}
522
523#[cfg(not(target_arch = "wasm32"))]
524fn tcp_role_registry_key(topology_signature: &str, role: &RoleName) -> String {
525    format!("{topology_signature}|role:{role}")
526}
527
528#[cfg(not(target_arch = "wasm32"))]
529fn shared_tcp_role_state(
530    topology: &Topology,
531    topology_signature: &str,
532    role: &RoleName,
533) -> TransportResult<Arc<TcpRoleState>> {
534    let key = tcp_role_registry_key(topology_signature, role);
535    let mut registry = shared_tcp_registry()
536        .lock()
537        .unwrap_or_else(|poisoned| poisoned.into_inner());
538    if let Some(existing) = registry.get(&key) {
539        return Ok(Arc::clone(existing));
540    }
541
542    let self_endpoint = match topology.get_location(role) {
543        Ok(Location::Remote(endpoint)) => Some(endpoint),
544        Ok(Location::Local | Location::Colocated(_)) => None,
545        Err(_) => return Err(TransportError::UnknownRole(role.clone())),
546    };
547    let peer_roles = topology
548        .locations
549        .keys()
550        .filter(|peer| *peer != role)
551        .cloned();
552    let state = Arc::new(TcpRoleState::new(role.clone(), self_endpoint, peer_roles));
553    registry.insert(key, Arc::clone(&state));
554    Ok(state)
555}
556
557#[cfg(not(target_arch = "wasm32"))]
558async fn connect_with_retry(endpoint: &crate::identifiers::Endpoint) -> TransportResult<TcpStream> {
559    let mut attempts = 0_u8;
560    loop {
561        match TcpStream::connect(endpoint.as_str()).await {
562            Ok(stream) => return Ok(stream),
563            Err(err) if attempts < 10 => {
564                attempts = attempts.saturating_add(1);
565                if err.kind() != std::io::ErrorKind::ConnectionRefused {
566                    return Err(TransportError::ConnectionFailed(err.to_string()));
567                }
568                sleep(Duration::from_millis(10)).await;
569            }
570            Err(err) => return Err(TransportError::ConnectionFailed(err.to_string())),
571        }
572    }
573}
574
575#[cfg(not(target_arch = "wasm32"))]
576struct TcpPeerTransport {
577    state: Arc<TcpRoleState>,
578    peer_role: RoleName,
579    peer_endpoint: Option<crate::identifiers::Endpoint>,
580}
581
582#[cfg(not(target_arch = "wasm32"))]
583impl DocumentedTransportContract for TcpPeerTransport {
584    fn contract_profile() -> TransportContractProfile {
585        TransportContractProfile {
586            transport_name: "TcpPeerTransport",
587            tier: TransportContractTier::FirstPartyRuntime,
588            semantics: TransportSemanticContract {
589                role_addressed_routing: true,
590                authenticated_peers: false,
591                per_peer_fifo_delivery: true,
592                fail_closed_unknown_role: true,
593                no_message_synthesis: true,
594                explicit_readiness_errors: true,
595                deterministic_for_regression: false,
596            },
597            operational: TransportOperationalContract {
598                transport_type: TransportType::Tcp,
599                startup_mode: TransportStartupMode::BackgroundWarmup,
600                environment_resolved: false,
601            },
602            notes: vec![
603                "Single-peer runtime TCP transport used for loopback remote topology execution.",
604                "trusted-network only: peers are not cryptographically authenticated.",
605            ],
606        }
607    }
608}
609
610#[cfg(not(target_arch = "wasm32"))]
611#[async_trait]
612impl Transport for TcpPeerTransport {
613    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
614        if to_role != &self.peer_role {
615            return Err(TransportError::UnknownRole(to_role.clone()));
616        }
617        let endpoint = self.peer_endpoint.clone().ok_or_else(|| {
618            TransportError::ConnectionFailed(format!(
619                "role {} has no remote endpoint configured for peer {}",
620                self.state.role, self.peer_role
621            ))
622        })?;
623        let mut stream = connect_with_retry(&endpoint).await?;
624        let role_bytes = self.state.role.to_string().into_bytes();
625        let message_len = telltale_types::MessageLenBytes::try_from(message.len())
626            .map_err(|err| TransportError::SendFailed(err.to_string()))?;
627        wire::write_preamble(&mut stream, TCP_WRITE_TIMEOUT).await?;
628        wire::write_role_name(&mut stream, &role_bytes, TCP_WRITE_TIMEOUT).await?;
629        wire::write_payload_len(&mut stream, message_len, TCP_WRITE_TIMEOUT).await?;
630        wire::write_all_timeout(&mut stream, &message, TCP_WRITE_TIMEOUT).await?;
631        stream.shutdown().await?;
632        Ok(())
633    }
634
635    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
636        if from_role != &self.peer_role {
637            return Err(TransportError::UnknownRole(from_role.clone()));
638        }
639        self.state.recv_from(from_role).await
640    }
641
642    fn is_connected(&self, role: &RoleName) -> bool {
643        role == &self.peer_role
644    }
645
646    async fn close(&self) -> TransportResult<()> {
647        Ok(())
648    }
649}
650
651#[cfg(not(target_arch = "wasm32"))]
652struct TcpRoleTransport {
653    state: Arc<TcpRoleState>,
654    peer_endpoints: BTreeMap<RoleName, Option<crate::identifiers::Endpoint>>,
655}
656
657#[cfg(not(target_arch = "wasm32"))]
658impl DocumentedTransportContract for TcpRoleTransport {
659    fn contract_profile() -> TransportContractProfile {
660        TransportContractProfile {
661            transport_name: "TcpRoleTransport",
662            tier: TransportContractTier::FirstPartyRuntime,
663            semantics: TransportSemanticContract {
664                role_addressed_routing: true,
665                authenticated_peers: false,
666                per_peer_fifo_delivery: true,
667                fail_closed_unknown_role: true,
668                no_message_synthesis: true,
669                explicit_readiness_errors: true,
670                deterministic_for_regression: false,
671            },
672            operational: TransportOperationalContract {
673                transport_type: TransportType::Tcp,
674                startup_mode: TransportStartupMode::BackgroundWarmup,
675                environment_resolved: false,
676            },
677            notes: vec![
678                "Role-addressed runtime TCP transport used by the first-party topology helper.",
679                "trusted-network only: peers are not cryptographically authenticated.",
680            ],
681        }
682    }
683}
684
685#[cfg(not(target_arch = "wasm32"))]
686#[async_trait]
687impl Transport for TcpRoleTransport {
688    async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
689        self.state.ensure_started().await?;
690        let endpoint = self
691            .peer_endpoints
692            .get(to_role)
693            .cloned()
694            .flatten()
695            .ok_or_else(|| {
696                TransportError::ConnectionFailed(format!(
697                    "role {} has no remote endpoint configured for peer {}",
698                    self.state.role, to_role
699                ))
700            })?;
701        let mut stream = connect_with_retry(&endpoint).await?;
702        let role_bytes = self.state.role.to_string().into_bytes();
703        let message_len = telltale_types::MessageLenBytes::try_from(message.len())
704            .map_err(|err| TransportError::SendFailed(err.to_string()))?;
705        wire::write_preamble(&mut stream, TCP_WRITE_TIMEOUT).await?;
706        wire::write_role_name(&mut stream, &role_bytes, TCP_WRITE_TIMEOUT).await?;
707        wire::write_payload_len(&mut stream, message_len, TCP_WRITE_TIMEOUT).await?;
708        wire::write_all_timeout(&mut stream, &message, TCP_WRITE_TIMEOUT).await?;
709        stream.shutdown().await?;
710        Ok(())
711    }
712
713    async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
714        self.state.ensure_started().await?;
715        self.state.recv_from(from_role).await
716    }
717
718    fn is_connected(&self, role: &RoleName) -> bool {
719        self.peer_endpoints.contains_key(role)
720    }
721
722    async fn close(&self) -> TransportResult<()> {
723        Ok(())
724    }
725}
726
727#[cfg(not(target_arch = "wasm32"))]
728pub(crate) async fn create_peer_transport(
729    topology: &Topology,
730    topology_signature: &str,
731    role: &RoleName,
732    peer: &RoleName,
733) -> TransportResult<Box<dyn Transport>> {
734    topology
735        .region_for_role(role)
736        .map_err(TransportError::ConnectionFailed)?;
737    topology
738        .region_for_role(peer)
739        .map_err(TransportError::ConnectionFailed)?;
740    let state = shared_tcp_role_state(topology, topology_signature, role)?;
741    state.ensure_started().await?;
742    let peer_endpoint = match topology.get_location(peer) {
743        Ok(Location::Remote(endpoint)) => Some(endpoint),
744        Ok(Location::Local | Location::Colocated(_)) => None,
745        Err(_) => return Err(TransportError::UnknownRole(peer.clone())),
746    };
747    Ok(Box::new(TcpPeerTransport {
748        state,
749        peer_role: peer.clone(),
750        peer_endpoint,
751    }))
752}
753
754/// Factory for creating transports based on topology.
755pub struct TransportFactory;
756
757impl TransportFactory {
758    fn validated_first_party_profile(
759        profile: TransportContractProfile,
760    ) -> TransportResult<TransportContractProfile> {
761        validate_transport_contract_profile(&profile)
762            .map_err(|err| TransportError::ConnectionFailed(err.to_string()))?;
763        Ok(profile)
764    }
765
766    /// Return the documented first-party transport contract selected for a role/topology pair.
767    pub fn contract_profile_for_topology(
768        topology: &Topology,
769        role: &RoleName,
770    ) -> TransportResult<TransportContractProfile> {
771        let has_remote_participants = topology
772            .locations
773            .values()
774            .any(|location| matches!(location, Location::Remote(_)));
775        if has_remote_participants {
776            #[cfg(target_arch = "wasm32")]
777            {
778                let _ = (topology, role);
779                Err(TransportError::NotReady)
780            }
781            #[cfg(not(target_arch = "wasm32"))]
782            {
783                topology
784                    .region_for_role(role)
785                    .map_err(TransportError::ConnectionFailed)?;
786                Self::validated_first_party_profile(TcpRoleTransport::contract_profile())
787            }
788        } else {
789            Self::validated_first_party_profile(InMemoryChannelTransport::contract_profile())
790        }
791    }
792
793    /// Create a transport for a role based on the topology.
794    pub fn create(topology: &Topology, role: &RoleName) -> TransportResult<Box<dyn Transport>> {
795        let _profile = Self::contract_profile_for_topology(topology, role)?;
796        let has_remote_participants = topology
797            .locations
798            .values()
799            .any(|location| matches!(location, Location::Remote(_)));
800        if has_remote_participants {
801            #[cfg(target_arch = "wasm32")]
802            {
803                let _ = role;
804                Err(TransportError::NotReady)
805            }
806            #[cfg(not(target_arch = "wasm32"))]
807            {
808                topology
809                    .region_for_role(role)
810                    .map_err(TransportError::ConnectionFailed)?;
811                let state = shared_tcp_role_state(topology, "transport_factory", role)?;
812                let warm_state = Arc::clone(&state);
813                spawn(async move {
814                    let _ = warm_state.ensure_started().await;
815                });
816                let peer_endpoints = topology
817                    .locations
818                    .iter()
819                    .filter(|(peer, _)| *peer != role)
820                    .map(|(peer, location)| {
821                        let _ = topology
822                            .region_for_role(peer)
823                            .map_err(TransportError::ConnectionFailed)?;
824                        let endpoint = match location {
825                            Location::Remote(endpoint) => Some(endpoint.clone()),
826                            Location::Local | Location::Colocated(_) => None,
827                        };
828                        Ok((peer.clone(), endpoint))
829                    })
830                    .collect::<TransportResult<BTreeMap<_, _>>>()?;
831                Ok(Box::new(TcpRoleTransport {
832                    state,
833                    peer_endpoints,
834                }))
835            }
836        } else {
837            Ok(Box::new(InMemoryChannelTransport::new(role.clone())))
838        }
839    }
840
841    /// Select transport type based on location.
842    pub fn transport_for_location(
843        _from_role: &RoleName,
844        to_role: &RoleName,
845        topology: &Topology,
846    ) -> Result<TransportType, super::TopologyError> {
847        match topology.get_location(to_role)? {
848            Location::Local => Ok(TransportType::InMemory),
849            Location::Colocated(_) => Ok(TransportType::SharedMemory),
850            Location::Remote(_) => Ok(TransportType::Tcp),
851        }
852    }
853}
854
855/// Types of transport available.
856#[derive(Debug, Clone, Copy, PartialEq, Eq)]
857pub enum TransportType {
858    /// In-process channels.
859    InMemory,
860    /// Shared memory (for colocated roles).
861    SharedMemory,
862    /// TCP network transport.
863    Tcp,
864    /// WebSocket transport.
865    WebSocket,
866}
867
868impl TransportType {
869    /// Check if this transport type is local (no network).
870    pub fn is_local(&self) -> bool {
871        matches!(self, TransportType::InMemory | TransportType::SharedMemory)
872    }
873}
874
875#[cfg(all(test, not(target_arch = "wasm32")))]
876mod tests {
877    use super::*;
878    use std::net::SocketAddr;
879
880    #[tokio::test]
881    async fn test_in_memory_transport() {
882        let alice = InMemoryChannelTransport::new(RoleName::from_static("Alice"));
883        let bob = InMemoryChannelTransport::new(RoleName::from_static("Bob"));
884
885        alice.connect(&bob).await;
886
887        // Alice sends to Bob
888        alice
889            .send(&RoleName::from_static("Bob"), b"Hello Bob".to_vec())
890            .await
891            .unwrap();
892
893        // Bob receives from Alice
894        let msg = bob.recv(&RoleName::from_static("Alice")).await.unwrap();
895        assert_eq!(msg, b"Hello Bob".to_vec());
896
897        // Bob sends to Alice
898        bob.send(&RoleName::from_static("Alice"), b"Hello Alice".to_vec())
899            .await
900            .unwrap();
901
902        // Alice receives from Bob
903        let msg = alice.recv(&RoleName::from_static("Bob")).await.unwrap();
904        assert_eq!(msg, b"Hello Alice".to_vec());
905    }
906
907    #[test]
908    fn test_transport_type_for_location() {
909        let topology = Topology::builder()
910            .local_role(RoleName::from_static("Alice"))
911            .remote_role(
912                RoleName::from_static("Bob"),
913                crate::identifiers::Endpoint::new("localhost:8080").unwrap(),
914            )
915            .colocated_role(
916                RoleName::from_static("Carol"),
917                RoleName::from_static("Alice"),
918            )
919            .build();
920
921        assert_eq!(
922            TransportFactory::transport_for_location(
923                &RoleName::from_static("Alice"),
924                &RoleName::from_static("Alice"),
925                &topology
926            )
927            .unwrap(),
928            TransportType::InMemory
929        );
930        assert_eq!(
931            TransportFactory::transport_for_location(
932                &RoleName::from_static("Alice"),
933                &RoleName::from_static("Bob"),
934                &topology
935            )
936            .unwrap(),
937            TransportType::Tcp
938        );
939        assert_eq!(
940            TransportFactory::transport_for_location(
941                &RoleName::from_static("Alice"),
942                &RoleName::from_static("Carol"),
943                &topology
944            )
945            .unwrap(),
946            TransportType::SharedMemory
947        );
948    }
949
950    #[test]
951    fn test_transport_type_is_local() {
952        assert!(TransportType::InMemory.is_local());
953        assert!(TransportType::SharedMemory.is_local());
954        assert!(!TransportType::Tcp.is_local());
955        assert!(!TransportType::WebSocket.is_local());
956    }
957
958    #[tokio::test]
959    async fn test_transport_factory_create_supports_loopback_remote_topologies() {
960        let local_topology = Topology::builder()
961            .local_role(RoleName::from_static("Alice"))
962            .local_role(RoleName::from_static("Bob"))
963            .build();
964        assert!(TransportFactory::create(&local_topology, &RoleName::from_static("Alice")).is_ok());
965
966        let remote_topology = Topology::builder()
967            .remote_role(
968                RoleName::from_static("Alice"),
969                crate::identifiers::Endpoint::new("127.0.0.1:19801").unwrap(),
970            )
971            .remote_role(
972                RoleName::from_static("Bob"),
973                crate::identifiers::Endpoint::new("127.0.0.1:19802").unwrap(),
974            )
975            .build();
976        let alice = TransportFactory::create(&remote_topology, &RoleName::from_static("Alice"))
977            .expect("remote transport for Alice");
978        let bob = TransportFactory::create(&remote_topology, &RoleName::from_static("Bob"))
979            .expect("remote transport for Bob");
980        alice
981            .send(&RoleName::from_static("Bob"), b"hello remote".to_vec())
982            .await
983            .expect("remote send");
984        assert_eq!(
985            bob.recv(&RoleName::from_static("Alice"))
986                .await
987                .expect("remote recv"),
988            b"hello remote".to_vec()
989        );
990    }
991
992    async fn write_runtime_role_claim(addr: SocketAddr, role: &str) -> TcpStream {
993        let mut stream = TcpStream::connect(addr).await.expect("connect test client");
994        wire::write_preamble(&mut stream, TCP_WRITE_TIMEOUT)
995            .await
996            .expect("write wire preamble");
997        wire::write_role_name(&mut stream, role.as_bytes(), TCP_WRITE_TIMEOUT)
998            .await
999            .expect("write role name");
1000        stream
1001    }
1002
1003    #[tokio::test]
1004    async fn runtime_tcp_rejects_duplicate_live_role_claim() {
1005        let state = Arc::new(TcpRoleState::new(
1006            RoleName::from_static("Alice"),
1007            None,
1008            [RoleName::from_static("Bob")],
1009        ));
1010        let bob = RoleName::from_static("Bob");
1011
1012        let first_listener = TcpListener::bind("127.0.0.1:0")
1013            .await
1014            .expect("bind first listener");
1015        let first_addr = first_listener.local_addr().expect("first listener address");
1016        let first_state = Arc::clone(&state);
1017        let first_task = tokio::spawn(async move {
1018            let (socket, _) = first_listener.accept().await.expect("accept first client");
1019            first_state.handle_socket(socket).await
1020        });
1021        let first_client = write_runtime_role_claim(first_addr, "Bob").await;
1022
1023        tokio::time::timeout(Duration::from_secs(1), async {
1024            loop {
1025                if mutex_lock!(state.claimed_inbound_roles).contains(&bob) {
1026                    break;
1027                }
1028                sleep(Duration::from_millis(10)).await;
1029            }
1030        })
1031        .await
1032        .expect("first runtime role claim should become active");
1033
1034        let second_listener = TcpListener::bind("127.0.0.1:0")
1035            .await
1036            .expect("bind second listener");
1037        let second_addr = second_listener
1038            .local_addr()
1039            .expect("second listener address");
1040        let second_state = Arc::clone(&state);
1041        let second_task = tokio::spawn(async move {
1042            let (socket, _) = second_listener
1043                .accept()
1044                .await
1045                .expect("accept second client");
1046            second_state.handle_socket(socket).await
1047        });
1048        let _second_client = write_runtime_role_claim(second_addr, "Bob").await;
1049
1050        let err = tokio::time::timeout(Duration::from_secs(1), second_task)
1051            .await
1052            .expect("duplicate runtime claim should finish promptly")
1053            .expect("duplicate runtime handler should not panic")
1054            .expect_err("duplicate runtime role claim must fail");
1055        assert!(matches!(err, TransportError::DuplicatePeer(role) if role == bob));
1056
1057        drop(first_client);
1058        let _ = tokio::time::timeout(Duration::from_secs(1), first_task)
1059            .await
1060            .expect("first runtime connection should close promptly")
1061            .expect("first runtime handler should not panic");
1062    }
1063
1064    #[tokio::test]
1065    async fn runtime_tcp_rejects_unknown_role_claim() {
1066        let state = Arc::new(TcpRoleState::new(
1067            RoleName::from_static("Alice"),
1068            None,
1069            [RoleName::from_static("Bob")],
1070        ));
1071        let listener = TcpListener::bind("127.0.0.1:0")
1072            .await
1073            .expect("bind test listener");
1074        let addr = listener.local_addr().expect("test listener address");
1075        let accept = tokio::spawn(async move {
1076            let (socket, _) = listener.accept().await.expect("accept client");
1077            state.handle_socket(socket).await
1078        });
1079
1080        let _client = write_runtime_role_claim(addr, "Mallory").await;
1081        let err = tokio::time::timeout(Duration::from_secs(1), accept)
1082            .await
1083            .expect("unknown role claim should finish promptly")
1084            .expect("unknown role handler should not panic")
1085            .expect_err("unknown role claim must fail closed");
1086        assert!(matches!(err, TransportError::ReceiveFailed(_)));
1087    }
1088}