memberlist_plumtree/
bridge.rs

1//! Fully automated Plumtree-Memberlist integration bridge.
2//!
3//! This module provides automatic synchronization between Memberlist's membership
4//! events and Plumtree's peer topology. When a node joins or leaves the cluster
5//! via Memberlist, the bridge automatically updates Plumtree's neighbor table
6//! and (optionally) the QUIC address resolver.
7//!
8//! # Architecture
9//!
10//! The bridge acts as an intermediary that:
11//! 1. Implements Memberlist's `EventDelegate` to receive membership events
12//! 2. Automatically adds/removes peers from Plumtree's topology
13//! 3. Updates the address resolver for QUIC transport (when enabled)
14//! 4. Provides a clean API for starting the full integration stack
15//!
16//! # Seed Recovery ("Lazarus" Feature)
17//!
18//! The bridge supports automatic recovery of seed nodes that have failed and restarted.
19//! When enabled, a background task periodically probes configured static seeds and
20//! attempts to rejoin them if they're not in the cluster's alive set.
21//!
22//! This solves the "Ghost Seed" problem where a restarted seed node remains isolated
23//! because the cluster stopped pinging it after marking it dead.
24//!
25//! # Example
26//!
27//! ```ignore
28//! use memberlist_plumtree::{
29//!     PlumtreeBridge, BridgeConfig, PlumtreeConfig,
30//!     PlumtreeMemberlist, NoopDelegate,
31//! };
32//! use std::sync::Arc;
33//! use std::net::SocketAddr;
34//!
35//! // Create the Plumtree instance
36//! let local_id: u64 = 1;
37//! let pm = Arc::new(PlumtreeMemberlist::new(
38//!     local_id,
39//!     PlumtreeConfig::lan(),
40//!     NoopDelegate,
41//! ));
42//!
43//! // Create the bridge with static seeds for automatic recovery
44//! let config = BridgeConfig::new()
45//!     .with_static_seeds(vec![
46//!         "192.168.1.100:7946".parse().unwrap(),
47//!         "192.168.1.101:7946".parse().unwrap(),
48//!     ])
49//!     .with_lazarus_enabled(true);
50//!
51//! let bridge = PlumtreeBridge::with_config(pm.clone(), config);
52//! ```
53//!
54//! # With QUIC Transport
55//!
56//! ```ignore
57//! use memberlist_plumtree::{
58//!     PlumtreeBridge, PlumtreeConfig, PlumtreeMemberlist,
59//!     QuicTransport, QuicConfig, PooledTransport, PoolConfig,
60//!     MapPeerResolver, NoopDelegate,
61//! };
62//! use std::sync::Arc;
63//!
64//! let local_addr: std::net::SocketAddr = "127.0.0.1:9000".parse().unwrap();
65//! let resolver = Arc::new(MapPeerResolver::new(local_addr));
66//!
67//! let pm = Arc::new(PlumtreeMemberlist::new(1u64, PlumtreeConfig::lan(), NoopDelegate));
68//!
69//! // Create bridge with resolver for automatic address updates
70//! let bridge = PlumtreeBridge::with_resolver(pm.clone(), resolver.clone());
71//!
72//! // Start the full stack
73//! // bridge.start_stack(transport, memberlist).await;
74//! ```
75
76use std::marker::PhantomData;
77use std::net::SocketAddr;
78use std::path::PathBuf;
79use std::sync::Arc;
80use std::time::Duration;
81
82use bytes::Bytes;
83use memberlist_core::delegate::{EventDelegate, VoidDelegate};
84use memberlist_core::proto::NodeState;
85use memberlist_core::transport::Id;
86use nodecraft::CheapClone;
87
88use crate::{IdCodec, PlumtreeDelegate, PlumtreeMemberlist};
89
90#[cfg(feature = "quic")]
91use crate::MapPeerResolver;
92
93/// Configuration for the Plumtree bridge.
94#[derive(Debug, Clone)]
95pub struct BridgeConfig {
96    /// Whether to log topology changes.
97    pub log_changes: bool,
98    /// Whether to automatically promote new peers to eager based on fanout settings.
99    pub auto_promote: bool,
100    /// Static seed addresses for the cluster.
101    ///
102    /// These seeds will be periodically probed by the Lazarus task to ensure
103    /// that restarted seeds are automatically rediscovered.
104    pub static_seeds: Vec<SocketAddr>,
105    /// Enable the Lazarus background task for seed recovery.
106    ///
107    /// When enabled, the bridge will periodically check if static seeds are
108    /// alive and attempt to rejoin them if they're missing from the cluster.
109    pub lazarus_enabled: bool,
110    /// Interval between Lazarus probes for dead seeds.
111    ///
112    /// Default: 30 seconds
113    pub lazarus_interval: Duration,
114    /// Path to persist known peers for recovery after restart.
115    ///
116    /// If set, the bridge will save known peer addresses to this file
117    /// on shutdown and load them on startup.
118    pub persistence_path: Option<PathBuf>,
119}
120
121impl Default for BridgeConfig {
122    fn default() -> Self {
123        Self {
124            log_changes: true,
125            auto_promote: true,
126            static_seeds: Vec::new(),
127            lazarus_enabled: false,
128            lazarus_interval: Duration::from_secs(30),
129            persistence_path: None,
130        }
131    }
132}
133
134impl BridgeConfig {
135    /// Create a new bridge configuration with default settings.
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Enable or disable logging of topology changes.
141    pub fn with_log_changes(mut self, log: bool) -> Self {
142        self.log_changes = log;
143        self
144    }
145
146    /// Enable or disable automatic promotion of new peers.
147    pub fn with_auto_promote(mut self, auto: bool) -> Self {
148        self.auto_promote = auto;
149        self
150    }
151
152    /// Set static seed addresses for the cluster.
153    ///
154    /// These seeds will be periodically probed by the Lazarus task (if enabled)
155    /// to ensure restarted seeds are automatically rediscovered.
156    ///
157    /// # Example
158    ///
159    /// ```
160    /// use memberlist_plumtree::BridgeConfig;
161    ///
162    /// let config = BridgeConfig::new()
163    ///     .with_static_seeds(vec![
164    ///         "192.168.1.100:7946".parse().unwrap(),
165    ///         "192.168.1.101:7946".parse().unwrap(),
166    ///     ]);
167    /// ```
168    pub fn with_static_seeds(mut self, seeds: Vec<SocketAddr>) -> Self {
169        self.static_seeds = seeds;
170        self
171    }
172
173    /// Enable or disable the Lazarus background task for seed recovery.
174    ///
175    /// When enabled, the bridge will periodically probe static seeds and
176    /// attempt to rejoin any that are not in the cluster's alive set.
177    pub fn with_lazarus_enabled(mut self, enabled: bool) -> Self {
178        self.lazarus_enabled = enabled;
179        self
180    }
181
182    /// Set the interval between Lazarus probes for dead seeds.
183    ///
184    /// Default: 30 seconds. Lower values provide faster recovery but
185    /// increase network overhead.
186    pub fn with_lazarus_interval(mut self, interval: Duration) -> Self {
187        self.lazarus_interval = interval;
188        self
189    }
190
191    /// Set the path for persisting known peers.
192    ///
193    /// When set, the bridge will save known peer addresses to this file
194    /// on shutdown and load them on startup, providing additional bootstrap
195    /// options beyond static seeds.
196    pub fn with_persistence_path(mut self, path: PathBuf) -> Self {
197        self.persistence_path = Some(path);
198        self
199    }
200
201    /// Check if Lazarus probing is enabled and has seeds configured.
202    pub fn should_run_lazarus(&self) -> bool {
203        self.lazarus_enabled && !self.static_seeds.is_empty()
204    }
205}
206
207/// Plumtree-Memberlist integration bridge.
208///
209/// This struct holds the Plumtree instance and provides automatic synchronization
210/// with Memberlist's membership events. When used as an `EventDelegate`, it
211/// automatically updates Plumtree's topology when nodes join or leave.
212///
213/// # Type Parameters
214///
215/// - `I`: The node identifier type (must implement `Id`, `IdCodec`, etc.)
216/// - `PD`: The Plumtree delegate type for message delivery
217pub struct PlumtreeBridge<I, PD>
218where
219    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
220    PD: PlumtreeDelegate<I>,
221{
222    /// The Plumtree instance.
223    pub pm: Arc<PlumtreeMemberlist<I, PD>>,
224    /// Configuration for the bridge.
225    pub config: BridgeConfig,
226    /// Address resolver for QUIC transport (optional).
227    #[cfg(feature = "quic")]
228    pub resolver: Option<Arc<MapPeerResolver<I>>>,
229    #[cfg(not(feature = "quic"))]
230    _marker: PhantomData<I>,
231}
232
233impl<I, PD> PlumtreeBridge<I, PD>
234where
235    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
236    PD: PlumtreeDelegate<I>,
237{
238    /// Create a new bridge without an address resolver.
239    ///
240    /// Use this when not using QUIC transport or when handling address
241    /// resolution separately.
242    pub fn new(pm: Arc<PlumtreeMemberlist<I, PD>>) -> Self {
243        Self {
244            pm,
245            config: BridgeConfig::default(),
246            #[cfg(feature = "quic")]
247            resolver: None,
248            #[cfg(not(feature = "quic"))]
249            _marker: PhantomData,
250        }
251    }
252
253    /// Create a new bridge with custom configuration.
254    pub fn with_config(pm: Arc<PlumtreeMemberlist<I, PD>>, config: BridgeConfig) -> Self {
255        Self {
256            pm,
257            config,
258            #[cfg(feature = "quic")]
259            resolver: None,
260            #[cfg(not(feature = "quic"))]
261            _marker: PhantomData,
262        }
263    }
264
265    /// Create a new bridge with an address resolver for QUIC transport.
266    ///
267    /// When nodes join or leave, the resolver is automatically updated
268    /// with their addresses.
269    #[cfg(feature = "quic")]
270    #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
271    pub fn with_resolver(pm: Arc<PlumtreeMemberlist<I, PD>>, resolver: Arc<MapPeerResolver<I>>) -> Self {
272        Self {
273            pm,
274            config: BridgeConfig::default(),
275            resolver: Some(resolver),
276        }
277    }
278
279    /// Create a new bridge with both custom configuration and address resolver.
280    #[cfg(feature = "quic")]
281    #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
282    pub fn with_config_and_resolver(
283        pm: Arc<PlumtreeMemberlist<I, PD>>,
284        config: BridgeConfig,
285        resolver: Arc<MapPeerResolver<I>>,
286    ) -> Self {
287        Self {
288            pm,
289            config,
290            resolver: Some(resolver),
291        }
292    }
293
294    /// Get a reference to the underlying PlumtreeMemberlist.
295    pub fn plumtree(&self) -> &Arc<PlumtreeMemberlist<I, PD>> {
296        &self.pm
297    }
298
299    /// Get the address resolver (if configured).
300    #[cfg(feature = "quic")]
301    #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
302    pub fn resolver(&self) -> Option<&Arc<MapPeerResolver<I>>> {
303        self.resolver.as_ref()
304    }
305
306    /// Get the current peer statistics from Plumtree.
307    pub fn peer_stats(&self) -> crate::PeerStats {
308        self.pm.peer_stats()
309    }
310
311    /// Get the current peer topology from Plumtree.
312    pub fn topology(&self) -> crate::PeerTopology<I> {
313        self.pm.peers().topology()
314    }
315
316    /// Manually add a peer to the topology.
317    ///
318    /// This is normally not needed when using the bridge as an EventDelegate,
319    /// as peers are added automatically on join events.
320    pub fn add_peer(&self, peer: I) {
321        self.pm.add_peer(peer);
322    }
323
324    /// Manually remove a peer from the topology.
325    ///
326    /// This is normally not needed when using the bridge as an EventDelegate,
327    /// as peers are removed automatically on leave events.
328    pub fn remove_peer(&self, peer: &I) {
329        self.pm.remove_peer(peer);
330    }
331
332    /// Broadcast a message to all nodes in the cluster.
333    pub async fn broadcast(&self, payload: impl Into<Bytes>) -> crate::Result<crate::MessageId> {
334        self.pm.broadcast(payload).await
335    }
336
337    /// Get the sender for incoming messages.
338    ///
339    /// Use this to inject messages received from Memberlist into Plumtree.
340    pub fn incoming_sender(&self) -> async_channel::Sender<(I, crate::PlumtreeMessage)> {
341        self.pm.incoming_sender()
342    }
343
344    /// Shutdown the bridge and underlying Plumtree instance.
345    pub fn shutdown(&self) {
346        self.pm.shutdown();
347    }
348
349    /// Check if the bridge is shutdown.
350    pub fn is_shutdown(&self) -> bool {
351        self.pm.is_shutdown()
352    }
353}
354
355impl<I, PD> Clone for PlumtreeBridge<I, PD>
356where
357    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
358    PD: PlumtreeDelegate<I>,
359{
360    fn clone(&self) -> Self {
361        Self {
362            pm: self.pm.clone(),
363            config: self.config.clone(),
364            #[cfg(feature = "quic")]
365            resolver: self.resolver.clone(),
366            #[cfg(not(feature = "quic"))]
367            _marker: PhantomData,
368        }
369    }
370}
371
372// Extract socket address from NodeState's address.
373// This is a helper trait to handle different address types.
374/// Trait for extracting socket address from memberlist node addresses.
375pub trait AddressExtractor<A> {
376    /// Extract the socket address from the address type.
377    fn extract_addr(addr: &A) -> Option<SocketAddr>;
378}
379
380// Default implementation for types that can be converted to SocketAddr
381impl<A> AddressExtractor<A> for ()
382where
383    A: AsRef<std::net::SocketAddr>,
384{
385    fn extract_addr(addr: &A) -> Option<SocketAddr> {
386        Some(*addr.as_ref())
387    }
388}
389
390/// Bridge event delegate that implements Memberlist's EventDelegate.
391///
392/// This is a wrapper around `PlumtreeBridge` that implements the full
393/// `EventDelegate` trait for integration with Memberlist.
394///
395/// # Type Parameters
396///
397/// - `I`: Node identifier type
398/// - `A`: Node address type
399/// - `PD`: Plumtree delegate type
400/// - `D`: Inner delegate type (for forwarding events)
401pub struct BridgeEventDelegate<I, A, PD, D = VoidDelegate<I, A>>
402where
403    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
404    A: CheapClone + Send + Sync + 'static,
405    PD: PlumtreeDelegate<I>,
406{
407    /// The bridge instance.
408    bridge: PlumtreeBridge<I, PD>,
409    /// Inner delegate for forwarding events.
410    inner: D,
411    /// Phantom data for address type.
412    _marker: PhantomData<A>,
413}
414
415impl<I, A, PD, D> BridgeEventDelegate<I, A, PD, D>
416where
417    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
418    A: CheapClone + Send + Sync + 'static,
419    PD: PlumtreeDelegate<I>,
420{
421    /// Create a new bridge event delegate with a void inner delegate.
422    pub fn new(bridge: PlumtreeBridge<I, PD>) -> BridgeEventDelegate<I, A, PD, VoidDelegate<I, A>> {
423        BridgeEventDelegate {
424            bridge,
425            inner: VoidDelegate::default(),
426            _marker: PhantomData,
427        }
428    }
429
430    /// Create a new bridge event delegate with a custom inner delegate.
431    pub fn with_inner(bridge: PlumtreeBridge<I, PD>, inner: D) -> Self {
432        Self {
433            bridge,
434            inner,
435            _marker: PhantomData,
436        }
437    }
438
439    /// Get a reference to the bridge.
440    pub fn bridge(&self) -> &PlumtreeBridge<I, PD> {
441        &self.bridge
442    }
443
444    /// Get a reference to the inner delegate.
445    pub fn inner(&self) -> &D {
446        &self.inner
447    }
448}
449
450impl<I, A, PD, D> Clone for BridgeEventDelegate<I, A, PD, D>
451where
452    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
453    A: CheapClone + Send + Sync + 'static,
454    PD: PlumtreeDelegate<I>,
455    D: Clone,
456{
457    fn clone(&self) -> Self {
458        Self {
459            bridge: self.bridge.clone(),
460            inner: self.inner.clone(),
461            _marker: PhantomData,
462        }
463    }
464}
465
466// Implement EventDelegate for BridgeEventDelegate
467impl<I, A, PD, D> EventDelegate for BridgeEventDelegate<I, A, PD, D>
468where
469    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
470    A: CheapClone + Send + Sync + 'static,
471    PD: PlumtreeDelegate<I>,
472    D: EventDelegate<Id = I, Address = A>,
473{
474    type Id = I;
475    type Address = A;
476
477    async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
478        let node_id = node.id().clone();
479
480        // Step 1: Update address resolver (if configured with QUIC)
481        #[cfg(feature = "quic")]
482        if let Some(ref resolver) = self.bridge.resolver {
483            // Try to extract socket address from the node's address
484            // The actual extraction depends on the address type
485            if let Some(addr) = extract_socket_addr(&node) {
486                resolver.add_peer(node_id.clone(), addr);
487                if self.bridge.config.log_changes {
488                    tracing::info!(
489                        peer = ?node_id,
490                        addr = %addr,
491                        "Bridge: Added peer address to resolver"
492                    );
493                }
494            }
495        }
496
497        // Step 2: Add peer to Plumtree topology
498        // This will automatically classify the peer as eager or lazy based on
499        // the hash ring topology and fanout settings
500        self.bridge.pm.add_peer(node_id.clone());
501
502        if self.bridge.config.log_changes {
503            let stats = self.bridge.pm.peer_stats();
504            tracing::info!(
505                peer = ?node_id,
506                eager_count = stats.eager_count,
507                lazy_count = stats.lazy_count,
508                "Bridge: Node joined, topology updated"
509            );
510        }
511
512        // Forward to inner delegate
513        self.inner.notify_join(node).await;
514    }
515
516    async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
517        let node_id = node.id();
518
519        // Step 1: Remove from Plumtree topology
520        self.bridge.pm.remove_peer(node_id);
521
522        // Step 2: Remove from address resolver (if configured)
523        #[cfg(feature = "quic")]
524        if let Some(ref resolver) = self.bridge.resolver {
525            resolver.remove_peer(node_id);
526        }
527
528        if self.bridge.config.log_changes {
529            let stats = self.bridge.pm.peer_stats();
530            tracing::info!(
531                peer = ?node_id,
532                eager_count = stats.eager_count,
533                lazy_count = stats.lazy_count,
534                "Bridge: Node left, topology cleaned"
535            );
536        }
537
538        // Forward to inner delegate
539        self.inner.notify_leave(node).await;
540    }
541
542    async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
543        // Update address resolver if the address changed
544        #[cfg(feature = "quic")]
545        if let Some(ref resolver) = self.bridge.resolver {
546            if let Some(addr) = extract_socket_addr(&node) {
547                let node_id = node.id().clone();
548                resolver.update_peer(node_id.clone(), addr);
549                if self.bridge.config.log_changes {
550                    tracing::debug!(
551                        peer = ?node_id,
552                        addr = %addr,
553                        "Bridge: Updated peer address"
554                    );
555                }
556            }
557        }
558
559        // Forward to inner delegate
560        self.inner.notify_update(node).await;
561    }
562}
563
564// Helper function to extract socket address from NodeState
565#[cfg(feature = "quic")]
566fn extract_socket_addr<I, A>(node: &Arc<NodeState<I, A>>) -> Option<SocketAddr>
567where
568    I: Id,
569    A: CheapClone + Send + Sync + 'static,
570{
571    use std::any::Any;
572
573    let addr = node.address();
574
575    // Try to downcast the address to SocketAddr directly
576    // This uses Any trait for runtime type checking
577    let addr_any = addr as &dyn Any;
578
579    if let Some(socket_addr) = addr_any.downcast_ref::<SocketAddr>() {
580        return Some(*socket_addr);
581    }
582
583    // Try to extract from nodecraft::Node<I, SocketAddr> if that's what A is
584    // This covers the common case where memberlist uses Node<I, SocketAddr>
585
586    // For other custom address types, users should:
587    // 1. Implement a custom BridgeEventDelegate that extracts addresses
588    // 2. Or manually update the resolver after notify_join
589
590    None
591}
592
593/// Builder for creating a fully configured Plumtree stack.
594///
595/// This builder helps wire together all the components needed for a
596/// production-ready Plumtree-Memberlist integration.
597pub struct PlumtreeStackBuilder<I, PD>
598where
599    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
600    PD: PlumtreeDelegate<I>,
601{
602    pm: Arc<PlumtreeMemberlist<I, PD>>,
603    config: BridgeConfig,
604    #[cfg(feature = "quic")]
605    resolver: Option<Arc<MapPeerResolver<I>>>,
606}
607
608impl<I, PD> PlumtreeStackBuilder<I, PD>
609where
610    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
611    PD: PlumtreeDelegate<I>,
612{
613    /// Create a new stack builder with the given Plumtree instance.
614    pub fn new(pm: Arc<PlumtreeMemberlist<I, PD>>) -> Self {
615        Self {
616            pm,
617            config: BridgeConfig::default(),
618            #[cfg(feature = "quic")]
619            resolver: None,
620        }
621    }
622
623    /// Set the bridge configuration.
624    pub fn with_config(mut self, config: BridgeConfig) -> Self {
625        self.config = config;
626        self
627    }
628
629    /// Set the address resolver for QUIC transport.
630    #[cfg(feature = "quic")]
631    #[cfg_attr(docsrs, doc(cfg(feature = "quic")))]
632    pub fn with_resolver(mut self, resolver: Arc<MapPeerResolver<I>>) -> Self {
633        self.resolver = Some(resolver);
634        self
635    }
636
637    /// Build the PlumtreeBridge.
638    pub fn build(self) -> PlumtreeBridge<I, PD> {
639        #[cfg(feature = "quic")]
640        {
641            match self.resolver {
642                Some(resolver) => PlumtreeBridge::with_config_and_resolver(self.pm, self.config, resolver),
643                None => PlumtreeBridge::with_config(self.pm, self.config),
644            }
645        }
646        #[cfg(not(feature = "quic"))]
647        {
648            PlumtreeBridge::with_config(self.pm, self.config)
649        }
650    }
651
652    /// Build a BridgeEventDelegate with a void inner delegate.
653    pub fn build_delegate<A>(self) -> BridgeEventDelegate<I, A, PD, VoidDelegate<I, A>>
654    where
655        A: CheapClone + Send + Sync + 'static,
656    {
657        BridgeEventDelegate::<I, A, PD, VoidDelegate<I, A>>::new(self.build())
658    }
659
660    /// Build a BridgeEventDelegate with a custom inner delegate.
661    pub fn build_delegate_with<A, D>(self, inner: D) -> BridgeEventDelegate<I, A, PD, D>
662    where
663        A: CheapClone + Send + Sync + 'static,
664        D: EventDelegate<Id = I, Address = A>,
665    {
666        BridgeEventDelegate::with_inner(self.build(), inner)
667    }
668}
669
670// ============================================================================
671// MemberlistStack - Complete Integration Stack
672// ============================================================================
673
674/// A complete Plumtree + Memberlist integration stack.
675///
676/// This struct combines:
677/// - `PlumtreeMemberlist` for epidemic broadcast
678/// - Real `Memberlist` instance for SWIM gossip discovery
679/// - Automatic peer synchronization via `PlumtreeNodeDelegate`
680///
681/// Use this when you want a fully integrated stack without manually
682/// wiring together the components.
683///
684/// # Type Parameters
685///
686/// - `I`: Node identifier type (must implement `Id`, `IdCodec`, etc.)
687/// - `PD`: Plumtree delegate for message delivery
688/// - `T`: Memberlist transport (e.g., `NetTransport`)
689/// - `D`: The wrapped delegate type (created by `wrap_delegate`)
690///
691/// # Example
692///
693/// ```ignore
694/// use memberlist_plumtree::{MemberlistStack, PlumtreeConfig, NoopDelegate};
695/// use memberlist::{Memberlist, Options as MemberlistOptions};
696/// use std::net::SocketAddr;
697///
698/// // Create the stack
699/// let stack = MemberlistStack::builder(node_id, PlumtreeConfig::lan(), NoopDelegate)
700///     .with_bind_address("127.0.0.1:0".parse().unwrap())
701///     .build()
702///     .await?;
703///
704/// // Join the cluster
705/// let seed: SocketAddr = "192.168.1.100:7946".parse().unwrap();
706/// stack.join(&[seed]).await?;
707///
708/// // Broadcast messages
709/// stack.broadcast(b"hello").await?;
710/// ```
711pub struct MemberlistStack<I, PD, T, D>
712where
713    I: Id + IdCodec + memberlist_core::proto::Data + Clone + Ord + Send + Sync + 'static,
714    PD: PlumtreeDelegate<I>,
715    T: memberlist_core::transport::Transport<Id = I>,
716    D: memberlist_core::delegate::Delegate<Id = I, Address = T::ResolvedAddress>,
717{
718    /// The PlumtreeMemberlist instance.
719    pm: Arc<PlumtreeMemberlist<I, PD>>,
720    /// The Memberlist instance for SWIM gossip.
721    memberlist: memberlist_core::Memberlist<T, D>,
722    /// The advertise address.
723    advertise_addr: SocketAddr,
724}
725
726impl<I, PD, T, D> MemberlistStack<I, PD, T, D>
727where
728    I: Id + IdCodec + memberlist_core::proto::Data + Clone + Ord + Send + Sync + 'static,
729    PD: PlumtreeDelegate<I>,
730    T: memberlist_core::transport::Transport<Id = I>,
731    D: memberlist_core::delegate::Delegate<Id = I, Address = T::ResolvedAddress>,
732{
733    /// Create a new MemberlistStack from pre-built components.
734    ///
735    /// This is a low-level constructor. Prefer using `MemberlistStackBuilder` for
736    /// a more ergonomic API.
737    pub fn new(
738        pm: Arc<PlumtreeMemberlist<I, PD>>,
739        memberlist: memberlist_core::Memberlist<T, D>,
740        advertise_addr: SocketAddr,
741    ) -> Self {
742        Self {
743            pm,
744            memberlist,
745            advertise_addr,
746        }
747    }
748
749    /// Get a reference to the PlumtreeMemberlist.
750    pub fn plumtree(&self) -> &Arc<PlumtreeMemberlist<I, PD>> {
751        &self.pm
752    }
753
754    /// Get a reference to the Memberlist.
755    pub fn memberlist(&self) -> &memberlist_core::Memberlist<T, D> {
756        &self.memberlist
757    }
758
759    /// Get the advertise address for this node.
760    ///
761    /// Other nodes can use this address to join the cluster.
762    pub fn advertise_address(&self) -> SocketAddr {
763        self.advertise_addr
764    }
765
766    /// Get Plumtree peer statistics.
767    pub fn peer_stats(&self) -> crate::peer_state::PeerStats {
768        self.pm.peer_stats()
769    }
770
771    /// Get the number of online memberlist members.
772    pub async fn num_members(&self) -> usize {
773        self.memberlist.num_online_members().await
774    }
775
776    /// Broadcast a message through Plumtree.
777    ///
778    /// The message will be delivered to all nodes in the cluster via the
779    /// epidemic broadcast tree.
780    pub async fn broadcast(
781        &self,
782        payload: impl Into<bytes::Bytes>,
783    ) -> Result<crate::MessageId, crate::Error> {
784        self.pm.broadcast(payload).await
785    }
786
787    /// Join the cluster via seed nodes.
788    ///
789    /// This triggers automatic peer discovery via SWIM gossip.
790    /// The PlumtreeNodeDelegate will automatically update Plumtree's topology
791    /// as nodes are discovered.
792    ///
793    /// # Arguments
794    ///
795    /// * `seed_addrs` - Socket addresses of seed nodes to join through
796    ///
797    /// # Example
798    ///
799    /// ```ignore
800    /// let seeds = vec![
801    ///     "192.168.1.100:7946".parse().unwrap(),
802    ///     "192.168.1.101:7946".parse().unwrap(),
803    /// ];
804    /// stack.join(&seeds).await?;
805    /// ```
806    pub async fn join(
807        &self,
808        seed_addrs: &[SocketAddr],
809    ) -> Result<(), MemberlistStackError>
810    where
811        <T as memberlist_core::transport::Transport>::ResolvedAddress: From<SocketAddr>,
812    {
813        use memberlist_core::proto::MaybeResolvedAddress;
814
815        for &addr in seed_addrs {
816            // Create a placeholder ID - memberlist will resolve the actual ID
817            // We use a minimal ID representation that will be replaced during handshake
818            let seed_node = nodecraft::Node::new(
819                self.pm.plumtree().local_id().clone(),
820                MaybeResolvedAddress::Resolved(addr.into()),
821            );
822
823            self.memberlist
824                .join(seed_node)
825                .await
826                .map_err(|e| MemberlistStackError::JoinFailed(format!("{}", e)))?;
827        }
828        Ok(())
829    }
830
831    /// Leave the cluster gracefully.
832    ///
833    /// This notifies other nodes of the departure and waits for the
834    /// specified timeout for the leave message to propagate.
835    pub async fn leave(&self, timeout: std::time::Duration) -> Result<bool, MemberlistStackError> {
836        self.memberlist
837            .leave(timeout)
838            .await
839            .map_err(|e| MemberlistStackError::LeaveFailed(format!("{}", e)))
840    }
841
842    /// Shutdown the entire stack.
843    ///
844    /// This shuts down both Plumtree and Memberlist.
845    pub async fn shutdown(&self) -> Result<(), MemberlistStackError> {
846        self.pm.shutdown();
847        self.memberlist
848            .shutdown()
849            .await
850            .map_err(|e| MemberlistStackError::ShutdownFailed(format!("{}", e)))
851    }
852
853    /// Check if the stack has been shut down.
854    pub fn is_shutdown(&self) -> bool {
855        self.pm.is_shutdown()
856    }
857}
858
859/// Errors that can occur when using `MemberlistStack`.
860#[derive(Debug, Clone)]
861pub enum MemberlistStackError {
862    /// Failed to join the cluster.
863    JoinFailed(String),
864    /// Failed to leave the cluster.
865    LeaveFailed(String),
866    /// Failed to shutdown.
867    ShutdownFailed(String),
868    /// Failed to create the stack.
869    CreationFailed(String),
870}
871
872impl std::fmt::Display for MemberlistStackError {
873    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
874        match self {
875            Self::JoinFailed(e) => write!(f, "failed to join cluster: {}", e),
876            Self::LeaveFailed(e) => write!(f, "failed to leave cluster: {}", e),
877            Self::ShutdownFailed(e) => write!(f, "failed to shutdown: {}", e),
878            Self::CreationFailed(e) => write!(f, "failed to create stack: {}", e),
879        }
880    }
881}
882
883impl std::error::Error for MemberlistStackError {}
884
885// ============================================================================
886// Peer Persistence
887// ============================================================================
888
889/// Persistence module for saving and loading known peer addresses.
890///
891/// This provides crash recovery by allowing nodes to remember peers they've
892/// seen, reducing dependency on static seeds after initial bootstrap.
893pub mod persistence {
894    use std::fs::{self, File};
895    use std::io::{BufRead, BufReader, Write};
896    use std::net::SocketAddr;
897    use std::path::Path;
898
899    /// Error type for persistence operations.
900    #[derive(Debug)]
901    pub enum PersistenceError {
902        /// I/O error during file operations.
903        Io(std::io::Error),
904        /// Invalid address format in persisted data.
905        InvalidAddress(String),
906    }
907
908    impl std::fmt::Display for PersistenceError {
909        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
910            match self {
911                Self::Io(e) => write!(f, "persistence I/O error: {}", e),
912                Self::InvalidAddress(s) => write!(f, "invalid address in persistence: {}", s),
913            }
914        }
915    }
916
917    impl std::error::Error for PersistenceError {}
918
919    impl From<std::io::Error> for PersistenceError {
920        fn from(e: std::io::Error) -> Self {
921            Self::Io(e)
922        }
923    }
924
925    /// Save peer addresses to a file.
926    ///
927    /// Addresses are stored one per line in `ip:port` format.
928    pub fn save_peers(path: &Path, peers: &[SocketAddr]) -> Result<(), PersistenceError> {
929        // Create parent directories if needed
930        if let Some(parent) = path.parent() {
931            fs::create_dir_all(parent)?;
932        }
933
934        let mut file = File::create(path)?;
935        for peer in peers {
936            writeln!(file, "{}", peer)?;
937        }
938        file.sync_all()?;
939        Ok(())
940    }
941
942    /// Load peer addresses from a file.
943    ///
944    /// Returns an empty vector if the file doesn't exist.
945    /// Invalid lines are logged and skipped.
946    pub fn load_peers(path: &Path) -> Result<Vec<SocketAddr>, PersistenceError> {
947        if !path.exists() {
948            return Ok(Vec::new());
949        }
950
951        let file = File::open(path)?;
952        let reader = BufReader::new(file);
953        let mut peers = Vec::new();
954
955        for line in reader.lines() {
956            let line = line?;
957            let trimmed = line.trim();
958            if trimmed.is_empty() || trimmed.starts_with('#') {
959                continue;
960            }
961
962            match trimmed.parse::<SocketAddr>() {
963                Ok(addr) => peers.push(addr),
964                Err(_) => {
965                    tracing::warn!(
966                        line = trimmed,
967                        "skipping invalid address in peer persistence file"
968                    );
969                }
970            }
971        }
972
973        Ok(peers)
974    }
975
976    /// Atomically update the peer persistence file.
977    ///
978    /// Writes to a temp file first, then renames to avoid corruption.
979    pub fn save_peers_atomic(path: &Path, peers: &[SocketAddr]) -> Result<(), PersistenceError> {
980        let temp_path = path.with_extension("tmp");
981        save_peers(&temp_path, peers)?;
982        fs::rename(&temp_path, path)?;
983        Ok(())
984    }
985}
986
987// ============================================================================
988// Lazarus Task (Seed Recovery)
989// ============================================================================
990
991/// Statistics for the Lazarus seed recovery task.
992#[derive(Debug, Clone, Default)]
993pub struct LazarusStats {
994    /// Number of probes sent to dead seeds.
995    pub probes_sent: u64,
996    /// Number of successful reconnections.
997    pub reconnections: u64,
998    /// Number of failed reconnection attempts.
999    pub failures: u64,
1000    /// Number of seeds currently missing from the cluster.
1001    pub missing_seeds: usize,
1002}
1003
1004/// Handle for controlling the Lazarus background task.
1005#[derive(Clone)]
1006pub struct LazarusHandle {
1007    shutdown: Arc<std::sync::atomic::AtomicBool>,
1008    stats: Arc<parking_lot::RwLock<LazarusStats>>,
1009}
1010
1011impl LazarusHandle {
1012    /// Create a new Lazarus handle.
1013    fn new() -> Self {
1014        Self {
1015            shutdown: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1016            stats: Arc::new(parking_lot::RwLock::new(LazarusStats::default())),
1017        }
1018    }
1019
1020    /// Signal the Lazarus task to shutdown.
1021    pub fn shutdown(&self) {
1022        self.shutdown
1023            .store(true, std::sync::atomic::Ordering::Release);
1024    }
1025
1026    /// Check if shutdown has been requested.
1027    pub fn is_shutdown(&self) -> bool {
1028        self.shutdown.load(std::sync::atomic::Ordering::Acquire)
1029    }
1030
1031    /// Get current Lazarus statistics.
1032    pub fn stats(&self) -> LazarusStats {
1033        self.stats.read().clone()
1034    }
1035
1036    /// Update missing seeds count.
1037    fn set_missing_seeds(&self, count: usize) {
1038        self.stats.write().missing_seeds = count;
1039    }
1040
1041    /// Record a probe sent.
1042    fn record_probe(&self) {
1043        self.stats.write().probes_sent += 1;
1044    }
1045
1046    /// Record a successful reconnection.
1047    fn record_reconnection(&self) {
1048        self.stats.write().reconnections += 1;
1049    }
1050
1051    /// Record a failed reconnection attempt.
1052    fn record_failure(&self) {
1053        self.stats.write().failures += 1;
1054    }
1055}
1056
1057impl Default for LazarusHandle {
1058    fn default() -> Self {
1059        Self::new()
1060    }
1061}
1062
1063// ============================================================================
1064// MemberlistStack Lazarus Integration
1065// ============================================================================
1066
1067impl<I, PD, T, D> MemberlistStack<I, PD, T, D>
1068where
1069    I: Id + IdCodec + memberlist_core::proto::Data + Clone + Ord + Send + Sync + 'static,
1070    PD: PlumtreeDelegate<I> + 'static,
1071    T: memberlist_core::transport::Transport<Id = I> + 'static,
1072    D: memberlist_core::delegate::Delegate<Id = I, Address = T::ResolvedAddress> + 'static,
1073    T::ResolvedAddress: From<SocketAddr>,
1074{
1075    /// Spawn the Lazarus background task for automatic seed recovery.
1076    ///
1077    /// This task periodically checks if any configured static seeds are missing
1078    /// from the cluster and attempts to rejoin them. This handles the "Ghost Seed"
1079    /// problem where a restarted seed node remains isolated because other nodes
1080    /// stopped probing it after marking it dead.
1081    ///
1082    /// # Arguments
1083    ///
1084    /// * `config` - The bridge configuration containing static seeds and interval
1085    ///
1086    /// # Returns
1087    ///
1088    /// A `LazarusHandle` that can be used to check stats and shutdown the task.
1089    ///
1090    /// # Example
1091    ///
1092    /// ```ignore
1093    /// let config = BridgeConfig::new()
1094    ///     .with_static_seeds(vec![
1095    ///         "192.168.1.100:7946".parse().unwrap(),
1096    ///         "192.168.1.101:7946".parse().unwrap(),
1097    ///     ])
1098    ///     .with_lazarus_enabled(true)
1099    ///     .with_lazarus_interval(Duration::from_secs(30));
1100    ///
1101    /// let handle = stack.spawn_lazarus_task(config);
1102    ///
1103    /// // Later, check stats
1104    /// let stats = handle.stats();
1105    /// println!("Reconnections: {}", stats.reconnections);
1106    ///
1107    /// // Shutdown when done
1108    /// handle.shutdown();
1109    /// ```
1110    pub fn spawn_lazarus_task(&self, config: BridgeConfig) -> LazarusHandle
1111    where
1112        I: std::fmt::Debug,
1113    {
1114        let handle = LazarusHandle::new();
1115
1116        if !config.should_run_lazarus() {
1117            tracing::debug!("Lazarus task not started: disabled or no seeds configured");
1118            return handle;
1119        }
1120
1121        let handle_clone = handle.clone();
1122        let memberlist = self.memberlist.clone();
1123        let local_id = self.pm.plumtree().local_id().clone();
1124        let seeds = config.static_seeds.clone();
1125        let interval = config.lazarus_interval;
1126
1127        // Spawn the background task using the runtime
1128        // Note: This requires a tokio runtime to be available
1129        #[cfg(feature = "tokio")]
1130        tokio::spawn(async move {
1131            tracing::info!(
1132                seeds = ?seeds,
1133                interval = ?interval,
1134                "Lazarus task started: monitoring {} static seeds",
1135                seeds.len()
1136            );
1137
1138            loop {
1139                // Check for shutdown
1140                if handle_clone.is_shutdown() {
1141                    tracing::info!("Lazarus task shutting down");
1142                    break;
1143                }
1144
1145                // Sleep for the configured interval
1146                futures_timer::Delay::new(interval).await;
1147
1148                // Check for shutdown again after sleep
1149                if handle_clone.is_shutdown() {
1150                    tracing::info!("Lazarus task shutting down");
1151                    break;
1152                }
1153
1154                // Get alive member addresses
1155                let alive_addrs = Self::get_alive_addresses(&memberlist).await;
1156
1157                // Find missing seeds
1158                let missing_seeds: Vec<_> = seeds
1159                    .iter()
1160                    .filter(|seed| !alive_addrs.contains(seed))
1161                    .cloned()
1162                    .collect();
1163
1164                handle_clone.set_missing_seeds(missing_seeds.len());
1165
1166                if missing_seeds.is_empty() {
1167                    tracing::trace!("Lazarus: all static seeds are alive");
1168                    continue;
1169                }
1170
1171                tracing::info!(
1172                    missing = ?missing_seeds,
1173                    "Lazarus: {} static seed(s) not in alive set, attempting rejoin",
1174                    missing_seeds.len()
1175                );
1176
1177                // Attempt to rejoin each missing seed
1178                for seed_addr in missing_seeds {
1179                    handle_clone.record_probe();
1180
1181                    // Create a node for the join attempt
1182                    // We use the local_id as a placeholder - memberlist will resolve the actual ID
1183                    use memberlist_core::proto::MaybeResolvedAddress;
1184                    let seed_node = nodecraft::Node::new(
1185                        local_id.clone(),
1186                        MaybeResolvedAddress::Resolved(seed_addr.into()),
1187                    );
1188
1189                    match memberlist.join(seed_node).await {
1190                        Ok(_) => {
1191                            handle_clone.record_reconnection();
1192                            tracing::info!(
1193                                seed = %seed_addr,
1194                                "Lazarus: successfully reconnected to seed"
1195                            );
1196                        }
1197                        Err(e) => {
1198                            handle_clone.record_failure();
1199                            tracing::debug!(
1200                                seed = %seed_addr,
1201                                error = %e,
1202                                "Lazarus: failed to reconnect to seed"
1203                            );
1204                        }
1205                    }
1206                }
1207            }
1208        });
1209
1210        // When tokio is not available, log a warning
1211        #[cfg(not(feature = "tokio"))]
1212        {
1213            tracing::warn!(
1214                "Lazarus task requires the 'tokio' feature. Seed recovery is disabled."
1215            );
1216        }
1217
1218        handle
1219    }
1220
1221    /// Get socket addresses of all alive members.
1222    ///
1223    /// This is used by the Lazarus task to check which seeds are currently alive.
1224    async fn get_alive_addresses(
1225        memberlist: &memberlist_core::Memberlist<T, D>,
1226    ) -> std::collections::HashSet<SocketAddr> {
1227        let mut addrs = std::collections::HashSet::new();
1228
1229        // Get online members and extract their addresses
1230        let members = memberlist.online_members().await;
1231        for member in members.iter() {
1232            // Try to extract SocketAddr from the node's address
1233            // This handles the common case where the address is a SocketAddr
1234            if let Some(addr) = Self::extract_member_addr(member) {
1235                addrs.insert(addr);
1236            }
1237        }
1238
1239        addrs
1240    }
1241
1242    /// Extract socket address from a member node.
1243    fn extract_member_addr(
1244        node: &std::sync::Arc<memberlist_core::proto::NodeState<I, T::ResolvedAddress>>,
1245    ) -> Option<SocketAddr> {
1246        use std::any::Any;
1247
1248        let addr = node.address();
1249        let addr_any = addr as &dyn Any;
1250
1251        // Try direct SocketAddr
1252        if let Some(socket_addr) = addr_any.downcast_ref::<SocketAddr>() {
1253            return Some(*socket_addr);
1254        }
1255
1256        None
1257    }
1258
1259    /// Save current cluster members to persistence file.
1260    ///
1261    /// This should be called periodically or on graceful shutdown to save
1262    /// known peers for recovery after restart.
1263    ///
1264    /// # Arguments
1265    ///
1266    /// * `path` - Path to save the peer list
1267    pub async fn save_peers_to_file(
1268        &self,
1269        path: &std::path::Path,
1270    ) -> Result<(), persistence::PersistenceError> {
1271        let addrs: Vec<SocketAddr> = Self::get_alive_addresses(&self.memberlist)
1272            .await
1273            .into_iter()
1274            .collect();
1275
1276        persistence::save_peers_atomic(path, &addrs)?;
1277
1278        tracing::debug!(
1279            path = ?path,
1280            count = addrs.len(),
1281            "Saved {} peer addresses to persistence file",
1282            addrs.len()
1283        );
1284
1285        Ok(())
1286    }
1287
1288    /// Load persisted peers and combine with static seeds.
1289    ///
1290    /// This provides a comprehensive list of potential bootstrap addresses.
1291    ///
1292    /// # Arguments
1293    ///
1294    /// * `config` - Bridge configuration with persistence path and static seeds
1295    ///
1296    /// # Returns
1297    ///
1298    /// Combined list of unique addresses from both persistence and static seeds.
1299    pub fn load_bootstrap_addresses(config: &BridgeConfig) -> Vec<SocketAddr> {
1300        let mut addrs = std::collections::HashSet::new();
1301
1302        // Add static seeds
1303        for seed in &config.static_seeds {
1304            addrs.insert(*seed);
1305        }
1306
1307        // Load persisted peers if path is configured
1308        if let Some(ref path) = config.persistence_path {
1309            match persistence::load_peers(path) {
1310                Ok(persisted) => {
1311                    tracing::info!(
1312                        path = ?path,
1313                        count = persisted.len(),
1314                        "Loaded {} peers from persistence file",
1315                        persisted.len()
1316                    );
1317                    for addr in persisted {
1318                        addrs.insert(addr);
1319                    }
1320                }
1321                Err(e) => {
1322                    tracing::warn!(
1323                        path = ?path,
1324                        error = %e,
1325                        "Failed to load persisted peers"
1326                    );
1327                }
1328            }
1329        }
1330
1331        addrs.into_iter().collect()
1332    }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337    use super::*;
1338    use crate::{NoopDelegate, PlumtreeConfig};
1339
1340    #[test]
1341    fn test_bridge_creation() {
1342        let pm = Arc::new(PlumtreeMemberlist::new(
1343            1u64,
1344            PlumtreeConfig::default(),
1345            NoopDelegate,
1346        ));
1347        let bridge = PlumtreeBridge::new(pm);
1348        assert!(!bridge.is_shutdown());
1349    }
1350
1351    #[test]
1352    fn test_bridge_config() {
1353        let config = BridgeConfig::new()
1354            .with_log_changes(false)
1355            .with_auto_promote(false);
1356
1357        assert!(!config.log_changes);
1358        assert!(!config.auto_promote);
1359    }
1360
1361    #[test]
1362    fn test_stack_builder() {
1363        let pm = Arc::new(PlumtreeMemberlist::new(
1364            1u64,
1365            PlumtreeConfig::default(),
1366            NoopDelegate,
1367        ));
1368        let bridge = PlumtreeStackBuilder::new(pm)
1369            .with_config(BridgeConfig::default())
1370            .build();
1371
1372        assert!(!bridge.is_shutdown());
1373    }
1374
1375    #[cfg(feature = "quic")]
1376    #[test]
1377    fn test_bridge_with_resolver() {
1378        use std::net::SocketAddr;
1379
1380        let local_addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
1381        let resolver = Arc::new(MapPeerResolver::new(local_addr));
1382
1383        let pm = Arc::new(PlumtreeMemberlist::new(
1384            1u64,
1385            PlumtreeConfig::default(),
1386            NoopDelegate,
1387        ));
1388
1389        let bridge = PlumtreeBridge::with_resolver(pm, resolver.clone());
1390        assert!(bridge.resolver().is_some());
1391    }
1392
1393    // ========================================================================
1394    // BridgeConfig Tests
1395    // ========================================================================
1396
1397    #[test]
1398    fn test_bridge_config_static_seeds() {
1399        let seeds: Vec<SocketAddr> = vec![
1400            "192.168.1.100:7946".parse().unwrap(),
1401            "192.168.1.101:7946".parse().unwrap(),
1402        ];
1403
1404        let config = BridgeConfig::new()
1405            .with_static_seeds(seeds.clone())
1406            .with_lazarus_enabled(true)
1407            .with_lazarus_interval(Duration::from_secs(60));
1408
1409        assert_eq!(config.static_seeds.len(), 2);
1410        assert!(config.lazarus_enabled);
1411        assert_eq!(config.lazarus_interval, Duration::from_secs(60));
1412        assert!(config.should_run_lazarus());
1413    }
1414
1415    #[test]
1416    fn test_bridge_config_should_run_lazarus() {
1417        // Disabled, no seeds
1418        let config = BridgeConfig::new();
1419        assert!(!config.should_run_lazarus());
1420
1421        // Enabled, no seeds
1422        let config = BridgeConfig::new().with_lazarus_enabled(true);
1423        assert!(!config.should_run_lazarus());
1424
1425        // Disabled, has seeds
1426        let seeds: Vec<SocketAddr> = vec!["192.168.1.100:7946".parse().unwrap()];
1427        let config = BridgeConfig::new().with_static_seeds(seeds.clone());
1428        assert!(!config.should_run_lazarus());
1429
1430        // Enabled, has seeds
1431        let config = BridgeConfig::new()
1432            .with_static_seeds(seeds)
1433            .with_lazarus_enabled(true);
1434        assert!(config.should_run_lazarus());
1435    }
1436
1437    #[test]
1438    fn test_bridge_config_persistence_path() {
1439        let config = BridgeConfig::new()
1440            .with_persistence_path(PathBuf::from("/tmp/peers.txt"));
1441
1442        assert_eq!(
1443            config.persistence_path,
1444            Some(PathBuf::from("/tmp/peers.txt"))
1445        );
1446    }
1447
1448    // ========================================================================
1449    // Persistence Tests
1450    // ========================================================================
1451
1452    #[test]
1453    fn test_persistence_save_and_load() {
1454        use std::fs;
1455        use tempfile::tempdir;
1456
1457        // Create a temp directory for the test
1458        let dir = tempdir().unwrap();
1459        let path = dir.path().join("peers.txt");
1460
1461        let peers: Vec<SocketAddr> = vec![
1462            "192.168.1.100:7946".parse().unwrap(),
1463            "10.0.0.1:7946".parse().unwrap(),
1464            "172.16.0.1:7946".parse().unwrap(),
1465        ];
1466
1467        // Save peers
1468        persistence::save_peers(&path, &peers).unwrap();
1469
1470        // Load peers
1471        let loaded = persistence::load_peers(&path).unwrap();
1472
1473        assert_eq!(loaded.len(), 3);
1474        assert!(loaded.contains(&"192.168.1.100:7946".parse().unwrap()));
1475        assert!(loaded.contains(&"10.0.0.1:7946".parse().unwrap()));
1476        assert!(loaded.contains(&"172.16.0.1:7946".parse().unwrap()));
1477
1478        // Cleanup
1479        fs::remove_file(&path).ok();
1480    }
1481
1482    #[test]
1483    fn test_persistence_load_nonexistent() {
1484        let path = PathBuf::from("/nonexistent/path/peers.txt");
1485        let loaded = persistence::load_peers(&path).unwrap();
1486        assert!(loaded.is_empty());
1487    }
1488
1489    #[test]
1490    fn test_persistence_save_atomic() {
1491        use std::fs;
1492        use tempfile::tempdir;
1493
1494        let dir = tempdir().unwrap();
1495        let path = dir.path().join("peers.txt");
1496
1497        let peers: Vec<SocketAddr> = vec![
1498            "192.168.1.100:7946".parse().unwrap(),
1499            "192.168.1.101:7946".parse().unwrap(),
1500        ];
1501
1502        // Save atomically
1503        persistence::save_peers_atomic(&path, &peers).unwrap();
1504
1505        // Verify file exists and can be loaded
1506        let loaded = persistence::load_peers(&path).unwrap();
1507        assert_eq!(loaded.len(), 2);
1508
1509        // Temp file should not exist
1510        let temp_path = path.with_extension("tmp");
1511        assert!(!temp_path.exists());
1512
1513        // Cleanup
1514        fs::remove_file(&path).ok();
1515    }
1516
1517    #[test]
1518    fn test_persistence_handles_comments_and_empty_lines() {
1519        use std::fs::{self, File};
1520        use std::io::Write;
1521        use tempfile::tempdir;
1522
1523        let dir = tempdir().unwrap();
1524        let path = dir.path().join("peers.txt");
1525
1526        // Write a file with comments and empty lines
1527        let mut file = File::create(&path).unwrap();
1528        writeln!(file, "# This is a comment").unwrap();
1529        writeln!(file, "192.168.1.100:7946").unwrap();
1530        writeln!(file).unwrap(); // Empty line
1531        writeln!(file, "   ").unwrap(); // Whitespace only
1532        writeln!(file, "# Another comment").unwrap();
1533        writeln!(file, "192.168.1.101:7946").unwrap();
1534
1535        let loaded = persistence::load_peers(&path).unwrap();
1536        assert_eq!(loaded.len(), 2);
1537
1538        // Cleanup
1539        fs::remove_file(&path).ok();
1540    }
1541
1542    // ========================================================================
1543    // LazarusHandle Tests
1544    // ========================================================================
1545
1546    #[test]
1547    fn test_lazarus_handle_creation() {
1548        let handle = LazarusHandle::new();
1549        assert!(!handle.is_shutdown());
1550
1551        let stats = handle.stats();
1552        assert_eq!(stats.probes_sent, 0);
1553        assert_eq!(stats.reconnections, 0);
1554        assert_eq!(stats.failures, 0);
1555        assert_eq!(stats.missing_seeds, 0);
1556    }
1557
1558    #[test]
1559    fn test_lazarus_handle_shutdown() {
1560        let handle = LazarusHandle::new();
1561        assert!(!handle.is_shutdown());
1562
1563        handle.shutdown();
1564        assert!(handle.is_shutdown());
1565    }
1566
1567    #[test]
1568    fn test_lazarus_handle_stats_recording() {
1569        let handle = LazarusHandle::new();
1570
1571        handle.record_probe();
1572        handle.record_probe();
1573        handle.record_reconnection();
1574        handle.record_failure();
1575        handle.set_missing_seeds(3);
1576
1577        let stats = handle.stats();
1578        assert_eq!(stats.probes_sent, 2);
1579        assert_eq!(stats.reconnections, 1);
1580        assert_eq!(stats.failures, 1);
1581        assert_eq!(stats.missing_seeds, 3);
1582    }
1583
1584    #[test]
1585    fn test_lazarus_handle_clone() {
1586        let handle = LazarusHandle::new();
1587        let handle2 = handle.clone();
1588
1589        // Both handles share the same state
1590        handle.record_probe();
1591        assert_eq!(handle2.stats().probes_sent, 1);
1592
1593        handle2.shutdown();
1594        assert!(handle.is_shutdown());
1595    }
1596}