1use std::marker::PhantomData;
77use std::net::SocketAddr;
78use std::path::PathBuf;
79use std::sync::Arc;
80use std::time::Duration;
81
82use bytes::Bytes;
83use memberlist_core::delegate::{EventDelegate, VoidDelegate};
84use memberlist_core::proto::NodeState;
85use memberlist_core::transport::Id;
86use nodecraft::CheapClone;
87
88use crate::{IdCodec, PlumtreeDelegate, PlumtreeMemberlist};
89
90#[cfg(feature = "quic")]
91use crate::MapPeerResolver;
92
93#[derive(Debug, Clone)]
95pub struct BridgeConfig {
96 pub log_changes: bool,
98 pub auto_promote: bool,
100 pub static_seeds: Vec<SocketAddr>,
105 pub lazarus_enabled: bool,
110 pub lazarus_interval: Duration,
114 pub persistence_path: Option<PathBuf>,
119}
120
121impl Default for BridgeConfig {
122 fn default() -> Self {
123 Self {
124 log_changes: true,
125 auto_promote: true,
126 static_seeds: Vec::new(),
127 lazarus_enabled: false,
128 lazarus_interval: Duration::from_secs(30),
129 persistence_path: None,
130 }
131 }
132}
133
134impl BridgeConfig {
135 pub fn new() -> Self {
137 Self::default()
138 }
139
140 pub fn with_log_changes(mut self, log: bool) -> Self {
142 self.log_changes = log;
143 self
144 }
145
146 pub fn with_auto_promote(mut self, auto: bool) -> Self {
148 self.auto_promote = auto;
149 self
150 }
151
152 pub fn with_static_seeds(mut self, seeds: Vec<SocketAddr>) -> Self {
169 self.static_seeds = seeds;
170 self
171 }
172
173 pub fn with_lazarus_enabled(mut self, enabled: bool) -> Self {
178 self.lazarus_enabled = enabled;
179 self
180 }
181
182 pub fn with_lazarus_interval(mut self, interval: Duration) -> Self {
187 self.lazarus_interval = interval;
188 self
189 }
190
191 pub fn with_persistence_path(mut self, path: PathBuf) -> Self {
197 self.persistence_path = Some(path);
198 self
199 }
200
201 pub fn should_run_lazarus(&self) -> bool {
203 self.lazarus_enabled && !self.static_seeds.is_empty()
204 }
205}
206
207pub struct PlumtreeBridge<I, PD>
218where
219 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
220 PD: PlumtreeDelegate<I>,
221{
222 pub pm: Arc<PlumtreeMemberlist<I, PD>>,
224 pub config: BridgeConfig,
226 #[cfg(feature = "quic")]
228 pub resolver: Option<Arc<MapPeerResolver<I>>>,
229 #[cfg(not(feature = "quic"))]
230 _marker: PhantomData<I>,
231}
232
233impl<I, PD> PlumtreeBridge<I, PD>
234where
235 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
236 PD: PlumtreeDelegate<I>,
237{
238 pub fn new(pm: Arc<PlumtreeMemberlist<I, PD>>) -> Self {
243 Self {
244 pm,
245 config: BridgeConfig::default(),
246 #[cfg(feature = "quic")]
247 resolver: None,
248 #[cfg(not(feature = "quic"))]
249 _marker: PhantomData,
250 }
251 }
252
253 pub fn with_config(pm: Arc<PlumtreeMemberlist<I, PD>>, config: BridgeConfig) -> Self {
255 Self {
256 pm,
257 config,
258 #[cfg(feature = "quic")]
259 resolver: None,
260 #[cfg(not(feature = "quic"))]
261 _marker: PhantomData,
262 }
263 }
264
265 #[cfg(feature = "quic")]
270 #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
271 pub fn with_resolver(pm: Arc<PlumtreeMemberlist<I, PD>>, resolver: Arc<MapPeerResolver<I>>) -> Self {
272 Self {
273 pm,
274 config: BridgeConfig::default(),
275 resolver: Some(resolver),
276 }
277 }
278
279 #[cfg(feature = "quic")]
281 #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
282 pub fn with_config_and_resolver(
283 pm: Arc<PlumtreeMemberlist<I, PD>>,
284 config: BridgeConfig,
285 resolver: Arc<MapPeerResolver<I>>,
286 ) -> Self {
287 Self {
288 pm,
289 config,
290 resolver: Some(resolver),
291 }
292 }
293
294 pub fn plumtree(&self) -> &Arc<PlumtreeMemberlist<I, PD>> {
296 &self.pm
297 }
298
299 #[cfg(feature = "quic")]
301 #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
302 pub fn resolver(&self) -> Option<&Arc<MapPeerResolver<I>>> {
303 self.resolver.as_ref()
304 }
305
306 pub fn peer_stats(&self) -> crate::PeerStats {
308 self.pm.peer_stats()
309 }
310
311 pub fn topology(&self) -> crate::PeerTopology<I> {
313 self.pm.peers().topology()
314 }
315
316 pub fn add_peer(&self, peer: I) {
321 self.pm.add_peer(peer);
322 }
323
324 pub fn remove_peer(&self, peer: &I) {
329 self.pm.remove_peer(peer);
330 }
331
332 pub async fn broadcast(&self, payload: impl Into<Bytes>) -> crate::Result<crate::MessageId> {
334 self.pm.broadcast(payload).await
335 }
336
337 pub fn incoming_sender(&self) -> async_channel::Sender<(I, crate::PlumtreeMessage)> {
341 self.pm.incoming_sender()
342 }
343
344 pub fn shutdown(&self) {
346 self.pm.shutdown();
347 }
348
349 pub fn is_shutdown(&self) -> bool {
351 self.pm.is_shutdown()
352 }
353}
354
355impl<I, PD> Clone for PlumtreeBridge<I, PD>
356where
357 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
358 PD: PlumtreeDelegate<I>,
359{
360 fn clone(&self) -> Self {
361 Self {
362 pm: self.pm.clone(),
363 config: self.config.clone(),
364 #[cfg(feature = "quic")]
365 resolver: self.resolver.clone(),
366 #[cfg(not(feature = "quic"))]
367 _marker: PhantomData,
368 }
369 }
370}
371
372pub trait AddressExtractor<A> {
376 fn extract_addr(addr: &A) -> Option<SocketAddr>;
378}
379
380impl<A> AddressExtractor<A> for ()
382where
383 A: AsRef<std::net::SocketAddr>,
384{
385 fn extract_addr(addr: &A) -> Option<SocketAddr> {
386 Some(*addr.as_ref())
387 }
388}
389
390pub struct BridgeEventDelegate<I, A, PD, D = VoidDelegate<I, A>>
402where
403 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
404 A: CheapClone + Send + Sync + 'static,
405 PD: PlumtreeDelegate<I>,
406{
407 bridge: PlumtreeBridge<I, PD>,
409 inner: D,
411 _marker: PhantomData<A>,
413}
414
415impl<I, A, PD, D> BridgeEventDelegate<I, A, PD, D>
416where
417 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
418 A: CheapClone + Send + Sync + 'static,
419 PD: PlumtreeDelegate<I>,
420{
421 pub fn new(bridge: PlumtreeBridge<I, PD>) -> BridgeEventDelegate<I, A, PD, VoidDelegate<I, A>> {
423 BridgeEventDelegate {
424 bridge,
425 inner: VoidDelegate::default(),
426 _marker: PhantomData,
427 }
428 }
429
430 pub fn with_inner(bridge: PlumtreeBridge<I, PD>, inner: D) -> Self {
432 Self {
433 bridge,
434 inner,
435 _marker: PhantomData,
436 }
437 }
438
439 pub fn bridge(&self) -> &PlumtreeBridge<I, PD> {
441 &self.bridge
442 }
443
444 pub fn inner(&self) -> &D {
446 &self.inner
447 }
448}
449
450impl<I, A, PD, D> Clone for BridgeEventDelegate<I, A, PD, D>
451where
452 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
453 A: CheapClone + Send + Sync + 'static,
454 PD: PlumtreeDelegate<I>,
455 D: Clone,
456{
457 fn clone(&self) -> Self {
458 Self {
459 bridge: self.bridge.clone(),
460 inner: self.inner.clone(),
461 _marker: PhantomData,
462 }
463 }
464}
465
466impl<I, A, PD, D> EventDelegate for BridgeEventDelegate<I, A, PD, D>
468where
469 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
470 A: CheapClone + Send + Sync + 'static,
471 PD: PlumtreeDelegate<I>,
472 D: EventDelegate<Id = I, Address = A>,
473{
474 type Id = I;
475 type Address = A;
476
477 async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
478 let node_id = node.id().clone();
479
480 #[cfg(feature = "quic")]
482 if let Some(ref resolver) = self.bridge.resolver {
483 if let Some(addr) = extract_socket_addr(&node) {
486 resolver.add_peer(node_id.clone(), addr);
487 if self.bridge.config.log_changes {
488 tracing::info!(
489 peer = ?node_id,
490 addr = %addr,
491 "Bridge: Added peer address to resolver"
492 );
493 }
494 }
495 }
496
497 self.bridge.pm.add_peer(node_id.clone());
501
502 if self.bridge.config.log_changes {
503 let stats = self.bridge.pm.peer_stats();
504 tracing::info!(
505 peer = ?node_id,
506 eager_count = stats.eager_count,
507 lazy_count = stats.lazy_count,
508 "Bridge: Node joined, topology updated"
509 );
510 }
511
512 self.inner.notify_join(node).await;
514 }
515
516 async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
517 let node_id = node.id();
518
519 self.bridge.pm.remove_peer(node_id);
521
522 #[cfg(feature = "quic")]
524 if let Some(ref resolver) = self.bridge.resolver {
525 resolver.remove_peer(node_id);
526 }
527
528 if self.bridge.config.log_changes {
529 let stats = self.bridge.pm.peer_stats();
530 tracing::info!(
531 peer = ?node_id,
532 eager_count = stats.eager_count,
533 lazy_count = stats.lazy_count,
534 "Bridge: Node left, topology cleaned"
535 );
536 }
537
538 self.inner.notify_leave(node).await;
540 }
541
542 async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
543 #[cfg(feature = "quic")]
545 if let Some(ref resolver) = self.bridge.resolver {
546 if let Some(addr) = extract_socket_addr(&node) {
547 let node_id = node.id().clone();
548 resolver.update_peer(node_id.clone(), addr);
549 if self.bridge.config.log_changes {
550 tracing::debug!(
551 peer = ?node_id,
552 addr = %addr,
553 "Bridge: Updated peer address"
554 );
555 }
556 }
557 }
558
559 self.inner.notify_update(node).await;
561 }
562}
563
564#[cfg(feature = "quic")]
566fn extract_socket_addr<I, A>(node: &Arc<NodeState<I, A>>) -> Option<SocketAddr>
567where
568 I: Id,
569 A: CheapClone + Send + Sync + 'static,
570{
571 use std::any::Any;
572
573 let addr = node.address();
574
575 let addr_any = addr as &dyn Any;
578
579 if let Some(socket_addr) = addr_any.downcast_ref::<SocketAddr>() {
580 return Some(*socket_addr);
581 }
582
583 None
591}
592
593pub struct PlumtreeStackBuilder<I, PD>
598where
599 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
600 PD: PlumtreeDelegate<I>,
601{
602 pm: Arc<PlumtreeMemberlist<I, PD>>,
603 config: BridgeConfig,
604 #[cfg(feature = "quic")]
605 resolver: Option<Arc<MapPeerResolver<I>>>,
606}
607
608impl<I, PD> PlumtreeStackBuilder<I, PD>
609where
610 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
611 PD: PlumtreeDelegate<I>,
612{
613 pub fn new(pm: Arc<PlumtreeMemberlist<I, PD>>) -> Self {
615 Self {
616 pm,
617 config: BridgeConfig::default(),
618 #[cfg(feature = "quic")]
619 resolver: None,
620 }
621 }
622
623 pub fn with_config(mut self, config: BridgeConfig) -> Self {
625 self.config = config;
626 self
627 }
628
629 #[cfg(feature = "quic")]
631 #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
632 pub fn with_resolver(mut self, resolver: Arc<MapPeerResolver<I>>) -> Self {
633 self.resolver = Some(resolver);
634 self
635 }
636
637 pub fn build(self) -> PlumtreeBridge<I, PD> {
639 #[cfg(feature = "quic")]
640 {
641 match self.resolver {
642 Some(resolver) => PlumtreeBridge::with_config_and_resolver(self.pm, self.config, resolver),
643 None => PlumtreeBridge::with_config(self.pm, self.config),
644 }
645 }
646 #[cfg(not(feature = "quic"))]
647 {
648 PlumtreeBridge::with_config(self.pm, self.config)
649 }
650 }
651
652 pub fn build_delegate<A>(self) -> BridgeEventDelegate<I, A, PD, VoidDelegate<I, A>>
654 where
655 A: CheapClone + Send + Sync + 'static,
656 {
657 BridgeEventDelegate::<I, A, PD, VoidDelegate<I, A>>::new(self.build())
658 }
659
660 pub fn build_delegate_with<A, D>(self, inner: D) -> BridgeEventDelegate<I, A, PD, D>
662 where
663 A: CheapClone + Send + Sync + 'static,
664 D: EventDelegate<Id = I, Address = A>,
665 {
666 BridgeEventDelegate::with_inner(self.build(), inner)
667 }
668}
669
670pub struct MemberlistStack<I, PD, T, D>
712where
713 I: Id + IdCodec + memberlist_core::proto::Data + Clone + Ord + Send + Sync + 'static,
714 PD: PlumtreeDelegate<I>,
715 T: memberlist_core::transport::Transport<Id = I>,
716 D: memberlist_core::delegate::Delegate<Id = I, Address = T::ResolvedAddress>,
717{
718 pm: Arc<PlumtreeMemberlist<I, PD>>,
720 memberlist: memberlist_core::Memberlist<T, D>,
722 advertise_addr: SocketAddr,
724}
725
726impl<I, PD, T, D> MemberlistStack<I, PD, T, D>
727where
728 I: Id + IdCodec + memberlist_core::proto::Data + Clone + Ord + Send + Sync + 'static,
729 PD: PlumtreeDelegate<I>,
730 T: memberlist_core::transport::Transport<Id = I>,
731 D: memberlist_core::delegate::Delegate<Id = I, Address = T::ResolvedAddress>,
732{
733 pub fn new(
738 pm: Arc<PlumtreeMemberlist<I, PD>>,
739 memberlist: memberlist_core::Memberlist<T, D>,
740 advertise_addr: SocketAddr,
741 ) -> Self {
742 Self {
743 pm,
744 memberlist,
745 advertise_addr,
746 }
747 }
748
749 pub fn plumtree(&self) -> &Arc<PlumtreeMemberlist<I, PD>> {
751 &self.pm
752 }
753
754 pub fn memberlist(&self) -> &memberlist_core::Memberlist<T, D> {
756 &self.memberlist
757 }
758
759 pub fn advertise_address(&self) -> SocketAddr {
763 self.advertise_addr
764 }
765
766 pub fn peer_stats(&self) -> crate::peer_state::PeerStats {
768 self.pm.peer_stats()
769 }
770
771 pub async fn num_members(&self) -> usize {
773 self.memberlist.num_online_members().await
774 }
775
776 pub async fn broadcast(
781 &self,
782 payload: impl Into<bytes::Bytes>,
783 ) -> Result<crate::MessageId, crate::Error> {
784 self.pm.broadcast(payload).await
785 }
786
787 pub async fn join(
807 &self,
808 seed_addrs: &[SocketAddr],
809 ) -> Result<(), MemberlistStackError>
810 where
811 <T as memberlist_core::transport::Transport>::ResolvedAddress: From<SocketAddr>,
812 {
813 use memberlist_core::proto::MaybeResolvedAddress;
814
815 for &addr in seed_addrs {
816 let seed_node = nodecraft::Node::new(
819 self.pm.plumtree().local_id().clone(),
820 MaybeResolvedAddress::Resolved(addr.into()),
821 );
822
823 self.memberlist
824 .join(seed_node)
825 .await
826 .map_err(|e| MemberlistStackError::JoinFailed(format!("{}", e)))?;
827 }
828 Ok(())
829 }
830
831 pub async fn leave(&self, timeout: std::time::Duration) -> Result<bool, MemberlistStackError> {
836 self.memberlist
837 .leave(timeout)
838 .await
839 .map_err(|e| MemberlistStackError::LeaveFailed(format!("{}", e)))
840 }
841
842 pub async fn shutdown(&self) -> Result<(), MemberlistStackError> {
846 self.pm.shutdown();
847 self.memberlist
848 .shutdown()
849 .await
850 .map_err(|e| MemberlistStackError::ShutdownFailed(format!("{}", e)))
851 }
852
853 pub fn is_shutdown(&self) -> bool {
855 self.pm.is_shutdown()
856 }
857}
858
859#[derive(Debug, Clone)]
861pub enum MemberlistStackError {
862 JoinFailed(String),
864 LeaveFailed(String),
866 ShutdownFailed(String),
868 CreationFailed(String),
870}
871
872impl std::fmt::Display for MemberlistStackError {
873 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
874 match self {
875 Self::JoinFailed(e) => write!(f, "failed to join cluster: {}", e),
876 Self::LeaveFailed(e) => write!(f, "failed to leave cluster: {}", e),
877 Self::ShutdownFailed(e) => write!(f, "failed to shutdown: {}", e),
878 Self::CreationFailed(e) => write!(f, "failed to create stack: {}", e),
879 }
880 }
881}
882
883impl std::error::Error for MemberlistStackError {}
884
885pub mod persistence {
894 use std::fs::{self, File};
895 use std::io::{BufRead, BufReader, Write};
896 use std::net::SocketAddr;
897 use std::path::Path;
898
899 #[derive(Debug)]
901 pub enum PersistenceError {
902 Io(std::io::Error),
904 InvalidAddress(String),
906 }
907
908 impl std::fmt::Display for PersistenceError {
909 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
910 match self {
911 Self::Io(e) => write!(f, "persistence I/O error: {}", e),
912 Self::InvalidAddress(s) => write!(f, "invalid address in persistence: {}", s),
913 }
914 }
915 }
916
917 impl std::error::Error for PersistenceError {}
918
919 impl From<std::io::Error> for PersistenceError {
920 fn from(e: std::io::Error) -> Self {
921 Self::Io(e)
922 }
923 }
924
925 pub fn save_peers(path: &Path, peers: &[SocketAddr]) -> Result<(), PersistenceError> {
929 if let Some(parent) = path.parent() {
931 fs::create_dir_all(parent)?;
932 }
933
934 let mut file = File::create(path)?;
935 for peer in peers {
936 writeln!(file, "{}", peer)?;
937 }
938 file.sync_all()?;
939 Ok(())
940 }
941
942 pub fn load_peers(path: &Path) -> Result<Vec<SocketAddr>, PersistenceError> {
947 if !path.exists() {
948 return Ok(Vec::new());
949 }
950
951 let file = File::open(path)?;
952 let reader = BufReader::new(file);
953 let mut peers = Vec::new();
954
955 for line in reader.lines() {
956 let line = line?;
957 let trimmed = line.trim();
958 if trimmed.is_empty() || trimmed.starts_with('#') {
959 continue;
960 }
961
962 match trimmed.parse::<SocketAddr>() {
963 Ok(addr) => peers.push(addr),
964 Err(_) => {
965 tracing::warn!(
966 line = trimmed,
967 "skipping invalid address in peer persistence file"
968 );
969 }
970 }
971 }
972
973 Ok(peers)
974 }
975
976 pub fn save_peers_atomic(path: &Path, peers: &[SocketAddr]) -> Result<(), PersistenceError> {
980 let temp_path = path.with_extension("tmp");
981 save_peers(&temp_path, peers)?;
982 fs::rename(&temp_path, path)?;
983 Ok(())
984 }
985}
986
987#[derive(Debug, Clone, Default)]
993pub struct LazarusStats {
994 pub probes_sent: u64,
996 pub reconnections: u64,
998 pub failures: u64,
1000 pub missing_seeds: usize,
1002}
1003
1004#[derive(Clone)]
1006pub struct LazarusHandle {
1007 shutdown: Arc<std::sync::atomic::AtomicBool>,
1008 stats: Arc<parking_lot::RwLock<LazarusStats>>,
1009}
1010
1011impl LazarusHandle {
1012 fn new() -> Self {
1014 Self {
1015 shutdown: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1016 stats: Arc::new(parking_lot::RwLock::new(LazarusStats::default())),
1017 }
1018 }
1019
1020 pub fn shutdown(&self) {
1022 self.shutdown
1023 .store(true, std::sync::atomic::Ordering::Release);
1024 }
1025
1026 pub fn is_shutdown(&self) -> bool {
1028 self.shutdown.load(std::sync::atomic::Ordering::Acquire)
1029 }
1030
1031 pub fn stats(&self) -> LazarusStats {
1033 self.stats.read().clone()
1034 }
1035
1036 fn set_missing_seeds(&self, count: usize) {
1038 self.stats.write().missing_seeds = count;
1039 }
1040
1041 fn record_probe(&self) {
1043 self.stats.write().probes_sent += 1;
1044 }
1045
1046 fn record_reconnection(&self) {
1048 self.stats.write().reconnections += 1;
1049 }
1050
1051 fn record_failure(&self) {
1053 self.stats.write().failures += 1;
1054 }
1055}
1056
1057impl Default for LazarusHandle {
1058 fn default() -> Self {
1059 Self::new()
1060 }
1061}
1062
1063impl<I, PD, T, D> MemberlistStack<I, PD, T, D>
1068where
1069 I: Id + IdCodec + memberlist_core::proto::Data + Clone + Ord + Send + Sync + 'static,
1070 PD: PlumtreeDelegate<I> + 'static,
1071 T: memberlist_core::transport::Transport<Id = I> + 'static,
1072 D: memberlist_core::delegate::Delegate<Id = I, Address = T::ResolvedAddress> + 'static,
1073 T::ResolvedAddress: From<SocketAddr>,
1074{
1075 pub fn spawn_lazarus_task(&self, config: BridgeConfig) -> LazarusHandle
1111 where
1112 I: std::fmt::Debug,
1113 {
1114 let handle = LazarusHandle::new();
1115
1116 if !config.should_run_lazarus() {
1117 tracing::debug!("Lazarus task not started: disabled or no seeds configured");
1118 return handle;
1119 }
1120
1121 let handle_clone = handle.clone();
1122 let memberlist = self.memberlist.clone();
1123 let local_id = self.pm.plumtree().local_id().clone();
1124 let seeds = config.static_seeds.clone();
1125 let interval = config.lazarus_interval;
1126
1127 #[cfg(feature = "tokio")]
1130 tokio::spawn(async move {
1131 tracing::info!(
1132 seeds = ?seeds,
1133 interval = ?interval,
1134 "Lazarus task started: monitoring {} static seeds",
1135 seeds.len()
1136 );
1137
1138 loop {
1139 if handle_clone.is_shutdown() {
1141 tracing::info!("Lazarus task shutting down");
1142 break;
1143 }
1144
1145 futures_timer::Delay::new(interval).await;
1147
1148 if handle_clone.is_shutdown() {
1150 tracing::info!("Lazarus task shutting down");
1151 break;
1152 }
1153
1154 let alive_addrs = Self::get_alive_addresses(&memberlist).await;
1156
1157 let missing_seeds: Vec<_> = seeds
1159 .iter()
1160 .filter(|seed| !alive_addrs.contains(seed))
1161 .cloned()
1162 .collect();
1163
1164 handle_clone.set_missing_seeds(missing_seeds.len());
1165
1166 if missing_seeds.is_empty() {
1167 tracing::trace!("Lazarus: all static seeds are alive");
1168 continue;
1169 }
1170
1171 tracing::info!(
1172 missing = ?missing_seeds,
1173 "Lazarus: {} static seed(s) not in alive set, attempting rejoin",
1174 missing_seeds.len()
1175 );
1176
1177 for seed_addr in missing_seeds {
1179 handle_clone.record_probe();
1180
1181 use memberlist_core::proto::MaybeResolvedAddress;
1184 let seed_node = nodecraft::Node::new(
1185 local_id.clone(),
1186 MaybeResolvedAddress::Resolved(seed_addr.into()),
1187 );
1188
1189 match memberlist.join(seed_node).await {
1190 Ok(_) => {
1191 handle_clone.record_reconnection();
1192 tracing::info!(
1193 seed = %seed_addr,
1194 "Lazarus: successfully reconnected to seed"
1195 );
1196 }
1197 Err(e) => {
1198 handle_clone.record_failure();
1199 tracing::debug!(
1200 seed = %seed_addr,
1201 error = %e,
1202 "Lazarus: failed to reconnect to seed"
1203 );
1204 }
1205 }
1206 }
1207 }
1208 });
1209
1210 #[cfg(not(feature = "tokio"))]
1212 {
1213 tracing::warn!(
1214 "Lazarus task requires the 'tokio' feature. Seed recovery is disabled."
1215 );
1216 }
1217
1218 handle
1219 }
1220
1221 async fn get_alive_addresses(
1225 memberlist: &memberlist_core::Memberlist<T, D>,
1226 ) -> std::collections::HashSet<SocketAddr> {
1227 let mut addrs = std::collections::HashSet::new();
1228
1229 let members = memberlist.online_members().await;
1231 for member in members.iter() {
1232 if let Some(addr) = Self::extract_member_addr(member) {
1235 addrs.insert(addr);
1236 }
1237 }
1238
1239 addrs
1240 }
1241
1242 fn extract_member_addr(
1244 node: &std::sync::Arc<memberlist_core::proto::NodeState<I, T::ResolvedAddress>>,
1245 ) -> Option<SocketAddr> {
1246 use std::any::Any;
1247
1248 let addr = node.address();
1249 let addr_any = addr as &dyn Any;
1250
1251 if let Some(socket_addr) = addr_any.downcast_ref::<SocketAddr>() {
1253 return Some(*socket_addr);
1254 }
1255
1256 None
1257 }
1258
1259 pub async fn save_peers_to_file(
1268 &self,
1269 path: &std::path::Path,
1270 ) -> Result<(), persistence::PersistenceError> {
1271 let addrs: Vec<SocketAddr> = Self::get_alive_addresses(&self.memberlist)
1272 .await
1273 .into_iter()
1274 .collect();
1275
1276 persistence::save_peers_atomic(path, &addrs)?;
1277
1278 tracing::debug!(
1279 path = ?path,
1280 count = addrs.len(),
1281 "Saved {} peer addresses to persistence file",
1282 addrs.len()
1283 );
1284
1285 Ok(())
1286 }
1287
1288 pub fn load_bootstrap_addresses(config: &BridgeConfig) -> Vec<SocketAddr> {
1300 let mut addrs = std::collections::HashSet::new();
1301
1302 for seed in &config.static_seeds {
1304 addrs.insert(*seed);
1305 }
1306
1307 if let Some(ref path) = config.persistence_path {
1309 match persistence::load_peers(path) {
1310 Ok(persisted) => {
1311 tracing::info!(
1312 path = ?path,
1313 count = persisted.len(),
1314 "Loaded {} peers from persistence file",
1315 persisted.len()
1316 );
1317 for addr in persisted {
1318 addrs.insert(addr);
1319 }
1320 }
1321 Err(e) => {
1322 tracing::warn!(
1323 path = ?path,
1324 error = %e,
1325 "Failed to load persisted peers"
1326 );
1327 }
1328 }
1329 }
1330
1331 addrs.into_iter().collect()
1332 }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use super::*;
1338 use crate::{NoopDelegate, PlumtreeConfig};
1339
1340 #[test]
1341 fn test_bridge_creation() {
1342 let pm = Arc::new(PlumtreeMemberlist::new(
1343 1u64,
1344 PlumtreeConfig::default(),
1345 NoopDelegate,
1346 ));
1347 let bridge = PlumtreeBridge::new(pm);
1348 assert!(!bridge.is_shutdown());
1349 }
1350
1351 #[test]
1352 fn test_bridge_config() {
1353 let config = BridgeConfig::new()
1354 .with_log_changes(false)
1355 .with_auto_promote(false);
1356
1357 assert!(!config.log_changes);
1358 assert!(!config.auto_promote);
1359 }
1360
1361 #[test]
1362 fn test_stack_builder() {
1363 let pm = Arc::new(PlumtreeMemberlist::new(
1364 1u64,
1365 PlumtreeConfig::default(),
1366 NoopDelegate,
1367 ));
1368 let bridge = PlumtreeStackBuilder::new(pm)
1369 .with_config(BridgeConfig::default())
1370 .build();
1371
1372 assert!(!bridge.is_shutdown());
1373 }
1374
1375 #[cfg(feature = "quic")]
1376 #[test]
1377 fn test_bridge_with_resolver() {
1378 use std::net::SocketAddr;
1379
1380 let local_addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
1381 let resolver = Arc::new(MapPeerResolver::new(local_addr));
1382
1383 let pm = Arc::new(PlumtreeMemberlist::new(
1384 1u64,
1385 PlumtreeConfig::default(),
1386 NoopDelegate,
1387 ));
1388
1389 let bridge = PlumtreeBridge::with_resolver(pm, resolver.clone());
1390 assert!(bridge.resolver().is_some());
1391 }
1392
1393 #[test]
1398 fn test_bridge_config_static_seeds() {
1399 let seeds: Vec<SocketAddr> = vec![
1400 "192.168.1.100:7946".parse().unwrap(),
1401 "192.168.1.101:7946".parse().unwrap(),
1402 ];
1403
1404 let config = BridgeConfig::new()
1405 .with_static_seeds(seeds.clone())
1406 .with_lazarus_enabled(true)
1407 .with_lazarus_interval(Duration::from_secs(60));
1408
1409 assert_eq!(config.static_seeds.len(), 2);
1410 assert!(config.lazarus_enabled);
1411 assert_eq!(config.lazarus_interval, Duration::from_secs(60));
1412 assert!(config.should_run_lazarus());
1413 }
1414
1415 #[test]
1416 fn test_bridge_config_should_run_lazarus() {
1417 let config = BridgeConfig::new();
1419 assert!(!config.should_run_lazarus());
1420
1421 let config = BridgeConfig::new().with_lazarus_enabled(true);
1423 assert!(!config.should_run_lazarus());
1424
1425 let seeds: Vec<SocketAddr> = vec!["192.168.1.100:7946".parse().unwrap()];
1427 let config = BridgeConfig::new().with_static_seeds(seeds.clone());
1428 assert!(!config.should_run_lazarus());
1429
1430 let config = BridgeConfig::new()
1432 .with_static_seeds(seeds)
1433 .with_lazarus_enabled(true);
1434 assert!(config.should_run_lazarus());
1435 }
1436
1437 #[test]
1438 fn test_bridge_config_persistence_path() {
1439 let config = BridgeConfig::new()
1440 .with_persistence_path(PathBuf::from("/tmp/peers.txt"));
1441
1442 assert_eq!(
1443 config.persistence_path,
1444 Some(PathBuf::from("/tmp/peers.txt"))
1445 );
1446 }
1447
1448 #[test]
1453 fn test_persistence_save_and_load() {
1454 use std::fs;
1455 use tempfile::tempdir;
1456
1457 let dir = tempdir().unwrap();
1459 let path = dir.path().join("peers.txt");
1460
1461 let peers: Vec<SocketAddr> = vec![
1462 "192.168.1.100:7946".parse().unwrap(),
1463 "10.0.0.1:7946".parse().unwrap(),
1464 "172.16.0.1:7946".parse().unwrap(),
1465 ];
1466
1467 persistence::save_peers(&path, &peers).unwrap();
1469
1470 let loaded = persistence::load_peers(&path).unwrap();
1472
1473 assert_eq!(loaded.len(), 3);
1474 assert!(loaded.contains(&"192.168.1.100:7946".parse().unwrap()));
1475 assert!(loaded.contains(&"10.0.0.1:7946".parse().unwrap()));
1476 assert!(loaded.contains(&"172.16.0.1:7946".parse().unwrap()));
1477
1478 fs::remove_file(&path).ok();
1480 }
1481
1482 #[test]
1483 fn test_persistence_load_nonexistent() {
1484 let path = PathBuf::from("/nonexistent/path/peers.txt");
1485 let loaded = persistence::load_peers(&path).unwrap();
1486 assert!(loaded.is_empty());
1487 }
1488
1489 #[test]
1490 fn test_persistence_save_atomic() {
1491 use std::fs;
1492 use tempfile::tempdir;
1493
1494 let dir = tempdir().unwrap();
1495 let path = dir.path().join("peers.txt");
1496
1497 let peers: Vec<SocketAddr> = vec![
1498 "192.168.1.100:7946".parse().unwrap(),
1499 "192.168.1.101:7946".parse().unwrap(),
1500 ];
1501
1502 persistence::save_peers_atomic(&path, &peers).unwrap();
1504
1505 let loaded = persistence::load_peers(&path).unwrap();
1507 assert_eq!(loaded.len(), 2);
1508
1509 let temp_path = path.with_extension("tmp");
1511 assert!(!temp_path.exists());
1512
1513 fs::remove_file(&path).ok();
1515 }
1516
1517 #[test]
1518 fn test_persistence_handles_comments_and_empty_lines() {
1519 use std::fs::{self, File};
1520 use std::io::Write;
1521 use tempfile::tempdir;
1522
1523 let dir = tempdir().unwrap();
1524 let path = dir.path().join("peers.txt");
1525
1526 let mut file = File::create(&path).unwrap();
1528 writeln!(file, "# This is a comment").unwrap();
1529 writeln!(file, "192.168.1.100:7946").unwrap();
1530 writeln!(file).unwrap(); writeln!(file, " ").unwrap(); writeln!(file, "# Another comment").unwrap();
1533 writeln!(file, "192.168.1.101:7946").unwrap();
1534
1535 let loaded = persistence::load_peers(&path).unwrap();
1536 assert_eq!(loaded.len(), 2);
1537
1538 fs::remove_file(&path).ok();
1540 }
1541
1542 #[test]
1547 fn test_lazarus_handle_creation() {
1548 let handle = LazarusHandle::new();
1549 assert!(!handle.is_shutdown());
1550
1551 let stats = handle.stats();
1552 assert_eq!(stats.probes_sent, 0);
1553 assert_eq!(stats.reconnections, 0);
1554 assert_eq!(stats.failures, 0);
1555 assert_eq!(stats.missing_seeds, 0);
1556 }
1557
1558 #[test]
1559 fn test_lazarus_handle_shutdown() {
1560 let handle = LazarusHandle::new();
1561 assert!(!handle.is_shutdown());
1562
1563 handle.shutdown();
1564 assert!(handle.is_shutdown());
1565 }
1566
1567 #[test]
1568 fn test_lazarus_handle_stats_recording() {
1569 let handle = LazarusHandle::new();
1570
1571 handle.record_probe();
1572 handle.record_probe();
1573 handle.record_reconnection();
1574 handle.record_failure();
1575 handle.set_missing_seeds(3);
1576
1577 let stats = handle.stats();
1578 assert_eq!(stats.probes_sent, 2);
1579 assert_eq!(stats.reconnections, 1);
1580 assert_eq!(stats.failures, 1);
1581 assert_eq!(stats.missing_seeds, 3);
1582 }
1583
1584 #[test]
1585 fn test_lazarus_handle_clone() {
1586 let handle = LazarusHandle::new();
1587 let handle2 = handle.clone();
1588
1589 handle.record_probe();
1591 assert_eq!(handle2.stats().probes_sent, 1);
1592
1593 handle2.shutdown();
1594 assert!(handle.is_shutdown());
1595 }
1596}