1use std::collections::hash_map::Entry;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use std::time::Duration;
17
18use display_error_chain::ErrorChainExt;
19use parking_lot::RwLock;
20use tokio::sync::oneshot;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, warn};
23
24use crate::api::ProtoName;
25use crate::api::proto::dataplane::v1::Message;
26use crate::errors::DataPathError;
27use crate::message_processing::MessageProcessor;
28use crate::messages::utils::DEFAULT_TTL;
29use crate::peer_discovery::config::PeerTopology;
30use crate::peer_discovery::{PeerDiscovery, PeerEvent, PeerInfo};
31use crate::sync::state::{PeerEntry, PeerState};
32
33use super::peer;
34
35pub(crate) const ACK_TIMEOUT: Duration = Duration::from_secs(5);
37pub(crate) const ACK_MAX_RETRIES: u32 = 3;
39
40#[derive(Debug, Clone)]
42pub enum PeerTarget {
43 All,
45 ExcludeConn(u64),
47}
48
49#[derive(Debug, Clone)]
51pub struct ForwardTargets {
52 pub peers: Option<PeerTarget>,
54 pub forward_conn: Option<u64>,
56}
57
58impl ForwardTargets {
59 pub fn has_any(&self) -> bool {
61 self.peers.is_some() || self.forward_conn.is_some()
62 }
63
64 pub fn none() -> Self {
66 Self {
67 peers: None,
68 forward_conn: None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct PeerSyncConfig {
76 pub self_id: String,
78 pub deployment_name: String,
80 pub topology: PeerTopology,
82 pub is_hub: bool,
84}
85
86#[derive(Debug, Clone)]
97pub struct PeerSync {
98 inner: Arc<PeerSyncInner>,
99}
100
101struct PendingAck {
104 remaining: usize,
106 tx: Option<oneshot::Sender<Result<(), DataPathError>>>,
108 errors: Vec<DataPathError>,
110}
111
112impl std::fmt::Debug for PendingAck {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("PendingAck")
115 .field("remaining", &self.remaining)
116 .field("tx", &self.tx.is_some())
117 .field("errors", &self.errors.len())
118 .finish()
119 }
120}
121
122#[derive(Debug)]
123struct PeerSyncInner {
124 peer_conns: RwLock<HashSet<u64>>,
126 seen_sub_ids: RwLock<HashSet<u64>>,
130 forwarded_sub_for_name: RwLock<HashMap<ProtoName, u64>>,
135 pending_acks: RwLock<HashMap<u64, PendingAck>>,
137 subscription_ttl: u32,
139 sync_filter: crate::tables::MatchFilter,
143 peer_state: Option<Arc<RwLock<PeerState>>>,
146}
147
148impl PeerSync {
149 pub fn new(topology: &PeerTopology) -> Self {
151 let (subscription_ttl, sync_filter) = match topology {
152 PeerTopology::FullMesh => (2, crate::tables::MatchFilter::EXCLUDE_PEER),
153 PeerTopology::HubAndSpoke => (3, crate::tables::MatchFilter::ALL),
154 };
155 Self {
156 inner: Arc::new(PeerSyncInner {
157 peer_conns: RwLock::new(HashSet::new()),
158 seen_sub_ids: RwLock::new(HashSet::new()),
159 forwarded_sub_for_name: RwLock::new(HashMap::new()),
160 pending_acks: RwLock::new(HashMap::new()),
161 subscription_ttl,
162 sync_filter,
163 peer_state: None,
164 }),
165 }
166 }
167
168 pub fn with_peer_state(topology: &PeerTopology, peer_state: Arc<RwLock<PeerState>>) -> Self {
170 let (subscription_ttl, sync_filter) = match topology {
171 PeerTopology::FullMesh => (2, crate::tables::MatchFilter::EXCLUDE_PEER),
172 PeerTopology::HubAndSpoke => (3, crate::tables::MatchFilter::ALL),
173 };
174 Self {
175 inner: Arc::new(PeerSyncInner {
176 peer_conns: RwLock::new(HashSet::new()),
177 seen_sub_ids: RwLock::new(HashSet::new()),
178 forwarded_sub_for_name: RwLock::new(HashMap::new()),
179 pending_acks: RwLock::new(HashMap::new()),
180 subscription_ttl,
181 sync_filter,
182 peer_state: Some(peer_state),
183 }),
184 }
185 }
186
187 pub fn standalone() -> Self {
191 Self {
192 inner: Arc::new(PeerSyncInner {
193 peer_conns: RwLock::new(HashSet::new()),
194 seen_sub_ids: RwLock::new(HashSet::new()),
195 forwarded_sub_for_name: RwLock::new(HashMap::new()),
196 pending_acks: RwLock::new(HashMap::new()),
197 subscription_ttl: DEFAULT_TTL,
198 sync_filter: crate::tables::MatchFilter::ALL,
199 peer_state: None,
200 }),
201 }
202 }
203
204 pub fn set_peer_conns(&self, conns: HashSet<u64>) {
206 *self.inner.peer_conns.write() = conns;
207 }
208
209 pub fn add_peer_conn(&self, conn_id: u64) {
211 self.inner.peer_conns.write().insert(conn_id);
212 }
213
214 pub fn remove_peer_conn(&self, conn_id: u64) {
216 self.inner.peer_conns.write().remove(&conn_id);
217 }
218
219 pub fn peer_conns(&self) -> HashSet<u64> {
221 self.inner.peer_conns.read().clone()
222 }
223
224 fn peer_label(&self, mp: &MessageProcessor, conn_id: u64) -> String {
227 mp.forwarder()
228 .get_connection(conn_id)
229 .and_then(|c| c.peer_node_id().map(|s| s.to_string()))
230 .unwrap_or_else(|| conn_id.to_string())
231 }
232
233 pub fn subscription_ttl(&self) -> u32 {
235 self.inner.subscription_ttl
236 }
237
238 pub fn has_peer_state(&self) -> bool {
240 self.inner.peer_state.is_some()
241 }
242
243 pub fn on_incoming_peer(&self, mp: &MessageProcessor, node_id: String, conn_id: u64) {
248 if let Some(ref state) = self.inner.peer_state {
250 if state.read().contains(&node_id) {
251 debug!(
252 %node_id,
253 %conn_id,
254 "incoming peer already registered, skipping"
255 );
256 return;
257 }
258 info!(
259 %node_id,
260 %conn_id,
261 "registering incoming peer in state table"
262 );
263 state.write().insert(
264 node_id,
265 PeerEntry {
266 conn_id,
267 endpoint: String::new(),
268 is_outgoing: false,
269 },
270 );
271 }
272
273 self.add_peer_conn_and_sync(mp, conn_id);
274 }
275
276 pub fn add_peer_conn_and_sync(&self, mp: &MessageProcessor, conn_id: u64) {
279 self.add_peer_conn(conn_id);
280 let forwarder = self.clone();
281 let mp = mp.clone();
282 tokio::spawn(async move {
283 let ttl = forwarder.inner.subscription_ttl;
284 let filter = forwarder.inner.sync_filter;
285 let subscriptions = peer::collect_subscriptions(&mp, conn_id, filter);
286 match peer::send_subscriptions(&mp, conn_id, &subscriptions, ttl).await {
287 Ok(count) => {
288 info!(
289 %conn_id,
290 count,
291 "completed full sync for peer"
292 );
293 for (name, sub_id) in &subscriptions {
294 forwarder.register_forwarded_sub(name, *sub_id);
295 }
296 }
297 Err(e) => {
298 warn!(%conn_id, error = %e, "full sync failed for peer");
299 }
300 }
301 });
302 }
303
304 pub async fn run_discovery<D: PeerDiscovery + Send>(
311 &self,
312 mp: &MessageProcessor,
313 config: PeerSyncConfig,
314 mut discovery: D,
315 cancel: CancellationToken,
316 ) {
317 info!(
318 self_id = %config.self_id,
319 deployment_name = %config.deployment_name,
320 "peer sync starting"
321 );
322
323 if let Err(e) = discovery.start().await {
324 error!(error = %e, "failed to start peer discovery");
325 return;
326 }
327
328 loop {
329 tokio::select! {
330 _ = cancel.cancelled() => {
331 info!("peer sync shutting down");
332 break;
333 }
334 event = discovery.recv() => {
335 match event {
336 Ok(PeerEvent::Joined(peer)) => {
337 self.handle_peer_joined(mp, &config, peer).await;
338 }
339 Ok(PeerEvent::Left(peer)) => {
340 self.handle_peer_left(mp, peer).await;
341 }
342 Err(e) => {
343 error!(error = %e, "peer discovery error, shutting down");
344 break;
345 }
346 }
347 }
348 }
349 }
350 }
351
352 async fn handle_peer_joined(
360 &self,
361 mp: &MessageProcessor,
362 config: &PeerSyncConfig,
363 peer: PeerInfo,
364 ) {
365 if peer.id == config.self_id {
367 debug!(peer_id = %peer.id, "skipping self in peer discovery");
368 return;
369 }
370
371 let should_dial = match config.topology {
373 PeerTopology::FullMesh => config.self_id < peer.id,
374 PeerTopology::HubAndSpoke => config.is_hub,
375 };
376
377 if !should_dial {
378 debug!(
379 peer_id = %peer.id,
380 self_id = %config.self_id,
381 topology = ?config.topology,
382 "skipping outbound connection (waiting for incoming)"
383 );
384 return;
385 }
386
387 if let Some(ref state) = self.inner.peer_state
389 && state.read().contains(&peer.id)
390 {
391 debug!(peer_id = %peer.id, "peer already connected, skipping");
392 return;
393 }
394
395 info!(peer_id = %peer.id, endpoint = %peer.config.endpoint, "connecting to peer");
396
397 match mp.connect(peer.config.clone(), None, None).await {
398 Ok((_handle, conn_id)) => {
399 info!(peer_id = %peer.id, %conn_id, "connected to peer");
400
401 if let Some(ref state) = self.inner.peer_state {
402 state.write().insert(
403 peer.id.clone(),
404 PeerEntry {
405 conn_id,
406 endpoint: peer.config.endpoint.clone(),
407 is_outgoing: true,
408 },
409 );
410 }
411
412 self.add_peer_conn(conn_id);
413
414 let ttl = self.inner.subscription_ttl;
416 let sync_result = if config.is_hub && config.topology == PeerTopology::HubAndSpoke {
417 peer::send_full_sync(mp, conn_id, ttl).await
418 } else {
419 peer::send_local_remote_sync(mp, conn_id, ttl).await
420 };
421 if let Err(e) = sync_result {
422 warn!(
423 peer_id = %peer.id,
424 error = %e,
425 "full sync failed after connecting to peer"
426 );
427 }
428 }
429 Err(e) => {
430 error!(
431 peer_id = %peer.id,
432 endpoint = %peer.config.endpoint,
433 error = %e.chain(),
434 "failed to connect to peer"
435 );
436 }
437 }
438 }
439
440 async fn handle_peer_left(&self, mp: &MessageProcessor, peer: PeerInfo) {
442 let entry = self
443 .inner
444 .peer_state
445 .as_ref()
446 .and_then(|s| s.write().remove(&peer.id));
447
448 if let Some(entry) = entry {
449 info!(
450 peer_id = %peer.id,
451 conn_id = entry.conn_id,
452 "peer left, disconnecting"
453 );
454
455 self.remove_peer_conn(entry.conn_id);
456 if entry.is_outgoing
457 && let Err(e) = mp.disconnect(entry.conn_id)
458 {
459 warn!(
460 peer_id = %peer.id,
461 error = %e,
462 "error disconnecting from peer"
463 );
464 }
465 }
466 }
467
468 pub fn on_connection_drop(&self, conn_id: u64) {
470 if let Some(ref state) = self.inner.peer_state
471 && let Some((peer_id, _entry)) = state.write().remove_by_conn(conn_id)
472 {
473 info!(
474 %peer_id,
475 %conn_id,
476 "peer connection dropped, cleaned up state"
477 );
478 self.remove_peer_conn(conn_id);
479 }
480 }
481
482 pub fn peer_state(&self) -> Option<Arc<RwLock<PeerState>>> {
484 self.inner.peer_state.clone()
485 }
486
487 pub fn register_forwarded_sub(&self, name: &ProtoName, sub_id: u64) {
494 self.inner.seen_sub_ids.write().insert(sub_id);
495 self.inner
496 .forwarded_sub_for_name
497 .write()
498 .insert(name.clone(), sub_id);
499 }
500
501 pub fn remove_forwarded_sub(&self, name: &ProtoName, sub_id: u64) {
503 self.inner.seen_sub_ids.write().remove(&sub_id);
504 self.inner.forwarded_sub_for_name.write().remove(name);
505 }
506
507 pub fn has_seen_sub_id(&self, sub_id: u64) -> bool {
510 self.inner.seen_sub_ids.read().contains(&sub_id)
511 }
512
513 pub(crate) fn register_ack(
518 &self,
519 ack_id: u64,
520 expected_count: usize,
521 ) -> oneshot::Receiver<Result<(), DataPathError>> {
522 let (tx, rx) = oneshot::channel();
523 self.inner.pending_acks.write().insert(
524 ack_id,
525 PendingAck {
526 remaining: expected_count,
527 tx: Some(tx),
528 errors: Vec::new(),
529 },
530 );
531 rx
532 }
533
534 pub(crate) fn resolve_ack(&self, ack_id: u64, result: Result<(), DataPathError>) {
538 let mut acks = self.inner.pending_acks.write();
539 let Entry::Occupied(mut entry) = acks.entry(ack_id) else {
540 return;
541 };
542
543 let pending = entry.get_mut();
544 debug!(%ack_id, remaining = pending.remaining, "subscription: remote ack received");
545
546 if let Err(e) = result {
547 pending.errors.push(e);
548 }
549
550 pending.remaining = pending.remaining.saturating_sub(1);
551 if pending.remaining == 0 {
552 let pending = entry.remove();
553 if let Some(tx) = pending.tx {
554 let final_result = if pending.errors.is_empty() {
555 Ok(())
556 } else {
557 let msg = pending
558 .errors
559 .iter()
560 .map(|e| e.to_string())
561 .collect::<Vec<_>>()
562 .join("; ");
563 Err(DataPathError::RemoteSubscriptionAckError(msg))
564 };
565 let _ = tx.send(final_result);
566 }
567 }
568 }
569
570 pub(crate) fn remove_ack(&self, ack_id: u64) {
572 self.inner.pending_acks.write().remove(&ack_id);
573 }
574
575 #[allow(clippy::too_many_arguments)]
589 pub fn spawn_forward_and_ack(
590 &self,
591 mp: MessageProcessor,
592 msg: Message,
593 name: ProtoName,
594 sub_id: u64,
595 add: bool,
596 targets: ForwardTargets,
597 in_connection: u64,
598 upstream_subscription_id: Option<u64>,
599 peer_ttl: u32,
600 drain: drain::Watch,
601 ) {
602 let forwarder = self.clone();
603 tokio::spawn(async move {
604 tokio::select! {
605 _ = forwarder.forward_and_ack(
606 &mp,
607 msg,
608 name,
609 sub_id,
610 add,
611 targets,
612 in_connection,
613 upstream_subscription_id,
614 peer_ttl,
615 ) => {}
616 _ = drain.signaled() => {
617 debug!(%in_connection, %sub_id, "subscription forwarder stopped by drain");
618 }
619 }
620 });
621 }
622
623 #[allow(clippy::too_many_arguments)]
625 async fn forward_and_ack(
626 &self,
627 mp: &MessageProcessor,
628 msg: Message,
629 name: ProtoName,
630 sub_id: u64,
631 add: bool,
632 targets: ForwardTargets,
633 in_connection: u64,
634 upstream_subscription_id: Option<u64>,
635 peer_ttl: u32,
636 ) {
637 let (peer_result, forward_result) = tokio::join!(
639 self.forward_to_peers(mp, &name, sub_id, add, &targets, peer_ttl),
640 self.forward_to_conn(mp, &msg, sub_id, add, &targets),
641 );
642
643 let final_result = match (&peer_result, &forward_result) {
647 (_, Err(e)) => {
648 Err(DataPathError::RemoteSubscriptionAckError(e.to_string()))
650 }
651 (Err(e), _) => {
652 warn!(error = %e, %name, "peer subscription forwarding failed (non-fatal)");
654 Ok(())
655 }
656 _ => Ok(()),
657 };
658
659 if let Some(id) = upstream_subscription_id {
661 debug!(
662 %in_connection,
663 subscription_id = id,
664 ok = final_result.is_ok(),
665 "sending subscription ack after forwarding"
666 );
667 mp.send_subscription_ack(in_connection, id, &final_result)
668 .await;
669 }
670 }
671
672 async fn forward_to_peers(
683 &self,
684 mp: &MessageProcessor,
685 name: &ProtoName,
686 sub_id: u64,
687 add: bool,
688 targets: &ForwardTargets,
689 ttl: u32,
690 ) -> Result<(), DataPathError> {
691 let peer_target = match &targets.peers {
692 Some(t) => t,
693 None => return Ok(()),
694 };
695
696 let peer_conns: Vec<u64> = {
698 let conns = self.inner.peer_conns.read();
699 match peer_target {
700 PeerTarget::All => conns.iter().copied().collect(),
701 PeerTarget::ExcludeConn(exclude) => {
702 conns.iter().copied().filter(|c| c != exclude).collect()
703 }
704 }
705 };
706
707 if peer_conns.is_empty() {
708 debug!(%name, "no peer connections, skipping peer forwarding");
709 return Ok(());
710 }
711
712 let action = if add { "subscribe" } else { "unsubscribe" };
713 debug!(%name, %sub_id, %action, ?peer_conns, "forwarding to peers");
714
715 let build_result = if add {
717 self.register_forwarded_sub(name, sub_id);
718 super::build_subscribe_msg(name, sub_id, ttl)
719 } else {
720 self.remove_forwarded_sub(name, sub_id);
721 super::build_unsubscribe_msg(name, sub_id, ttl)
722 };
723
724 let peer_msg = match build_result {
725 Ok(m) => m,
726 Err(e) => {
727 warn!(%action, error = %e, %name, "failed to build peer message");
728 return Err(e.into());
729 }
730 };
731
732 let send_results = futures::future::join_all(peer_conns.iter().map(|&conn_id| {
734 let msg = peer_msg.clone();
735 async move { (conn_id, mp.send_msg(msg, conn_id).await) }
736 }))
737 .await;
738
739 let mut sent_count = 0usize;
741 for (conn_id, result) in &send_results {
742 if let Err(e) = result {
743 let peer = self.peer_label(mp, *conn_id);
744 warn!(%conn_id, %peer, error = %e, "failed to send to peer");
745 } else {
746 sent_count += 1;
747 }
748 }
749
750 if sent_count == 0 {
751 return Ok(());
752 }
753
754 let rx = self.register_ack(sub_id, sent_count);
756 match tokio::time::timeout(ACK_TIMEOUT, rx).await {
757 Ok(Ok(result)) => {
758 if let Err(e) = &result {
759 warn!(%name, %sub_id, error = %e, "peer ACK aggregated failure");
760 }
761 }
762 Ok(Err(_)) => {
763 debug!(%name, %sub_id, "peer ACK sender dropped");
764 }
765 Err(_) => {
766 warn!(%name, %sub_id, "peer ACK timeout");
767 self.remove_ack(sub_id);
768 }
769 }
770
771 Ok(())
772 }
773
774 async fn forward_to_conn(
779 &self,
780 mp: &MessageProcessor,
781 msg: &Message,
782 sub_id: u64,
783 add: bool,
784 targets: &ForwardTargets,
785 ) -> Result<(), DataPathError> {
786 let out_conn = match targets.forward_conn {
787 Some(c) => c,
788 None => return Ok(()),
789 };
790
791 debug!(%out_conn, %add, "forwarding subscription to forward connection");
792
793 let rx = self.register_ack(sub_id, 1);
795 if let Err(e) = mp.send_msg(msg.clone(), out_conn).await {
796 self.remove_ack(sub_id);
797 return Err(e);
798 }
799
800 let source = msg.get_source();
802 let dst = msg.get_dst();
803 let identity = msg.get_identity();
804 mp.remote_sync()
805 .on_forwarded_subscription(source, dst, identity, out_conn, add, sub_id);
806
807 let result = self
809 .wait_for_ack_with_retry(mp, sub_id, msg.clone(), out_conn, rx)
810 .await;
811
812 self.remove_ack(sub_id);
813 result
814 }
815
816 async fn wait_for_ack_with_retry(
818 &self,
819 mp: &MessageProcessor,
820 _sub_id: u64,
821 msg: Message,
822 out_conn: u64,
823 mut rx: oneshot::Receiver<Result<(), DataPathError>>,
824 ) -> Result<(), DataPathError> {
825 for attempt in 0..=ACK_MAX_RETRIES {
826 tokio::select! {
827 result = &mut rx => {
828 return match result {
829 Ok(r) => r,
830 Err(_) => Err(DataPathError::RemoteSubscriptionAckTimeout(attempt)),
831 };
832 }
833 _ = tokio::time::sleep(ACK_TIMEOUT) => {
834 if attempt < ACK_MAX_RETRIES {
835 debug!(attempt = attempt + 1, "remote sub ack timeout, retrying");
836 mp.send_msg(msg.clone(), out_conn).await?;
837 }
838 }
839 }
840 }
841
842 Err(DataPathError::RemoteSubscriptionAckTimeout(ACK_MAX_RETRIES))
843 }
844
845 pub async fn notify_peers_unsubscribe(&self, mp: &MessageProcessor, name: &ProtoName) {
848 let sub_id = match self.inner.forwarded_sub_for_name.read().get(name).copied() {
850 Some(id) => id,
851 None => return, };
853
854 let targets = ForwardTargets {
855 peers: Some(PeerTarget::All),
856 forward_conn: None,
857 };
858 if let Err(e) = self
859 .forward_to_peers(
860 mp,
861 name,
862 sub_id,
863 false,
864 &targets,
865 self.inner.subscription_ttl,
866 )
867 .await
868 {
869 warn!(%name, %sub_id, error = %e, "failed to notify peers of unsubscription");
870 }
871 }
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877
878 use slim_config::client::ClientConfig;
879 use tokio_util::sync::CancellationToken;
880
881 use crate::message_processing::MessageProcessor;
882 use crate::peer_discovery::{PeerDiscovery, PeerDiscoveryError, PeerEvent, PeerInfo};
883
884 fn make_forwarder() -> PeerSync {
885 let mp = MessageProcessor::new();
886 mp.peer_sync()
887 }
888
889 fn make_forwarder_with_state() -> PeerSync {
890 PeerSync::with_peer_state(
891 &PeerTopology::FullMesh,
892 Arc::new(RwLock::new(PeerState::new())),
893 )
894 }
895
896 fn make_name() -> ProtoName {
897 ProtoName::from_strings(["org", "example", "svc"])
898 }
899
900 fn make_peer_info(id: &str) -> PeerInfo {
901 PeerInfo {
902 id: id.to_string(),
903 config: ClientConfig {
904 endpoint: "http://127.0.0.1:9999".to_string(),
905 ..Default::default()
906 },
907 }
908 }
909
910 #[tokio::test]
913 async fn test_register_and_resolve_delivers_ok() {
914 let fwd = make_forwarder();
915 let rx = fwd.register_ack(1, 1);
916 fwd.resolve_ack(1, Ok(()));
917 let result = rx.await.expect("sender dropped unexpectedly");
918 assert!(result.is_ok());
919 }
920
921 #[tokio::test]
922 async fn test_register_and_resolve_delivers_err() {
923 let fwd = make_forwarder();
924 let rx = fwd.register_ack(2, 1);
925 fwd.resolve_ack(
926 2,
927 Err(DataPathError::RemoteSubscriptionAckError("boom".into())),
928 );
929 let result = rx.await.expect("sender dropped unexpectedly");
930 assert!(result.is_err());
931 }
932
933 #[test]
934 fn test_resolve_unknown_id_is_noop() {
935 let fwd = make_forwarder();
936 let mut rx = fwd.register_ack(3, 1);
937 fwd.resolve_ack(4, Ok(()));
939 assert!(
940 rx.try_recv().is_err(),
941 "registered channel must not have received anything"
942 );
943 }
944
945 #[test]
946 fn test_remove_cleans_up() {
947 let fwd = make_forwarder();
948 fwd.register_ack(5, 1);
949 assert!(fwd.inner.pending_acks.read().contains_key(&5));
950 fwd.remove_ack(5);
951 assert!(!fwd.inner.pending_acks.read().contains_key(&5));
952 }
953
954 #[test]
955 fn test_peer_conns_management() {
956 let fwd = make_forwarder();
957 assert!(fwd.peer_conns().is_empty());
958
959 fwd.add_peer_conn(10);
960 fwd.add_peer_conn(20);
961 assert_eq!(fwd.peer_conns(), HashSet::from([10, 20]));
962
963 fwd.add_peer_conn(10);
965 assert_eq!(fwd.peer_conns(), HashSet::from([10, 20]));
966
967 fwd.remove_peer_conn(10);
968 assert_eq!(fwd.peer_conns(), HashSet::from([20]));
969 }
970
971 #[tokio::test]
974 async fn test_multi_ack_all_ok() {
975 let fwd = make_forwarder();
976 let rx = fwd.register_ack(10, 3);
977 fwd.resolve_ack(10, Ok(()));
978 fwd.resolve_ack(10, Ok(()));
979 fwd.resolve_ack(10, Ok(()));
980 let result = rx.await.unwrap();
981 assert!(result.is_ok());
982 }
983
984 #[tokio::test]
985 async fn test_multi_ack_partial_error() {
986 let fwd = make_forwarder();
987 let rx = fwd.register_ack(11, 2);
988 fwd.resolve_ack(11, Ok(()));
989 fwd.resolve_ack(
990 11,
991 Err(DataPathError::RemoteSubscriptionAckError(
992 "peer-fail".into(),
993 )),
994 );
995 let result = rx.await.unwrap();
996 assert!(result.is_err());
997 }
998
999 #[tokio::test]
1000 async fn test_multi_ack_all_errors() {
1001 let fwd = make_forwarder();
1002 let rx = fwd.register_ack(12, 2);
1003 fwd.resolve_ack(
1004 12,
1005 Err(DataPathError::RemoteSubscriptionAckError("e1".into())),
1006 );
1007 fwd.resolve_ack(
1008 12,
1009 Err(DataPathError::RemoteSubscriptionAckError("e2".into())),
1010 );
1011 let result = rx.await.unwrap();
1012 assert!(result.is_err());
1013 }
1014
1015 #[test]
1018 fn test_forwarded_sub_tracking() {
1019 let fwd = make_forwarder();
1020 let name = make_name();
1021
1022 assert!(!fwd.has_seen_sub_id(42));
1023 fwd.register_forwarded_sub(&name, 42);
1024 assert!(fwd.has_seen_sub_id(42));
1025
1026 assert_eq!(
1028 fwd.inner.forwarded_sub_for_name.read().get(&name).copied(),
1029 Some(42)
1030 );
1031
1032 fwd.remove_forwarded_sub(&name, 42);
1033 assert!(!fwd.has_seen_sub_id(42));
1034 assert!(fwd.inner.forwarded_sub_for_name.read().get(&name).is_none());
1035 }
1036
1037 #[tokio::test]
1040 async fn test_forward_to_peers_no_target() {
1041 let fwd = make_forwarder();
1042 let mp = MessageProcessor::new();
1043 let name = make_name();
1044 let targets = ForwardTargets {
1045 peers: None,
1046 forward_conn: None,
1047 };
1048 let result = fwd.forward_to_peers(&mp, &name, 1, true, &targets, 2).await;
1050 assert!(result.is_ok());
1051 }
1052
1053 #[tokio::test]
1054 async fn test_forward_to_peers_no_conns() {
1055 let fwd = make_forwarder();
1056 let mp = MessageProcessor::new();
1057 let name = make_name();
1058 let targets = ForwardTargets {
1059 peers: Some(PeerTarget::All),
1060 forward_conn: None,
1061 };
1062 let result = fwd.forward_to_peers(&mp, &name, 1, true, &targets, 2).await;
1064 assert!(result.is_ok());
1065 }
1066
1067 #[tokio::test]
1068 async fn test_forward_to_peers_send_failure_still_ok() {
1069 let fwd = make_forwarder();
1070 let mp = MessageProcessor::new();
1071 let name = make_name();
1072
1073 fwd.add_peer_conn(100);
1075 fwd.add_peer_conn(200);
1076
1077 let targets = ForwardTargets {
1078 peers: Some(PeerTarget::All),
1079 forward_conn: None,
1080 };
1081
1082 let result = fwd
1085 .forward_to_peers(&mp, &name, 50, true, &targets, 2)
1086 .await;
1087 assert!(result.is_ok());
1088 assert!(fwd.has_seen_sub_id(50));
1090 }
1091
1092 #[tokio::test]
1093 async fn test_forward_to_peers_exclude_conn() {
1094 let fwd = make_forwarder();
1095 let mp = MessageProcessor::new();
1096 let name = make_name();
1097
1098 fwd.add_peer_conn(100);
1099 fwd.add_peer_conn(200);
1100
1101 let targets = ForwardTargets {
1102 peers: Some(PeerTarget::ExcludeConn(100)),
1103 forward_conn: None,
1104 };
1105
1106 let result = fwd
1108 .forward_to_peers(&mp, &name, 51, true, &targets, 2)
1109 .await;
1110 assert!(result.is_ok());
1111 }
1112
1113 #[tokio::test]
1114 async fn test_forward_to_peers_unsubscribe() {
1115 let fwd = make_forwarder();
1116 let mp = MessageProcessor::new();
1117 let name = make_name();
1118
1119 fwd.register_forwarded_sub(&name, 60);
1121 assert!(fwd.has_seen_sub_id(60));
1122
1123 fwd.add_peer_conn(100);
1124 let targets = ForwardTargets {
1125 peers: Some(PeerTarget::All),
1126 forward_conn: None,
1127 };
1128
1129 let result = fwd
1130 .forward_to_peers(&mp, &name, 60, false, &targets, 2)
1131 .await;
1132 assert!(result.is_ok());
1133 assert!(!fwd.has_seen_sub_id(60));
1135 }
1136
1137 #[tokio::test]
1140 async fn test_forward_to_conn_no_target() {
1141 let fwd = make_forwarder();
1142 let mp = MessageProcessor::new();
1143 let name = make_name();
1144 let msg = super::super::build_subscribe_msg(&name, 70, 2).unwrap();
1145 let targets = ForwardTargets {
1146 peers: None,
1147 forward_conn: None,
1148 };
1149 let result = fwd.forward_to_conn(&mp, &msg, 70, true, &targets).await;
1150 assert!(result.is_ok());
1151 }
1152
1153 #[tokio::test]
1154 async fn test_forward_to_conn_send_failure() {
1155 let fwd = make_forwarder();
1156 let mp = MessageProcessor::new();
1157 let name = make_name();
1158 let msg = super::super::build_subscribe_msg(&name, 71, 2).unwrap();
1159 let targets = ForwardTargets {
1160 peers: None,
1161 forward_conn: Some(999), };
1163 let result = fwd.forward_to_conn(&mp, &msg, 71, true, &targets).await;
1164 assert!(result.is_err());
1165 assert!(!fwd.inner.pending_acks.read().contains_key(&71));
1167 }
1168
1169 #[tokio::test]
1172 async fn test_forward_and_ack_no_targets() {
1173 let fwd = make_forwarder();
1174 let mp = MessageProcessor::new();
1175 let name = make_name();
1176 let msg = super::super::build_subscribe_msg(&name, 80, 2).unwrap();
1177 let targets = ForwardTargets::none();
1178 fwd.forward_and_ack(&mp, msg, name, 80, true, targets, 1, None, 2)
1180 .await;
1181 }
1182
1183 #[tokio::test]
1184 async fn test_forward_and_ack_peer_failure_nonfatal() {
1185 let fwd = make_forwarder();
1186 let mp = MessageProcessor::new();
1187 let name = make_name();
1188 let msg = super::super::build_subscribe_msg(&name, 81, 2).unwrap();
1189
1190 fwd.add_peer_conn(300);
1192 let targets = ForwardTargets {
1193 peers: Some(PeerTarget::All),
1194 forward_conn: None,
1195 };
1196 fwd.forward_and_ack(&mp, msg, name, 81, true, targets, 1, None, 2)
1197 .await;
1198 }
1200
1201 #[tokio::test]
1202 async fn test_forward_and_ack_forward_conn_failure() {
1203 let fwd = make_forwarder();
1204 let mp = MessageProcessor::new();
1205 let name = make_name();
1206 let msg = super::super::build_subscribe_msg(&name, 82, 2).unwrap();
1207
1208 let targets = ForwardTargets {
1209 peers: None,
1210 forward_conn: Some(999), };
1212 fwd.forward_and_ack(&mp, msg, name, 82, true, targets, 1, None, 2)
1214 .await;
1215 }
1216
1217 #[tokio::test]
1220 async fn test_notify_peers_unsubscribe_unknown_name() {
1221 let fwd = make_forwarder();
1222 let mp = MessageProcessor::new();
1223 let name = make_name();
1224 fwd.notify_peers_unsubscribe(&mp, &name).await;
1226 }
1227
1228 #[tokio::test]
1229 async fn test_notify_peers_unsubscribe_known_name() {
1230 let fwd = make_forwarder();
1231 let mp = MessageProcessor::new();
1232 let name = make_name();
1233
1234 fwd.register_forwarded_sub(&name, 90);
1236 fwd.add_peer_conn(400); fwd.notify_peers_unsubscribe(&mp, &name).await;
1239 assert!(!fwd.has_seen_sub_id(90));
1241 }
1242
1243 #[test]
1246 fn test_on_connection_drop_removes_peer_state() {
1247 let fwd = make_forwarder_with_state();
1248 let state = fwd.peer_state().unwrap();
1249 state.write().insert(
1250 "peer-a".to_string(),
1251 PeerEntry {
1252 conn_id: 50,
1253 endpoint: "http://a".to_string(),
1254 is_outgoing: true,
1255 },
1256 );
1257 fwd.add_peer_conn(50);
1258
1259 fwd.on_connection_drop(50);
1260 assert!(!fwd.peer_conns().contains(&50));
1261 assert!(!state.read().contains("peer-a"));
1262 }
1263
1264 #[test]
1265 fn test_on_connection_drop_unknown_conn() {
1266 let fwd = make_forwarder_with_state();
1267 fwd.on_connection_drop(999);
1269 }
1270
1271 #[tokio::test]
1274 async fn test_on_incoming_peer_dedup() {
1275 let fwd = make_forwarder_with_state();
1276 let mp = MessageProcessor::new();
1277
1278 let state = fwd.peer_state().unwrap();
1279 state.write().insert(
1280 "peer-a".to_string(),
1281 PeerEntry {
1282 conn_id: 10,
1283 endpoint: "http://a".to_string(),
1284 is_outgoing: false,
1285 },
1286 );
1287
1288 fwd.on_incoming_peer(&mp, "peer-a".to_string(), 20);
1290 assert_eq!(state.read().conn_id("peer-a"), Some(10));
1292 }
1293
1294 #[tokio::test]
1295 async fn test_on_incoming_peer_new() {
1296 let fwd = make_forwarder_with_state();
1297 let mp = MessageProcessor::new();
1298
1299 fwd.on_incoming_peer(&mp, "peer-b".to_string(), 30);
1300 let state = fwd.peer_state().unwrap();
1301 assert!(state.read().contains("peer-b"));
1302 assert!(fwd.peer_conns().contains(&30));
1303 }
1304
1305 #[test]
1308 fn test_forward_targets_has_any() {
1309 assert!(!ForwardTargets::none().has_any());
1310 assert!(
1311 ForwardTargets {
1312 peers: Some(PeerTarget::All),
1313 forward_conn: None,
1314 }
1315 .has_any()
1316 );
1317 assert!(
1318 ForwardTargets {
1319 peers: None,
1320 forward_conn: Some(1),
1321 }
1322 .has_any()
1323 );
1324 }
1325
1326 #[test]
1329 fn test_standalone_constructor() {
1330 let fwd = PeerSync::standalone();
1331 assert_eq!(fwd.subscription_ttl(), DEFAULT_TTL);
1332 assert!(!fwd.has_peer_state());
1333 }
1334
1335 #[test]
1336 fn test_fullmesh_constructor() {
1337 let fwd = PeerSync::new(&PeerTopology::FullMesh);
1338 assert_eq!(fwd.subscription_ttl(), 2);
1339 assert!(!fwd.has_peer_state());
1340 }
1341
1342 #[test]
1343 fn test_hub_and_spoke_constructor() {
1344 let fwd = PeerSync::new(&PeerTopology::HubAndSpoke);
1345 assert_eq!(fwd.subscription_ttl(), 3);
1346 }
1347
1348 #[test]
1349 fn test_with_peer_state_constructor() {
1350 let fwd = make_forwarder_with_state();
1351 assert!(fwd.has_peer_state());
1352 }
1353
1354 struct MockDiscovery {
1358 events: Vec<Result<PeerEvent, PeerDiscoveryError>>,
1359 start_error: Option<PeerDiscoveryError>,
1360 }
1361
1362 impl MockDiscovery {
1363 fn new(events: Vec<Result<PeerEvent, PeerDiscoveryError>>) -> Self {
1364 Self {
1365 events,
1366 start_error: None,
1367 }
1368 }
1369
1370 fn with_start_error(err: PeerDiscoveryError) -> Self {
1371 Self {
1372 events: vec![],
1373 start_error: Some(err),
1374 }
1375 }
1376 }
1377
1378 impl PeerDiscovery for MockDiscovery {
1379 async fn start(&mut self) -> Result<(), PeerDiscoveryError> {
1380 if let Some(e) = self.start_error.take() {
1381 Err(e)
1382 } else {
1383 Ok(())
1384 }
1385 }
1386
1387 async fn recv(&mut self) -> Result<PeerEvent, PeerDiscoveryError> {
1388 if self.events.is_empty() {
1389 std::future::pending().await
1391 } else {
1392 self.events.remove(0)
1393 }
1394 }
1395 }
1396
1397 #[tokio::test]
1398 async fn test_run_discovery_start_error() {
1399 let fwd = make_forwarder_with_state();
1400 let mp = MessageProcessor::new();
1401 let cancel = CancellationToken::new();
1402 let config = PeerSyncConfig {
1403 self_id: "self".to_string(),
1404 deployment_name: "deploy".to_string(),
1405 topology: PeerTopology::FullMesh,
1406 is_hub: false,
1407 };
1408
1409 let discovery =
1410 MockDiscovery::with_start_error(PeerDiscoveryError::Backend("cannot start".into()));
1411 fwd.run_discovery(&mp, config, discovery, cancel).await;
1413 }
1414
1415 #[tokio::test]
1416 async fn test_run_discovery_cancellation() {
1417 let fwd = make_forwarder_with_state();
1418 let mp = MessageProcessor::new();
1419 let cancel = CancellationToken::new();
1420 let config = PeerSyncConfig {
1421 self_id: "self".to_string(),
1422 deployment_name: "deploy".to_string(),
1423 topology: PeerTopology::FullMesh,
1424 is_hub: false,
1425 };
1426
1427 let discovery = MockDiscovery::new(vec![]);
1428 cancel.cancel();
1429 fwd.run_discovery(&mp, config, discovery, cancel).await;
1430 }
1431
1432 #[tokio::test]
1433 async fn test_run_discovery_error_event() {
1434 let fwd = make_forwarder_with_state();
1435 let mp = MessageProcessor::new();
1436 let cancel = CancellationToken::new();
1437 let config = PeerSyncConfig {
1438 self_id: "self".to_string(),
1439 deployment_name: "deploy".to_string(),
1440 topology: PeerTopology::FullMesh,
1441 is_hub: false,
1442 };
1443
1444 let discovery = MockDiscovery::new(vec![Err(PeerDiscoveryError::Backend(
1445 "stream error".into(),
1446 ))]);
1447 fwd.run_discovery(&mp, config, discovery, cancel).await;
1449 }
1450
1451 #[tokio::test]
1454 async fn test_handle_peer_joined_skip_self() {
1455 let fwd = make_forwarder_with_state();
1456 let mp = MessageProcessor::new();
1457 let config = PeerSyncConfig {
1458 self_id: "self-node".to_string(),
1459 deployment_name: "deploy".to_string(),
1460 topology: PeerTopology::FullMesh,
1461 is_hub: false,
1462 };
1463
1464 let peer = make_peer_info("self-node");
1466 fwd.handle_peer_joined(&mp, &config, peer).await;
1467 assert!(fwd.peer_conns().is_empty());
1468 }
1469
1470 #[tokio::test]
1471 async fn test_handle_peer_joined_skip_no_dial_fullmesh() {
1472 let fwd = make_forwarder_with_state();
1473 let mp = MessageProcessor::new();
1474 let config = PeerSyncConfig {
1475 self_id: "z-node".to_string(), deployment_name: "deploy".to_string(),
1477 topology: PeerTopology::FullMesh,
1478 is_hub: false,
1479 };
1480
1481 let peer = make_peer_info("a-peer");
1483 fwd.handle_peer_joined(&mp, &config, peer).await;
1484 assert!(fwd.peer_conns().is_empty());
1485 }
1486
1487 #[tokio::test]
1488 async fn test_handle_peer_joined_skip_no_dial_spoke() {
1489 let fwd = make_forwarder_with_state();
1490 let mp = MessageProcessor::new();
1491 let config = PeerSyncConfig {
1492 self_id: "spoke-node".to_string(),
1493 deployment_name: "deploy".to_string(),
1494 topology: PeerTopology::HubAndSpoke,
1495 is_hub: false, };
1497
1498 let peer = make_peer_info("other");
1499 fwd.handle_peer_joined(&mp, &config, peer).await;
1500 assert!(fwd.peer_conns().is_empty());
1501 }
1502
1503 #[tokio::test]
1504 async fn test_handle_peer_joined_already_connected() {
1505 let fwd = make_forwarder_with_state();
1506 let mp = MessageProcessor::new();
1507 let config = PeerSyncConfig {
1508 self_id: "a-node".to_string(), deployment_name: "deploy".to_string(),
1510 topology: PeerTopology::FullMesh,
1511 is_hub: false,
1512 };
1513
1514 let state = fwd.peer_state().unwrap();
1516 state.write().insert(
1517 "b-peer".to_string(),
1518 PeerEntry {
1519 conn_id: 100,
1520 endpoint: "http://b".to_string(),
1521 is_outgoing: true,
1522 },
1523 );
1524
1525 let peer = make_peer_info("b-peer");
1526 fwd.handle_peer_joined(&mp, &config, peer).await;
1527 assert!(fwd.peer_conns().is_empty());
1529 }
1530
1531 #[tokio::test]
1534 async fn test_handle_peer_left_known_peer() {
1535 let fwd = make_forwarder_with_state();
1536 let mp = MessageProcessor::new();
1537
1538 let state = fwd.peer_state().unwrap();
1539 state.write().insert(
1540 "peer-x".to_string(),
1541 PeerEntry {
1542 conn_id: 55,
1543 endpoint: "http://x".to_string(),
1544 is_outgoing: true,
1545 },
1546 );
1547 fwd.add_peer_conn(55);
1548
1549 let peer = make_peer_info("peer-x");
1550 fwd.handle_peer_left(&mp, peer).await;
1551 assert!(!fwd.peer_conns().contains(&55));
1552 assert!(!state.read().contains("peer-x"));
1553 }
1554
1555 #[tokio::test]
1556 async fn test_handle_peer_left_unknown_peer() {
1557 let fwd = make_forwarder_with_state();
1558 let mp = MessageProcessor::new();
1559
1560 let peer = make_peer_info("unknown-peer");
1561 fwd.handle_peer_left(&mp, peer).await;
1563 }
1564
1565 #[tokio::test]
1566 async fn test_handle_peer_left_incoming_peer_no_disconnect() {
1567 let fwd = make_forwarder_with_state();
1568 let mp = MessageProcessor::new();
1569
1570 let state = fwd.peer_state().unwrap();
1571 state.write().insert(
1572 "peer-y".to_string(),
1573 PeerEntry {
1574 conn_id: 66,
1575 endpoint: "http://y".to_string(),
1576 is_outgoing: false, },
1578 );
1579 fwd.add_peer_conn(66);
1580
1581 let peer = make_peer_info("peer-y");
1582 fwd.handle_peer_left(&mp, peer).await;
1583 assert!(!fwd.peer_conns().contains(&66));
1584 }
1585
1586 #[tokio::test]
1589 async fn test_wait_for_ack_immediate_ok() {
1590 let fwd = make_forwarder();
1591 let mp = MessageProcessor::new();
1592 let name = make_name();
1593 let msg = super::super::build_subscribe_msg(&name, 100, 2).unwrap();
1594
1595 let (tx, rx) = oneshot::channel();
1596 tx.send(Ok(())).unwrap();
1597
1598 let result = fwd.wait_for_ack_with_retry(&mp, 100, msg, 999, rx).await;
1599 assert!(result.is_ok());
1600 }
1601
1602 #[tokio::test]
1603 async fn test_wait_for_ack_immediate_err() {
1604 let fwd = make_forwarder();
1605 let mp = MessageProcessor::new();
1606 let name = make_name();
1607 let msg = super::super::build_subscribe_msg(&name, 101, 2).unwrap();
1608
1609 let (tx, rx) = oneshot::channel();
1610 tx.send(Err(DataPathError::RemoteSubscriptionAckError(
1611 "nope".into(),
1612 )))
1613 .unwrap();
1614
1615 let result = fwd.wait_for_ack_with_retry(&mp, 101, msg, 999, rx).await;
1616 assert!(result.is_err());
1617 }
1618
1619 #[tokio::test]
1620 async fn test_wait_for_ack_sender_dropped() {
1621 let fwd = make_forwarder();
1622 let mp = MessageProcessor::new();
1623 let name = make_name();
1624 let msg = super::super::build_subscribe_msg(&name, 102, 2).unwrap();
1625
1626 let (_tx, rx) = oneshot::channel::<Result<(), DataPathError>>();
1627 drop(_tx);
1629
1630 let result = fwd.wait_for_ack_with_retry(&mp, 102, msg, 999, rx).await;
1631 assert!(result.is_err());
1632 }
1633
1634 #[test]
1637 fn test_set_peer_conns() {
1638 let fwd = make_forwarder();
1639 fwd.set_peer_conns(HashSet::from([1, 2, 3]));
1640 assert_eq!(fwd.peer_conns(), HashSet::from([1, 2, 3]));
1641 }
1642
1643 #[test]
1646 fn test_peer_label_unknown_conn() {
1647 let fwd = make_forwarder();
1648 let mp = MessageProcessor::new();
1649 assert_eq!(fwd.peer_label(&mp, 12345), "12345");
1651 }
1652
1653 #[test]
1656 fn test_pending_ack_debug() {
1657 let (tx, _rx) = oneshot::channel();
1658 let ack = PendingAck {
1659 remaining: 2,
1660 tx: Some(tx),
1661 errors: vec![DataPathError::RemoteSubscriptionAckError("x".into())],
1662 };
1663 let dbg = format!("{:?}", ack);
1664 assert!(dbg.contains("remaining: 2"));
1665 assert!(dbg.contains("tx: true"));
1666 assert!(dbg.contains("errors: 1"));
1667 }
1668}