moonpool_transport/rpc/net_transport.rs
1//! NetTransport: Central transport coordinator (FDB pattern).
2//!
3//! Manages peer connections and dispatches incoming packets to endpoints.
4//! Provides synchronous send API (FDB pattern: never await on send).
5//!
6//! # FDB Reference
7//! From NetTransport.actor.cpp:300-600, NetTransport.h:195-314
8//!
9//! # Usage
10//!
11//! Use [`NetTransportBuilder`] to create a properly configured transport:
12//!
13//! ```rust,ignore
14//! // For servers and clients that need RPC responses:
15//! let transport = NetTransportBuilder::new(network, time, task)
16//! .local_address(addr)
17//! .build_listening()
18//! .await?;
19//!
20//! // For fire-and-forget senders only (no listening):
21//! let transport = NetTransportBuilder::new(network, time, task)
22//! .local_address(addr)
23//! .build();
24//! ```
25//!
26//! The builder automatically handles `Rc` wrapping and `set_weak_self()`,
27//! eliminating the most common footgun in NetTransport usage.
28
29use std::cell::RefCell;
30use std::collections::HashMap;
31use std::rc::{Rc, Weak};
32
33use crate::{
34 Endpoint, NetworkAddress, NetworkProvider, Peer, PeerConfig, TaskProvider, TcpListenerTrait,
35 TimeProvider, UID, WellKnownToken,
36};
37use moonpool_sim::sometimes_assert;
38use tokio::sync::watch;
39
40use super::endpoint_map::{EndpointMap, MessageReceiver};
41use super::request_stream::RequestStream;
42use crate::MessageCodec;
43use crate::error::MessagingError;
44use serde::de::DeserializeOwned;
45
46/// Type alias for shared peer reference.
47type SharedPeer<N, T, TP> = Rc<RefCell<Peer<N, T, TP>>>;
48
49/// Internal transport data (FDB TransportData equivalent).
50///
51/// Separates internal mutable state from public API, matching FDB's pattern.
52/// See NetTransport.actor.cpp:300-350 for the original TransportData struct.
53///
54/// # FDB Reference
55/// ```cpp
56/// struct TransportData {
57/// std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
58/// std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
59/// // ... endpoints, stats, etc.
60/// };
61/// ```
62struct TransportData<N, T, TP>
63where
64 N: NetworkProvider + 'static,
65 T: TimeProvider + 'static,
66 TP: TaskProvider + 'static,
67{
68 /// Endpoint map for routing incoming packets.
69 endpoints: EndpointMap,
70
71 /// Peer connections keyed by destination address (outgoing).
72 /// FDB: std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
73 peers: HashMap<String, SharedPeer<N, T, TP>>,
74
75 /// Incoming peer connections (from accepted connections).
76 /// Separate from outgoing peers to avoid conflicts.
77 incoming_peers: HashMap<String, SharedPeer<N, T, TP>>,
78
79 /// Statistics.
80 stats: TransportStats,
81}
82
83impl<N, T, TP> Default for TransportData<N, T, TP>
84where
85 N: NetworkProvider + 'static,
86 T: TimeProvider + 'static,
87 TP: TaskProvider + 'static,
88{
89 fn default() -> Self {
90 Self {
91 endpoints: EndpointMap::new(),
92 peers: HashMap::new(),
93 incoming_peers: HashMap::new(),
94 stats: TransportStats::default(),
95 }
96 }
97}
98
99/// Central transport coordinator (FDB NetTransport equivalent).
100///
101/// # Design
102///
103/// - Manages peer connections lazily (created on first send)
104/// - Routes incoming packets to registered endpoints
105/// - Synchronous send API - queues immediately, returns (FDB pattern)
106/// - Explicit `Rc<NetTransport>` passing (testable, simulation-friendly)
107/// - Internal state in `TransportData` (matches FDB: `TransportData* self`)
108///
109/// # FDB Reference
110/// From NetTransport.h:195-314
111///
112/// # Multi-Node Support (Phase 12 Step 7d)
113///
114/// For multi-node operation, wrap in `Rc` and call `set_weak_self()`:
115/// ```ignore
116/// let transport = Rc::new(NetTransport::new(...));
117/// transport.set_weak_self(Rc::downgrade(&transport));
118/// transport.listen().await?; // Start accepting connections
119/// ```
120pub struct NetTransport<N, T, TP>
121where
122 N: NetworkProvider + 'static,
123 T: TimeProvider + 'static,
124 TP: TaskProvider + 'static,
125{
126 /// Internal transport data (FDB: TransportData* self).
127 data: RefCell<TransportData<N, T, TP>>,
128
129 /// Local address for this transport.
130 local_address: NetworkAddress,
131
132 /// Network provider for creating connections.
133 network: N,
134
135 /// Time provider for delays and timing.
136 time: T,
137
138 /// Task provider for spawning background tasks.
139 task_provider: TP,
140
141 /// Peer configuration.
142 peer_config: PeerConfig,
143
144 /// Weak self-reference for spawning background tasks.
145 /// Required for `connection_reader` tasks to dispatch back to this transport.
146 /// Set via `set_weak_self()` after wrapping in `Rc`.
147 weak_self: RefCell<Option<Weak<Self>>>,
148
149 /// Shutdown signal sender. When set to `true`, background tasks (listen_task) should exit.
150 /// Uses watch channel so multiple receivers can observe the same signal.
151 shutdown_tx: watch::Sender<bool>,
152}
153
154/// Statistics for the transport.
155#[derive(Debug, Default)]
156struct TransportStats {
157 /// Number of packets sent.
158 packets_sent: u64,
159 /// Number of packets dispatched to endpoints.
160 packets_dispatched: u64,
161 /// Number of packets that couldn't be delivered (endpoint not found).
162 packets_undelivered: u64,
163 /// Number of peers created.
164 peers_created: u64,
165}
166
167impl<N, T, TP> NetTransport<N, T, TP>
168where
169 N: NetworkProvider + Clone + 'static,
170 T: TimeProvider + Clone + 'static,
171 TP: TaskProvider + Clone + 'static,
172{
173 /// Create a new NetTransport.
174 ///
175 /// # Arguments
176 ///
177 /// * `local_address` - Address of this transport (for local delivery checks)
178 /// * `network` - Network provider for connections
179 /// * `time` - Time provider for timing
180 /// * `task_provider` - Task provider for background tasks
181 ///
182 /// # Multi-Node Usage
183 ///
184 /// For multi-node operation, wrap in `Rc` and call `set_weak_self()`:
185 /// ```ignore
186 /// let transport = Rc::new(NetTransport::new(...));
187 /// transport.set_weak_self(Rc::downgrade(&transport));
188 /// ```
189 pub fn new(local_address: NetworkAddress, network: N, time: T, task_provider: TP) -> Self {
190 let (shutdown_tx, _) = watch::channel(false);
191 Self {
192 data: RefCell::new(TransportData::default()),
193 local_address,
194 network,
195 time,
196 task_provider,
197 peer_config: PeerConfig::default(),
198 weak_self: RefCell::new(None),
199 shutdown_tx,
200 }
201 }
202
203 /// Set the weak self-reference for background task spawning.
204 ///
205 /// Required for multi-node operation where `connection_reader` tasks need
206 /// to dispatch incoming packets back to this transport.
207 ///
208 /// # FDB Pattern
209 /// Similar to how FDB actors reference `TransportData* self`.
210 ///
211 /// # Example
212 /// ```ignore
213 /// let transport = Rc::new(NetTransport::new(...));
214 /// transport.set_weak_self(Rc::downgrade(&transport));
215 /// ```
216 pub fn set_weak_self(&self, weak: Weak<Self>) {
217 *self.weak_self.borrow_mut() = Some(weak);
218 }
219
220 /// Get weak self-reference, panics if not set.
221 fn weak_self(&self) -> Weak<Self> {
222 self.weak_self
223 .borrow()
224 .clone()
225 .expect("weak_self not set - call set_weak_self() after wrapping in Rc")
226 }
227
228 /// Create with custom peer configuration.
229 pub fn with_peer_config(mut self, config: PeerConfig) -> Self {
230 self.peer_config = config;
231 self
232 }
233
234 /// Get the local address.
235 pub fn local_address(&self) -> &NetworkAddress {
236 &self.local_address
237 }
238
239 /// Register a well-known endpoint.
240 ///
241 /// Well-known endpoints have deterministic tokens for O(1) lookup.
242 pub fn register_well_known(
243 &self,
244 token: WellKnownToken,
245 receiver: Rc<dyn MessageReceiver>,
246 ) -> Result<(), MessagingError> {
247 self.data
248 .borrow_mut()
249 .endpoints
250 .insert_well_known(token, receiver)
251 }
252
253 /// Register a dynamic endpoint with the given UID.
254 ///
255 /// Returns the endpoint that senders should use to address this receiver.
256 pub fn register(&self, token: UID, receiver: Rc<dyn MessageReceiver>) -> Endpoint {
257 self.data.borrow_mut().endpoints.insert(token, receiver);
258 Endpoint::new(self.local_address.clone(), token)
259 }
260
261 /// Unregister a dynamic endpoint.
262 pub fn unregister(&self, token: &UID) -> Option<Rc<dyn MessageReceiver>> {
263 self.data.borrow_mut().endpoints.remove(token)
264 }
265
266 /// Register a typed request handler in a single step.
267 ///
268 /// This is the preferred method for registering RPC handlers. It combines:
269 /// - Creating an endpoint from the local address and token
270 /// - Creating a `RequestStream` for the request type
271 /// - Registering the stream's queue with the transport
272 ///
273 /// # Example
274 ///
275 /// ```rust,ignore
276 /// // Before (verbose):
277 /// let endpoint = Endpoint::new(local_addr.clone(), ping_token());
278 /// let stream: RequestStream<PingRequest, JsonCodec> =
279 /// RequestStream::new(endpoint, JsonCodec);
280 /// transport.register(ping_token(), stream.queue() as Rc<dyn MessageReceiver>);
281 ///
282 /// // After (single call):
283 /// let stream = transport.register_handler::<PingRequest>(ping_token(), JsonCodec);
284 /// ```
285 ///
286 /// # Arguments
287 ///
288 /// * `token` - Unique identifier for this handler
289 /// * `codec` - Codec for serializing/deserializing messages
290 ///
291 /// # Type Parameters
292 ///
293 /// * `Req` - The request type this handler will receive
294 /// * `C` - The codec type (e.g., `JsonCodec`)
295 pub fn register_handler<Req, C>(&self, token: UID, codec: C) -> RequestStream<Req, C>
296 where
297 Req: DeserializeOwned + 'static,
298 C: MessageCodec,
299 {
300 let endpoint = Endpoint::new(self.local_address.clone(), token);
301 let stream = RequestStream::new(endpoint, codec);
302 self.data
303 .borrow_mut()
304 .endpoints
305 .insert(token, stream.queue() as Rc<dyn MessageReceiver>);
306 stream
307 }
308
309 /// Register a handler for a multi-method interface.
310 ///
311 /// Use this when an interface has multiple methods (like a Calculator with
312 /// add, subtract, multiply, divide). Each method gets its own handler.
313 ///
314 /// The token is computed deterministically from `interface_id` and `method_index`,
315 /// making it easy to create matching client endpoints.
316 ///
317 /// # Example
318 ///
319 /// ```rust,ignore
320 /// // Calculator interface with multiple methods
321 /// const CALC_INTERFACE: u64 = 0xCA1C;
322 /// const METHOD_ADD: u64 = 0;
323 /// const METHOD_SUB: u64 = 1;
324 /// const METHOD_MUL: u64 = 2;
325 /// const METHOD_DIV: u64 = 3;
326 ///
327 /// let add_stream = transport.register_handler_at::<AddRequest>(
328 /// CALC_INTERFACE, METHOD_ADD, JsonCodec
329 /// );
330 /// let sub_stream = transport.register_handler_at::<SubRequest>(
331 /// CALC_INTERFACE, METHOD_SUB, JsonCodec
332 /// );
333 ///
334 /// // Handle requests with tokio::select!
335 /// loop {
336 /// tokio::select! {
337 /// Some((req, reply)) = add_stream.recv_with_transport(&transport) => {
338 /// reply.send(AddResponse { result: req.a + req.b });
339 /// }
340 /// Some((req, reply)) = sub_stream.recv_with_transport(&transport) => {
341 /// reply.send(SubResponse { result: req.a - req.b });
342 /// }
343 /// }
344 /// }
345 /// ```
346 ///
347 /// # Arguments
348 ///
349 /// * `interface_id` - Unique identifier for the interface
350 /// * `method_index` - Index of the method within the interface (0, 1, 2, ...)
351 /// * `codec` - Codec for serializing/deserializing messages
352 ///
353 /// # Returns
354 ///
355 /// A tuple containing:
356 /// - The `RequestStream` for receiving requests
357 /// - The token (`UID`) that clients should use to send requests to this handler
358 pub fn register_handler_at<Req, C>(
359 &self,
360 interface_id: u64,
361 method_index: u64,
362 codec: C,
363 ) -> (RequestStream<Req, C>, UID)
364 where
365 Req: DeserializeOwned + 'static,
366 C: MessageCodec,
367 {
368 let token = UID::new(interface_id, method_index);
369 let stream = self.register_handler(token, codec);
370 (stream, token)
371 }
372
373 /// Send packet unreliably (best-effort, dropped on failure).
374 ///
375 /// This is a synchronous operation - it queues the packet and returns immediately.
376 /// The packet may be dropped if the connection fails (FDB pattern).
377 ///
378 /// # Arguments
379 ///
380 /// * `endpoint` - Destination endpoint
381 /// * `payload` - Message bytes (already serialized)
382 pub fn send_unreliable(
383 &self,
384 endpoint: &Endpoint,
385 payload: &[u8],
386 ) -> Result<(), MessagingError> {
387 // Check for local delivery
388 if self.is_local_address(&endpoint.address) {
389 sometimes_assert!(
390 local_delivery_path,
391 true,
392 "Unreliable send uses local delivery path"
393 );
394 return self.deliver_local(&endpoint.token, payload);
395 }
396
397 // Get or create peer for remote address
398 sometimes_assert!(
399 remote_peer_path,
400 true,
401 "Unreliable send uses remote peer path"
402 );
403 let peer = self.get_or_open_peer(&endpoint.address);
404 peer.borrow_mut()
405 .send_unreliable(endpoint.token, payload)
406 .map_err(|e| MessagingError::PeerError {
407 message: e.to_string(),
408 })?;
409
410 self.data.borrow_mut().stats.packets_sent += 1;
411 Ok(())
412 }
413
414 /// Send packet reliably (queued, will retry on reconnect).
415 ///
416 /// This is a synchronous operation - it queues the packet and returns immediately.
417 /// The packet will be retried on connection failure (FDB pattern).
418 ///
419 /// # Arguments
420 ///
421 /// * `endpoint` - Destination endpoint
422 /// * `payload` - Message bytes (already serialized)
423 pub fn send_reliable(&self, endpoint: &Endpoint, payload: &[u8]) -> Result<(), MessagingError> {
424 // Check for local delivery
425 if self.is_local_address(&endpoint.address) {
426 return self.deliver_local(&endpoint.token, payload);
427 }
428
429 // Get or create peer for remote address
430 let peer = self.get_or_open_peer(&endpoint.address);
431 peer.borrow_mut()
432 .send_reliable(endpoint.token, payload)
433 .map_err(|e| MessagingError::PeerError {
434 message: e.to_string(),
435 })?;
436
437 self.data.borrow_mut().stats.packets_sent += 1;
438 Ok(())
439 }
440
441 /// Check if address is local (same as this transport).
442 fn is_local_address(&self, address: &NetworkAddress) -> bool {
443 self.local_address == *address
444 }
445
446 /// Deliver packet locally (same process, no network).
447 fn deliver_local(&self, token: &UID, payload: &[u8]) -> Result<(), MessagingError> {
448 let data = self.data.borrow();
449 if let Some(receiver) = data.endpoints.get(token) {
450 receiver.receive(payload);
451 drop(data); // Release borrow before mutating stats
452 self.data.borrow_mut().stats.packets_dispatched += 1;
453 sometimes_assert!(
454 endpoint_found_local,
455 true,
456 "Local delivery found registered endpoint"
457 );
458 Ok(())
459 } else {
460 drop(data); // Release borrow before mutating stats
461 self.data.borrow_mut().stats.packets_undelivered += 1;
462 sometimes_assert!(
463 endpoint_not_found_local,
464 true,
465 "Local delivery to unregistered endpoint"
466 );
467 Err(MessagingError::EndpointNotFound { token: *token })
468 }
469 }
470
471 /// Get or create a peer for the given address.
472 ///
473 /// Peers are created lazily on first send (FDB connectionKeeper pattern).
474 /// When a new peer is created, a `connection_reader` task is spawned to
475 /// handle incoming packets (FDB: connectionKeeper spawns connectionReader at line 843).
476 ///
477 /// # FDB Reference
478 /// `Reference<struct Peer> getOrOpenPeer(NetworkAddress const& address);`
479 fn get_or_open_peer(&self, address: &NetworkAddress) -> SharedPeer<N, T, TP> {
480 let addr_str = address.to_string();
481
482 // Check if peer already exists
483 if let Some(peer) = self.data.borrow().peers.get(&addr_str) {
484 sometimes_assert!(peer_reused, true, "Existing peer reused for address");
485 return Rc::clone(peer);
486 }
487
488 // Create new peer
489 let peer = Peer::new(
490 self.network.clone(),
491 self.time.clone(),
492 self.task_provider.clone(),
493 addr_str.clone(),
494 self.peer_config.clone(),
495 );
496 let peer = Rc::new(RefCell::new(peer));
497
498 // Store in peers map
499 {
500 let mut data = self.data.borrow_mut();
501 data.peers.insert(addr_str.clone(), Rc::clone(&peer));
502 data.stats.peers_created += 1;
503 }
504
505 // Spawn connection_reader for incoming packets (FDB pattern: connectionKeeper spawns connectionReader)
506 // This handles responses for outgoing requests
507 self.spawn_connection_reader(Rc::clone(&peer), addr_str);
508
509 sometimes_assert!(peer_created, true, "New peer created for address");
510 peer
511 }
512
513 /// Dispatch an incoming packet to the appropriate endpoint.
514 ///
515 /// Called by the transport loop when a packet is received.
516 ///
517 /// # Returns
518 ///
519 /// Ok(()) if delivered, Err if endpoint not found.
520 pub fn dispatch(&self, token: &UID, payload: &[u8]) -> Result<(), MessagingError> {
521 let data = self.data.borrow();
522 if let Some(receiver) = data.endpoints.get(token) {
523 receiver.receive(payload);
524 drop(data); // Release borrow before mutating stats
525 self.data.borrow_mut().stats.packets_dispatched += 1;
526 sometimes_assert!(dispatch_success, true, "Message dispatched to endpoint");
527 Ok(())
528 } else {
529 drop(data); // Release borrow before mutating stats
530 self.data.borrow_mut().stats.packets_undelivered += 1;
531 sometimes_assert!(
532 dispatch_undelivered,
533 true,
534 "Dispatch to unregistered endpoint"
535 );
536 Err(MessagingError::EndpointNotFound { token: *token })
537 }
538 }
539
540 /// Get statistics.
541 pub fn packets_sent(&self) -> u64 {
542 self.data.borrow().stats.packets_sent
543 }
544
545 /// Get the number of packets dispatched to endpoints.
546 pub fn packets_dispatched(&self) -> u64 {
547 self.data.borrow().stats.packets_dispatched
548 }
549
550 /// Get the number of packets that couldn't be delivered.
551 pub fn packets_undelivered(&self) -> u64 {
552 self.data.borrow().stats.packets_undelivered
553 }
554
555 /// Get the number of peers created.
556 pub fn peers_created(&self) -> u64 {
557 self.data.borrow().stats.peers_created
558 }
559
560 /// Get number of active peers.
561 pub fn peer_count(&self) -> usize {
562 self.data.borrow().peers.len()
563 }
564
565 /// Get number of registered endpoints.
566 pub fn endpoint_count(&self) -> usize {
567 let data = self.data.borrow();
568 data.endpoints.well_known_count() + data.endpoints.dynamic_count()
569 }
570
571 /// Get number of incoming peers (from accepted connections).
572 pub fn incoming_peer_count(&self) -> usize {
573 self.data.borrow().incoming_peers.len()
574 }
575
576 /// Spawn a connection_reader for a peer.
577 ///
578 /// FDB Pattern: connectionKeeper spawns connectionReader (line 843).
579 /// The connection_reader reads from the peer and dispatches to endpoints.
580 ///
581 /// # Arguments
582 ///
583 /// * `peer` - The peer to read from
584 /// * `peer_addr` - Address string for logging
585 fn spawn_connection_reader(&self, peer: SharedPeer<N, T, TP>, peer_addr: String) {
586 // Only spawn if weak_self is set (multi-node mode)
587 if self.weak_self.borrow().is_none() {
588 return;
589 }
590
591 let transport_weak = self.weak_self();
592 self.task_provider.spawn_task(
593 "connection_reader",
594 connection_reader(transport_weak, peer, peer_addr),
595 );
596 }
597
598 // =========================================================================
599 // Server Listener Support (FDB: listen + connectionIncoming)
600 // =========================================================================
601
602 /// Start listening for incoming connections.
603 ///
604 /// FDB Pattern: `listen()` (NetTransport.actor.cpp:1646-1676)
605 /// Binds to the local address and spawns an accept loop that handles
606 /// incoming connections via `connection_incoming()`.
607 ///
608 /// # Requirements
609 ///
610 /// Must call `set_weak_self()` before calling this method.
611 ///
612 /// # Example
613 ///
614 /// ```ignore
615 /// let transport = Rc::new(NetTransport::new(...));
616 /// transport.set_weak_self(Rc::downgrade(&transport));
617 /// transport.listen().await?;
618 /// ```
619 pub async fn listen(&self) -> Result<(), MessagingError> {
620 // Verify weak_self is set
621 if self.weak_self.borrow().is_none() {
622 return Err(MessagingError::InvalidState {
623 message: "weak_self not set - call set_weak_self() before listen()".to_string(),
624 });
625 }
626
627 // Bind to local address
628 let addr_str = self.local_address.to_string();
629 let listener =
630 self.network
631 .bind(&addr_str)
632 .await
633 .map_err(|e| MessagingError::NetworkError {
634 message: format!("Failed to bind to {}: {}", addr_str, e),
635 })?;
636
637 tracing::info!("NetTransport: listening on {}", addr_str);
638
639 // Spawn listen task (FDB: listen() actor)
640 // Pass shutdown receiver so the task can exit when transport is dropped
641 let transport_weak = self.weak_self();
642 let shutdown_rx = self.shutdown_tx.subscribe();
643 self.task_provider.spawn_task(
644 "listen",
645 listen_task(transport_weak, listener, addr_str, shutdown_rx),
646 );
647
648 Ok(())
649 }
650}
651
652/// Implement Drop to signal shutdown to background tasks.
653///
654/// When NetTransport is dropped, we signal all background tasks (listen_task)
655/// to exit gracefully. This prevents tasks from being stuck on accept() forever.
656impl<N, T, TP> Drop for NetTransport<N, T, TP>
657where
658 N: NetworkProvider + 'static,
659 T: TimeProvider + 'static,
660 TP: TaskProvider + 'static,
661{
662 fn drop(&mut self) {
663 tracing::debug!("NetTransport: signaling shutdown to background tasks");
664 // Signal shutdown - ignore error if no receivers
665 let _ = self.shutdown_tx.send(true);
666 }
667}
668
669// =============================================================================
670// NetTransportBuilder
671// =============================================================================
672
673/// Builder for NetTransport that eliminates common footguns.
674///
675/// The manual `Rc` wrapping and `set_weak_self()` pattern is error-prone:
676/// forgetting `set_weak_self()` causes a runtime panic. This builder handles
677/// both automatically.
678///
679/// # Examples
680///
681/// ```rust,ignore
682/// // Standard usage - server or client that needs RPC responses:
683/// let transport = NetTransportBuilder::new(network, time, task)
684/// .local_address(addr)
685/// .build_listening()
686/// .await?;
687///
688/// // Fire-and-forget sender (no listening needed):
689/// let transport = NetTransportBuilder::new(network, time, task)
690/// .local_address(addr)
691/// .build();
692///
693/// // With custom peer config:
694/// let transport = NetTransportBuilder::new(network, time, task)
695/// .local_address(addr)
696/// .peer_config(config)
697/// .build_listening()
698/// .await?;
699/// ```
700///
701/// # Why Build vs Build Listening?
702///
703/// - **`build_listening()`**: For most RPC use cases. Both servers AND clients
704/// need this because responses are sent to the client's listening address.
705///
706/// - **`build()`**: For fire-and-forget messaging where you don't expect responses.
707/// Also useful for testing where you control message flow manually.
708pub struct NetTransportBuilder<N, T, TP>
709where
710 N: NetworkProvider + Clone + 'static,
711 T: TimeProvider + Clone + 'static,
712 TP: TaskProvider + Clone + 'static,
713{
714 network: N,
715 time: T,
716 task_provider: TP,
717 local_address: Option<NetworkAddress>,
718 peer_config: Option<PeerConfig>,
719}
720
721impl<N, T, TP> NetTransportBuilder<N, T, TP>
722where
723 N: NetworkProvider + Clone + 'static,
724 T: TimeProvider + Clone + 'static,
725 TP: TaskProvider + Clone + 'static,
726{
727 /// Create a new builder with the required providers.
728 ///
729 /// # Arguments
730 ///
731 /// * `network` - Network provider for TCP connections
732 /// * `time` - Time provider for timing operations
733 /// * `task_provider` - Task provider for spawning background tasks
734 pub fn new(network: N, time: T, task_provider: TP) -> Self {
735 Self {
736 network,
737 time,
738 task_provider,
739 local_address: None,
740 peer_config: None,
741 }
742 }
743
744 /// Set the local address for this transport.
745 ///
746 /// This is required before calling `build()` or `build_listening()`.
747 ///
748 /// # Arguments
749 ///
750 /// * `address` - The network address to bind to
751 pub fn local_address(mut self, address: NetworkAddress) -> Self {
752 self.local_address = Some(address);
753 self
754 }
755
756 /// Set custom peer configuration.
757 ///
758 /// If not set, uses `PeerConfig::default()`.
759 pub fn peer_config(mut self, config: PeerConfig) -> Self {
760 self.peer_config = Some(config);
761 self
762 }
763
764 /// Build the transport without starting the listener.
765 ///
766 /// Returns `Rc<NetTransport>` with `set_weak_self()` already called.
767 /// Use this for fire-and-forget messaging or testing.
768 ///
769 /// For RPC (request/response), use `build_listening()` instead.
770 ///
771 /// # Panics
772 ///
773 /// Panics if `local_address()` was not called.
774 pub fn build(self) -> Rc<NetTransport<N, T, TP>> {
775 let address = self
776 .local_address
777 .expect("local_address is required - call .local_address(addr) before .build()");
778
779 let mut transport = NetTransport::new(address, self.network, self.time, self.task_provider);
780
781 if let Some(config) = self.peer_config {
782 transport = transport.with_peer_config(config);
783 }
784
785 let transport = Rc::new(transport);
786 transport.set_weak_self(Rc::downgrade(&transport));
787 transport
788 }
789
790 /// Build the transport and start listening for incoming connections.
791 ///
792 /// Returns `Rc<NetTransport>` with `set_weak_self()` already called
793 /// and the listener started.
794 ///
795 /// Use this for typical RPC usage where you need to receive responses.
796 /// Both servers AND clients need this in a request/response pattern.
797 ///
798 /// # Errors
799 ///
800 /// Returns an error if binding to the local address fails.
801 ///
802 /// # Panics
803 ///
804 /// Panics if `local_address()` was not called.
805 pub async fn build_listening(self) -> Result<Rc<NetTransport<N, T, TP>>, MessagingError> {
806 let transport = self.build();
807 transport.listen().await?;
808 Ok(transport)
809 }
810}
811
812/// FDB: connectionReader() - reads from connection and dispatches to endpoints.
813///
814/// This is a background task that:
815/// 1. Takes ownership of the peer's receiver channel at startup
816/// 2. Reads incoming packets from the channel (no RefCell borrow held during await)
817/// 3. Dispatches them to the appropriate endpoint via `transport.dispatch()`
818///
819/// # FDB Reference
820/// From NetTransport.actor.cpp:1401-1602 connectionReader
821///
822/// The FDB version is more complex (handles ConnectPacket, protocol negotiation),
823/// but the core loop is: read packets → scanPackets → deliver.
824async fn connection_reader<N, T, TP>(
825 transport: Weak<NetTransport<N, T, TP>>,
826 peer: SharedPeer<N, T, TP>,
827 peer_addr: String,
828) where
829 N: NetworkProvider + Clone + 'static,
830 T: TimeProvider + Clone + 'static,
831 TP: TaskProvider + Clone + 'static,
832{
833 tracing::debug!("connection_reader: started for peer {}", peer_addr);
834
835 // Take ownership of the receiver at startup.
836 // This avoids holding RefCell borrows across await points (critical safety fix).
837 let mut receiver = {
838 match peer.borrow_mut().take_receiver() {
839 Some(rx) => rx,
840 None => {
841 tracing::error!(
842 "connection_reader: receiver already taken for peer {}",
843 peer_addr
844 );
845 return;
846 }
847 }
848 }; // RefCell borrow released here
849
850 loop {
851 // Await directly on the owned receiver - safe, no RefCell involved!
852 match receiver.recv().await {
853 Some((token, payload)) => {
854 // Try to get transport reference
855 let Some(transport) = transport.upgrade() else {
856 tracing::debug!(
857 "connection_reader: transport dropped, exiting for peer {}",
858 peer_addr
859 );
860 break;
861 };
862
863 // FDB: deliver() - looks up endpoint and dispatches
864 if let Err(e) = transport.dispatch(&token, &payload) {
865 tracing::debug!(
866 "connection_reader: dispatch failed for token {}: {:?}",
867 token,
868 e
869 );
870 }
871
872 sometimes_assert!(
873 connection_reader_dispatch,
874 true,
875 "connectionReader dispatched incoming message"
876 );
877 }
878 None => {
879 // Channel closed - peer disconnected or shutdown
880 tracing::debug!("connection_reader: peer {} receiver closed", peer_addr);
881 break;
882 }
883 }
884 }
885
886 tracing::debug!("connection_reader: exiting for peer {}", peer_addr);
887}
888
889/// FDB: listen() - accept loop spawning connectionIncoming per connection.
890///
891/// This is a background task that:
892/// 1. Accepts incoming connections
893/// 2. Spawns `connection_incoming` for each accepted connection
894/// 3. Exits gracefully when shutdown signal is received
895///
896/// # FDB Reference
897/// From NetTransport.actor.cpp:1646-1676 listen
898async fn listen_task<N, T, TP>(
899 transport: Weak<NetTransport<N, T, TP>>,
900 listener: N::TcpListener,
901 listen_addr: String,
902 mut shutdown_rx: watch::Receiver<bool>,
903) where
904 N: NetworkProvider + Clone + 'static,
905 T: TimeProvider + Clone + 'static,
906 TP: TaskProvider + Clone + 'static,
907{
908 tracing::debug!("listen_task: started on {}", listen_addr);
909
910 loop {
911 // Use select! to race between accept and shutdown signal
912 tokio::select! {
913 // Bias toward shutdown to ensure timely exit
914 biased;
915
916 // Check shutdown signal first
917 result = shutdown_rx.changed() => {
918 match result {
919 Ok(()) if *shutdown_rx.borrow() => {
920 tracing::debug!("listen_task: shutdown signal received, exiting for {}", listen_addr);
921 break;
922 }
923 Err(_) => {
924 // Channel closed means transport was dropped
925 tracing::debug!("listen_task: shutdown channel closed, exiting for {}", listen_addr);
926 break;
927 }
928 _ => {
929 // Value changed but not to true - continue
930 }
931 }
932 }
933
934 // Accept next connection
935 accept_result = listener.accept() => {
936 match accept_result {
937 Ok((stream, peer_addr)) => {
938 tracing::debug!(
939 "listen_task: accepted connection from {} on {}",
940 peer_addr,
941 listen_addr
942 );
943
944 // Get transport reference
945 let Some(transport_rc) = transport.upgrade() else {
946 tracing::debug!("listen_task: transport dropped, exiting");
947 break;
948 };
949
950 // Handle the incoming connection (FDB: connectionIncoming)
951 connection_incoming(
952 Rc::downgrade(&transport_rc),
953 stream,
954 peer_addr,
955 &transport_rc,
956 );
957
958 sometimes_assert!(
959 connection_incoming_accepted,
960 true,
961 "listen() accepted incoming connection"
962 );
963 }
964 Err(e) => {
965 tracing::warn!("listen_task: accept error on {}: {:?}", listen_addr, e);
966 // Continue accepting - transient errors are expected
967 }
968 }
969 }
970 }
971 }
972
973 tracing::debug!("listen_task: exiting for {}", listen_addr);
974}
975
976/// FDB: connectionIncoming() - handles accepted connection.
977///
978/// This function:
979/// 1. Creates a peer for the incoming connection
980/// 2. Spawns a connection_reader for the peer
981///
982/// # FDB Reference
983/// From NetTransport.actor.cpp:1604-1644 connectionIncoming
984///
985/// Note: FDB's connectionIncoming waits for ConnectPacket to identify the peer.
986/// We simplify by using the peer address directly since SimNetworkProvider
987/// already provides the peer address.
988///
989/// FDB Pattern: Use Peer::new_incoming() with the accepted stream, not Peer::new().
990/// This uses the already-established connection rather than trying to connect back.
991fn connection_incoming<N, T, TP>(
992 transport_weak: Weak<NetTransport<N, T, TP>>,
993 stream: N::TcpStream,
994 peer_addr: String,
995 transport: &NetTransport<N, T, TP>,
996) where
997 N: NetworkProvider + Clone + 'static,
998 T: TimeProvider + Clone + 'static,
999 TP: TaskProvider + Clone + 'static,
1000{
1001 tracing::debug!(
1002 "connection_incoming: handling connection from {}",
1003 peer_addr
1004 );
1005
1006 // Check if we already have a peer for this address (FDB: getOrOpenPeer in connectionReader:1555)
1007 // For incoming connections, we store in incoming_peers to avoid conflicts with outgoing peers
1008 //
1009 // Note: Unlike outgoing peers which can be reused, incoming peers with new streams
1010 // should replace the old one since the old connection is stale.
1011 let peer = {
1012 let data = transport.data.borrow();
1013 if data.incoming_peers.contains_key(&peer_addr) {
1014 tracing::debug!(
1015 "connection_incoming: replacing stale incoming peer for {}",
1016 peer_addr
1017 );
1018 }
1019 drop(data); // Release borrow
1020
1021 // FDB Pattern: Use Peer::new_incoming() with the accepted stream
1022 // (NetTransport.actor.cpp:1123 Peer::onIncomingConnection)
1023 // This uses the already-established connection rather than trying to connect back.
1024 let peer = Peer::new_incoming(
1025 transport.network.clone(),
1026 transport.time.clone(),
1027 transport.task_provider.clone(),
1028 peer_addr.clone(),
1029 stream,
1030 transport.peer_config.clone(),
1031 );
1032 let peer = Rc::new(RefCell::new(peer));
1033
1034 // Store in incoming_peers (replaces any existing stale peer)
1035 transport
1036 .data
1037 .borrow_mut()
1038 .incoming_peers
1039 .insert(peer_addr.clone(), Rc::clone(&peer));
1040
1041 tracing::debug!(
1042 "connection_incoming: created new incoming peer for {}",
1043 peer_addr
1044 );
1045 peer
1046 };
1047
1048 // Spawn connection_reader to handle incoming packets
1049 transport.task_provider.spawn_task(
1050 "connection_reader",
1051 connection_reader(transport_weak, peer, peer_addr),
1052 );
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057 use std::net::{IpAddr, Ipv4Addr};
1058
1059 use super::*;
1060 use crate::{JsonCodec, NetNotifiedQueue, TokioTaskProvider, TokioTimeProvider};
1061
1062 // Simple mock network provider that fails all connections
1063 // (we only test local delivery, so connections are never actually made)
1064 #[derive(Clone)]
1065 struct MockNetworkProvider;
1066
1067 // Dummy stream type for the mock
1068 struct DummyStream;
1069
1070 impl tokio::io::AsyncRead for DummyStream {
1071 fn poll_read(
1072 self: std::pin::Pin<&mut Self>,
1073 _cx: &mut std::task::Context<'_>,
1074 _buf: &mut tokio::io::ReadBuf<'_>,
1075 ) -> std::task::Poll<std::io::Result<()>> {
1076 std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1077 }
1078 }
1079
1080 impl tokio::io::AsyncWrite for DummyStream {
1081 fn poll_write(
1082 self: std::pin::Pin<&mut Self>,
1083 _cx: &mut std::task::Context<'_>,
1084 _buf: &[u8],
1085 ) -> std::task::Poll<std::io::Result<usize>> {
1086 std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1087 }
1088
1089 fn poll_flush(
1090 self: std::pin::Pin<&mut Self>,
1091 _cx: &mut std::task::Context<'_>,
1092 ) -> std::task::Poll<std::io::Result<()>> {
1093 std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1094 }
1095
1096 fn poll_shutdown(
1097 self: std::pin::Pin<&mut Self>,
1098 _cx: &mut std::task::Context<'_>,
1099 ) -> std::task::Poll<std::io::Result<()>> {
1100 std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1101 }
1102 }
1103
1104 impl std::marker::Unpin for DummyStream {}
1105
1106 // Dummy listener type for the mock
1107 struct DummyListener;
1108
1109 #[async_trait::async_trait(?Send)]
1110 impl crate::TcpListenerTrait for DummyListener {
1111 type TcpStream = DummyStream;
1112
1113 async fn accept(&self) -> std::io::Result<(Self::TcpStream, String)> {
1114 Err(std::io::Error::other("dummy listener"))
1115 }
1116
1117 fn local_addr(&self) -> std::io::Result<String> {
1118 Err(std::io::Error::other("dummy listener"))
1119 }
1120 }
1121
1122 #[async_trait::async_trait(?Send)]
1123 impl NetworkProvider for MockNetworkProvider {
1124 type TcpStream = DummyStream;
1125 type TcpListener = DummyListener;
1126
1127 async fn bind(&self, _addr: &str) -> std::io::Result<Self::TcpListener> {
1128 Err(std::io::Error::other("mock bind"))
1129 }
1130
1131 async fn connect(&self, _addr: &str) -> std::io::Result<Self::TcpStream> {
1132 Err(std::io::Error::other("mock connection"))
1133 }
1134 }
1135
1136 fn test_address() -> NetworkAddress {
1137 NetworkAddress::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4500)
1138 }
1139
1140 fn create_test_transport()
1141 -> NetTransport<MockNetworkProvider, TokioTimeProvider, TokioTaskProvider> {
1142 NetTransport::new(
1143 test_address(),
1144 MockNetworkProvider,
1145 TokioTimeProvider::new(),
1146 TokioTaskProvider,
1147 )
1148 }
1149
1150 #[test]
1151 fn test_new_transport() {
1152 let transport = create_test_transport();
1153
1154 assert_eq!(transport.peer_count(), 0);
1155 assert_eq!(transport.endpoint_count(), 0);
1156 assert_eq!(transport.packets_sent(), 0);
1157 }
1158
1159 #[test]
1160 fn test_register_well_known() {
1161 let transport = create_test_transport();
1162
1163 let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1164 Endpoint::new(test_address(), UID::new(1, 1)),
1165 JsonCodec,
1166 ));
1167
1168 transport
1169 .register_well_known(WellKnownToken::Ping, queue)
1170 .expect("register should succeed");
1171
1172 assert_eq!(transport.endpoint_count(), 1);
1173 }
1174
1175 #[test]
1176 fn test_register_dynamic() {
1177 let transport = create_test_transport();
1178
1179 let token = UID::new(0x1234, 0x5678);
1180 let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1181 Endpoint::new(test_address(), token),
1182 JsonCodec,
1183 ));
1184
1185 let endpoint = transport.register(token, queue);
1186
1187 assert_eq!(endpoint.token, token);
1188 assert_eq!(transport.endpoint_count(), 1);
1189 }
1190
1191 #[test]
1192 fn test_local_delivery() {
1193 let transport = create_test_transport();
1194
1195 let token = UID::new(0x1234, 0x5678);
1196 let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1197 Endpoint::new(test_address(), token),
1198 JsonCodec,
1199 ));
1200
1201 let endpoint = transport.register(token, Rc::clone(&queue) as Rc<dyn MessageReceiver>);
1202
1203 // Send to local endpoint
1204 let payload = br#""hello local""#;
1205 transport
1206 .send_unreliable(&endpoint, payload)
1207 .expect("send should succeed");
1208
1209 assert_eq!(transport.packets_dispatched(), 1);
1210 assert_eq!(queue.try_recv(), Some("hello local".to_string()));
1211 }
1212
1213 #[test]
1214 fn test_endpoint_not_found() {
1215 let transport = create_test_transport();
1216
1217 let endpoint = Endpoint::new(test_address(), UID::new(999, 999));
1218 let payload = br#""test""#;
1219
1220 let result = transport.send_unreliable(&endpoint, payload);
1221 assert!(matches!(
1222 result,
1223 Err(MessagingError::EndpointNotFound { .. })
1224 ));
1225 assert_eq!(transport.packets_undelivered(), 1);
1226 }
1227
1228 #[test]
1229 fn test_unregister() {
1230 let transport = create_test_transport();
1231
1232 let token = UID::new(0x1234, 0x5678);
1233 let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1234 Endpoint::new(test_address(), token),
1235 JsonCodec,
1236 ));
1237
1238 transport.register(token, queue as Rc<dyn MessageReceiver>);
1239 assert_eq!(transport.endpoint_count(), 1);
1240
1241 let removed = transport.unregister(&token);
1242 assert!(removed.is_some());
1243 assert_eq!(transport.endpoint_count(), 0);
1244 }
1245
1246 #[test]
1247 fn test_dispatch() {
1248 let transport = create_test_transport();
1249
1250 let token = UID::new(0x1234, 0x5678);
1251 let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1252 Endpoint::new(test_address(), token),
1253 JsonCodec,
1254 ));
1255
1256 transport.register(token, Rc::clone(&queue) as Rc<dyn MessageReceiver>);
1257
1258 // Dispatch directly
1259 let payload = br#""dispatched""#;
1260 transport
1261 .dispatch(&token, payload)
1262 .expect("dispatch should succeed");
1263
1264 assert_eq!(queue.try_recv(), Some("dispatched".to_string()));
1265 }
1266
1267 // =========================================================================
1268 // NetTransportBuilder tests
1269 // =========================================================================
1270
1271 #[test]
1272 fn test_builder_build() {
1273 let transport = NetTransportBuilder::new(
1274 MockNetworkProvider,
1275 TokioTimeProvider::new(),
1276 TokioTaskProvider,
1277 )
1278 .local_address(test_address())
1279 .build();
1280
1281 // Should be properly initialized
1282 assert_eq!(transport.peer_count(), 0);
1283 assert_eq!(transport.endpoint_count(), 0);
1284 assert_eq!(transport.packets_sent(), 0);
1285 assert_eq!(*transport.local_address(), test_address());
1286 }
1287
1288 #[test]
1289 fn test_builder_weak_self_set() {
1290 let transport = NetTransportBuilder::new(
1291 MockNetworkProvider,
1292 TokioTimeProvider::new(),
1293 TokioTaskProvider,
1294 )
1295 .local_address(test_address())
1296 .build();
1297
1298 // weak_self should be set - verify by checking it doesn't panic
1299 // This verifies the builder correctly calls set_weak_self()
1300 assert!(transport.weak_self.borrow().is_some());
1301 }
1302
1303 #[test]
1304 fn test_builder_with_peer_config() {
1305 let config = PeerConfig::default();
1306 let transport = NetTransportBuilder::new(
1307 MockNetworkProvider,
1308 TokioTimeProvider::new(),
1309 TokioTaskProvider,
1310 )
1311 .local_address(test_address())
1312 .peer_config(config)
1313 .build();
1314
1315 // Transport should be created (peer_config is internal, but creation succeeds)
1316 assert_eq!(transport.peer_count(), 0);
1317 }
1318
1319 #[test]
1320 #[should_panic(expected = "local_address is required")]
1321 fn test_builder_missing_address_panics() {
1322 let _ = NetTransportBuilder::new(
1323 MockNetworkProvider,
1324 TokioTimeProvider::new(),
1325 TokioTaskProvider,
1326 )
1327 .build();
1328 }
1329
1330 #[test]
1331 fn test_builder_local_delivery_works() {
1332 // Verify the built transport functions correctly
1333 let transport = NetTransportBuilder::new(
1334 MockNetworkProvider,
1335 TokioTimeProvider::new(),
1336 TokioTaskProvider,
1337 )
1338 .local_address(test_address())
1339 .build();
1340
1341 let token = UID::new(0x1234, 0x5678);
1342 let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1343 Endpoint::new(test_address(), token),
1344 JsonCodec,
1345 ));
1346
1347 let endpoint = transport.register(token, Rc::clone(&queue) as Rc<dyn MessageReceiver>);
1348
1349 // Send to local endpoint
1350 let payload = br#""builder test""#;
1351 transport
1352 .send_unreliable(&endpoint, payload)
1353 .expect("send should succeed");
1354
1355 assert_eq!(transport.packets_dispatched(), 1);
1356 assert_eq!(queue.try_recv(), Some("builder test".to_string()));
1357 }
1358
1359 #[test]
1360 fn test_register_handler() {
1361 use serde::{Deserialize, Serialize};
1362
1363 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1364 struct TestRequest {
1365 value: i32,
1366 }
1367
1368 let transport = NetTransportBuilder::new(
1369 MockNetworkProvider,
1370 TokioTimeProvider::new(),
1371 TokioTaskProvider,
1372 )
1373 .local_address(test_address())
1374 .build();
1375
1376 let token = UID::new(0xDEAD, 0xBEEF);
1377 let stream = transport.register_handler::<TestRequest, _>(token, JsonCodec);
1378
1379 // Handler should be registered
1380 assert_eq!(transport.endpoint_count(), 1);
1381
1382 // Endpoint should match
1383 assert_eq!(stream.endpoint().token, token);
1384 assert_eq!(stream.endpoint().address, test_address());
1385 }
1386
1387 #[test]
1388 fn test_register_handler_at_multi_method() {
1389 use serde::{Deserialize, Serialize};
1390
1391 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1392 struct AddRequest {
1393 a: i32,
1394 b: i32,
1395 }
1396
1397 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1398 struct SubRequest {
1399 a: i32,
1400 b: i32,
1401 }
1402
1403 const CALC_INTERFACE: u64 = 0xCA1C;
1404 const METHOD_ADD: u64 = 0;
1405 const METHOD_SUB: u64 = 1;
1406
1407 let transport = NetTransportBuilder::new(
1408 MockNetworkProvider,
1409 TokioTimeProvider::new(),
1410 TokioTaskProvider,
1411 )
1412 .local_address(test_address())
1413 .build();
1414
1415 // Register multiple handlers for the same interface
1416 let (add_stream, add_token) =
1417 transport.register_handler_at::<AddRequest, _>(CALC_INTERFACE, METHOD_ADD, JsonCodec);
1418 let (sub_stream, sub_token) =
1419 transport.register_handler_at::<SubRequest, _>(CALC_INTERFACE, METHOD_SUB, JsonCodec);
1420
1421 // Both handlers should be registered
1422 assert_eq!(transport.endpoint_count(), 2);
1423
1424 // Tokens should be deterministic
1425 assert_eq!(add_token, UID::new(CALC_INTERFACE, METHOD_ADD));
1426 assert_eq!(sub_token, UID::new(CALC_INTERFACE, METHOD_SUB));
1427
1428 // Streams should have correct endpoints
1429 assert_eq!(add_stream.endpoint().token, add_token);
1430 assert_eq!(sub_stream.endpoint().token, sub_token);
1431 }
1432
1433 // =========================================================================
1434 // Builder error path tests (Phase 12D Step 18)
1435 // =========================================================================
1436
1437 #[tokio::test]
1438 async fn test_build_listening_bind_error() {
1439 // MockNetworkProvider returns error on bind(), simulating port already in use
1440 let result = NetTransportBuilder::new(
1441 MockNetworkProvider,
1442 TokioTimeProvider::new(),
1443 TokioTaskProvider,
1444 )
1445 .local_address(test_address())
1446 .build_listening()
1447 .await;
1448
1449 // Should return NetworkError when bind fails
1450 assert!(matches!(
1451 result,
1452 Err(crate::error::MessagingError::NetworkError { .. })
1453 ));
1454 }
1455}