1use super::{
8 validate_transport_contract_profile, DocumentedTransportContract, Location, Topology,
9 TransportContractProfile, TransportContractTier, TransportOperationalContract,
10 TransportSemanticContract, TransportStartupMode,
11};
12use crate::identifiers::RoleName;
13use crate::mutex_lock;
14#[cfg(not(target_arch = "wasm32"))]
15use crate::util::spawn::spawn;
16use crate::util::sync::{mpsc, Mutex};
17use async_trait::async_trait;
18use cfg_if::cfg_if;
19#[cfg(target_arch = "wasm32")]
20use futures::{SinkExt, StreamExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23#[cfg(not(target_arch = "wasm32"))]
24use std::sync::{Mutex as StdMutex, OnceLock};
25use thiserror::Error;
26
27#[cfg(not(target_arch = "wasm32"))]
28use tokio::io::{AsyncReadExt, AsyncWriteExt};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::net::{TcpListener, TcpStream};
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::time::{sleep, Duration};
33
34#[derive(Debug, Error)]
36pub enum TransportError {
37 #[error("connection failed: {0}")]
38 ConnectionFailed(String),
39
40 #[error("send failed: {0}")]
41 SendFailed(String),
42
43 #[error("receive failed: {0}")]
44 ReceiveFailed(String),
45
46 #[error("timeout")]
47 Timeout,
48
49 #[error("channel closed")]
50 ChannelClosed,
51
52 #[error("unknown role: {0}")]
53 UnknownRole(RoleName),
54
55 #[error("transport not ready")]
56 NotReady,
57
58 #[error("IO error: {0}")]
59 IoError(#[from] std::io::Error),
60}
61
62pub type TransportResult<T> = Result<T, TransportError>;
64
65pub trait TransportMessage: Send + Sync + 'static {
67 fn to_bytes(&self) -> Vec<u8>;
69
70 fn from_bytes(bytes: &[u8]) -> Result<Self, String>
72 where
73 Self: Sized;
74}
75
76#[derive(Debug, Clone)]
78pub struct ByteMessage(pub Vec<u8>);
79
80impl TransportMessage for ByteMessage {
81 fn to_bytes(&self) -> Vec<u8> {
82 self.0.clone()
83 }
84
85 fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
86 Ok(ByteMessage(bytes.to_vec()))
87 }
88}
89
90#[async_trait]
92pub trait Transport: Send + Sync + 'static {
93 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()>;
95
96 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>>;
98
99 fn is_connected(&self, role: &RoleName) -> bool;
101
102 async fn close(&self) -> TransportResult<()>;
104}
105
106pub struct InMemoryChannelTransport {
111 role: RoleName,
113 senders: Arc<Mutex<BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>>>,
115 receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
117}
118
119impl InMemoryChannelTransport {
120 pub fn new(role: RoleName) -> Self {
122 Self {
123 role,
124 senders: Arc::new(Mutex::new(BTreeMap::new())),
125 receivers: Arc::new(Mutex::new(BTreeMap::new())),
126 }
127 }
128
129 pub async fn connect(&self, other: &InMemoryChannelTransport) {
131 let (tx1, rx1) = mpsc::channel(32);
132 let (tx2, rx2) = mpsc::channel(32);
133
134 mutex_lock!(self.senders).insert(other.role.clone(), tx1);
136 mutex_lock!(other.receivers).insert(self.role.clone(), rx1);
137
138 mutex_lock!(other.senders).insert(self.role.clone(), tx2);
140 mutex_lock!(self.receivers).insert(other.role.clone(), rx2);
141 }
142
143 pub fn role(&self) -> &RoleName {
145 &self.role
146 }
147}
148
149impl DocumentedTransportContract for InMemoryChannelTransport {
150 fn contract_profile() -> TransportContractProfile {
151 TransportContractProfile {
152 transport_name: "InMemoryChannelTransport",
153 tier: TransportContractTier::FirstPartyRuntime,
154 semantics: TransportSemanticContract {
155 role_addressed_routing: true,
156 per_peer_fifo_delivery: true,
157 fail_closed_unknown_role: true,
158 no_message_synthesis: true,
159 explicit_readiness_errors: false,
160 deterministic_for_regression: true,
161 },
162 operational: TransportOperationalContract {
163 transport_type: TransportType::InMemory,
164 startup_mode: TransportStartupMode::ReadyOnCreate,
165 environment_resolved: false,
166 },
167 notes: vec![
168 "In-process channel transport for first-party local execution.",
169 "Deterministic enough for strict regression suites.",
170 ],
171 }
172 }
173}
174
175#[async_trait]
176impl Transport for InMemoryChannelTransport {
177 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
178 cfg_if! {
179 if #[cfg(target_arch = "wasm32")] {
180 let sender = {
182 let senders = mutex_lock!(self.senders);
183 senders
184 .get(to_role)
185 .cloned()
186 .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?
187 };
188
189 let mut sender = sender;
190 sender
191 .send(message)
192 .await
193 .map_err(|_| TransportError::ChannelClosed)
194 } else {
195 let senders = mutex_lock!(self.senders);
196 let sender = senders
197 .get(to_role)
198 .ok_or_else(|| TransportError::UnknownRole(to_role.clone()))?;
199
200 sender
201 .send(message)
202 .await
203 .map_err(|_| TransportError::ChannelClosed)
204 }
205 }
206 }
207
208 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
209 cfg_if! {
210 if #[cfg(target_arch = "wasm32")] {
211 let mut receiver = {
213 let mut receivers = mutex_lock!(self.receivers);
214 receivers
215 .remove(from_role)
216 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?
217 };
218
219 let result = receiver.next().await;
220
221 {
222 let mut receivers = mutex_lock!(self.receivers);
223 receivers.insert(from_role.clone(), receiver);
224 }
225
226 result.ok_or(TransportError::ChannelClosed)
227 } else {
228 let mut receivers = mutex_lock!(self.receivers);
229 let receiver = receivers
230 .get_mut(from_role)
231 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
232 receiver.recv().await.ok_or(TransportError::ChannelClosed)
233 }
234 }
235 }
236
237 fn is_connected(&self, _role: &RoleName) -> bool {
238 true
241 }
242
243 async fn close(&self) -> TransportResult<()> {
244 mutex_lock!(self.senders).clear();
245 mutex_lock!(self.receivers).clear();
246 Ok(())
247 }
248}
249
250#[cfg(not(target_arch = "wasm32"))]
251enum TcpListenerState {
252 NotStarted,
253 Started,
254 Failed(String),
255}
256
257#[cfg(not(target_arch = "wasm32"))]
258struct TcpRoleState {
259 role: RoleName,
260 self_endpoint: Option<crate::identifiers::Endpoint>,
261 inbound_senders: BTreeMap<RoleName, mpsc::Sender<Vec<u8>>>,
262 inbound_receivers: Arc<Mutex<BTreeMap<RoleName, mpsc::Receiver<Vec<u8>>>>>,
263 listener_state: Arc<Mutex<TcpListenerState>>,
264}
265
266#[cfg(not(target_arch = "wasm32"))]
267impl TcpRoleState {
268 fn new(
269 role: RoleName,
270 self_endpoint: Option<crate::identifiers::Endpoint>,
271 peer_roles: impl IntoIterator<Item = RoleName>,
272 ) -> Self {
273 let mut inbound_senders = BTreeMap::new();
274 let mut inbound_receivers = BTreeMap::new();
275 for peer in peer_roles {
276 let (tx, rx) = mpsc::channel(32);
277 inbound_senders.insert(peer.clone(), tx);
278 inbound_receivers.insert(peer, rx);
279 }
280 Self {
281 role,
282 self_endpoint,
283 inbound_senders,
284 inbound_receivers: Arc::new(Mutex::new(inbound_receivers)),
285 listener_state: Arc::new(Mutex::new(TcpListenerState::NotStarted)),
286 }
287 }
288
289 async fn ensure_started(self: &Arc<Self>) -> TransportResult<()> {
290 let mut state = mutex_lock!(self.listener_state);
291 match &*state {
292 TcpListenerState::Started => return Ok(()),
293 TcpListenerState::Failed(message) => {
294 return Err(TransportError::ConnectionFailed(message.clone()));
295 }
296 TcpListenerState::NotStarted => {}
297 }
298
299 let Some(endpoint) = self.self_endpoint.clone() else {
300 *state = TcpListenerState::Started;
301 return Ok(());
302 };
303
304 let listener = TcpListener::bind(endpoint.as_str()).await.map_err(|err| {
305 let message = format!(
306 "failed to bind {} for role {}: {}",
307 endpoint, self.role, err
308 );
309 *state = TcpListenerState::Failed(message.clone());
310 TransportError::ConnectionFailed(message)
311 })?;
312 let role_state = Arc::clone(self);
313 spawn(async move {
314 role_state.accept_loop(listener).await;
315 });
316 *state = TcpListenerState::Started;
317 Ok(())
318 }
319
320 async fn accept_loop(self: Arc<Self>, listener: TcpListener) {
321 loop {
322 let Ok((socket, _)) = listener.accept().await else {
323 break;
324 };
325 let role_state = Arc::clone(&self);
326 spawn(async move {
327 let _ = role_state.handle_socket(socket).await;
328 });
329 }
330 }
331
332 async fn handle_socket(&self, mut socket: TcpStream) -> TransportResult<()> {
333 let role_len = socket.read_u32().await? as usize;
334 let mut role_buf = vec![0_u8; role_len];
335 socket.read_exact(&mut role_buf).await?;
336 let from_role = String::from_utf8(role_buf).map_err(|err| {
337 TransportError::ReceiveFailed(format!("invalid sender header: {err}"))
338 })?;
339 let payload_len = socket.read_u32().await? as usize;
340 let mut payload = vec![0_u8; payload_len];
341 socket.read_exact(&mut payload).await?;
342 let sender_role = RoleName::new(from_role.clone()).map_err(|err| {
343 TransportError::ReceiveFailed(format!("invalid sender role `{from_role}`: {err}"))
344 })?;
345 let sender = self
346 .inbound_senders
347 .get(&sender_role)
348 .cloned()
349 .ok_or_else(|| {
350 TransportError::ReceiveFailed(format!(
351 "sender role `{sender_role}` is not configured for {}",
352 self.role
353 ))
354 })?;
355 sender
356 .send(payload)
357 .await
358 .map_err(|_| TransportError::ChannelClosed)
359 }
360
361 async fn recv_from(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
362 let mut receivers = mutex_lock!(self.inbound_receivers);
363 let receiver = receivers
364 .get_mut(from_role)
365 .ok_or_else(|| TransportError::UnknownRole(from_role.clone()))?;
366 receiver.recv().await.ok_or(TransportError::ChannelClosed)
367 }
368}
369
370#[cfg(not(target_arch = "wasm32"))]
371type SharedTcpRegistry = BTreeMap<String, Arc<TcpRoleState>>;
372
373#[cfg(not(target_arch = "wasm32"))]
374fn shared_tcp_registry() -> &'static StdMutex<SharedTcpRegistry> {
375 static REGISTRY: OnceLock<StdMutex<SharedTcpRegistry>> = OnceLock::new();
376 REGISTRY.get_or_init(|| StdMutex::new(BTreeMap::new()))
377}
378
379#[cfg(not(target_arch = "wasm32"))]
380fn tcp_role_registry_key(topology_signature: &str, role: &RoleName) -> String {
381 format!("{topology_signature}|role:{role}")
382}
383
384#[cfg(not(target_arch = "wasm32"))]
385fn shared_tcp_role_state(
386 topology: &Topology,
387 topology_signature: &str,
388 role: &RoleName,
389) -> TransportResult<Arc<TcpRoleState>> {
390 let key = tcp_role_registry_key(topology_signature, role);
391 let mut registry = shared_tcp_registry()
392 .lock()
393 .unwrap_or_else(|poisoned| poisoned.into_inner());
394 if let Some(existing) = registry.get(&key) {
395 return Ok(Arc::clone(existing));
396 }
397
398 let self_endpoint = match topology.get_location(role) {
399 Ok(Location::Remote(endpoint)) => Some(endpoint),
400 Ok(Location::Local | Location::Colocated(_)) => None,
401 Err(_) => return Err(TransportError::UnknownRole(role.clone())),
402 };
403 let peer_roles = topology
404 .locations
405 .keys()
406 .filter(|peer| *peer != role)
407 .cloned();
408 let state = Arc::new(TcpRoleState::new(role.clone(), self_endpoint, peer_roles));
409 registry.insert(key, Arc::clone(&state));
410 Ok(state)
411}
412
413#[cfg(not(target_arch = "wasm32"))]
414async fn connect_with_retry(endpoint: &crate::identifiers::Endpoint) -> TransportResult<TcpStream> {
415 let mut attempts = 0_u8;
416 loop {
417 match TcpStream::connect(endpoint.as_str()).await {
418 Ok(stream) => return Ok(stream),
419 Err(err) if attempts < 10 => {
420 attempts = attempts.saturating_add(1);
421 if err.kind() != std::io::ErrorKind::ConnectionRefused {
422 return Err(TransportError::ConnectionFailed(err.to_string()));
423 }
424 sleep(Duration::from_millis(10)).await;
425 }
426 Err(err) => return Err(TransportError::ConnectionFailed(err.to_string())),
427 }
428 }
429}
430
431#[cfg(not(target_arch = "wasm32"))]
432struct TcpPeerTransport {
433 state: Arc<TcpRoleState>,
434 peer_role: RoleName,
435 peer_endpoint: Option<crate::identifiers::Endpoint>,
436}
437
438#[cfg(not(target_arch = "wasm32"))]
439impl DocumentedTransportContract for TcpPeerTransport {
440 fn contract_profile() -> TransportContractProfile {
441 TransportContractProfile {
442 transport_name: "TcpPeerTransport",
443 tier: TransportContractTier::FirstPartyRuntime,
444 semantics: TransportSemanticContract {
445 role_addressed_routing: true,
446 per_peer_fifo_delivery: true,
447 fail_closed_unknown_role: true,
448 no_message_synthesis: true,
449 explicit_readiness_errors: true,
450 deterministic_for_regression: false,
451 },
452 operational: TransportOperationalContract {
453 transport_type: TransportType::Tcp,
454 startup_mode: TransportStartupMode::BackgroundWarmup,
455 environment_resolved: false,
456 },
457 notes: vec![
458 "Single-peer runtime TCP transport used for loopback remote topology execution.",
459 ],
460 }
461 }
462}
463
464#[cfg(not(target_arch = "wasm32"))]
465#[async_trait]
466impl Transport for TcpPeerTransport {
467 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
468 if to_role != &self.peer_role {
469 return Err(TransportError::UnknownRole(to_role.clone()));
470 }
471 let endpoint = self.peer_endpoint.clone().ok_or_else(|| {
472 TransportError::ConnectionFailed(format!(
473 "role {} has no remote endpoint configured for peer {}",
474 self.state.role, self.peer_role
475 ))
476 })?;
477 let mut stream = connect_with_retry(&endpoint).await?;
478 let role_bytes = self.state.role.to_string().into_bytes();
479 stream.write_u32(role_bytes.len() as u32).await?;
480 stream.write_all(&role_bytes).await?;
481 stream.write_u32(message.len() as u32).await?;
482 stream.write_all(&message).await?;
483 stream.shutdown().await?;
484 Ok(())
485 }
486
487 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
488 if from_role != &self.peer_role {
489 return Err(TransportError::UnknownRole(from_role.clone()));
490 }
491 self.state.recv_from(from_role).await
492 }
493
494 fn is_connected(&self, role: &RoleName) -> bool {
495 role == &self.peer_role
496 }
497
498 async fn close(&self) -> TransportResult<()> {
499 Ok(())
500 }
501}
502
503#[cfg(not(target_arch = "wasm32"))]
504struct TcpRoleTransport {
505 state: Arc<TcpRoleState>,
506 peer_endpoints: BTreeMap<RoleName, Option<crate::identifiers::Endpoint>>,
507}
508
509#[cfg(not(target_arch = "wasm32"))]
510impl DocumentedTransportContract for TcpRoleTransport {
511 fn contract_profile() -> TransportContractProfile {
512 TransportContractProfile {
513 transport_name: "TcpRoleTransport",
514 tier: TransportContractTier::FirstPartyRuntime,
515 semantics: TransportSemanticContract {
516 role_addressed_routing: true,
517 per_peer_fifo_delivery: true,
518 fail_closed_unknown_role: true,
519 no_message_synthesis: true,
520 explicit_readiness_errors: true,
521 deterministic_for_regression: false,
522 },
523 operational: TransportOperationalContract {
524 transport_type: TransportType::Tcp,
525 startup_mode: TransportStartupMode::BackgroundWarmup,
526 environment_resolved: false,
527 },
528 notes: vec![
529 "Role-addressed runtime TCP transport used by the first-party topology helper.",
530 ],
531 }
532 }
533}
534
535#[cfg(not(target_arch = "wasm32"))]
536#[async_trait]
537impl Transport for TcpRoleTransport {
538 async fn send(&self, to_role: &RoleName, message: Vec<u8>) -> TransportResult<()> {
539 self.state.ensure_started().await?;
540 let endpoint = self
541 .peer_endpoints
542 .get(to_role)
543 .cloned()
544 .flatten()
545 .ok_or_else(|| {
546 TransportError::ConnectionFailed(format!(
547 "role {} has no remote endpoint configured for peer {}",
548 self.state.role, to_role
549 ))
550 })?;
551 let mut stream = connect_with_retry(&endpoint).await?;
552 let role_bytes = self.state.role.to_string().into_bytes();
553 stream.write_u32(role_bytes.len() as u32).await?;
554 stream.write_all(&role_bytes).await?;
555 stream.write_u32(message.len() as u32).await?;
556 stream.write_all(&message).await?;
557 stream.shutdown().await?;
558 Ok(())
559 }
560
561 async fn recv(&self, from_role: &RoleName) -> TransportResult<Vec<u8>> {
562 self.state.ensure_started().await?;
563 self.state.recv_from(from_role).await
564 }
565
566 fn is_connected(&self, role: &RoleName) -> bool {
567 self.peer_endpoints.contains_key(role)
568 }
569
570 async fn close(&self) -> TransportResult<()> {
571 Ok(())
572 }
573}
574
575#[cfg(not(target_arch = "wasm32"))]
576pub(crate) async fn create_peer_transport(
577 topology: &Topology,
578 topology_signature: &str,
579 role: &RoleName,
580 peer: &RoleName,
581) -> TransportResult<Box<dyn Transport>> {
582 topology
583 .region_for_role(role)
584 .map_err(TransportError::ConnectionFailed)?;
585 topology
586 .region_for_role(peer)
587 .map_err(TransportError::ConnectionFailed)?;
588 let state = shared_tcp_role_state(topology, topology_signature, role)?;
589 state.ensure_started().await?;
590 let peer_endpoint = match topology.get_location(peer) {
591 Ok(Location::Remote(endpoint)) => Some(endpoint),
592 Ok(Location::Local | Location::Colocated(_)) => None,
593 Err(_) => return Err(TransportError::UnknownRole(peer.clone())),
594 };
595 Ok(Box::new(TcpPeerTransport {
596 state,
597 peer_role: peer.clone(),
598 peer_endpoint,
599 }))
600}
601
602pub struct TransportFactory;
604
605impl TransportFactory {
606 fn validated_first_party_profile(
607 profile: TransportContractProfile,
608 ) -> TransportResult<TransportContractProfile> {
609 validate_transport_contract_profile(&profile)
610 .map_err(|err| TransportError::ConnectionFailed(err.to_string()))?;
611 Ok(profile)
612 }
613
614 pub fn contract_profile_for_topology(
616 topology: &Topology,
617 role: &RoleName,
618 ) -> TransportResult<TransportContractProfile> {
619 let has_remote_participants = topology
620 .locations
621 .values()
622 .any(|location| matches!(location, Location::Remote(_)));
623 if has_remote_participants {
624 #[cfg(target_arch = "wasm32")]
625 {
626 let _ = (topology, role);
627 Err(TransportError::NotReady)
628 }
629 #[cfg(not(target_arch = "wasm32"))]
630 {
631 topology
632 .region_for_role(role)
633 .map_err(TransportError::ConnectionFailed)?;
634 Self::validated_first_party_profile(TcpRoleTransport::contract_profile())
635 }
636 } else {
637 Self::validated_first_party_profile(InMemoryChannelTransport::contract_profile())
638 }
639 }
640
641 pub fn create(topology: &Topology, role: &RoleName) -> TransportResult<Box<dyn Transport>> {
643 let _profile = Self::contract_profile_for_topology(topology, role)?;
644 let has_remote_participants = topology
645 .locations
646 .values()
647 .any(|location| matches!(location, Location::Remote(_)));
648 if has_remote_participants {
649 #[cfg(target_arch = "wasm32")]
650 {
651 let _ = role;
652 Err(TransportError::NotReady)
653 }
654 #[cfg(not(target_arch = "wasm32"))]
655 {
656 topology
657 .region_for_role(role)
658 .map_err(TransportError::ConnectionFailed)?;
659 let state = shared_tcp_role_state(topology, "transport_factory", role)?;
660 let warm_state = Arc::clone(&state);
661 spawn(async move {
662 let _ = warm_state.ensure_started().await;
663 });
664 let peer_endpoints = topology
665 .locations
666 .iter()
667 .filter(|(peer, _)| *peer != role)
668 .map(|(peer, location)| {
669 let _ = topology
670 .region_for_role(peer)
671 .map_err(TransportError::ConnectionFailed)?;
672 let endpoint = match location {
673 Location::Remote(endpoint) => Some(endpoint.clone()),
674 Location::Local | Location::Colocated(_) => None,
675 };
676 Ok((peer.clone(), endpoint))
677 })
678 .collect::<TransportResult<BTreeMap<_, _>>>()?;
679 Ok(Box::new(TcpRoleTransport {
680 state,
681 peer_endpoints,
682 }))
683 }
684 } else {
685 Ok(Box::new(InMemoryChannelTransport::new(role.clone())))
686 }
687 }
688
689 pub fn transport_for_location(
691 _from_role: &RoleName,
692 to_role: &RoleName,
693 topology: &Topology,
694 ) -> Result<TransportType, super::TopologyError> {
695 match topology.get_location(to_role)? {
696 Location::Local => Ok(TransportType::InMemory),
697 Location::Colocated(_) => Ok(TransportType::SharedMemory),
698 Location::Remote(_) => Ok(TransportType::Tcp),
699 }
700 }
701}
702
703#[derive(Debug, Clone, Copy, PartialEq, Eq)]
705pub enum TransportType {
706 InMemory,
708 SharedMemory,
710 Tcp,
712 WebSocket,
714}
715
716impl TransportType {
717 pub fn is_local(&self) -> bool {
719 matches!(self, TransportType::InMemory | TransportType::SharedMemory)
720 }
721}
722
723#[cfg(all(test, not(target_arch = "wasm32")))]
724mod tests {
725 use super::*;
726
727 #[tokio::test]
728 async fn test_in_memory_transport() {
729 let alice = InMemoryChannelTransport::new(RoleName::from_static("Alice"));
730 let bob = InMemoryChannelTransport::new(RoleName::from_static("Bob"));
731
732 alice.connect(&bob).await;
733
734 alice
736 .send(&RoleName::from_static("Bob"), b"Hello Bob".to_vec())
737 .await
738 .unwrap();
739
740 let msg = bob.recv(&RoleName::from_static("Alice")).await.unwrap();
742 assert_eq!(msg, b"Hello Bob".to_vec());
743
744 bob.send(&RoleName::from_static("Alice"), b"Hello Alice".to_vec())
746 .await
747 .unwrap();
748
749 let msg = alice.recv(&RoleName::from_static("Bob")).await.unwrap();
751 assert_eq!(msg, b"Hello Alice".to_vec());
752 }
753
754 #[test]
755 fn test_transport_type_for_location() {
756 let topology = Topology::builder()
757 .local_role(RoleName::from_static("Alice"))
758 .remote_role(
759 RoleName::from_static("Bob"),
760 crate::identifiers::Endpoint::new("localhost:8080").unwrap(),
761 )
762 .colocated_role(
763 RoleName::from_static("Carol"),
764 RoleName::from_static("Alice"),
765 )
766 .build();
767
768 assert_eq!(
769 TransportFactory::transport_for_location(
770 &RoleName::from_static("Alice"),
771 &RoleName::from_static("Alice"),
772 &topology
773 )
774 .unwrap(),
775 TransportType::InMemory
776 );
777 assert_eq!(
778 TransportFactory::transport_for_location(
779 &RoleName::from_static("Alice"),
780 &RoleName::from_static("Bob"),
781 &topology
782 )
783 .unwrap(),
784 TransportType::Tcp
785 );
786 assert_eq!(
787 TransportFactory::transport_for_location(
788 &RoleName::from_static("Alice"),
789 &RoleName::from_static("Carol"),
790 &topology
791 )
792 .unwrap(),
793 TransportType::SharedMemory
794 );
795 }
796
797 #[test]
798 fn test_transport_type_is_local() {
799 assert!(TransportType::InMemory.is_local());
800 assert!(TransportType::SharedMemory.is_local());
801 assert!(!TransportType::Tcp.is_local());
802 assert!(!TransportType::WebSocket.is_local());
803 }
804
805 #[tokio::test]
806 async fn test_transport_factory_create_supports_loopback_remote_topologies() {
807 let local_topology = Topology::builder()
808 .local_role(RoleName::from_static("Alice"))
809 .local_role(RoleName::from_static("Bob"))
810 .build();
811 assert!(TransportFactory::create(&local_topology, &RoleName::from_static("Alice")).is_ok());
812
813 let remote_topology = Topology::builder()
814 .remote_role(
815 RoleName::from_static("Alice"),
816 crate::identifiers::Endpoint::new("127.0.0.1:19801").unwrap(),
817 )
818 .remote_role(
819 RoleName::from_static("Bob"),
820 crate::identifiers::Endpoint::new("127.0.0.1:19802").unwrap(),
821 )
822 .build();
823 let alice = TransportFactory::create(&remote_topology, &RoleName::from_static("Alice"))
824 .expect("remote transport for Alice");
825 let bob = TransportFactory::create(&remote_topology, &RoleName::from_static("Bob"))
826 .expect("remote transport for Bob");
827 alice
828 .send(&RoleName::from_static("Bob"), b"hello remote".to_vec())
829 .await
830 .expect("remote send");
831 assert_eq!(
832 bob.recv(&RoleName::from_static("Alice"))
833 .await
834 .expect("remote recv"),
835 b"hello remote".to_vec()
836 );
837 }
838}