1#[cfg(not(target_arch = "wasm32"))]
8use super::wire;
9use super::{
10 validate_transport_contract_profile, DocumentedTransportContract, Location, Topology,
11 TransportContractProfile, TransportContractTier, TransportOperationalContract,
12 TransportSemanticContract, TransportStartupMode,
13};
14use crate::identifiers::RoleName;
15use crate::mutex_lock;
16#[cfg(not(target_arch = "wasm32"))]
17use crate::util::spawn::spawn;
18use crate::util::sync::{mpsc, Mutex};
19use async_trait::async_trait;
20use cfg_if::cfg_if;
21#[cfg(target_arch = "wasm32")]
22use futures::{SinkExt, StreamExt};
23use std::collections::BTreeMap;
24#[cfg(not(target_arch = "wasm32"))]
25use std::collections::BTreeSet;
26#[cfg(not(target_arch = "wasm32"))]
27use std::net::IpAddr;
28use std::sync::Arc;
29#[cfg(not(target_arch = "wasm32"))]
30use std::sync::{Mutex as StdMutex, OnceLock};
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Instant;
33use thiserror::Error;
34
35#[cfg(not(target_arch = "wasm32"))]
36use tokio::io::AsyncWriteExt;
37#[cfg(not(target_arch = "wasm32"))]
38use tokio::net::{TcpListener, TcpStream};
39#[cfg(not(target_arch = "wasm32"))]
40use tokio::sync::{OwnedSemaphorePermit, Semaphore};
41#[cfg(not(target_arch = "wasm32"))]
42use tokio::time::{sleep, Duration};
43
44#[cfg(not(target_arch = "wasm32"))]
45const TCP_READ_TIMEOUT: Duration = Duration::from_secs(30);
46#[cfg(not(target_arch = "wasm32"))]
47const TCP_WRITE_TIMEOUT: Duration = Duration::from_secs(10);
48#[cfg(not(target_arch = "wasm32"))]
49const TCP_MAX_CONNECTIONS: usize = 1024;
50#[cfg(not(target_arch = "wasm32"))]
51const TCP_MAX_INFLIGHT_PAYLOAD_BYTES: usize = 16 * 1024 * 1024;
52#[cfg(not(target_arch = "wasm32"))]
53const TCP_PER_SOURCE_CONNECTION_LIMIT: usize = 64;
54#[cfg(not(target_arch = "wasm32"))]
55const TCP_PER_SOURCE_RECONNECT_LIMIT: usize = 128;
56#[cfg(not(target_arch = "wasm32"))]
57const TCP_RECONNECT_WINDOW: Duration = Duration::from_secs(10);
58
59#[derive(Debug, Error)]
61pub enum TransportError {
62 #[error("connection failed: {0}")]
63 ConnectionFailed(String),
64
65 #[error("send failed: {0}")]
66 SendFailed(String),
67
68 #[error("receive failed: {0}")]
69 ReceiveFailed(String),
70
71 #[error("timeout")]
72 Timeout,
73
74 #[error("channel closed")]
75 ChannelClosed,
76
77 #[error("unknown role: {0}")]
78 UnknownRole(RoleName),
79
80 #[error("duplicate peer connection: {0}")]
81 DuplicatePeer(RoleName),
82
83 #[error("unsupported protocol: {0}")]
84 UnsupportedProtocol(String),
85
86 #[error("transport not ready")]
87 NotReady,
88
89 #[error("IO error: {0}")]
90 IoError(#[from] std::io::Error),
91}
92
93pub type TransportResult<T> = Result<T, TransportError>;
95
96pub trait TransportMessage: Send + Sync + 'static {
98 fn to_bytes(&self) -> Vec<u8>;
100
101 fn from_bytes(bytes: &[u8]) -> Result<Self, String>
103 where
104 Self: Sized;
105}
106
107#[derive(Debug, Clone)]
109pub struct ByteMessage(pub Vec<u8>);
110
111impl TransportMessage for ByteMessage {
112 fn to_bytes(&self) -> Vec<u8> {
113 self.0.clone()
114 }
115
116 fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
117 Ok(ByteMessage(bytes.to_vec()))
118 }
119}
120
121#[async_trait]
123pub trait Transport: Send + Sync + 'static {
124 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()>;
126
127 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>>;
129
130 fn is_connected(&self, role: &RoleName) -> bool;
132
133 async fn close(&self) -> TransportResult<()>;
135}
136
137pub struct InMemoryChannelTransport {
142 role: RoleName,
144 senders: Arc<Mutex<BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>>>,
146 receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
148}
149
150impl InMemoryChannelTransport {
151 pub fn new(role: RoleName) -> Self {
153 Self {
154 role,
155 senders: Arc::new(Mutex::new(BTreeMap::new())),
156 receivers: Arc::new(Mutex::new(BTreeMap::new())),
157 }
158 }
159
160 pub async fn connect(&self, other: &InMemoryChannelTransport) {
162 let (tx1, rx1) = mpsc::channel(32);
163 let (tx2, rx2) = mpsc::channel(32);
164
165 mutex_lock!(self.senders).insert(other.role.clone(), tx1);
167 mutex_lock!(other.receivers).insert(self.role.clone(), rx1);
168
169 mutex_lock!(other.senders).insert(self.role.clone(), tx2);
171 mutex_lock!(self.receivers).insert(other.role.clone(), rx2);
172 }
173
174 pub fn role(&self) -> &RoleName {
176 &self.role
177 }
178}
179
180impl DocumentedTransportContract for InMemoryChannelTransport {
181 fn contract_profile() -> TransportContractProfile {
182 TransportContractProfile {
183 transport_name: "InMemoryChannelTransport",
184 tier: TransportContractTier::FirstPartyRuntime,
185 semantics: TransportSemanticContract {
186 role_addressed_routing: true,
187 authenticated_peers: true,
188 per_peer_fifo_delivery: true,
189 fail_closed_unknown_role: true,
190 no_message_synthesis: true,
191 explicit_readiness_errors: false,
192 deterministic_for_regression: true,
193 },
194 operational: TransportOperationalContract {
195 transport_type: TransportType::InMemory,
196 startup_mode: TransportStartupMode::ReadyOnCreate,
197 environment_resolved: false,
198 },
199 notes: vec![
200 "In-process channel transport for first-party local execution.",
201 "Deterministic enough for strict regression suites.",
202 ],
203 }
204 }
205}
206
207#[async_trait]
208impl Transport for InMemoryChannelTransport {
209 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
210 cfg_if! {
211 if #[cfg(target_arch = "wasm32")] {
212 let sender = {
214 let senders = mutex_lock!(self.senders);
215 senders
216 .get(to_role)
217 .cloned()
218 .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?
219 };
220
221 let mut sender = sender;
222 sender
223 .send(message)
224 .await
225 .map_err(|_| TransportError::ChannelClosed)
226 } else {
227 let senders = mutex_lock!(self.senders);
228 let sender = senders
229 .get(to_role)
230 .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?;
231
232 sender
233 .send(message)
234 .await
235 .map_err(|_| TransportError::ChannelClosed)
236 }
237 }
238 }
239
240 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
241 cfg_if! {
242 if #[cfg(target_arch = "wasm32")] {
243 let mut receiver = {
245 let mut receivers = mutex_lock!(self.receivers);
246 receivers
247 .remove(from_role)
248 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?
249 };
250
251 let result = receiver.next().await;
252
253 {
254 let mut receivers = mutex_lock!(self.receivers);
255 receivers.insert(from_role.clone(), receiver);
256 }
257
258 result.ok_or(TransportError::ChannelClosed)
259 } else {
260 let mut receivers = mutex_lock!(self.receivers);
261 let receiver = receivers
262 .get_mut(from_role)
263 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
264 receiver.recv().await.ok_or(TransportError::ChannelClosed)
265 }
266 }
267 }
268
269 fn is_connected(&self, _role: &RoleName) -> bool {
270 true
273 }
274
275 async fn close(&self) -> TransportResult<()> {
276 mutex_lock!(self.senders).clear();
277 mutex_lock!(self.receivers).clear();
278 Ok(())
279 }
280}
281
282#[cfg(not(target_arch = "wasm32"))]
283enum TcpListenerState {
284 NotStarted,
285 Started,
286 Failed(String),
287}
288
289#[cfg(not(target_arch = "wasm32"))]
290#[derive(Debug, Clone, Copy)]
291struct TcpSourceRateState {
292 window_start: Instant,
293 attempts: usize,
294 live_connections: usize,
295}
296
297#[cfg(not(target_arch = "wasm32"))]
298struct TcpRoleState {
299 role: RoleName,
300 self_endpoint: Option<crate::identifiers::Endpoint>,
301 inbound_senders: BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>,
302 inbound_receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
303 listener_state: Arc<Mutex<TcpListenerState>>,
304 claimed_inbound_roles: Arc<Mutex<BTreeSet<RoleName>>>,
305 active_connections: Arc<Mutex<usize>>,
306 payload_budget: Arc<Semaphore>,
307 source_limits: Arc<Mutex<BTreeMap<IpAddr, TcpSourceRateState>>>,
308}
309
310#[cfg(not(target_arch = "wasm32"))]
311impl TcpRoleState {
312 fn new(
313 role: RoleName,
314 self_endpoint: Option<crate::identifiers::Endpoint>,
315 peer_roles: impl IntoIterator<Item = RoleName>,
316 ) -> Self {
317 let mut inbound_senders = BTreeMap::new();
318 let mut inbound_receivers = BTreeMap::new();
319 for peer in peer_roles {
320 let (tx, rx) = mpsc::channel(32);
321 inbound_senders.insert(peer.clone(), tx);
322 inbound_receivers.insert(peer, rx);
323 }
324 Self {
325 role,
326 self_endpoint,
327 inbound_senders,
328 inbound_receivers: Arc::new(Mutex::new(inbound_receivers)),
329 listener_state: Arc::new(Mutex::new(TcpListenerState::NotStarted)),
330 claimed_inbound_roles: Arc::new(Mutex::new(BTreeSet::new())),
331 active_connections: Arc::new(Mutex::new(0)),
332 payload_budget: Arc::new(Semaphore::new(TCP_MAX_INFLIGHT_PAYLOAD_BYTES)),
333 source_limits: Arc::new(Mutex::new(BTreeMap::new())),
334 }
335 }
336
337 async fn ensure_started(self: &Arc<Self>) -> TransportResult<()> {
338 let mut state = mutex_lock!(self.listener_state);
339 match &*state {
340 TcpListenerState::Started => return Ok(()),
341 TcpListenerState::Failed(message) => {
342 return Err(TransportError::ConnectionFailed(message.clone()));
343 }
344 TcpListenerState::NotStarted => {}
345 }
346
347 let Some(endpoint) = self.self_endpoint.clone() else {
348 *state = TcpListenerState::Started;
349 return Ok(());
350 };
351
352 let listener = TcpListener::bind(endpoint.as_str()).await.map_err(|err| {
353 let message = format!(
354 "failed to bind {} for role {}: {}",
355 endpoint, self.role, err
356 );
357 *state = TcpListenerState::Failed(message.clone());
358 TransportError::ConnectionFailed(message)
359 })?;
360 let role_state = Arc::clone(self);
361 spawn(async move {
362 role_state.accept_loop(listener).await;
363 });
364 *state = TcpListenerState::Started;
365 Ok(())
366 }
367
368 async fn accept_loop(self: Arc<Self>, listener: TcpListener) {
369 loop {
370 let Ok((socket, addr)) = listener.accept().await else {
371 break;
372 };
373 if self.admit_connection(addr.ip()).await.is_err() {
374 continue;
375 }
376 let role_state = Arc::clone(&self);
377 spawn(async move {
378 let _ = role_state.handle_socket(socket).await;
379 role_state.release_connection(addr.ip()).await;
380 });
381 }
382 }
383
384 async fn handle_socket(&self, mut socket: TcpStream) -> TransportResult<()> {
385 wire::read_preamble(&mut socket, TCP_READ_TIMEOUT).await?;
386 let role_buf = wire::read_role_name_bytes(&mut socket, TCP_READ_TIMEOUT).await?;
387 let from_role = String::from_utf8(role_buf).map_err(|err| {
388 TransportError::ReceiveFailed(format!("invalid sender header: {err}"))
389 })?;
390 let sender_role = RoleName::new(from_role.clone()).map_err(|err| {
391 TransportError::ReceiveFailed(format!("invalid sender role `{from_role}`: {err}"))
392 })?;
393 let sender = self
394 .inbound_senders
395 .get(&sender_role)
396 .cloned()
397 .ok_or_else(|| {
398 TransportError::ReceiveFailed(format!(
399 "sender role `{sender_role}` is not configured for {}",
400 self.role
401 ))
402 })?;
403 self.claim_inbound_role(&sender_role).await?;
404 let result = async {
405 let payload_len = wire::read_payload_len(&mut socket, TCP_READ_TIMEOUT).await?;
406 let _payload_permit =
407 acquire_tcp_payload_budget(&self.payload_budget, payload_len.as_usize()).await?;
408 let mut payload = vec![0_u8; payload_len.as_usize()];
409 wire::read_exact_timeout(&mut socket, &mut payload, TCP_READ_TIMEOUT).await?;
410 sender
411 .send(payload)
412 .await
413 .map_err(|_| TransportError::ChannelClosed)
414 }
415 .await;
416 self.release_inbound_role(&sender_role).await;
417 result
418 }
419
420 async fn admit_connection(&self, source_ip: IpAddr) -> TransportResult<()> {
421 {
422 let mut active_connections = mutex_lock!(self.active_connections);
423 if *active_connections >= TCP_MAX_CONNECTIONS {
424 return Err(TransportError::ReceiveFailed(format!(
425 "max TCP connections exceeded: {TCP_MAX_CONNECTIONS}"
426 )));
427 }
428 *active_connections += 1;
429 }
430
431 let mut sources = mutex_lock!(self.source_limits);
432 let now = Instant::now();
433 let state = sources.entry(source_ip).or_insert(TcpSourceRateState {
434 window_start: now,
435 attempts: 0,
436 live_connections: 0,
437 });
438
439 if now.duration_since(state.window_start) > TCP_RECONNECT_WINDOW {
440 state.window_start = now;
441 state.attempts = 0;
442 }
443
444 if state.live_connections >= TCP_PER_SOURCE_CONNECTION_LIMIT {
445 drop(sources);
446 self.release_active_connection().await;
447 return Err(TransportError::ReceiveFailed(format!(
448 "source {source_ip} has too many live TCP connections"
449 )));
450 }
451 if state.attempts >= TCP_PER_SOURCE_RECONNECT_LIMIT {
452 drop(sources);
453 self.release_active_connection().await;
454 return Err(TransportError::ReceiveFailed(format!(
455 "source {source_ip} exceeded TCP reconnect limit"
456 )));
457 }
458
459 state.live_connections += 1;
460 state.attempts += 1;
461 Ok(())
462 }
463
464 async fn release_active_connection(&self) {
465 let mut active_connections = mutex_lock!(self.active_connections);
466 *active_connections = active_connections.saturating_sub(1);
467 }
468
469 async fn release_connection(&self, source_ip: IpAddr) {
470 self.release_active_connection().await;
471 let mut sources = mutex_lock!(self.source_limits);
472 if let Some(state) = sources.get_mut(&source_ip) {
473 state.live_connections = state.live_connections.saturating_sub(1);
474 }
475 }
476
477 async fn claim_inbound_role(&self, sender_role: &RoleName) -> TransportResult<()> {
478 let mut claimed = mutex_lock!(self.claimed_inbound_roles);
479 if !claimed.insert(sender_role.clone()) {
480 return Err(TransportError::DuplicatePeer(sender_role.clone()));
481 }
482 Ok(())
483 }
484
485 async fn release_inbound_role(&self, sender_role: &RoleName) {
486 mutex_lock!(self.claimed_inbound_roles).remove(sender_role);
487 }
488
489 async fn recv_from(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
490 let mut receivers = mutex_lock!(self.inbound_receivers);
491 let receiver = receivers
492 .get_mut(from_role)
493 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
494 receiver.recv().await.ok_or(TransportError::ChannelClosed)
495 }
496}
497
498#[cfg(not(target_arch = "wasm32"))]
499async fn acquire_tcp_payload_budget(
500 payload_budget: &Arc<Semaphore>,
501 bytes: usize,
502) -> TransportResult<OwnedSemaphorePermit> {
503 let permits =
504 u32::try_from(bytes).map_err(|err| TransportError::ReceiveFailed(err.to_string()))?;
505 Arc::clone(payload_budget)
506 .try_acquire_many_owned(permits)
507 .map_err(|_| {
508 TransportError::ReceiveFailed(
509 "global in-flight TCP payload byte cap reached".to_string(),
510 )
511 })
512}
513
514#[cfg(not(target_arch = "wasm32"))]
515type SharedTcpRegistry = BTreeMap<String, Arc<TcpRoleState>>;
516
517#[cfg(not(target_arch = "wasm32"))]
518fn shared_tcp_registry() -> &'static StdMutex<SharedTcpRegistry> {
519 static REGISTRY: OnceLock<StdMutex<SharedTcpRegistry>> = OnceLock::new();
520 REGISTRY.get_or_init(|| StdMutex::new(BTreeMap::new()))
521}
522
523#[cfg(not(target_arch = "wasm32"))]
524fn tcp_role_registry_key(topology_signature: &str, role: &RoleName) -> String {
525 format!("{topology_signature}|role:{role}")
526}
527
528#[cfg(not(target_arch = "wasm32"))]
529fn shared_tcp_role_state(
530 topology: &Topology,
531 topology_signature: &str,
532 role: &RoleName,
533) -> TransportResult<Arc<TcpRoleState>> {
534 let key = tcp_role_registry_key(topology_signature, role);
535 let mut registry = shared_tcp_registry()
536 .lock()
537 .unwrap_or_else(|poisoned| poisoned.into_inner());
538 if let Some(existing) = registry.get(&key) {
539 return Ok(Arc::clone(existing));
540 }
541
542 let self_endpoint = match topology.get_location(role) {
543 Ok(Location::Remote(endpoint)) => Some(endpoint),
544 Ok(Location::Local | Location::Colocated(_)) => None,
545 Err(_) => return Err(TransportError::UnknownRole(role.clone())),
546 };
547 let peer_roles = topology
548 .locations
549 .keys()
550 .filter(|peer| *peer != role)
551 .cloned();
552 let state = Arc::new(TcpRoleState::new(role.clone(), self_endpoint, peer_roles));
553 registry.insert(key, Arc::clone(&state));
554 Ok(state)
555}
556
557#[cfg(not(target_arch = "wasm32"))]
558async fn connect_with_retry(endpoint: &crate::identifiers::Endpoint) -> TransportResult<TcpStream> {
559 let mut attempts = 0_u8;
560 loop {
561 match TcpStream::connect(endpoint.as_str()).await {
562 Ok(stream) => return Ok(stream),
563 Err(err) if attempts < 10 => {
564 attempts = attempts.saturating_add(1);
565 if err.kind() != std::io::ErrorKind::ConnectionRefused {
566 return Err(TransportError::ConnectionFailed(err.to_string()));
567 }
568 sleep(Duration::from_millis(10)).await;
569 }
570 Err(err) => return Err(TransportError::ConnectionFailed(err.to_string())),
571 }
572 }
573}
574
575#[cfg(not(target_arch = "wasm32"))]
576struct TcpPeerTransport {
577 state: Arc<TcpRoleState>,
578 peer_role: RoleName,
579 peer_endpoint: Option<crate::identifiers::Endpoint>,
580}
581
582#[cfg(not(target_arch = "wasm32"))]
583impl DocumentedTransportContract for TcpPeerTransport {
584 fn contract_profile() -> TransportContractProfile {
585 TransportContractProfile {
586 transport_name: "TcpPeerTransport",
587 tier: TransportContractTier::FirstPartyRuntime,
588 semantics: TransportSemanticContract {
589 role_addressed_routing: true,
590 authenticated_peers: false,
591 per_peer_fifo_delivery: true,
592 fail_closed_unknown_role: true,
593 no_message_synthesis: true,
594 explicit_readiness_errors: true,
595 deterministic_for_regression: false,
596 },
597 operational: TransportOperationalContract {
598 transport_type: TransportType::Tcp,
599 startup_mode: TransportStartupMode::BackgroundWarmup,
600 environment_resolved: false,
601 },
602 notes: vec![
603 "Single-peer runtime TCP transport used for loopback remote topology execution.",
604 "trusted-network only: peers are not cryptographically authenticated.",
605 ],
606 }
607 }
608}
609
610#[cfg(not(target_arch = "wasm32"))]
611#[async_trait]
612impl Transport for TcpPeerTransport {
613 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
614 if to_role != &self.peer_role {
615 return Err(TransportError::UnknownRole(to_role.clone()));
616 }
617 let endpoint = self.peer_endpoint.clone().ok_or_else(|| {
618 TransportError::ConnectionFailed(format!(
619 "role {} has no remote endpoint configured for peer {}",
620 self.state.role, self.peer_role
621 ))
622 })?;
623 let mut stream = connect_with_retry(&endpoint).await?;
624 let role_bytes = self.state.role.to_string().into_bytes();
625 let message_len = telltale_types::MessageLenBytes::try_from(message.len())
626 .map_err(|err| TransportError::SendFailed(err.to_string()))?;
627 wire::write_preamble(&mut stream, TCP_WRITE_TIMEOUT).await?;
628 wire::write_role_name(&mut stream, &role_bytes, TCP_WRITE_TIMEOUT).await?;
629 wire::write_payload_len(&mut stream, message_len, TCP_WRITE_TIMEOUT).await?;
630 wire::write_all_timeout(&mut stream, &message, TCP_WRITE_TIMEOUT).await?;
631 stream.shutdown().await?;
632 Ok(())
633 }
634
635 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
636 if from_role != &self.peer_role {
637 return Err(TransportError::UnknownRole(from_role.clone()));
638 }
639 self.state.recv_from(from_role).await
640 }
641
642 fn is_connected(&self, role: &RoleName) -> bool {
643 role == &self.peer_role
644 }
645
646 async fn close(&self) -> TransportResult<()> {
647 Ok(())
648 }
649}
650
651#[cfg(not(target_arch = "wasm32"))]
652struct TcpRoleTransport {
653 state: Arc<TcpRoleState>,
654 peer_endpoints: BTreeMap<RoleName, Option<crate::identifiers::Endpoint>>,
655}
656
657#[cfg(not(target_arch = "wasm32"))]
658impl DocumentedTransportContract for TcpRoleTransport {
659 fn contract_profile() -> TransportContractProfile {
660 TransportContractProfile {
661 transport_name: "TcpRoleTransport",
662 tier: TransportContractTier::FirstPartyRuntime,
663 semantics: TransportSemanticContract {
664 role_addressed_routing: true,
665 authenticated_peers: false,
666 per_peer_fifo_delivery: true,
667 fail_closed_unknown_role: true,
668 no_message_synthesis: true,
669 explicit_readiness_errors: true,
670 deterministic_for_regression: false,
671 },
672 operational: TransportOperationalContract {
673 transport_type: TransportType::Tcp,
674 startup_mode: TransportStartupMode::BackgroundWarmup,
675 environment_resolved: false,
676 },
677 notes: vec![
678 "Role-addressed runtime TCP transport used by the first-party topology helper.",
679 "trusted-network only: peers are not cryptographically authenticated.",
680 ],
681 }
682 }
683}
684
685#[cfg(not(target_arch = "wasm32"))]
686#[async_trait]
687impl Transport for TcpRoleTransport {
688 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
689 self.state.ensure_started().await?;
690 let endpoint = self
691 .peer_endpoints
692 .get(to_role)
693 .cloned()
694 .flatten()
695 .ok_or_else(|| {
696 TransportError::ConnectionFailed(format!(
697 "role {} has no remote endpoint configured for peer {}",
698 self.state.role, to_role
699 ))
700 })?;
701 let mut stream = connect_with_retry(&endpoint).await?;
702 let role_bytes = self.state.role.to_string().into_bytes();
703 let message_len = telltale_types::MessageLenBytes::try_from(message.len())
704 .map_err(|err| TransportError::SendFailed(err.to_string()))?;
705 wire::write_preamble(&mut stream, TCP_WRITE_TIMEOUT).await?;
706 wire::write_role_name(&mut stream, &role_bytes, TCP_WRITE_TIMEOUT).await?;
707 wire::write_payload_len(&mut stream, message_len, TCP_WRITE_TIMEOUT).await?;
708 wire::write_all_timeout(&mut stream, &message, TCP_WRITE_TIMEOUT).await?;
709 stream.shutdown().await?;
710 Ok(())
711 }
712
713 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
714 self.state.ensure_started().await?;
715 self.state.recv_from(from_role).await
716 }
717
718 fn is_connected(&self, role: &RoleName) -> bool {
719 self.peer_endpoints.contains_key(role)
720 }
721
722 async fn close(&self) -> TransportResult<()> {
723 Ok(())
724 }
725}
726
727#[cfg(not(target_arch = "wasm32"))]
728pub(crate) async fn create_peer_transport(
729 topology: &Topology,
730 topology_signature: &str,
731 role: &RoleName,
732 peer: &RoleName,
733) -> TransportResult<Box<dyn Transport>> {
734 topology
735 .region_for_role(role)
736 .map_err(TransportError::ConnectionFailed)?;
737 topology
738 .region_for_role(peer)
739 .map_err(TransportError::ConnectionFailed)?;
740 let state = shared_tcp_role_state(topology, topology_signature, role)?;
741 state.ensure_started().await?;
742 let peer_endpoint = match topology.get_location(peer) {
743 Ok(Location::Remote(endpoint)) => Some(endpoint),
744 Ok(Location::Local | Location::Colocated(_)) => None,
745 Err(_) => return Err(TransportError::UnknownRole(peer.clone())),
746 };
747 Ok(Box::new(TcpPeerTransport {
748 state,
749 peer_role: peer.clone(),
750 peer_endpoint,
751 }))
752}
753
754pub struct TransportFactory;
756
757impl TransportFactory {
758 fn validated_first_party_profile(
759 profile: TransportContractProfile,
760 ) -> TransportResult<TransportContractProfile> {
761 validate_transport_contract_profile(&profile)
762 .map_err(|err| TransportError::ConnectionFailed(err.to_string()))?;
763 Ok(profile)
764 }
765
766 pub fn contract_profile_for_topology(
768 topology: &Topology,
769 role: &RoleName,
770 ) -> TransportResult<TransportContractProfile> {
771 let has_remote_participants = topology
772 .locations
773 .values()
774 .any(|location| matches!(location, Location::Remote(_)));
775 if has_remote_participants {
776 #[cfg(target_arch = "wasm32")]
777 {
778 let _ = (topology, role);
779 Err(TransportError::NotReady)
780 }
781 #[cfg(not(target_arch = "wasm32"))]
782 {
783 topology
784 .region_for_role(role)
785 .map_err(TransportError::ConnectionFailed)?;
786 Self::validated_first_party_profile(TcpRoleTransport::contract_profile())
787 }
788 } else {
789 Self::validated_first_party_profile(InMemoryChannelTransport::contract_profile())
790 }
791 }
792
793 pub fn create(topology: &Topology, role: &RoleName) -> TransportResult<Box<dyn Transport>> {
795 let _profile = Self::contract_profile_for_topology(topology, role)?;
796 let has_remote_participants = topology
797 .locations
798 .values()
799 .any(|location| matches!(location, Location::Remote(_)));
800 if has_remote_participants {
801 #[cfg(target_arch = "wasm32")]
802 {
803 let _ = role;
804 Err(TransportError::NotReady)
805 }
806 #[cfg(not(target_arch = "wasm32"))]
807 {
808 topology
809 .region_for_role(role)
810 .map_err(TransportError::ConnectionFailed)?;
811 let state = shared_tcp_role_state(topology, "transport_factory", role)?;
812 let warm_state = Arc::clone(&state);
813 spawn(async move {
814 let _ = warm_state.ensure_started().await;
815 });
816 let peer_endpoints = topology
817 .locations
818 .iter()
819 .filter(|(peer, _)| *peer != role)
820 .map(|(peer, location)| {
821 let _ = topology
822 .region_for_role(peer)
823 .map_err(TransportError::ConnectionFailed)?;
824 let endpoint = match location {
825 Location::Remote(endpoint) => Some(endpoint.clone()),
826 Location::Local | Location::Colocated(_) => None,
827 };
828 Ok((peer.clone(), endpoint))
829 })
830 .collect::<TransportResult<BTreeMap<_, _>>>()?;
831 Ok(Box::new(TcpRoleTransport {
832 state,
833 peer_endpoints,
834 }))
835 }
836 } else {
837 Ok(Box::new(InMemoryChannelTransport::new(role.clone())))
838 }
839 }
840
841 pub fn transport_for_location(
843 _from_role: &RoleName,
844 to_role: &RoleName,
845 topology: &Topology,
846 ) -> Result<TransportType, super::TopologyError> {
847 match topology.get_location(to_role)? {
848 Location::Local => Ok(TransportType::InMemory),
849 Location::Colocated(_) => Ok(TransportType::SharedMemory),
850 Location::Remote(_) => Ok(TransportType::Tcp),
851 }
852 }
853}
854
855#[derive(Debug, Clone, Copy, PartialEq, Eq)]
857pub enum TransportType {
858 InMemory,
860 SharedMemory,
862 Tcp,
864 WebSocket,
866}
867
868impl TransportType {
869 pub fn is_local(&self) -> bool {
871 matches!(self, TransportType::InMemory | TransportType::SharedMemory)
872 }
873}
874
875#[cfg(all(test, not(target_arch = "wasm32")))]
876mod tests {
877 use super::*;
878 use std::net::SocketAddr;
879
880 #[tokio::test]
881 async fn test_in_memory_transport() {
882 let alice = InMemoryChannelTransport::new(RoleName::from_static("Alice"));
883 let bob = InMemoryChannelTransport::new(RoleName::from_static("Bob"));
884
885 alice.connect(&bob).await;
886
887 alice
889 .send(&RoleName::from_static("Bob"), b"Hello Bob".to_vec())
890 .await
891 .unwrap();
892
893 let msg = bob.recv(&RoleName::from_static("Alice")).await.unwrap();
895 assert_eq!(msg, b"Hello Bob".to_vec());
896
897 bob.send(&RoleName::from_static("Alice"), b"Hello Alice".to_vec())
899 .await
900 .unwrap();
901
902 let msg = alice.recv(&RoleName::from_static("Bob")).await.unwrap();
904 assert_eq!(msg, b"Hello Alice".to_vec());
905 }
906
907 #[test]
908 fn test_transport_type_for_location() {
909 let topology = Topology::builder()
910 .local_role(RoleName::from_static("Alice"))
911 .remote_role(
912 RoleName::from_static("Bob"),
913 crate::identifiers::Endpoint::new("localhost:8080").unwrap(),
914 )
915 .colocated_role(
916 RoleName::from_static("Carol"),
917 RoleName::from_static("Alice"),
918 )
919 .build();
920
921 assert_eq!(
922 TransportFactory::transport_for_location(
923 &RoleName::from_static("Alice"),
924 &RoleName::from_static("Alice"),
925 &topology
926 )
927 .unwrap(),
928 TransportType::InMemory
929 );
930 assert_eq!(
931 TransportFactory::transport_for_location(
932 &RoleName::from_static("Alice"),
933 &RoleName::from_static("Bob"),
934 &topology
935 )
936 .unwrap(),
937 TransportType::Tcp
938 );
939 assert_eq!(
940 TransportFactory::transport_for_location(
941 &RoleName::from_static("Alice"),
942 &RoleName::from_static("Carol"),
943 &topology
944 )
945 .unwrap(),
946 TransportType::SharedMemory
947 );
948 }
949
950 #[test]
951 fn test_transport_type_is_local() {
952 assert!(TransportType::InMemory.is_local());
953 assert!(TransportType::SharedMemory.is_local());
954 assert!(!TransportType::Tcp.is_local());
955 assert!(!TransportType::WebSocket.is_local());
956 }
957
958 #[tokio::test]
959 async fn test_transport_factory_create_supports_loopback_remote_topologies() {
960 let local_topology = Topology::builder()
961 .local_role(RoleName::from_static("Alice"))
962 .local_role(RoleName::from_static("Bob"))
963 .build();
964 assert!(TransportFactory::create(&local_topology, &RoleName::from_static("Alice")).is_ok());
965
966 let remote_topology = Topology::builder()
967 .remote_role(
968 RoleName::from_static("Alice"),
969 crate::identifiers::Endpoint::new("127.0.0.1:19801").unwrap(),
970 )
971 .remote_role(
972 RoleName::from_static("Bob"),
973 crate::identifiers::Endpoint::new("127.0.0.1:19802").unwrap(),
974 )
975 .build();
976 let alice = TransportFactory::create(&remote_topology, &RoleName::from_static("Alice"))
977 .expect("remote transport for Alice");
978 let bob = TransportFactory::create(&remote_topology, &RoleName::from_static("Bob"))
979 .expect("remote transport for Bob");
980 alice
981 .send(&RoleName::from_static("Bob"), b"hello remote".to_vec())
982 .await
983 .expect("remote send");
984 assert_eq!(
985 bob.recv(&RoleName::from_static("Alice"))
986 .await
987 .expect("remote recv"),
988 b"hello remote".to_vec()
989 );
990 }
991
992 async fn write_runtime_role_claim(addr: SocketAddr, role: &str) -> TcpStream {
993 let mut stream = TcpStream::connect(addr).await.expect("connect test client");
994 wire::write_preamble(&mut stream, TCP_WRITE_TIMEOUT)
995 .await
996 .expect("write wire preamble");
997 wire::write_role_name(&mut stream, role.as_bytes(), TCP_WRITE_TIMEOUT)
998 .await
999 .expect("write role name");
1000 stream
1001 }
1002
1003 #[tokio::test]
1004 async fn runtime_tcp_rejects_duplicate_live_role_claim() {
1005 let state = Arc::new(TcpRoleState::new(
1006 RoleName::from_static("Alice"),
1007 None,
1008 [RoleName::from_static("Bob")],
1009 ));
1010 let bob = RoleName::from_static("Bob");
1011
1012 let first_listener = TcpListener::bind("127.0.0.1:0")
1013 .await
1014 .expect("bind first listener");
1015 let first_addr = first_listener.local_addr().expect("first listener address");
1016 let first_state = Arc::clone(&state);
1017 let first_task = tokio::spawn(async move {
1018 let (socket, _) = first_listener.accept().await.expect("accept first client");
1019 first_state.handle_socket(socket).await
1020 });
1021 let first_client = write_runtime_role_claim(first_addr, "Bob").await;
1022
1023 tokio::time::timeout(Duration::from_secs(1), async {
1024 loop {
1025 if mutex_lock!(state.claimed_inbound_roles).contains(&bob) {
1026 break;
1027 }
1028 sleep(Duration::from_millis(10)).await;
1029 }
1030 })
1031 .await
1032 .expect("first runtime role claim should become active");
1033
1034 let second_listener = TcpListener::bind("127.0.0.1:0")
1035 .await
1036 .expect("bind second listener");
1037 let second_addr = second_listener
1038 .local_addr()
1039 .expect("second listener address");
1040 let second_state = Arc::clone(&state);
1041 let second_task = tokio::spawn(async move {
1042 let (socket, _) = second_listener
1043 .accept()
1044 .await
1045 .expect("accept second client");
1046 second_state.handle_socket(socket).await
1047 });
1048 let _second_client = write_runtime_role_claim(second_addr, "Bob").await;
1049
1050 let err = tokio::time::timeout(Duration::from_secs(1), second_task)
1051 .await
1052 .expect("duplicate runtime claim should finish promptly")
1053 .expect("duplicate runtime handler should not panic")
1054 .expect_err("duplicate runtime role claim must fail");
1055 assert!(matches!(err, TransportError::DuplicatePeer(role) if role == bob));
1056
1057 drop(first_client);
1058 let _ = tokio::time::timeout(Duration::from_secs(1), first_task)
1059 .await
1060 .expect("first runtime connection should close promptly")
1061 .expect("first runtime handler should not panic");
1062 }
1063
1064 #[tokio::test]
1065 async fn runtime_tcp_rejects_unknown_role_claim() {
1066 let state = Arc::new(TcpRoleState::new(
1067 RoleName::from_static("Alice"),
1068 None,
1069 [RoleName::from_static("Bob")],
1070 ));
1071 let listener = TcpListener::bind("127.0.0.1:0")
1072 .await
1073 .expect("bind test listener");
1074 let addr = listener.local_addr().expect("test listener address");
1075 let accept = tokio::spawn(async move {
1076 let (socket, _) = listener.accept().await.expect("accept client");
1077 state.handle_socket(socket).await
1078 });
1079
1080 let _client = write_runtime_role_claim(addr, "Mallory").await;
1081 let err = tokio::time::timeout(Duration::from_secs(1), accept)
1082 .await
1083 .expect("unknown role claim should finish promptly")
1084 .expect("unknown role handler should not panic")
1085 .expect_err("unknown role claim must fail closed");
1086 assert!(matches!(err, TransportError::ReceiveFailed(_)));
1087 }
1088}