Skip to main content

slim_datapath/sync/
forwarder.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer synchronization component.
5//!
6//! Handles:
7//! - Peer lifecycle (discovery, connect/disconnect, state tracking)
8//! - Subscription forwarding to peers (full mesh broadcast or hub relay)
9//! - Subscription forwarding to the controller (forward connection)
10//! - In-flight ACK tracking and retry
11//! - Loop prevention via seen subscription IDs
12
13use std::collections::hash_map::Entry;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use std::time::Duration;
17
18use display_error_chain::ErrorChainExt;
19use parking_lot::RwLock;
20use tokio::sync::oneshot;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, warn};
23
24use crate::api::ProtoName;
25use crate::api::proto::dataplane::v1::Message;
26use crate::errors::DataPathError;
27use crate::message_processing::MessageProcessor;
28use crate::messages::utils::DEFAULT_TTL;
29use crate::peer_discovery::config::PeerTopology;
30use crate::peer_discovery::{PeerDiscovery, PeerEvent, PeerInfo};
31use crate::sync::state::{PeerEntry, PeerState};
32
33use super::peer;
34
35/// Timeout for a single ACK wait cycle before retrying.
36pub(crate) const ACK_TIMEOUT: Duration = Duration::from_secs(5);
37/// Maximum number of retry attempts for a remote subscription ACK.
38pub(crate) const ACK_MAX_RETRIES: u32 = 3;
39
40/// Specifies which peers to target for subscription forwarding.
41#[derive(Debug, Clone)]
42pub enum PeerTarget {
43    /// Forward to ALL connected peers (normal subscription transition).
44    All,
45    /// Forward to all peers EXCEPT the specified connection (hub relay from spoke).
46    ExcludeConn(u64),
47}
48
49/// Specifies where to forward a subscription.
50#[derive(Debug, Clone)]
51pub struct ForwardTargets {
52    /// Forward to peer connections. None = skip peer forwarding.
53    pub peers: Option<PeerTarget>,
54    /// Forward to a specific connection (controller), preserving the original message.
55    pub forward_conn: Option<u64>,
56}
57
58impl ForwardTargets {
59    /// Returns true if there are any targets to forward to.
60    pub fn has_any(&self) -> bool {
61        self.peers.is_some() || self.forward_conn.is_some()
62    }
63
64    /// No forwarding targets.
65    pub fn none() -> Self {
66        Self {
67            peers: None,
68            forward_conn: None,
69        }
70    }
71}
72
73/// Configuration for peer synchronization.
74#[derive(Debug, Clone)]
75pub struct PeerSyncConfig {
76    /// This replica's unique identifier.
77    pub self_id: String,
78    /// Shared group identifier for peer authentication.
79    pub deployment_name: String,
80    /// Topology for peer connections.
81    pub topology: PeerTopology,
82    /// Whether this node is the hub (smallest ID). Only meaningful for HubAndSpoke.
83    pub is_hub: bool,
84}
85
86/// Peer synchronization and subscription forwarding.
87///
88/// This is the single component responsible for:
89/// - Tracking peer connections and state
90/// - Forwarding subscriptions to peers and the controller
91/// - Managing the peer discovery lifecycle (via `start_discovery`)
92/// - Handling incoming peer registration
93/// - ACK tracking for forwarded subscriptions
94///
95/// Shared via `Arc` between the discovery task and `MessageProcessor`.
96#[derive(Debug, Clone)]
97pub struct PeerSync {
98    inner: Arc<PeerSyncInner>,
99}
100
101/// State for a pending multi-peer ACK.
102/// Tracks how many ACKs are expected and resolves once all arrive or on timeout.
103struct PendingAck {
104    /// Number of ACKs still expected.
105    remaining: usize,
106    /// Sender to signal completion to the waiting task.
107    tx: Option<oneshot::Sender<Result<(), DataPathError>>>,
108    /// Collects all errors received from peers.
109    errors: Vec<DataPathError>,
110}
111
112impl std::fmt::Debug for PendingAck {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("PendingAck")
115            .field("remaining", &self.remaining)
116            .field("tx", &self.tx.is_some())
117            .field("errors", &self.errors.len())
118            .finish()
119    }
120}
121
122#[derive(Debug)]
123struct PeerSyncInner {
124    /// Current peer connection IDs (updated on join/leave).
125    peer_conns: RwLock<HashSet<u64>>,
126    /// Set of subscription IDs this node has already forwarded or processed.
127    /// Used for loop prevention: if an incoming subscription has an ID in this set,
128    /// it means the subscription has looped back and should be dropped.
129    seen_sub_ids: RwLock<HashSet<u64>>,
130    /// Tracks which sub_id was forwarded to peers for each name.
131    /// Needed for the connection-drop path: when a connection drops and a name
132    /// becomes unreachable, we only have the name — this map gives us the sub_id
133    /// to send in the unsubscribe message.
134    forwarded_sub_for_name: RwLock<HashMap<ProtoName, u64>>,
135    /// In-flight pending ACK state: sub_id → pending ack tracker.
136    pending_acks: RwLock<HashMap<u64, PendingAck>>,
137    /// TTL to set on subscription messages forwarded to peers.
138    subscription_ttl: u32,
139    /// Filter to apply when syncing subscriptions with a new peer.
140    /// FullMesh → EXCLUDE_PEER (peers get subs from each other directly).
141    /// HubAndSpoke / standalone → ALL.
142    sync_filter: crate::tables::MatchFilter,
143    /// Shared peer state (if discovery is active).
144    /// Used to register incoming peers directly without a channel.
145    peer_state: Option<Arc<RwLock<PeerState>>>,
146}
147
148impl PeerSync {
149    /// Create a new PeerSync.
150    pub fn new(topology: &PeerTopology) -> Self {
151        let (subscription_ttl, sync_filter) = match topology {
152            PeerTopology::FullMesh => (2, crate::tables::MatchFilter::EXCLUDE_PEER),
153            PeerTopology::HubAndSpoke => (3, crate::tables::MatchFilter::ALL),
154        };
155        Self {
156            inner: Arc::new(PeerSyncInner {
157                peer_conns: RwLock::new(HashSet::new()),
158                seen_sub_ids: RwLock::new(HashSet::new()),
159                forwarded_sub_for_name: RwLock::new(HashMap::new()),
160                pending_acks: RwLock::new(HashMap::new()),
161                subscription_ttl,
162                sync_filter,
163                peer_state: None,
164            }),
165        }
166    }
167
168    /// Create a PeerSync with shared peer state (for discovery mode).
169    pub fn with_peer_state(topology: &PeerTopology, peer_state: Arc<RwLock<PeerState>>) -> Self {
170        let (subscription_ttl, sync_filter) = match topology {
171            PeerTopology::FullMesh => (2, crate::tables::MatchFilter::EXCLUDE_PEER),
172            PeerTopology::HubAndSpoke => (3, crate::tables::MatchFilter::ALL),
173        };
174        Self {
175            inner: Arc::new(PeerSyncInner {
176                peer_conns: RwLock::new(HashSet::new()),
177                seen_sub_ids: RwLock::new(HashSet::new()),
178                forwarded_sub_for_name: RwLock::new(HashMap::new()),
179                pending_acks: RwLock::new(HashMap::new()),
180                subscription_ttl,
181                sync_filter,
182                peer_state: Some(peer_state),
183            }),
184        }
185    }
186
187    /// Create a standalone PeerSync (no discovery, no peer state).
188    /// Uses DEFAULT_TTL for generic multi-hop topologies.
189    /// Peer connections are auto-registered during link negotiation.
190    pub fn standalone() -> Self {
191        Self {
192            inner: Arc::new(PeerSyncInner {
193                peer_conns: RwLock::new(HashSet::new()),
194                seen_sub_ids: RwLock::new(HashSet::new()),
195                forwarded_sub_for_name: RwLock::new(HashMap::new()),
196                pending_acks: RwLock::new(HashMap::new()),
197                subscription_ttl: DEFAULT_TTL,
198                sync_filter: crate::tables::MatchFilter::ALL,
199                peer_state: None,
200            }),
201        }
202    }
203
204    /// Update the peer connection set (called by PeerSyncManager on join/leave).
205    pub fn set_peer_conns(&self, conns: HashSet<u64>) {
206        *self.inner.peer_conns.write() = conns;
207    }
208
209    /// Add a peer connection ID.
210    pub fn add_peer_conn(&self, conn_id: u64) {
211        self.inner.peer_conns.write().insert(conn_id);
212    }
213
214    /// Remove a peer connection ID.
215    pub fn remove_peer_conn(&self, conn_id: u64) {
216        self.inner.peer_conns.write().remove(&conn_id);
217    }
218
219    /// Get a snapshot of the current peer connection set.
220    pub fn peer_conns(&self) -> HashSet<u64> {
221        self.inner.peer_conns.read().clone()
222    }
223
224    /// Resolve a connection ID to the remote peer's node name (for logging).
225    /// Returns the node_id if available, otherwise the conn_id as a string.
226    fn peer_label(&self, mp: &MessageProcessor, conn_id: u64) -> String {
227        mp.forwarder()
228            .get_connection(conn_id)
229            .and_then(|c| c.peer_node_id().map(|s| s.to_string()))
230            .unwrap_or_else(|| conn_id.to_string())
231    }
232
233    /// The initial TTL set on subscription messages forwarded to peers.
234    pub fn subscription_ttl(&self) -> u32 {
235        self.inner.subscription_ttl
236    }
237
238    /// Whether a PeerSyncManager is active (peer state is shared).
239    pub fn has_peer_state(&self) -> bool {
240        self.inner.peer_state.is_some()
241    }
242
243    /// Handle an incoming peer connection: register in state, add to peer conns,
244    /// and perform subscription sync.
245    ///
246    /// This is the single entry point for incoming peer registration from message processing.
247    pub fn on_incoming_peer(&self, mp: &MessageProcessor, node_id: String, conn_id: u64) {
248        // Register in peer state (dedup, reconnect awareness).
249        if let Some(ref state) = self.inner.peer_state {
250            if state.read().contains(&node_id) {
251                debug!(
252                    %node_id,
253                    %conn_id,
254                    "incoming peer already registered, skipping"
255                );
256                return;
257            }
258            info!(
259                %node_id,
260                %conn_id,
261                "registering incoming peer in state table"
262            );
263            state.write().insert(
264                node_id,
265                PeerEntry {
266                    conn_id,
267                    endpoint: String::new(),
268                    is_outgoing: false,
269                },
270            );
271        }
272
273        self.add_peer_conn_and_sync(mp, conn_id);
274    }
275
276    /// Register a peer connection and perform full sync (send local subscriptions).
277    /// Used by PeerSyncManager for outgoing connections (state already tracked by manager).
278    pub fn add_peer_conn_and_sync(&self, mp: &MessageProcessor, conn_id: u64) {
279        self.add_peer_conn(conn_id);
280        let forwarder = self.clone();
281        let mp = mp.clone();
282        tokio::spawn(async move {
283            let ttl = forwarder.inner.subscription_ttl;
284            let filter = forwarder.inner.sync_filter;
285            let subscriptions = peer::collect_subscriptions(&mp, conn_id, filter);
286            match peer::send_subscriptions(&mp, conn_id, &subscriptions, ttl).await {
287                Ok(count) => {
288                    info!(
289                        %conn_id,
290                        count,
291                        "completed full sync for peer"
292                    );
293                    for (name, sub_id) in &subscriptions {
294                        forwarder.register_forwarded_sub(name, *sub_id);
295                    }
296                }
297                Err(e) => {
298                    warn!(%conn_id, error = %e, "full sync failed for peer");
299                }
300            }
301        });
302    }
303
304    // ── Peer Discovery Lifecycle ─────────────────────────────────────────────
305
306    /// Start peer discovery and run the event loop until cancellation.
307    ///
308    /// This spawns nothing — it runs as an async loop. The caller is expected
309    /// to spawn this (e.g., via `tokio::spawn`).
310    pub async fn run_discovery<D: PeerDiscovery + Send>(
311        &self,
312        mp: &MessageProcessor,
313        config: PeerSyncConfig,
314        mut discovery: D,
315        cancel: CancellationToken,
316    ) {
317        info!(
318            self_id = %config.self_id,
319            deployment_name = %config.deployment_name,
320            "peer sync starting"
321        );
322
323        if let Err(e) = discovery.start().await {
324            error!(error = %e, "failed to start peer discovery");
325            return;
326        }
327
328        loop {
329            tokio::select! {
330                _ = cancel.cancelled() => {
331                    info!("peer sync shutting down");
332                    break;
333                }
334                event = discovery.recv() => {
335                    match event {
336                        Ok(PeerEvent::Joined(peer)) => {
337                            self.handle_peer_joined(mp, &config, peer).await;
338                        }
339                        Ok(PeerEvent::Left(peer)) => {
340                            self.handle_peer_left(mp, peer).await;
341                        }
342                        Err(e) => {
343                            error!(error = %e, "peer discovery error, shutting down");
344                            break;
345                        }
346                    }
347                }
348            }
349        }
350    }
351
352    /// Handle a newly discovered peer.
353    ///
354    /// Connection behavior depends on topology:
355    /// - **FullMesh**: only the node with the lexicographically smaller self_id
356    ///   initiates the outbound connection (tie-breaking for deduplication).
357    /// - **HubAndSpoke**: only the hub (smallest ID) initiates connections.
358    ///   Spokes never dial out.
359    async fn handle_peer_joined(
360        &self,
361        mp: &MessageProcessor,
362        config: &PeerSyncConfig,
363        peer: PeerInfo,
364    ) {
365        // Skip self
366        if peer.id == config.self_id {
367            debug!(peer_id = %peer.id, "skipping self in peer discovery");
368            return;
369        }
370
371        // Determine whether to dial based on topology.
372        let should_dial = match config.topology {
373            PeerTopology::FullMesh => config.self_id < peer.id,
374            PeerTopology::HubAndSpoke => config.is_hub,
375        };
376
377        if !should_dial {
378            debug!(
379                peer_id = %peer.id,
380                self_id = %config.self_id,
381                topology = ?config.topology,
382                "skipping outbound connection (waiting for incoming)"
383            );
384            return;
385        }
386
387        // Skip if already connected
388        if let Some(ref state) = self.inner.peer_state
389            && state.read().contains(&peer.id)
390        {
391            debug!(peer_id = %peer.id, "peer already connected, skipping");
392            return;
393        }
394
395        info!(peer_id = %peer.id, endpoint = %peer.config.endpoint, "connecting to peer");
396
397        match mp.connect(peer.config.clone(), None, None).await {
398            Ok((_handle, conn_id)) => {
399                info!(peer_id = %peer.id, %conn_id, "connected to peer");
400
401                if let Some(ref state) = self.inner.peer_state {
402                    state.write().insert(
403                        peer.id.clone(),
404                        PeerEntry {
405                            conn_id,
406                            endpoint: peer.config.endpoint.clone(),
407                            is_outgoing: true,
408                        },
409                    );
410                }
411
412                self.add_peer_conn(conn_id);
413
414                // Perform full sync: send subscriptions to the new peer.
415                let ttl = self.inner.subscription_ttl;
416                let sync_result = if config.is_hub && config.topology == PeerTopology::HubAndSpoke {
417                    peer::send_full_sync(mp, conn_id, ttl).await
418                } else {
419                    peer::send_local_remote_sync(mp, conn_id, ttl).await
420                };
421                if let Err(e) = sync_result {
422                    warn!(
423                        peer_id = %peer.id,
424                        error = %e,
425                        "full sync failed after connecting to peer"
426                    );
427                }
428            }
429            Err(e) => {
430                error!(
431                    peer_id = %peer.id,
432                    endpoint = %peer.config.endpoint,
433                    error = %e.chain(),
434                    "failed to connect to peer"
435                );
436            }
437        }
438    }
439
440    /// Handle a peer leaving the deployment.
441    async fn handle_peer_left(&self, mp: &MessageProcessor, peer: PeerInfo) {
442        let entry = self
443            .inner
444            .peer_state
445            .as_ref()
446            .and_then(|s| s.write().remove(&peer.id));
447
448        if let Some(entry) = entry {
449            info!(
450                peer_id = %peer.id,
451                conn_id = entry.conn_id,
452                "peer left, disconnecting"
453            );
454
455            self.remove_peer_conn(entry.conn_id);
456            if entry.is_outgoing
457                && let Err(e) = mp.disconnect(entry.conn_id)
458            {
459                warn!(
460                    peer_id = %peer.id,
461                    error = %e,
462                    "error disconnecting from peer"
463                );
464            }
465        }
466    }
467
468    /// Notify that a connection was dropped. Cleans up peer state if it was a peer.
469    pub fn on_connection_drop(&self, conn_id: u64) {
470        if let Some(ref state) = self.inner.peer_state
471            && let Some((peer_id, _entry)) = state.write().remove_by_conn(conn_id)
472        {
473            info!(
474                %peer_id,
475                %conn_id,
476                "peer connection dropped, cleaned up state"
477            );
478            self.remove_peer_conn(conn_id);
479        }
480    }
481
482    /// Get the current peer state (for testing/inspection).
483    pub fn peer_state(&self) -> Option<Arc<RwLock<PeerState>>> {
484        self.inner.peer_state.clone()
485    }
486
487    // ── Subscription Tracking ────────────────────────────────────────────────
488
489    /// Register a subscription that was forwarded (tracks sub_id for unsubscribe
490    /// and adds to seen set for loop prevention).
491    /// Mark a subscription as forwarded: adds to seen set (loop prevention)
492    /// and records the name→sub_id mapping (for connection-drop unsubscribe).
493    pub fn register_forwarded_sub(&self, name: &ProtoName, sub_id: u64) {
494        self.inner.seen_sub_ids.write().insert(sub_id);
495        self.inner
496            .forwarded_sub_for_name
497            .write()
498            .insert(name.clone(), sub_id);
499    }
500
501    /// Remove a subscription from the seen set (on unsubscribe).
502    pub fn remove_forwarded_sub(&self, name: &ProtoName, sub_id: u64) {
503        self.inner.seen_sub_ids.write().remove(&sub_id);
504        self.inner.forwarded_sub_for_name.write().remove(name);
505    }
506
507    /// Check if a subscription_id has already been seen/forwarded by this node.
508    /// Used for loop prevention.
509    pub fn has_seen_sub_id(&self, sub_id: u64) -> bool {
510        self.inner.seen_sub_ids.read().contains(&sub_id)
511    }
512
513    // ── ACK tracking ─────────────────────────────────────────────────────────
514
515    /// Register a multi-peer ACK; returns the result receiver.
516    /// `expected_count` is the number of ACKs expected before resolving.
517    pub(crate) fn register_ack(
518        &self,
519        ack_id: u64,
520        expected_count: usize,
521    ) -> oneshot::Receiver<Result<(), DataPathError>> {
522        let (tx, rx) = oneshot::channel();
523        self.inner.pending_acks.write().insert(
524            ack_id,
525            PendingAck {
526                remaining: expected_count,
527                tx: Some(tx),
528                errors: Vec::new(),
529            },
530        );
531        rx
532    }
533
534    /// Deliver a result for one ACK. When all expected ACKs have arrived,
535    /// the waiting task is unblocked with the aggregate result.
536    /// Called from the message processing path when a SubscriptionAck arrives.
537    pub(crate) fn resolve_ack(&self, ack_id: u64, result: Result<(), DataPathError>) {
538        let mut acks = self.inner.pending_acks.write();
539        let Entry::Occupied(mut entry) = acks.entry(ack_id) else {
540            return;
541        };
542
543        let pending = entry.get_mut();
544        debug!(%ack_id, remaining = pending.remaining, "subscription: remote ack received");
545
546        if let Err(e) = result {
547            pending.errors.push(e);
548        }
549
550        pending.remaining = pending.remaining.saturating_sub(1);
551        if pending.remaining == 0 {
552            let pending = entry.remove();
553            if let Some(tx) = pending.tx {
554                let final_result = if pending.errors.is_empty() {
555                    Ok(())
556                } else {
557                    let msg = pending
558                        .errors
559                        .iter()
560                        .map(|e| e.to_string())
561                        .collect::<Vec<_>>()
562                        .join("; ");
563                    Err(DataPathError::RemoteSubscriptionAckError(msg))
564                };
565                let _ = tx.send(final_result);
566            }
567        }
568    }
569
570    /// Remove a pending ACK entry (e.g. all retries exhausted, or cleanup).
571    pub(crate) fn remove_ack(&self, ack_id: u64) {
572        self.inner.pending_acks.write().remove(&ack_id);
573    }
574
575    /// Spawn a task that forwards the subscription to targets, waits for ACKs,
576    /// then sends the upstream ACK to the client. Does NOT block the caller.
577    ///
578    /// The spawned task:
579    /// 1. Sends to target peers (same sub_id) + forward_conn (original msg)
580    /// 2. Waits for ACKs with timeout
581    /// 3. ACKs the upstream client based on aggregate result
582    ///
583    /// The task is wrapped with a drain watch so it stops promptly on shutdown.
584    ///
585    /// `peer_ttl` is the TTL set on outgoing subscription messages to peers.
586    /// For initial forwarding (local sub), use `subscription_ttl()`.
587    /// For relay (peer sub), use the remaining TTL from the incoming message.
588    #[allow(clippy::too_many_arguments)]
589    pub fn spawn_forward_and_ack(
590        &self,
591        mp: MessageProcessor,
592        msg: Message,
593        name: ProtoName,
594        sub_id: u64,
595        add: bool,
596        targets: ForwardTargets,
597        in_connection: u64,
598        upstream_subscription_id: Option<u64>,
599        peer_ttl: u32,
600        drain: drain::Watch,
601    ) {
602        let forwarder = self.clone();
603        tokio::spawn(async move {
604            tokio::select! {
605                _ = forwarder.forward_and_ack(
606                    &mp,
607                    msg,
608                    name,
609                    sub_id,
610                    add,
611                    targets,
612                    in_connection,
613                    upstream_subscription_id,
614                    peer_ttl,
615                ) => {}
616                _ = drain.signaled() => {
617                    debug!(%in_connection, %sub_id, "subscription forwarder stopped by drain");
618                }
619            }
620        });
621    }
622
623    /// The actual forwarding + ACK lifecycle (runs in spawned task).
624    #[allow(clippy::too_many_arguments)]
625    async fn forward_and_ack(
626        &self,
627        mp: &MessageProcessor,
628        msg: Message,
629        name: ProtoName,
630        sub_id: u64,
631        add: bool,
632        targets: ForwardTargets,
633        in_connection: u64,
634        upstream_subscription_id: Option<u64>,
635        peer_ttl: u32,
636    ) {
637        // Run peer forwarding and forward-conn forwarding concurrently.
638        let (peer_result, forward_result) = tokio::join!(
639            self.forward_to_peers(mp, &name, sub_id, add, &targets, peer_ttl),
640            self.forward_to_conn(mp, &msg, sub_id, add, &targets),
641        );
642
643        // Aggregate results:
644        // - Forward (controller) failure is critical → propagate to upstream
645        // - Peer failure is non-fatal → log and treat as OK
646        let final_result = match (&peer_result, &forward_result) {
647            (_, Err(e)) => {
648                // Controller ACK failure is fatal
649                Err(DataPathError::RemoteSubscriptionAckError(e.to_string()))
650            }
651            (Err(e), _) => {
652                // Peer failure is non-fatal, log warning
653                warn!(error = %e, %name, "peer subscription forwarding failed (non-fatal)");
654                Ok(())
655            }
656            _ => Ok(()),
657        };
658
659        // Send upstream ACK to client
660        if let Some(id) = upstream_subscription_id {
661            debug!(
662                %in_connection,
663                subscription_id = id,
664                ok = final_result.is_ok(),
665                "sending subscription ack after forwarding"
666            );
667            mp.send_subscription_ack(in_connection, id, &final_result)
668                .await;
669        }
670    }
671
672    /// Forward subscription/unsubscription to peer connections.
673    ///
674    /// For subscribe: uses the subscription's stable sub_id, sends to all target peers,
675    /// waits for ACKs from all peers with timeout.
676    ///
677    /// For unsubscribe: looks up the previously stored sub_id for the name and
678    /// sends unsubscribe best-effort (no ACK wait).
679    ///
680    /// `sub_id` is the stable subscription identifier (same across all hops).
681    /// `ttl` is set on the outgoing subscription messages (controls propagation depth).
682    async fn forward_to_peers(
683        &self,
684        mp: &MessageProcessor,
685        name: &ProtoName,
686        sub_id: u64,
687        add: bool,
688        targets: &ForwardTargets,
689        ttl: u32,
690    ) -> Result<(), DataPathError> {
691        let peer_target = match &targets.peers {
692            Some(t) => t,
693            None => return Ok(()),
694        };
695
696        // Snapshot peer conn IDs (don't hold lock across await)
697        let peer_conns: Vec<u64> = {
698            let conns = self.inner.peer_conns.read();
699            match peer_target {
700                PeerTarget::All => conns.iter().copied().collect(),
701                PeerTarget::ExcludeConn(exclude) => {
702                    conns.iter().copied().filter(|c| c != exclude).collect()
703                }
704            }
705        };
706
707        if peer_conns.is_empty() {
708            debug!(%name, "no peer connections, skipping peer forwarding");
709            return Ok(());
710        }
711
712        let action = if add { "subscribe" } else { "unsubscribe" };
713        debug!(%name, %sub_id, %action, ?peer_conns, "forwarding to peers");
714
715        // Build the appropriate message.
716        let build_result = if add {
717            self.register_forwarded_sub(name, sub_id);
718            super::build_subscribe_msg(name, sub_id, ttl)
719        } else {
720            self.remove_forwarded_sub(name, sub_id);
721            super::build_unsubscribe_msg(name, sub_id, ttl)
722        };
723
724        let peer_msg = match build_result {
725            Ok(m) => m,
726            Err(e) => {
727                warn!(%action, error = %e, %name, "failed to build peer message");
728                return Err(e.into());
729            }
730        };
731
732        // Send to all target peers concurrently.
733        let send_results = futures::future::join_all(peer_conns.iter().map(|&conn_id| {
734            let msg = peer_msg.clone();
735            async move { (conn_id, mp.send_msg(msg, conn_id).await) }
736        }))
737        .await;
738
739        // Count successes; log failures.
740        let mut sent_count = 0usize;
741        for (conn_id, result) in &send_results {
742            if let Err(e) = result {
743                let peer = self.peer_label(mp, *conn_id);
744                warn!(%conn_id, %peer, error = %e, "failed to send to peer");
745            } else {
746                sent_count += 1;
747            }
748        }
749
750        if sent_count == 0 {
751            return Ok(());
752        }
753
754        // Wait for ACKs with timeout.
755        let rx = self.register_ack(sub_id, sent_count);
756        match tokio::time::timeout(ACK_TIMEOUT, rx).await {
757            Ok(Ok(result)) => {
758                if let Err(e) = &result {
759                    warn!(%name, %sub_id, error = %e, "peer ACK aggregated failure");
760                }
761            }
762            Ok(Err(_)) => {
763                debug!(%name, %sub_id, "peer ACK sender dropped");
764            }
765            Err(_) => {
766                warn!(%name, %sub_id, "peer ACK timeout");
767                self.remove_ack(sub_id);
768            }
769        }
770
771        Ok(())
772    }
773
774    /// Forward subscription/unsubscription to the forward connection (controller).
775    ///
776    /// Sends the original message, tracks in remote subscription table,
777    /// and waits for ACK with retry.
778    async fn forward_to_conn(
779        &self,
780        mp: &MessageProcessor,
781        msg: &Message,
782        sub_id: u64,
783        add: bool,
784        targets: &ForwardTargets,
785    ) -> Result<(), DataPathError> {
786        let out_conn = match targets.forward_conn {
787            Some(c) => c,
788            None => return Ok(()),
789        };
790
791        debug!(%out_conn, %add, "forwarding subscription to forward connection");
792
793        // Register ACK and send
794        let rx = self.register_ack(sub_id, 1);
795        if let Err(e) = mp.send_msg(msg.clone(), out_conn).await {
796            self.remove_ack(sub_id);
797            return Err(e);
798        }
799
800        // Track in remote subscription table (for reconnection replay) — both add and remove.
801        let source = msg.get_source();
802        let dst = msg.get_dst();
803        let identity = msg.get_identity();
804        mp.remote_sync()
805            .on_forwarded_subscription(source, dst, identity, out_conn, add, sub_id);
806
807        // Wait for ACK with retry
808        let result = self
809            .wait_for_ack_with_retry(mp, sub_id, msg.clone(), out_conn, rx)
810            .await;
811
812        self.remove_ack(sub_id);
813        result
814    }
815
816    /// Wait for a remote ACK with retries.
817    async fn wait_for_ack_with_retry(
818        &self,
819        mp: &MessageProcessor,
820        _sub_id: u64,
821        msg: Message,
822        out_conn: u64,
823        mut rx: oneshot::Receiver<Result<(), DataPathError>>,
824    ) -> Result<(), DataPathError> {
825        for attempt in 0..=ACK_MAX_RETRIES {
826            tokio::select! {
827                result = &mut rx => {
828                    return match result {
829                        Ok(r) => r,
830                        Err(_) => Err(DataPathError::RemoteSubscriptionAckTimeout(attempt)),
831                    };
832                }
833                _ = tokio::time::sleep(ACK_TIMEOUT) => {
834                    if attempt < ACK_MAX_RETRIES {
835                        debug!(attempt = attempt + 1, "remote sub ack timeout, retrying");
836                        mp.send_msg(msg.clone(), out_conn).await?;
837                    }
838                }
839            }
840        }
841
842        Err(DataPathError::RemoteSubscriptionAckTimeout(ACK_MAX_RETRIES))
843    }
844
845    /// Best-effort unsubscribe to peers only (used by connection-drop path).
846    /// Does not wait for ACKs. Looks up the forwarded sub_id for the name.
847    pub async fn notify_peers_unsubscribe(&self, mp: &MessageProcessor, name: &ProtoName) {
848        // Look up the sub_id that was actually forwarded for this name.
849        let sub_id = match self.inner.forwarded_sub_for_name.read().get(name).copied() {
850            Some(id) => id,
851            None => return, // Never forwarded — nothing to unsubscribe
852        };
853
854        let targets = ForwardTargets {
855            peers: Some(PeerTarget::All),
856            forward_conn: None,
857        };
858        if let Err(e) = self
859            .forward_to_peers(
860                mp,
861                name,
862                sub_id,
863                false,
864                &targets,
865                self.inner.subscription_ttl,
866            )
867            .await
868        {
869            warn!(%name, %sub_id, error = %e, "failed to notify peers of unsubscription");
870        }
871    }
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877
878    use slim_config::client::ClientConfig;
879    use tokio_util::sync::CancellationToken;
880
881    use crate::message_processing::MessageProcessor;
882    use crate::peer_discovery::{PeerDiscovery, PeerDiscoveryError, PeerEvent, PeerInfo};
883
884    fn make_forwarder() -> PeerSync {
885        let mp = MessageProcessor::new();
886        mp.peer_sync()
887    }
888
889    fn make_forwarder_with_state() -> PeerSync {
890        PeerSync::with_peer_state(
891            &PeerTopology::FullMesh,
892            Arc::new(RwLock::new(PeerState::new())),
893        )
894    }
895
896    fn make_name() -> ProtoName {
897        ProtoName::from_strings(["org", "example", "svc"])
898    }
899
900    fn make_peer_info(id: &str) -> PeerInfo {
901        PeerInfo {
902            id: id.to_string(),
903            config: ClientConfig {
904                endpoint: "http://127.0.0.1:9999".to_string(),
905                ..Default::default()
906            },
907        }
908    }
909
910    // ── ACK tests ───────────────────────────────────────────────────────────
911
912    #[tokio::test]
913    async fn test_register_and_resolve_delivers_ok() {
914        let fwd = make_forwarder();
915        let rx = fwd.register_ack(1, 1);
916        fwd.resolve_ack(1, Ok(()));
917        let result = rx.await.expect("sender dropped unexpectedly");
918        assert!(result.is_ok());
919    }
920
921    #[tokio::test]
922    async fn test_register_and_resolve_delivers_err() {
923        let fwd = make_forwarder();
924        let rx = fwd.register_ack(2, 1);
925        fwd.resolve_ack(
926            2,
927            Err(DataPathError::RemoteSubscriptionAckError("boom".into())),
928        );
929        let result = rx.await.expect("sender dropped unexpectedly");
930        assert!(result.is_err());
931    }
932
933    #[test]
934    fn test_resolve_unknown_id_is_noop() {
935        let fwd = make_forwarder();
936        let mut rx = fwd.register_ack(3, 1);
937        // Resolve a different (unknown) id — must not affect the registered one.
938        fwd.resolve_ack(4, Ok(()));
939        assert!(
940            rx.try_recv().is_err(),
941            "registered channel must not have received anything"
942        );
943    }
944
945    #[test]
946    fn test_remove_cleans_up() {
947        let fwd = make_forwarder();
948        fwd.register_ack(5, 1);
949        assert!(fwd.inner.pending_acks.read().contains_key(&5));
950        fwd.remove_ack(5);
951        assert!(!fwd.inner.pending_acks.read().contains_key(&5));
952    }
953
954    #[test]
955    fn test_peer_conns_management() {
956        let fwd = make_forwarder();
957        assert!(fwd.peer_conns().is_empty());
958
959        fwd.add_peer_conn(10);
960        fwd.add_peer_conn(20);
961        assert_eq!(fwd.peer_conns(), HashSet::from([10, 20]));
962
963        // Duplicate add is idempotent
964        fwd.add_peer_conn(10);
965        assert_eq!(fwd.peer_conns(), HashSet::from([10, 20]));
966
967        fwd.remove_peer_conn(10);
968        assert_eq!(fwd.peer_conns(), HashSet::from([20]));
969    }
970
971    // ── Multi-peer ACK aggregation ──────────────────────────────────────────
972
973    #[tokio::test]
974    async fn test_multi_ack_all_ok() {
975        let fwd = make_forwarder();
976        let rx = fwd.register_ack(10, 3);
977        fwd.resolve_ack(10, Ok(()));
978        fwd.resolve_ack(10, Ok(()));
979        fwd.resolve_ack(10, Ok(()));
980        let result = rx.await.unwrap();
981        assert!(result.is_ok());
982    }
983
984    #[tokio::test]
985    async fn test_multi_ack_partial_error() {
986        let fwd = make_forwarder();
987        let rx = fwd.register_ack(11, 2);
988        fwd.resolve_ack(11, Ok(()));
989        fwd.resolve_ack(
990            11,
991            Err(DataPathError::RemoteSubscriptionAckError(
992                "peer-fail".into(),
993            )),
994        );
995        let result = rx.await.unwrap();
996        assert!(result.is_err());
997    }
998
999    #[tokio::test]
1000    async fn test_multi_ack_all_errors() {
1001        let fwd = make_forwarder();
1002        let rx = fwd.register_ack(12, 2);
1003        fwd.resolve_ack(
1004            12,
1005            Err(DataPathError::RemoteSubscriptionAckError("e1".into())),
1006        );
1007        fwd.resolve_ack(
1008            12,
1009            Err(DataPathError::RemoteSubscriptionAckError("e2".into())),
1010        );
1011        let result = rx.await.unwrap();
1012        assert!(result.is_err());
1013    }
1014
1015    // ── Forwarded sub tracking ──────────────────────────────────────────────
1016
1017    #[test]
1018    fn test_forwarded_sub_tracking() {
1019        let fwd = make_forwarder();
1020        let name = make_name();
1021
1022        assert!(!fwd.has_seen_sub_id(42));
1023        fwd.register_forwarded_sub(&name, 42);
1024        assert!(fwd.has_seen_sub_id(42));
1025
1026        // name → sub_id lookup
1027        assert_eq!(
1028            fwd.inner.forwarded_sub_for_name.read().get(&name).copied(),
1029            Some(42)
1030        );
1031
1032        fwd.remove_forwarded_sub(&name, 42);
1033        assert!(!fwd.has_seen_sub_id(42));
1034        assert!(fwd.inner.forwarded_sub_for_name.read().get(&name).is_none());
1035    }
1036
1037    // ── forward_to_peers ────────────────────────────────────────────────────
1038
1039    #[tokio::test]
1040    async fn test_forward_to_peers_no_target() {
1041        let fwd = make_forwarder();
1042        let mp = MessageProcessor::new();
1043        let name = make_name();
1044        let targets = ForwardTargets {
1045            peers: None,
1046            forward_conn: None,
1047        };
1048        // No peer target → early return Ok
1049        let result = fwd.forward_to_peers(&mp, &name, 1, true, &targets, 2).await;
1050        assert!(result.is_ok());
1051    }
1052
1053    #[tokio::test]
1054    async fn test_forward_to_peers_no_conns() {
1055        let fwd = make_forwarder();
1056        let mp = MessageProcessor::new();
1057        let name = make_name();
1058        let targets = ForwardTargets {
1059            peers: Some(PeerTarget::All),
1060            forward_conn: None,
1061        };
1062        // Peer target set but no peer conns → early return Ok
1063        let result = fwd.forward_to_peers(&mp, &name, 1, true, &targets, 2).await;
1064        assert!(result.is_ok());
1065    }
1066
1067    #[tokio::test]
1068    async fn test_forward_to_peers_send_failure_still_ok() {
1069        let fwd = make_forwarder();
1070        let mp = MessageProcessor::new();
1071        let name = make_name();
1072
1073        // Add peer conns that don't actually exist in the forwarder table
1074        fwd.add_peer_conn(100);
1075        fwd.add_peer_conn(200);
1076
1077        let targets = ForwardTargets {
1078            peers: Some(PeerTarget::All),
1079            forward_conn: None,
1080        };
1081
1082        // send_msg will fail since conns 100/200 don't exist → sends fail,
1083        // sent_count == 0, returns Ok (no ACK wait needed)
1084        let result = fwd
1085            .forward_to_peers(&mp, &name, 50, true, &targets, 2)
1086            .await;
1087        assert!(result.is_ok());
1088        // Despite failure, the sub was registered in seen set
1089        assert!(fwd.has_seen_sub_id(50));
1090    }
1091
1092    #[tokio::test]
1093    async fn test_forward_to_peers_exclude_conn() {
1094        let fwd = make_forwarder();
1095        let mp = MessageProcessor::new();
1096        let name = make_name();
1097
1098        fwd.add_peer_conn(100);
1099        fwd.add_peer_conn(200);
1100
1101        let targets = ForwardTargets {
1102            peers: Some(PeerTarget::ExcludeConn(100)),
1103            forward_conn: None,
1104        };
1105
1106        // Only conn 200 will be targeted (and fail since it doesn't exist)
1107        let result = fwd
1108            .forward_to_peers(&mp, &name, 51, true, &targets, 2)
1109            .await;
1110        assert!(result.is_ok());
1111    }
1112
1113    #[tokio::test]
1114    async fn test_forward_to_peers_unsubscribe() {
1115        let fwd = make_forwarder();
1116        let mp = MessageProcessor::new();
1117        let name = make_name();
1118
1119        // Pre-register to test removal path
1120        fwd.register_forwarded_sub(&name, 60);
1121        assert!(fwd.has_seen_sub_id(60));
1122
1123        fwd.add_peer_conn(100);
1124        let targets = ForwardTargets {
1125            peers: Some(PeerTarget::All),
1126            forward_conn: None,
1127        };
1128
1129        let result = fwd
1130            .forward_to_peers(&mp, &name, 60, false, &targets, 2)
1131            .await;
1132        assert!(result.is_ok());
1133        // Unsubscribe removes from seen set
1134        assert!(!fwd.has_seen_sub_id(60));
1135    }
1136
1137    // ── forward_to_conn ─────────────────────────────────────────────────────
1138
1139    #[tokio::test]
1140    async fn test_forward_to_conn_no_target() {
1141        let fwd = make_forwarder();
1142        let mp = MessageProcessor::new();
1143        let name = make_name();
1144        let msg = super::super::build_subscribe_msg(&name, 70, 2).unwrap();
1145        let targets = ForwardTargets {
1146            peers: None,
1147            forward_conn: None,
1148        };
1149        let result = fwd.forward_to_conn(&mp, &msg, 70, true, &targets).await;
1150        assert!(result.is_ok());
1151    }
1152
1153    #[tokio::test]
1154    async fn test_forward_to_conn_send_failure() {
1155        let fwd = make_forwarder();
1156        let mp = MessageProcessor::new();
1157        let name = make_name();
1158        let msg = super::super::build_subscribe_msg(&name, 71, 2).unwrap();
1159        let targets = ForwardTargets {
1160            peers: None,
1161            forward_conn: Some(999), // non-existent connection
1162        };
1163        let result = fwd.forward_to_conn(&mp, &msg, 71, true, &targets).await;
1164        assert!(result.is_err());
1165        // ACK should have been cleaned up
1166        assert!(!fwd.inner.pending_acks.read().contains_key(&71));
1167    }
1168
1169    // ── forward_and_ack ─────────────────────────────────────────────────────
1170
1171    #[tokio::test]
1172    async fn test_forward_and_ack_no_targets() {
1173        let fwd = make_forwarder();
1174        let mp = MessageProcessor::new();
1175        let name = make_name();
1176        let msg = super::super::build_subscribe_msg(&name, 80, 2).unwrap();
1177        let targets = ForwardTargets::none();
1178        // No targets → both return Ok, no upstream ACK sent (None)
1179        fwd.forward_and_ack(&mp, msg, name, 80, true, targets, 1, None, 2)
1180            .await;
1181    }
1182
1183    #[tokio::test]
1184    async fn test_forward_and_ack_peer_failure_nonfatal() {
1185        let fwd = make_forwarder();
1186        let mp = MessageProcessor::new();
1187        let name = make_name();
1188        let msg = super::super::build_subscribe_msg(&name, 81, 2).unwrap();
1189
1190        // peer target with fake conns → send fails → peer error is non-fatal
1191        fwd.add_peer_conn(300);
1192        let targets = ForwardTargets {
1193            peers: Some(PeerTarget::All),
1194            forward_conn: None,
1195        };
1196        fwd.forward_and_ack(&mp, msg, name, 81, true, targets, 1, None, 2)
1197            .await;
1198        // Should not panic; peer failure is logged but not propagated
1199    }
1200
1201    #[tokio::test]
1202    async fn test_forward_and_ack_forward_conn_failure() {
1203        let fwd = make_forwarder();
1204        let mp = MessageProcessor::new();
1205        let name = make_name();
1206        let msg = super::super::build_subscribe_msg(&name, 82, 2).unwrap();
1207
1208        let targets = ForwardTargets {
1209            peers: None,
1210            forward_conn: Some(999), // non-existent
1211        };
1212        // forward_conn failure → upstream ACK should NOT be sent (no upstream_subscription_id)
1213        fwd.forward_and_ack(&mp, msg, name, 82, true, targets, 1, None, 2)
1214            .await;
1215    }
1216
1217    // ── notify_peers_unsubscribe ────────────────────────────────────────────
1218
1219    #[tokio::test]
1220    async fn test_notify_peers_unsubscribe_unknown_name() {
1221        let fwd = make_forwarder();
1222        let mp = MessageProcessor::new();
1223        let name = make_name();
1224        // Never forwarded → should return immediately
1225        fwd.notify_peers_unsubscribe(&mp, &name).await;
1226    }
1227
1228    #[tokio::test]
1229    async fn test_notify_peers_unsubscribe_known_name() {
1230        let fwd = make_forwarder();
1231        let mp = MessageProcessor::new();
1232        let name = make_name();
1233
1234        // Register the name first
1235        fwd.register_forwarded_sub(&name, 90);
1236        fwd.add_peer_conn(400); // fake conn → send will fail but we just check it doesn't panic
1237
1238        fwd.notify_peers_unsubscribe(&mp, &name).await;
1239        // After unsubscribe, the seen set no longer contains this ID
1240        assert!(!fwd.has_seen_sub_id(90));
1241    }
1242
1243    // ── on_connection_drop ──────────────────────────────────────────────────
1244
1245    #[test]
1246    fn test_on_connection_drop_removes_peer_state() {
1247        let fwd = make_forwarder_with_state();
1248        let state = fwd.peer_state().unwrap();
1249        state.write().insert(
1250            "peer-a".to_string(),
1251            PeerEntry {
1252                conn_id: 50,
1253                endpoint: "http://a".to_string(),
1254                is_outgoing: true,
1255            },
1256        );
1257        fwd.add_peer_conn(50);
1258
1259        fwd.on_connection_drop(50);
1260        assert!(!fwd.peer_conns().contains(&50));
1261        assert!(!state.read().contains("peer-a"));
1262    }
1263
1264    #[test]
1265    fn test_on_connection_drop_unknown_conn() {
1266        let fwd = make_forwarder_with_state();
1267        // Should not panic on unknown conn
1268        fwd.on_connection_drop(999);
1269    }
1270
1271    // ── on_incoming_peer ────────────────────────────────────────────────────
1272
1273    #[tokio::test]
1274    async fn test_on_incoming_peer_dedup() {
1275        let fwd = make_forwarder_with_state();
1276        let mp = MessageProcessor::new();
1277
1278        let state = fwd.peer_state().unwrap();
1279        state.write().insert(
1280            "peer-a".to_string(),
1281            PeerEntry {
1282                conn_id: 10,
1283                endpoint: "http://a".to_string(),
1284                is_outgoing: false,
1285            },
1286        );
1287
1288        // Second registration of same node_id should be a no-op
1289        fwd.on_incoming_peer(&mp, "peer-a".to_string(), 20);
1290        // conn_id should still be the original
1291        assert_eq!(state.read().conn_id("peer-a"), Some(10));
1292    }
1293
1294    #[tokio::test]
1295    async fn test_on_incoming_peer_new() {
1296        let fwd = make_forwarder_with_state();
1297        let mp = MessageProcessor::new();
1298
1299        fwd.on_incoming_peer(&mp, "peer-b".to_string(), 30);
1300        let state = fwd.peer_state().unwrap();
1301        assert!(state.read().contains("peer-b"));
1302        assert!(fwd.peer_conns().contains(&30));
1303    }
1304
1305    // ── ForwardTargets ──────────────────────────────────────────────────────
1306
1307    #[test]
1308    fn test_forward_targets_has_any() {
1309        assert!(!ForwardTargets::none().has_any());
1310        assert!(
1311            ForwardTargets {
1312                peers: Some(PeerTarget::All),
1313                forward_conn: None,
1314            }
1315            .has_any()
1316        );
1317        assert!(
1318            ForwardTargets {
1319                peers: None,
1320                forward_conn: Some(1),
1321            }
1322            .has_any()
1323        );
1324    }
1325
1326    // ── PeerSync constructors ───────────────────────────────────────────────
1327
1328    #[test]
1329    fn test_standalone_constructor() {
1330        let fwd = PeerSync::standalone();
1331        assert_eq!(fwd.subscription_ttl(), DEFAULT_TTL);
1332        assert!(!fwd.has_peer_state());
1333    }
1334
1335    #[test]
1336    fn test_fullmesh_constructor() {
1337        let fwd = PeerSync::new(&PeerTopology::FullMesh);
1338        assert_eq!(fwd.subscription_ttl(), 2);
1339        assert!(!fwd.has_peer_state());
1340    }
1341
1342    #[test]
1343    fn test_hub_and_spoke_constructor() {
1344        let fwd = PeerSync::new(&PeerTopology::HubAndSpoke);
1345        assert_eq!(fwd.subscription_ttl(), 3);
1346    }
1347
1348    #[test]
1349    fn test_with_peer_state_constructor() {
1350        let fwd = make_forwarder_with_state();
1351        assert!(fwd.has_peer_state());
1352    }
1353
1354    // ── run_discovery ───────────────────────────────────────────────────────
1355
1356    /// Mock discovery that yields a sequence of events then closes.
1357    struct MockDiscovery {
1358        events: Vec<Result<PeerEvent, PeerDiscoveryError>>,
1359        start_error: Option<PeerDiscoveryError>,
1360    }
1361
1362    impl MockDiscovery {
1363        fn new(events: Vec<Result<PeerEvent, PeerDiscoveryError>>) -> Self {
1364            Self {
1365                events,
1366                start_error: None,
1367            }
1368        }
1369
1370        fn with_start_error(err: PeerDiscoveryError) -> Self {
1371            Self {
1372                events: vec![],
1373                start_error: Some(err),
1374            }
1375        }
1376    }
1377
1378    impl PeerDiscovery for MockDiscovery {
1379        async fn start(&mut self) -> Result<(), PeerDiscoveryError> {
1380            if let Some(e) = self.start_error.take() {
1381                Err(e)
1382            } else {
1383                Ok(())
1384            }
1385        }
1386
1387        async fn recv(&mut self) -> Result<PeerEvent, PeerDiscoveryError> {
1388            if self.events.is_empty() {
1389                // Block forever (will be cancelled)
1390                std::future::pending().await
1391            } else {
1392                self.events.remove(0)
1393            }
1394        }
1395    }
1396
1397    #[tokio::test]
1398    async fn test_run_discovery_start_error() {
1399        let fwd = make_forwarder_with_state();
1400        let mp = MessageProcessor::new();
1401        let cancel = CancellationToken::new();
1402        let config = PeerSyncConfig {
1403            self_id: "self".to_string(),
1404            deployment_name: "deploy".to_string(),
1405            topology: PeerTopology::FullMesh,
1406            is_hub: false,
1407        };
1408
1409        let discovery =
1410            MockDiscovery::with_start_error(PeerDiscoveryError::Backend("cannot start".into()));
1411        // Should return without panicking
1412        fwd.run_discovery(&mp, config, discovery, cancel).await;
1413    }
1414
1415    #[tokio::test]
1416    async fn test_run_discovery_cancellation() {
1417        let fwd = make_forwarder_with_state();
1418        let mp = MessageProcessor::new();
1419        let cancel = CancellationToken::new();
1420        let config = PeerSyncConfig {
1421            self_id: "self".to_string(),
1422            deployment_name: "deploy".to_string(),
1423            topology: PeerTopology::FullMesh,
1424            is_hub: false,
1425        };
1426
1427        let discovery = MockDiscovery::new(vec![]);
1428        cancel.cancel();
1429        fwd.run_discovery(&mp, config, discovery, cancel).await;
1430    }
1431
1432    #[tokio::test]
1433    async fn test_run_discovery_error_event() {
1434        let fwd = make_forwarder_with_state();
1435        let mp = MessageProcessor::new();
1436        let cancel = CancellationToken::new();
1437        let config = PeerSyncConfig {
1438            self_id: "self".to_string(),
1439            deployment_name: "deploy".to_string(),
1440            topology: PeerTopology::FullMesh,
1441            is_hub: false,
1442        };
1443
1444        let discovery = MockDiscovery::new(vec![Err(PeerDiscoveryError::Backend(
1445            "stream error".into(),
1446        ))]);
1447        // Should break on error event
1448        fwd.run_discovery(&mp, config, discovery, cancel).await;
1449    }
1450
1451    // ── handle_peer_joined ──────────────────────────────────────────────────
1452
1453    #[tokio::test]
1454    async fn test_handle_peer_joined_skip_self() {
1455        let fwd = make_forwarder_with_state();
1456        let mp = MessageProcessor::new();
1457        let config = PeerSyncConfig {
1458            self_id: "self-node".to_string(),
1459            deployment_name: "deploy".to_string(),
1460            topology: PeerTopology::FullMesh,
1461            is_hub: false,
1462        };
1463
1464        // Peer with same id as self → skip
1465        let peer = make_peer_info("self-node");
1466        fwd.handle_peer_joined(&mp, &config, peer).await;
1467        assert!(fwd.peer_conns().is_empty());
1468    }
1469
1470    #[tokio::test]
1471    async fn test_handle_peer_joined_skip_no_dial_fullmesh() {
1472        let fwd = make_forwarder_with_state();
1473        let mp = MessageProcessor::new();
1474        let config = PeerSyncConfig {
1475            self_id: "z-node".to_string(), // lexicographically larger
1476            deployment_name: "deploy".to_string(),
1477            topology: PeerTopology::FullMesh,
1478            is_hub: false,
1479        };
1480
1481        // In FullMesh, only smaller ID dials → "z-node" > "a-peer", so no dial
1482        let peer = make_peer_info("a-peer");
1483        fwd.handle_peer_joined(&mp, &config, peer).await;
1484        assert!(fwd.peer_conns().is_empty());
1485    }
1486
1487    #[tokio::test]
1488    async fn test_handle_peer_joined_skip_no_dial_spoke() {
1489        let fwd = make_forwarder_with_state();
1490        let mp = MessageProcessor::new();
1491        let config = PeerSyncConfig {
1492            self_id: "spoke-node".to_string(),
1493            deployment_name: "deploy".to_string(),
1494            topology: PeerTopology::HubAndSpoke,
1495            is_hub: false, // spoke never dials
1496        };
1497
1498        let peer = make_peer_info("other");
1499        fwd.handle_peer_joined(&mp, &config, peer).await;
1500        assert!(fwd.peer_conns().is_empty());
1501    }
1502
1503    #[tokio::test]
1504    async fn test_handle_peer_joined_already_connected() {
1505        let fwd = make_forwarder_with_state();
1506        let mp = MessageProcessor::new();
1507        let config = PeerSyncConfig {
1508            self_id: "a-node".to_string(), // smaller → should_dial
1509            deployment_name: "deploy".to_string(),
1510            topology: PeerTopology::FullMesh,
1511            is_hub: false,
1512        };
1513
1514        // Pre-register the peer as already connected
1515        let state = fwd.peer_state().unwrap();
1516        state.write().insert(
1517            "b-peer".to_string(),
1518            PeerEntry {
1519                conn_id: 100,
1520                endpoint: "http://b".to_string(),
1521                is_outgoing: true,
1522            },
1523        );
1524
1525        let peer = make_peer_info("b-peer");
1526        fwd.handle_peer_joined(&mp, &config, peer).await;
1527        // Should skip — no new peer_conn added
1528        assert!(fwd.peer_conns().is_empty());
1529    }
1530
1531    // ── handle_peer_left ────────────────────────────────────────────────────
1532
1533    #[tokio::test]
1534    async fn test_handle_peer_left_known_peer() {
1535        let fwd = make_forwarder_with_state();
1536        let mp = MessageProcessor::new();
1537
1538        let state = fwd.peer_state().unwrap();
1539        state.write().insert(
1540            "peer-x".to_string(),
1541            PeerEntry {
1542                conn_id: 55,
1543                endpoint: "http://x".to_string(),
1544                is_outgoing: true,
1545            },
1546        );
1547        fwd.add_peer_conn(55);
1548
1549        let peer = make_peer_info("peer-x");
1550        fwd.handle_peer_left(&mp, peer).await;
1551        assert!(!fwd.peer_conns().contains(&55));
1552        assert!(!state.read().contains("peer-x"));
1553    }
1554
1555    #[tokio::test]
1556    async fn test_handle_peer_left_unknown_peer() {
1557        let fwd = make_forwarder_with_state();
1558        let mp = MessageProcessor::new();
1559
1560        let peer = make_peer_info("unknown-peer");
1561        // Should not panic
1562        fwd.handle_peer_left(&mp, peer).await;
1563    }
1564
1565    #[tokio::test]
1566    async fn test_handle_peer_left_incoming_peer_no_disconnect() {
1567        let fwd = make_forwarder_with_state();
1568        let mp = MessageProcessor::new();
1569
1570        let state = fwd.peer_state().unwrap();
1571        state.write().insert(
1572            "peer-y".to_string(),
1573            PeerEntry {
1574                conn_id: 66,
1575                endpoint: "http://y".to_string(),
1576                is_outgoing: false, // incoming → no disconnect call
1577            },
1578        );
1579        fwd.add_peer_conn(66);
1580
1581        let peer = make_peer_info("peer-y");
1582        fwd.handle_peer_left(&mp, peer).await;
1583        assert!(!fwd.peer_conns().contains(&66));
1584    }
1585
1586    // ── wait_for_ack_with_retry ─────────────────────────────────────────────
1587
1588    #[tokio::test]
1589    async fn test_wait_for_ack_immediate_ok() {
1590        let fwd = make_forwarder();
1591        let mp = MessageProcessor::new();
1592        let name = make_name();
1593        let msg = super::super::build_subscribe_msg(&name, 100, 2).unwrap();
1594
1595        let (tx, rx) = oneshot::channel();
1596        tx.send(Ok(())).unwrap();
1597
1598        let result = fwd.wait_for_ack_with_retry(&mp, 100, msg, 999, rx).await;
1599        assert!(result.is_ok());
1600    }
1601
1602    #[tokio::test]
1603    async fn test_wait_for_ack_immediate_err() {
1604        let fwd = make_forwarder();
1605        let mp = MessageProcessor::new();
1606        let name = make_name();
1607        let msg = super::super::build_subscribe_msg(&name, 101, 2).unwrap();
1608
1609        let (tx, rx) = oneshot::channel();
1610        tx.send(Err(DataPathError::RemoteSubscriptionAckError(
1611            "nope".into(),
1612        )))
1613        .unwrap();
1614
1615        let result = fwd.wait_for_ack_with_retry(&mp, 101, msg, 999, rx).await;
1616        assert!(result.is_err());
1617    }
1618
1619    #[tokio::test]
1620    async fn test_wait_for_ack_sender_dropped() {
1621        let fwd = make_forwarder();
1622        let mp = MessageProcessor::new();
1623        let name = make_name();
1624        let msg = super::super::build_subscribe_msg(&name, 102, 2).unwrap();
1625
1626        let (_tx, rx) = oneshot::channel::<Result<(), DataPathError>>();
1627        // Drop tx immediately
1628        drop(_tx);
1629
1630        let result = fwd.wait_for_ack_with_retry(&mp, 102, msg, 999, rx).await;
1631        assert!(result.is_err());
1632    }
1633
1634    // ── set_peer_conns ──────────────────────────────────────────────────────
1635
1636    #[test]
1637    fn test_set_peer_conns() {
1638        let fwd = make_forwarder();
1639        fwd.set_peer_conns(HashSet::from([1, 2, 3]));
1640        assert_eq!(fwd.peer_conns(), HashSet::from([1, 2, 3]));
1641    }
1642
1643    // ── peer_label ──────────────────────────────────────────────────────────
1644
1645    #[test]
1646    fn test_peer_label_unknown_conn() {
1647        let fwd = make_forwarder();
1648        let mp = MessageProcessor::new();
1649        // Unknown conn → falls back to conn_id as string
1650        assert_eq!(fwd.peer_label(&mp, 12345), "12345");
1651    }
1652
1653    // ── PendingAck Debug ────────────────────────────────────────────────────
1654
1655    #[test]
1656    fn test_pending_ack_debug() {
1657        let (tx, _rx) = oneshot::channel();
1658        let ack = PendingAck {
1659            remaining: 2,
1660            tx: Some(tx),
1661            errors: vec![DataPathError::RemoteSubscriptionAckError("x".into())],
1662        };
1663        let dbg = format!("{:?}", ack);
1664        assert!(dbg.contains("remaining: 2"));
1665        assert!(dbg.contains("tx: true"));
1666        assert!(dbg.contains("errors: 1"));
1667    }
1668}