moonpool_transport/rpc/
net_transport.rs

1//! NetTransport: Central transport coordinator (FDB pattern).
2//!
3//! Manages peer connections and dispatches incoming packets to endpoints.
4//! Provides synchronous send API (FDB pattern: never await on send).
5//!
6//! # FDB Reference
7//! From NetTransport.actor.cpp:300-600, NetTransport.h:195-314
8//!
9//! # Usage
10//!
11//! Use [`NetTransportBuilder`] to create a properly configured transport:
12//!
13//! ```rust,ignore
14//! // For servers and clients that need RPC responses:
15//! let transport = NetTransportBuilder::new(network, time, task)
16//!     .local_address(addr)
17//!     .build_listening()
18//!     .await?;
19//!
20//! // For fire-and-forget senders only (no listening):
21//! let transport = NetTransportBuilder::new(network, time, task)
22//!     .local_address(addr)
23//!     .build();
24//! ```
25//!
26//! The builder automatically handles `Rc` wrapping and `set_weak_self()`,
27//! eliminating the most common footgun in NetTransport usage.
28
29use std::cell::RefCell;
30use std::collections::HashMap;
31use std::rc::{Rc, Weak};
32
33use crate::{
34    Endpoint, NetworkAddress, NetworkProvider, Peer, PeerConfig, TaskProvider, TcpListenerTrait,
35    TimeProvider, UID, WellKnownToken,
36};
37use moonpool_sim::sometimes_assert;
38use tokio::sync::watch;
39
40use super::endpoint_map::{EndpointMap, MessageReceiver};
41use super::request_stream::RequestStream;
42use crate::MessageCodec;
43use crate::error::MessagingError;
44use serde::de::DeserializeOwned;
45
46/// Type alias for shared peer reference.
47type SharedPeer<N, T, TP> = Rc<RefCell<Peer<N, T, TP>>>;
48
49/// Internal transport data (FDB TransportData equivalent).
50///
51/// Separates internal mutable state from public API, matching FDB's pattern.
52/// See NetTransport.actor.cpp:300-350 for the original TransportData struct.
53///
54/// # FDB Reference
55/// ```cpp
56/// struct TransportData {
57///     std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
58///     std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
59///     // ... endpoints, stats, etc.
60/// };
61/// ```
62struct TransportData<N, T, TP>
63where
64    N: NetworkProvider + 'static,
65    T: TimeProvider + 'static,
66    TP: TaskProvider + 'static,
67{
68    /// Endpoint map for routing incoming packets.
69    endpoints: EndpointMap,
70
71    /// Peer connections keyed by destination address (outgoing).
72    /// FDB: std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
73    peers: HashMap<String, SharedPeer<N, T, TP>>,
74
75    /// Incoming peer connections (from accepted connections).
76    /// Separate from outgoing peers to avoid conflicts.
77    incoming_peers: HashMap<String, SharedPeer<N, T, TP>>,
78
79    /// Statistics.
80    stats: TransportStats,
81}
82
83impl<N, T, TP> Default for TransportData<N, T, TP>
84where
85    N: NetworkProvider + 'static,
86    T: TimeProvider + 'static,
87    TP: TaskProvider + 'static,
88{
89    fn default() -> Self {
90        Self {
91            endpoints: EndpointMap::new(),
92            peers: HashMap::new(),
93            incoming_peers: HashMap::new(),
94            stats: TransportStats::default(),
95        }
96    }
97}
98
99/// Central transport coordinator (FDB NetTransport equivalent).
100///
101/// # Design
102///
103/// - Manages peer connections lazily (created on first send)
104/// - Routes incoming packets to registered endpoints
105/// - Synchronous send API - queues immediately, returns (FDB pattern)
106/// - Explicit `Rc<NetTransport>` passing (testable, simulation-friendly)
107/// - Internal state in `TransportData` (matches FDB: `TransportData* self`)
108///
109/// # FDB Reference
110/// From NetTransport.h:195-314
111///
112/// # Multi-Node Support (Phase 12 Step 7d)
113///
114/// For multi-node operation, wrap in `Rc` and call `set_weak_self()`:
115/// ```ignore
116/// let transport = Rc::new(NetTransport::new(...));
117/// transport.set_weak_self(Rc::downgrade(&transport));
118/// transport.listen().await?; // Start accepting connections
119/// ```
120pub struct NetTransport<N, T, TP>
121where
122    N: NetworkProvider + 'static,
123    T: TimeProvider + 'static,
124    TP: TaskProvider + 'static,
125{
126    /// Internal transport data (FDB: TransportData* self).
127    data: RefCell<TransportData<N, T, TP>>,
128
129    /// Local address for this transport.
130    local_address: NetworkAddress,
131
132    /// Network provider for creating connections.
133    network: N,
134
135    /// Time provider for delays and timing.
136    time: T,
137
138    /// Task provider for spawning background tasks.
139    task_provider: TP,
140
141    /// Peer configuration.
142    peer_config: PeerConfig,
143
144    /// Weak self-reference for spawning background tasks.
145    /// Required for `connection_reader` tasks to dispatch back to this transport.
146    /// Set via `set_weak_self()` after wrapping in `Rc`.
147    weak_self: RefCell<Option<Weak<Self>>>,
148
149    /// Shutdown signal sender. When set to `true`, background tasks (listen_task) should exit.
150    /// Uses watch channel so multiple receivers can observe the same signal.
151    shutdown_tx: watch::Sender<bool>,
152}
153
154/// Statistics for the transport.
155#[derive(Debug, Default)]
156struct TransportStats {
157    /// Number of packets sent.
158    packets_sent: u64,
159    /// Number of packets dispatched to endpoints.
160    packets_dispatched: u64,
161    /// Number of packets that couldn't be delivered (endpoint not found).
162    packets_undelivered: u64,
163    /// Number of peers created.
164    peers_created: u64,
165}
166
167impl<N, T, TP> NetTransport<N, T, TP>
168where
169    N: NetworkProvider + Clone + 'static,
170    T: TimeProvider + Clone + 'static,
171    TP: TaskProvider + Clone + 'static,
172{
173    /// Create a new NetTransport.
174    ///
175    /// # Arguments
176    ///
177    /// * `local_address` - Address of this transport (for local delivery checks)
178    /// * `network` - Network provider for connections
179    /// * `time` - Time provider for timing
180    /// * `task_provider` - Task provider for background tasks
181    ///
182    /// # Multi-Node Usage
183    ///
184    /// For multi-node operation, wrap in `Rc` and call `set_weak_self()`:
185    /// ```ignore
186    /// let transport = Rc::new(NetTransport::new(...));
187    /// transport.set_weak_self(Rc::downgrade(&transport));
188    /// ```
189    pub fn new(local_address: NetworkAddress, network: N, time: T, task_provider: TP) -> Self {
190        let (shutdown_tx, _) = watch::channel(false);
191        Self {
192            data: RefCell::new(TransportData::default()),
193            local_address,
194            network,
195            time,
196            task_provider,
197            peer_config: PeerConfig::default(),
198            weak_self: RefCell::new(None),
199            shutdown_tx,
200        }
201    }
202
203    /// Set the weak self-reference for background task spawning.
204    ///
205    /// Required for multi-node operation where `connection_reader` tasks need
206    /// to dispatch incoming packets back to this transport.
207    ///
208    /// # FDB Pattern
209    /// Similar to how FDB actors reference `TransportData* self`.
210    ///
211    /// # Example
212    /// ```ignore
213    /// let transport = Rc::new(NetTransport::new(...));
214    /// transport.set_weak_self(Rc::downgrade(&transport));
215    /// ```
216    pub fn set_weak_self(&self, weak: Weak<Self>) {
217        *self.weak_self.borrow_mut() = Some(weak);
218    }
219
220    /// Get weak self-reference, panics if not set.
221    fn weak_self(&self) -> Weak<Self> {
222        self.weak_self
223            .borrow()
224            .clone()
225            .expect("weak_self not set - call set_weak_self() after wrapping in Rc")
226    }
227
228    /// Create with custom peer configuration.
229    pub fn with_peer_config(mut self, config: PeerConfig) -> Self {
230        self.peer_config = config;
231        self
232    }
233
234    /// Get the local address.
235    pub fn local_address(&self) -> &NetworkAddress {
236        &self.local_address
237    }
238
239    /// Register a well-known endpoint.
240    ///
241    /// Well-known endpoints have deterministic tokens for O(1) lookup.
242    pub fn register_well_known(
243        &self,
244        token: WellKnownToken,
245        receiver: Rc<dyn MessageReceiver>,
246    ) -> Result<(), MessagingError> {
247        self.data
248            .borrow_mut()
249            .endpoints
250            .insert_well_known(token, receiver)
251    }
252
253    /// Register a dynamic endpoint with the given UID.
254    ///
255    /// Returns the endpoint that senders should use to address this receiver.
256    pub fn register(&self, token: UID, receiver: Rc<dyn MessageReceiver>) -> Endpoint {
257        self.data.borrow_mut().endpoints.insert(token, receiver);
258        Endpoint::new(self.local_address.clone(), token)
259    }
260
261    /// Unregister a dynamic endpoint.
262    pub fn unregister(&self, token: &UID) -> Option<Rc<dyn MessageReceiver>> {
263        self.data.borrow_mut().endpoints.remove(token)
264    }
265
266    /// Register a typed request handler in a single step.
267    ///
268    /// This is the preferred method for registering RPC handlers. It combines:
269    /// - Creating an endpoint from the local address and token
270    /// - Creating a `RequestStream` for the request type
271    /// - Registering the stream's queue with the transport
272    ///
273    /// # Example
274    ///
275    /// ```rust,ignore
276    /// // Before (verbose):
277    /// let endpoint = Endpoint::new(local_addr.clone(), ping_token());
278    /// let stream: RequestStream<PingRequest, JsonCodec> =
279    ///     RequestStream::new(endpoint, JsonCodec);
280    /// transport.register(ping_token(), stream.queue() as Rc<dyn MessageReceiver>);
281    ///
282    /// // After (single call):
283    /// let stream = transport.register_handler::<PingRequest>(ping_token(), JsonCodec);
284    /// ```
285    ///
286    /// # Arguments
287    ///
288    /// * `token` - Unique identifier for this handler
289    /// * `codec` - Codec for serializing/deserializing messages
290    ///
291    /// # Type Parameters
292    ///
293    /// * `Req` - The request type this handler will receive
294    /// * `C` - The codec type (e.g., `JsonCodec`)
295    pub fn register_handler<Req, C>(&self, token: UID, codec: C) -> RequestStream<Req, C>
296    where
297        Req: DeserializeOwned + 'static,
298        C: MessageCodec,
299    {
300        let endpoint = Endpoint::new(self.local_address.clone(), token);
301        let stream = RequestStream::new(endpoint, codec);
302        self.data
303            .borrow_mut()
304            .endpoints
305            .insert(token, stream.queue() as Rc<dyn MessageReceiver>);
306        stream
307    }
308
309    /// Register a handler for a multi-method interface.
310    ///
311    /// Use this when an interface has multiple methods (like a Calculator with
312    /// add, subtract, multiply, divide). Each method gets its own handler.
313    ///
314    /// The token is computed deterministically from `interface_id` and `method_index`,
315    /// making it easy to create matching client endpoints.
316    ///
317    /// # Example
318    ///
319    /// ```rust,ignore
320    /// // Calculator interface with multiple methods
321    /// const CALC_INTERFACE: u64 = 0xCA1C;
322    /// const METHOD_ADD: u64 = 0;
323    /// const METHOD_SUB: u64 = 1;
324    /// const METHOD_MUL: u64 = 2;
325    /// const METHOD_DIV: u64 = 3;
326    ///
327    /// let add_stream = transport.register_handler_at::<AddRequest>(
328    ///     CALC_INTERFACE, METHOD_ADD, JsonCodec
329    /// );
330    /// let sub_stream = transport.register_handler_at::<SubRequest>(
331    ///     CALC_INTERFACE, METHOD_SUB, JsonCodec
332    /// );
333    ///
334    /// // Handle requests with tokio::select!
335    /// loop {
336    ///     tokio::select! {
337    ///         Some((req, reply)) = add_stream.recv_with_transport(&transport) => {
338    ///             reply.send(AddResponse { result: req.a + req.b });
339    ///         }
340    ///         Some((req, reply)) = sub_stream.recv_with_transport(&transport) => {
341    ///             reply.send(SubResponse { result: req.a - req.b });
342    ///         }
343    ///     }
344    /// }
345    /// ```
346    ///
347    /// # Arguments
348    ///
349    /// * `interface_id` - Unique identifier for the interface
350    /// * `method_index` - Index of the method within the interface (0, 1, 2, ...)
351    /// * `codec` - Codec for serializing/deserializing messages
352    ///
353    /// # Returns
354    ///
355    /// A tuple containing:
356    /// - The `RequestStream` for receiving requests
357    /// - The token (`UID`) that clients should use to send requests to this handler
358    pub fn register_handler_at<Req, C>(
359        &self,
360        interface_id: u64,
361        method_index: u64,
362        codec: C,
363    ) -> (RequestStream<Req, C>, UID)
364    where
365        Req: DeserializeOwned + 'static,
366        C: MessageCodec,
367    {
368        let token = UID::new(interface_id, method_index);
369        let stream = self.register_handler(token, codec);
370        (stream, token)
371    }
372
373    /// Send packet unreliably (best-effort, dropped on failure).
374    ///
375    /// This is a synchronous operation - it queues the packet and returns immediately.
376    /// The packet may be dropped if the connection fails (FDB pattern).
377    ///
378    /// # Arguments
379    ///
380    /// * `endpoint` - Destination endpoint
381    /// * `payload` - Message bytes (already serialized)
382    pub fn send_unreliable(
383        &self,
384        endpoint: &Endpoint,
385        payload: &[u8],
386    ) -> Result<(), MessagingError> {
387        // Check for local delivery
388        if self.is_local_address(&endpoint.address) {
389            sometimes_assert!(
390                local_delivery_path,
391                true,
392                "Unreliable send uses local delivery path"
393            );
394            return self.deliver_local(&endpoint.token, payload);
395        }
396
397        // Get or create peer for remote address
398        sometimes_assert!(
399            remote_peer_path,
400            true,
401            "Unreliable send uses remote peer path"
402        );
403        let peer = self.get_or_open_peer(&endpoint.address);
404        peer.borrow_mut()
405            .send_unreliable(endpoint.token, payload)
406            .map_err(|e| MessagingError::PeerError {
407                message: e.to_string(),
408            })?;
409
410        self.data.borrow_mut().stats.packets_sent += 1;
411        Ok(())
412    }
413
414    /// Send packet reliably (queued, will retry on reconnect).
415    ///
416    /// This is a synchronous operation - it queues the packet and returns immediately.
417    /// The packet will be retried on connection failure (FDB pattern).
418    ///
419    /// # Arguments
420    ///
421    /// * `endpoint` - Destination endpoint
422    /// * `payload` - Message bytes (already serialized)
423    pub fn send_reliable(&self, endpoint: &Endpoint, payload: &[u8]) -> Result<(), MessagingError> {
424        // Check for local delivery
425        if self.is_local_address(&endpoint.address) {
426            return self.deliver_local(&endpoint.token, payload);
427        }
428
429        // Get or create peer for remote address
430        let peer = self.get_or_open_peer(&endpoint.address);
431        peer.borrow_mut()
432            .send_reliable(endpoint.token, payload)
433            .map_err(|e| MessagingError::PeerError {
434                message: e.to_string(),
435            })?;
436
437        self.data.borrow_mut().stats.packets_sent += 1;
438        Ok(())
439    }
440
441    /// Check if address is local (same as this transport).
442    fn is_local_address(&self, address: &NetworkAddress) -> bool {
443        self.local_address == *address
444    }
445
446    /// Deliver packet locally (same process, no network).
447    fn deliver_local(&self, token: &UID, payload: &[u8]) -> Result<(), MessagingError> {
448        let data = self.data.borrow();
449        if let Some(receiver) = data.endpoints.get(token) {
450            receiver.receive(payload);
451            drop(data); // Release borrow before mutating stats
452            self.data.borrow_mut().stats.packets_dispatched += 1;
453            sometimes_assert!(
454                endpoint_found_local,
455                true,
456                "Local delivery found registered endpoint"
457            );
458            Ok(())
459        } else {
460            drop(data); // Release borrow before mutating stats
461            self.data.borrow_mut().stats.packets_undelivered += 1;
462            sometimes_assert!(
463                endpoint_not_found_local,
464                true,
465                "Local delivery to unregistered endpoint"
466            );
467            Err(MessagingError::EndpointNotFound { token: *token })
468        }
469    }
470
471    /// Get or create a peer for the given address.
472    ///
473    /// Peers are created lazily on first send (FDB connectionKeeper pattern).
474    /// When a new peer is created, a `connection_reader` task is spawned to
475    /// handle incoming packets (FDB: connectionKeeper spawns connectionReader at line 843).
476    ///
477    /// # FDB Reference
478    /// `Reference<struct Peer> getOrOpenPeer(NetworkAddress const& address);`
479    fn get_or_open_peer(&self, address: &NetworkAddress) -> SharedPeer<N, T, TP> {
480        let addr_str = address.to_string();
481
482        // Check if peer already exists
483        if let Some(peer) = self.data.borrow().peers.get(&addr_str) {
484            sometimes_assert!(peer_reused, true, "Existing peer reused for address");
485            return Rc::clone(peer);
486        }
487
488        // Create new peer
489        let peer = Peer::new(
490            self.network.clone(),
491            self.time.clone(),
492            self.task_provider.clone(),
493            addr_str.clone(),
494            self.peer_config.clone(),
495        );
496        let peer = Rc::new(RefCell::new(peer));
497
498        // Store in peers map
499        {
500            let mut data = self.data.borrow_mut();
501            data.peers.insert(addr_str.clone(), Rc::clone(&peer));
502            data.stats.peers_created += 1;
503        }
504
505        // Spawn connection_reader for incoming packets (FDB pattern: connectionKeeper spawns connectionReader)
506        // This handles responses for outgoing requests
507        self.spawn_connection_reader(Rc::clone(&peer), addr_str);
508
509        sometimes_assert!(peer_created, true, "New peer created for address");
510        peer
511    }
512
513    /// Dispatch an incoming packet to the appropriate endpoint.
514    ///
515    /// Called by the transport loop when a packet is received.
516    ///
517    /// # Returns
518    ///
519    /// Ok(()) if delivered, Err if endpoint not found.
520    pub fn dispatch(&self, token: &UID, payload: &[u8]) -> Result<(), MessagingError> {
521        let data = self.data.borrow();
522        if let Some(receiver) = data.endpoints.get(token) {
523            receiver.receive(payload);
524            drop(data); // Release borrow before mutating stats
525            self.data.borrow_mut().stats.packets_dispatched += 1;
526            sometimes_assert!(dispatch_success, true, "Message dispatched to endpoint");
527            Ok(())
528        } else {
529            drop(data); // Release borrow before mutating stats
530            self.data.borrow_mut().stats.packets_undelivered += 1;
531            sometimes_assert!(
532                dispatch_undelivered,
533                true,
534                "Dispatch to unregistered endpoint"
535            );
536            Err(MessagingError::EndpointNotFound { token: *token })
537        }
538    }
539
540    /// Get statistics.
541    pub fn packets_sent(&self) -> u64 {
542        self.data.borrow().stats.packets_sent
543    }
544
545    /// Get the number of packets dispatched to endpoints.
546    pub fn packets_dispatched(&self) -> u64 {
547        self.data.borrow().stats.packets_dispatched
548    }
549
550    /// Get the number of packets that couldn't be delivered.
551    pub fn packets_undelivered(&self) -> u64 {
552        self.data.borrow().stats.packets_undelivered
553    }
554
555    /// Get the number of peers created.
556    pub fn peers_created(&self) -> u64 {
557        self.data.borrow().stats.peers_created
558    }
559
560    /// Get number of active peers.
561    pub fn peer_count(&self) -> usize {
562        self.data.borrow().peers.len()
563    }
564
565    /// Get number of registered endpoints.
566    pub fn endpoint_count(&self) -> usize {
567        let data = self.data.borrow();
568        data.endpoints.well_known_count() + data.endpoints.dynamic_count()
569    }
570
571    /// Get number of incoming peers (from accepted connections).
572    pub fn incoming_peer_count(&self) -> usize {
573        self.data.borrow().incoming_peers.len()
574    }
575
576    /// Spawn a connection_reader for a peer.
577    ///
578    /// FDB Pattern: connectionKeeper spawns connectionReader (line 843).
579    /// The connection_reader reads from the peer and dispatches to endpoints.
580    ///
581    /// # Arguments
582    ///
583    /// * `peer` - The peer to read from
584    /// * `peer_addr` - Address string for logging
585    fn spawn_connection_reader(&self, peer: SharedPeer<N, T, TP>, peer_addr: String) {
586        // Only spawn if weak_self is set (multi-node mode)
587        if self.weak_self.borrow().is_none() {
588            return;
589        }
590
591        let transport_weak = self.weak_self();
592        self.task_provider.spawn_task(
593            "connection_reader",
594            connection_reader(transport_weak, peer, peer_addr),
595        );
596    }
597
598    // =========================================================================
599    // Server Listener Support (FDB: listen + connectionIncoming)
600    // =========================================================================
601
602    /// Start listening for incoming connections.
603    ///
604    /// FDB Pattern: `listen()` (NetTransport.actor.cpp:1646-1676)
605    /// Binds to the local address and spawns an accept loop that handles
606    /// incoming connections via `connection_incoming()`.
607    ///
608    /// # Requirements
609    ///
610    /// Must call `set_weak_self()` before calling this method.
611    ///
612    /// # Example
613    ///
614    /// ```ignore
615    /// let transport = Rc::new(NetTransport::new(...));
616    /// transport.set_weak_self(Rc::downgrade(&transport));
617    /// transport.listen().await?;
618    /// ```
619    pub async fn listen(&self) -> Result<(), MessagingError> {
620        // Verify weak_self is set
621        if self.weak_self.borrow().is_none() {
622            return Err(MessagingError::InvalidState {
623                message: "weak_self not set - call set_weak_self() before listen()".to_string(),
624            });
625        }
626
627        // Bind to local address
628        let addr_str = self.local_address.to_string();
629        let listener =
630            self.network
631                .bind(&addr_str)
632                .await
633                .map_err(|e| MessagingError::NetworkError {
634                    message: format!("Failed to bind to {}: {}", addr_str, e),
635                })?;
636
637        tracing::info!("NetTransport: listening on {}", addr_str);
638
639        // Spawn listen task (FDB: listen() actor)
640        // Pass shutdown receiver so the task can exit when transport is dropped
641        let transport_weak = self.weak_self();
642        let shutdown_rx = self.shutdown_tx.subscribe();
643        self.task_provider.spawn_task(
644            "listen",
645            listen_task(transport_weak, listener, addr_str, shutdown_rx),
646        );
647
648        Ok(())
649    }
650}
651
652/// Implement Drop to signal shutdown to background tasks.
653///
654/// When NetTransport is dropped, we signal all background tasks (listen_task)
655/// to exit gracefully. This prevents tasks from being stuck on accept() forever.
656impl<N, T, TP> Drop for NetTransport<N, T, TP>
657where
658    N: NetworkProvider + 'static,
659    T: TimeProvider + 'static,
660    TP: TaskProvider + 'static,
661{
662    fn drop(&mut self) {
663        tracing::debug!("NetTransport: signaling shutdown to background tasks");
664        // Signal shutdown - ignore error if no receivers
665        let _ = self.shutdown_tx.send(true);
666    }
667}
668
669// =============================================================================
670// NetTransportBuilder
671// =============================================================================
672
673/// Builder for NetTransport that eliminates common footguns.
674///
675/// The manual `Rc` wrapping and `set_weak_self()` pattern is error-prone:
676/// forgetting `set_weak_self()` causes a runtime panic. This builder handles
677/// both automatically.
678///
679/// # Examples
680///
681/// ```rust,ignore
682/// // Standard usage - server or client that needs RPC responses:
683/// let transport = NetTransportBuilder::new(network, time, task)
684///     .local_address(addr)
685///     .build_listening()
686///     .await?;
687///
688/// // Fire-and-forget sender (no listening needed):
689/// let transport = NetTransportBuilder::new(network, time, task)
690///     .local_address(addr)
691///     .build();
692///
693/// // With custom peer config:
694/// let transport = NetTransportBuilder::new(network, time, task)
695///     .local_address(addr)
696///     .peer_config(config)
697///     .build_listening()
698///     .await?;
699/// ```
700///
701/// # Why Build vs Build Listening?
702///
703/// - **`build_listening()`**: For most RPC use cases. Both servers AND clients
704///   need this because responses are sent to the client's listening address.
705///
706/// - **`build()`**: For fire-and-forget messaging where you don't expect responses.
707///   Also useful for testing where you control message flow manually.
708pub struct NetTransportBuilder<N, T, TP>
709where
710    N: NetworkProvider + Clone + 'static,
711    T: TimeProvider + Clone + 'static,
712    TP: TaskProvider + Clone + 'static,
713{
714    network: N,
715    time: T,
716    task_provider: TP,
717    local_address: Option<NetworkAddress>,
718    peer_config: Option<PeerConfig>,
719}
720
721impl<N, T, TP> NetTransportBuilder<N, T, TP>
722where
723    N: NetworkProvider + Clone + 'static,
724    T: TimeProvider + Clone + 'static,
725    TP: TaskProvider + Clone + 'static,
726{
727    /// Create a new builder with the required providers.
728    ///
729    /// # Arguments
730    ///
731    /// * `network` - Network provider for TCP connections
732    /// * `time` - Time provider for timing operations
733    /// * `task_provider` - Task provider for spawning background tasks
734    pub fn new(network: N, time: T, task_provider: TP) -> Self {
735        Self {
736            network,
737            time,
738            task_provider,
739            local_address: None,
740            peer_config: None,
741        }
742    }
743
744    /// Set the local address for this transport.
745    ///
746    /// This is required before calling `build()` or `build_listening()`.
747    ///
748    /// # Arguments
749    ///
750    /// * `address` - The network address to bind to
751    pub fn local_address(mut self, address: NetworkAddress) -> Self {
752        self.local_address = Some(address);
753        self
754    }
755
756    /// Set custom peer configuration.
757    ///
758    /// If not set, uses `PeerConfig::default()`.
759    pub fn peer_config(mut self, config: PeerConfig) -> Self {
760        self.peer_config = Some(config);
761        self
762    }
763
764    /// Build the transport without starting the listener.
765    ///
766    /// Returns `Rc<NetTransport>` with `set_weak_self()` already called.
767    /// Use this for fire-and-forget messaging or testing.
768    ///
769    /// For RPC (request/response), use `build_listening()` instead.
770    ///
771    /// # Panics
772    ///
773    /// Panics if `local_address()` was not called.
774    pub fn build(self) -> Rc<NetTransport<N, T, TP>> {
775        let address = self
776            .local_address
777            .expect("local_address is required - call .local_address(addr) before .build()");
778
779        let mut transport = NetTransport::new(address, self.network, self.time, self.task_provider);
780
781        if let Some(config) = self.peer_config {
782            transport = transport.with_peer_config(config);
783        }
784
785        let transport = Rc::new(transport);
786        transport.set_weak_self(Rc::downgrade(&transport));
787        transport
788    }
789
790    /// Build the transport and start listening for incoming connections.
791    ///
792    /// Returns `Rc<NetTransport>` with `set_weak_self()` already called
793    /// and the listener started.
794    ///
795    /// Use this for typical RPC usage where you need to receive responses.
796    /// Both servers AND clients need this in a request/response pattern.
797    ///
798    /// # Errors
799    ///
800    /// Returns an error if binding to the local address fails.
801    ///
802    /// # Panics
803    ///
804    /// Panics if `local_address()` was not called.
805    pub async fn build_listening(self) -> Result<Rc<NetTransport<N, T, TP>>, MessagingError> {
806        let transport = self.build();
807        transport.listen().await?;
808        Ok(transport)
809    }
810}
811
812/// FDB: connectionReader() - reads from connection and dispatches to endpoints.
813///
814/// This is a background task that:
815/// 1. Takes ownership of the peer's receiver channel at startup
816/// 2. Reads incoming packets from the channel (no RefCell borrow held during await)
817/// 3. Dispatches them to the appropriate endpoint via `transport.dispatch()`
818///
819/// # FDB Reference
820/// From NetTransport.actor.cpp:1401-1602 connectionReader
821///
822/// The FDB version is more complex (handles ConnectPacket, protocol negotiation),
823/// but the core loop is: read packets → scanPackets → deliver.
824async fn connection_reader<N, T, TP>(
825    transport: Weak<NetTransport<N, T, TP>>,
826    peer: SharedPeer<N, T, TP>,
827    peer_addr: String,
828) where
829    N: NetworkProvider + Clone + 'static,
830    T: TimeProvider + Clone + 'static,
831    TP: TaskProvider + Clone + 'static,
832{
833    tracing::debug!("connection_reader: started for peer {}", peer_addr);
834
835    // Take ownership of the receiver at startup.
836    // This avoids holding RefCell borrows across await points (critical safety fix).
837    let mut receiver = {
838        match peer.borrow_mut().take_receiver() {
839            Some(rx) => rx,
840            None => {
841                tracing::error!(
842                    "connection_reader: receiver already taken for peer {}",
843                    peer_addr
844                );
845                return;
846            }
847        }
848    }; // RefCell borrow released here
849
850    loop {
851        // Await directly on the owned receiver - safe, no RefCell involved!
852        match receiver.recv().await {
853            Some((token, payload)) => {
854                // Try to get transport reference
855                let Some(transport) = transport.upgrade() else {
856                    tracing::debug!(
857                        "connection_reader: transport dropped, exiting for peer {}",
858                        peer_addr
859                    );
860                    break;
861                };
862
863                // FDB: deliver() - looks up endpoint and dispatches
864                if let Err(e) = transport.dispatch(&token, &payload) {
865                    tracing::debug!(
866                        "connection_reader: dispatch failed for token {}: {:?}",
867                        token,
868                        e
869                    );
870                }
871
872                sometimes_assert!(
873                    connection_reader_dispatch,
874                    true,
875                    "connectionReader dispatched incoming message"
876                );
877            }
878            None => {
879                // Channel closed - peer disconnected or shutdown
880                tracing::debug!("connection_reader: peer {} receiver closed", peer_addr);
881                break;
882            }
883        }
884    }
885
886    tracing::debug!("connection_reader: exiting for peer {}", peer_addr);
887}
888
889/// FDB: listen() - accept loop spawning connectionIncoming per connection.
890///
891/// This is a background task that:
892/// 1. Accepts incoming connections
893/// 2. Spawns `connection_incoming` for each accepted connection
894/// 3. Exits gracefully when shutdown signal is received
895///
896/// # FDB Reference
897/// From NetTransport.actor.cpp:1646-1676 listen
898async fn listen_task<N, T, TP>(
899    transport: Weak<NetTransport<N, T, TP>>,
900    listener: N::TcpListener,
901    listen_addr: String,
902    mut shutdown_rx: watch::Receiver<bool>,
903) where
904    N: NetworkProvider + Clone + 'static,
905    T: TimeProvider + Clone + 'static,
906    TP: TaskProvider + Clone + 'static,
907{
908    tracing::debug!("listen_task: started on {}", listen_addr);
909
910    loop {
911        // Use select! to race between accept and shutdown signal
912        tokio::select! {
913            // Bias toward shutdown to ensure timely exit
914            biased;
915
916            // Check shutdown signal first
917            result = shutdown_rx.changed() => {
918                match result {
919                    Ok(()) if *shutdown_rx.borrow() => {
920                        tracing::debug!("listen_task: shutdown signal received, exiting for {}", listen_addr);
921                        break;
922                    }
923                    Err(_) => {
924                        // Channel closed means transport was dropped
925                        tracing::debug!("listen_task: shutdown channel closed, exiting for {}", listen_addr);
926                        break;
927                    }
928                    _ => {
929                        // Value changed but not to true - continue
930                    }
931                }
932            }
933
934            // Accept next connection
935            accept_result = listener.accept() => {
936                match accept_result {
937                    Ok((stream, peer_addr)) => {
938                        tracing::debug!(
939                            "listen_task: accepted connection from {} on {}",
940                            peer_addr,
941                            listen_addr
942                        );
943
944                        // Get transport reference
945                        let Some(transport_rc) = transport.upgrade() else {
946                            tracing::debug!("listen_task: transport dropped, exiting");
947                            break;
948                        };
949
950                        // Handle the incoming connection (FDB: connectionIncoming)
951                        connection_incoming(
952                            Rc::downgrade(&transport_rc),
953                            stream,
954                            peer_addr,
955                            &transport_rc,
956                        );
957
958                        sometimes_assert!(
959                            connection_incoming_accepted,
960                            true,
961                            "listen() accepted incoming connection"
962                        );
963                    }
964                    Err(e) => {
965                        tracing::warn!("listen_task: accept error on {}: {:?}", listen_addr, e);
966                        // Continue accepting - transient errors are expected
967                    }
968                }
969            }
970        }
971    }
972
973    tracing::debug!("listen_task: exiting for {}", listen_addr);
974}
975
976/// FDB: connectionIncoming() - handles accepted connection.
977///
978/// This function:
979/// 1. Creates a peer for the incoming connection
980/// 2. Spawns a connection_reader for the peer
981///
982/// # FDB Reference
983/// From NetTransport.actor.cpp:1604-1644 connectionIncoming
984///
985/// Note: FDB's connectionIncoming waits for ConnectPacket to identify the peer.
986/// We simplify by using the peer address directly since SimNetworkProvider
987/// already provides the peer address.
988///
989/// FDB Pattern: Use Peer::new_incoming() with the accepted stream, not Peer::new().
990/// This uses the already-established connection rather than trying to connect back.
991fn connection_incoming<N, T, TP>(
992    transport_weak: Weak<NetTransport<N, T, TP>>,
993    stream: N::TcpStream,
994    peer_addr: String,
995    transport: &NetTransport<N, T, TP>,
996) where
997    N: NetworkProvider + Clone + 'static,
998    T: TimeProvider + Clone + 'static,
999    TP: TaskProvider + Clone + 'static,
1000{
1001    tracing::debug!(
1002        "connection_incoming: handling connection from {}",
1003        peer_addr
1004    );
1005
1006    // Check if we already have a peer for this address (FDB: getOrOpenPeer in connectionReader:1555)
1007    // For incoming connections, we store in incoming_peers to avoid conflicts with outgoing peers
1008    //
1009    // Note: Unlike outgoing peers which can be reused, incoming peers with new streams
1010    // should replace the old one since the old connection is stale.
1011    let peer = {
1012        let data = transport.data.borrow();
1013        if data.incoming_peers.contains_key(&peer_addr) {
1014            tracing::debug!(
1015                "connection_incoming: replacing stale incoming peer for {}",
1016                peer_addr
1017            );
1018        }
1019        drop(data); // Release borrow
1020
1021        // FDB Pattern: Use Peer::new_incoming() with the accepted stream
1022        // (NetTransport.actor.cpp:1123 Peer::onIncomingConnection)
1023        // This uses the already-established connection rather than trying to connect back.
1024        let peer = Peer::new_incoming(
1025            transport.network.clone(),
1026            transport.time.clone(),
1027            transport.task_provider.clone(),
1028            peer_addr.clone(),
1029            stream,
1030            transport.peer_config.clone(),
1031        );
1032        let peer = Rc::new(RefCell::new(peer));
1033
1034        // Store in incoming_peers (replaces any existing stale peer)
1035        transport
1036            .data
1037            .borrow_mut()
1038            .incoming_peers
1039            .insert(peer_addr.clone(), Rc::clone(&peer));
1040
1041        tracing::debug!(
1042            "connection_incoming: created new incoming peer for {}",
1043            peer_addr
1044        );
1045        peer
1046    };
1047
1048    // Spawn connection_reader to handle incoming packets
1049    transport.task_provider.spawn_task(
1050        "connection_reader",
1051        connection_reader(transport_weak, peer, peer_addr),
1052    );
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057    use std::net::{IpAddr, Ipv4Addr};
1058
1059    use super::*;
1060    use crate::{JsonCodec, NetNotifiedQueue, TokioTaskProvider, TokioTimeProvider};
1061
1062    // Simple mock network provider that fails all connections
1063    // (we only test local delivery, so connections are never actually made)
1064    #[derive(Clone)]
1065    struct MockNetworkProvider;
1066
1067    // Dummy stream type for the mock
1068    struct DummyStream;
1069
1070    impl tokio::io::AsyncRead for DummyStream {
1071        fn poll_read(
1072            self: std::pin::Pin<&mut Self>,
1073            _cx: &mut std::task::Context<'_>,
1074            _buf: &mut tokio::io::ReadBuf<'_>,
1075        ) -> std::task::Poll<std::io::Result<()>> {
1076            std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1077        }
1078    }
1079
1080    impl tokio::io::AsyncWrite for DummyStream {
1081        fn poll_write(
1082            self: std::pin::Pin<&mut Self>,
1083            _cx: &mut std::task::Context<'_>,
1084            _buf: &[u8],
1085        ) -> std::task::Poll<std::io::Result<usize>> {
1086            std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1087        }
1088
1089        fn poll_flush(
1090            self: std::pin::Pin<&mut Self>,
1091            _cx: &mut std::task::Context<'_>,
1092        ) -> std::task::Poll<std::io::Result<()>> {
1093            std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1094        }
1095
1096        fn poll_shutdown(
1097            self: std::pin::Pin<&mut Self>,
1098            _cx: &mut std::task::Context<'_>,
1099        ) -> std::task::Poll<std::io::Result<()>> {
1100            std::task::Poll::Ready(Err(std::io::Error::other("dummy stream")))
1101        }
1102    }
1103
1104    impl std::marker::Unpin for DummyStream {}
1105
1106    // Dummy listener type for the mock
1107    struct DummyListener;
1108
1109    #[async_trait::async_trait(?Send)]
1110    impl crate::TcpListenerTrait for DummyListener {
1111        type TcpStream = DummyStream;
1112
1113        async fn accept(&self) -> std::io::Result<(Self::TcpStream, String)> {
1114            Err(std::io::Error::other("dummy listener"))
1115        }
1116
1117        fn local_addr(&self) -> std::io::Result<String> {
1118            Err(std::io::Error::other("dummy listener"))
1119        }
1120    }
1121
1122    #[async_trait::async_trait(?Send)]
1123    impl NetworkProvider for MockNetworkProvider {
1124        type TcpStream = DummyStream;
1125        type TcpListener = DummyListener;
1126
1127        async fn bind(&self, _addr: &str) -> std::io::Result<Self::TcpListener> {
1128            Err(std::io::Error::other("mock bind"))
1129        }
1130
1131        async fn connect(&self, _addr: &str) -> std::io::Result<Self::TcpStream> {
1132            Err(std::io::Error::other("mock connection"))
1133        }
1134    }
1135
1136    fn test_address() -> NetworkAddress {
1137        NetworkAddress::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4500)
1138    }
1139
1140    fn create_test_transport()
1141    -> NetTransport<MockNetworkProvider, TokioTimeProvider, TokioTaskProvider> {
1142        NetTransport::new(
1143            test_address(),
1144            MockNetworkProvider,
1145            TokioTimeProvider::new(),
1146            TokioTaskProvider,
1147        )
1148    }
1149
1150    #[test]
1151    fn test_new_transport() {
1152        let transport = create_test_transport();
1153
1154        assert_eq!(transport.peer_count(), 0);
1155        assert_eq!(transport.endpoint_count(), 0);
1156        assert_eq!(transport.packets_sent(), 0);
1157    }
1158
1159    #[test]
1160    fn test_register_well_known() {
1161        let transport = create_test_transport();
1162
1163        let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1164            Endpoint::new(test_address(), UID::new(1, 1)),
1165            JsonCodec,
1166        ));
1167
1168        transport
1169            .register_well_known(WellKnownToken::Ping, queue)
1170            .expect("register should succeed");
1171
1172        assert_eq!(transport.endpoint_count(), 1);
1173    }
1174
1175    #[test]
1176    fn test_register_dynamic() {
1177        let transport = create_test_transport();
1178
1179        let token = UID::new(0x1234, 0x5678);
1180        let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1181            Endpoint::new(test_address(), token),
1182            JsonCodec,
1183        ));
1184
1185        let endpoint = transport.register(token, queue);
1186
1187        assert_eq!(endpoint.token, token);
1188        assert_eq!(transport.endpoint_count(), 1);
1189    }
1190
1191    #[test]
1192    fn test_local_delivery() {
1193        let transport = create_test_transport();
1194
1195        let token = UID::new(0x1234, 0x5678);
1196        let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1197            Endpoint::new(test_address(), token),
1198            JsonCodec,
1199        ));
1200
1201        let endpoint = transport.register(token, Rc::clone(&queue) as Rc<dyn MessageReceiver>);
1202
1203        // Send to local endpoint
1204        let payload = br#""hello local""#;
1205        transport
1206            .send_unreliable(&endpoint, payload)
1207            .expect("send should succeed");
1208
1209        assert_eq!(transport.packets_dispatched(), 1);
1210        assert_eq!(queue.try_recv(), Some("hello local".to_string()));
1211    }
1212
1213    #[test]
1214    fn test_endpoint_not_found() {
1215        let transport = create_test_transport();
1216
1217        let endpoint = Endpoint::new(test_address(), UID::new(999, 999));
1218        let payload = br#""test""#;
1219
1220        let result = transport.send_unreliable(&endpoint, payload);
1221        assert!(matches!(
1222            result,
1223            Err(MessagingError::EndpointNotFound { .. })
1224        ));
1225        assert_eq!(transport.packets_undelivered(), 1);
1226    }
1227
1228    #[test]
1229    fn test_unregister() {
1230        let transport = create_test_transport();
1231
1232        let token = UID::new(0x1234, 0x5678);
1233        let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1234            Endpoint::new(test_address(), token),
1235            JsonCodec,
1236        ));
1237
1238        transport.register(token, queue as Rc<dyn MessageReceiver>);
1239        assert_eq!(transport.endpoint_count(), 1);
1240
1241        let removed = transport.unregister(&token);
1242        assert!(removed.is_some());
1243        assert_eq!(transport.endpoint_count(), 0);
1244    }
1245
1246    #[test]
1247    fn test_dispatch() {
1248        let transport = create_test_transport();
1249
1250        let token = UID::new(0x1234, 0x5678);
1251        let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1252            Endpoint::new(test_address(), token),
1253            JsonCodec,
1254        ));
1255
1256        transport.register(token, Rc::clone(&queue) as Rc<dyn MessageReceiver>);
1257
1258        // Dispatch directly
1259        let payload = br#""dispatched""#;
1260        transport
1261            .dispatch(&token, payload)
1262            .expect("dispatch should succeed");
1263
1264        assert_eq!(queue.try_recv(), Some("dispatched".to_string()));
1265    }
1266
1267    // =========================================================================
1268    // NetTransportBuilder tests
1269    // =========================================================================
1270
1271    #[test]
1272    fn test_builder_build() {
1273        let transport = NetTransportBuilder::new(
1274            MockNetworkProvider,
1275            TokioTimeProvider::new(),
1276            TokioTaskProvider,
1277        )
1278        .local_address(test_address())
1279        .build();
1280
1281        // Should be properly initialized
1282        assert_eq!(transport.peer_count(), 0);
1283        assert_eq!(transport.endpoint_count(), 0);
1284        assert_eq!(transport.packets_sent(), 0);
1285        assert_eq!(*transport.local_address(), test_address());
1286    }
1287
1288    #[test]
1289    fn test_builder_weak_self_set() {
1290        let transport = NetTransportBuilder::new(
1291            MockNetworkProvider,
1292            TokioTimeProvider::new(),
1293            TokioTaskProvider,
1294        )
1295        .local_address(test_address())
1296        .build();
1297
1298        // weak_self should be set - verify by checking it doesn't panic
1299        // This verifies the builder correctly calls set_weak_self()
1300        assert!(transport.weak_self.borrow().is_some());
1301    }
1302
1303    #[test]
1304    fn test_builder_with_peer_config() {
1305        let config = PeerConfig::default();
1306        let transport = NetTransportBuilder::new(
1307            MockNetworkProvider,
1308            TokioTimeProvider::new(),
1309            TokioTaskProvider,
1310        )
1311        .local_address(test_address())
1312        .peer_config(config)
1313        .build();
1314
1315        // Transport should be created (peer_config is internal, but creation succeeds)
1316        assert_eq!(transport.peer_count(), 0);
1317    }
1318
1319    #[test]
1320    #[should_panic(expected = "local_address is required")]
1321    fn test_builder_missing_address_panics() {
1322        let _ = NetTransportBuilder::new(
1323            MockNetworkProvider,
1324            TokioTimeProvider::new(),
1325            TokioTaskProvider,
1326        )
1327        .build();
1328    }
1329
1330    #[test]
1331    fn test_builder_local_delivery_works() {
1332        // Verify the built transport functions correctly
1333        let transport = NetTransportBuilder::new(
1334            MockNetworkProvider,
1335            TokioTimeProvider::new(),
1336            TokioTaskProvider,
1337        )
1338        .local_address(test_address())
1339        .build();
1340
1341        let token = UID::new(0x1234, 0x5678);
1342        let queue: Rc<NetNotifiedQueue<String, JsonCodec>> = Rc::new(NetNotifiedQueue::new(
1343            Endpoint::new(test_address(), token),
1344            JsonCodec,
1345        ));
1346
1347        let endpoint = transport.register(token, Rc::clone(&queue) as Rc<dyn MessageReceiver>);
1348
1349        // Send to local endpoint
1350        let payload = br#""builder test""#;
1351        transport
1352            .send_unreliable(&endpoint, payload)
1353            .expect("send should succeed");
1354
1355        assert_eq!(transport.packets_dispatched(), 1);
1356        assert_eq!(queue.try_recv(), Some("builder test".to_string()));
1357    }
1358
1359    #[test]
1360    fn test_register_handler() {
1361        use serde::{Deserialize, Serialize};
1362
1363        #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1364        struct TestRequest {
1365            value: i32,
1366        }
1367
1368        let transport = NetTransportBuilder::new(
1369            MockNetworkProvider,
1370            TokioTimeProvider::new(),
1371            TokioTaskProvider,
1372        )
1373        .local_address(test_address())
1374        .build();
1375
1376        let token = UID::new(0xDEAD, 0xBEEF);
1377        let stream = transport.register_handler::<TestRequest, _>(token, JsonCodec);
1378
1379        // Handler should be registered
1380        assert_eq!(transport.endpoint_count(), 1);
1381
1382        // Endpoint should match
1383        assert_eq!(stream.endpoint().token, token);
1384        assert_eq!(stream.endpoint().address, test_address());
1385    }
1386
1387    #[test]
1388    fn test_register_handler_at_multi_method() {
1389        use serde::{Deserialize, Serialize};
1390
1391        #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1392        struct AddRequest {
1393            a: i32,
1394            b: i32,
1395        }
1396
1397        #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1398        struct SubRequest {
1399            a: i32,
1400            b: i32,
1401        }
1402
1403        const CALC_INTERFACE: u64 = 0xCA1C;
1404        const METHOD_ADD: u64 = 0;
1405        const METHOD_SUB: u64 = 1;
1406
1407        let transport = NetTransportBuilder::new(
1408            MockNetworkProvider,
1409            TokioTimeProvider::new(),
1410            TokioTaskProvider,
1411        )
1412        .local_address(test_address())
1413        .build();
1414
1415        // Register multiple handlers for the same interface
1416        let (add_stream, add_token) =
1417            transport.register_handler_at::<AddRequest, _>(CALC_INTERFACE, METHOD_ADD, JsonCodec);
1418        let (sub_stream, sub_token) =
1419            transport.register_handler_at::<SubRequest, _>(CALC_INTERFACE, METHOD_SUB, JsonCodec);
1420
1421        // Both handlers should be registered
1422        assert_eq!(transport.endpoint_count(), 2);
1423
1424        // Tokens should be deterministic
1425        assert_eq!(add_token, UID::new(CALC_INTERFACE, METHOD_ADD));
1426        assert_eq!(sub_token, UID::new(CALC_INTERFACE, METHOD_SUB));
1427
1428        // Streams should have correct endpoints
1429        assert_eq!(add_stream.endpoint().token, add_token);
1430        assert_eq!(sub_stream.endpoint().token, sub_token);
1431    }
1432
1433    // =========================================================================
1434    // Builder error path tests (Phase 12D Step 18)
1435    // =========================================================================
1436
1437    #[tokio::test]
1438    async fn test_build_listening_bind_error() {
1439        // MockNetworkProvider returns error on bind(), simulating port already in use
1440        let result = NetTransportBuilder::new(
1441            MockNetworkProvider,
1442            TokioTimeProvider::new(),
1443            TokioTaskProvider,
1444        )
1445        .local_address(test_address())
1446        .build_listening()
1447        .await;
1448
1449        // Should return NetworkError when bind fails
1450        assert!(matches!(
1451            result,
1452            Err(crate::error::MessagingError::NetworkError { .. })
1453        ));
1454    }
1455}