memberlist_plumtree/
plumtree.rs

1//! Core Plumtree protocol implementation.
2//!
3//! This module provides the main `Plumtree` struct that combines
4//! eager/lazy push mechanisms for efficient O(n) broadcast.
5
6use async_channel::{Receiver, Sender};
7use async_lock::RwLock;
8use bytes::Bytes;
9use futures_timer::Delay;
10use smallvec::SmallVec;
11use std::{
12    collections::HashMap,
13    fmt::Debug,
14    hash::Hash,
15    sync::{
16        atomic::{AtomicBool, AtomicU32, Ordering},
17        Arc,
18    },
19};
20use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument};
21
22use crate::{
23    adaptive_batcher::{AdaptiveBatcher, BatcherConfig},
24    cleanup_tuner::{CleanupConfig, CleanupTuner},
25    config::PlumtreeConfig,
26    error::{Error, Result},
27    health::{HealthReport, HealthReportBuilder},
28    message::{MessageCache, MessageId, PlumtreeMessage},
29    peer_scoring::{PeerScoring, ScoringConfig},
30    peer_state::SharedPeerState,
31    rate_limiter::RateLimiter,
32    scheduler::{GraftTimer, IHaveScheduler, PendingIHave},
33    PeerStateBuilder,
34};
35
36/// Number of shards for the seen map.
37/// 16 shards provides good concurrency while keeping memory overhead low.
38const SEEN_MAP_SHARDS: usize = 16;
39
40/// Default max capacity per shard for the seen map.
41/// With 16 shards, this gives a total capacity of ~160,000 entries.
42const DEFAULT_MAX_CAPACITY_PER_SHARD: usize = 10_000;
43
44/// Sharded map for tracking seen messages.
45///
46/// Uses multiple shards to reduce lock contention:
47/// - Each operation only locks one shard (based on message ID hash)
48/// - Cleanup iterates through shards one at a time, yielding between shards
49/// - Emergency eviction occurs when a shard exceeds its capacity
50///
51/// This prevents the cleanup task from blocking all message processing
52/// and ensures bounded memory usage even under high message velocity.
53struct ShardedSeenMap<I> {
54    shards: Vec<RwLock<HashMap<MessageId, SeenEntry<I>>>>,
55    /// Maximum entries per shard before emergency eviction.
56    max_capacity_per_shard: usize,
57}
58
59impl<I> ShardedSeenMap<I> {
60    /// Create a new sharded seen map with the default number of shards and capacity.
61    fn new() -> Self {
62        Self::with_capacity(DEFAULT_MAX_CAPACITY_PER_SHARD)
63    }
64
65    /// Create a new sharded seen map with custom max capacity per shard.
66    fn with_capacity(max_capacity_per_shard: usize) -> Self {
67        let shards = (0..SEEN_MAP_SHARDS)
68            .map(|_| RwLock::new(HashMap::new()))
69            .collect();
70        Self {
71            shards,
72            max_capacity_per_shard,
73        }
74    }
75
76    /// Get the shard index for a message ID.
77    fn shard_index(&self, id: &MessageId) -> usize {
78        // Use timestamp and random bytes for distribution
79        let hash = id.timestamp() as usize ^ (id.random() as usize);
80        hash % self.shards.len()
81    }
82
83    /// Get read access to the shard containing the given message ID.
84    async fn read_shard(
85        &self,
86        id: &MessageId,
87    ) -> async_lock::RwLockReadGuard<'_, HashMap<MessageId, SeenEntry<I>>> {
88        let idx = self.shard_index(id);
89        self.shards[idx].read().await
90    }
91
92    /// Get write access to the shard containing the given message ID.
93    async fn write_shard(
94        &self,
95        id: &MessageId,
96    ) -> async_lock::RwLockWriteGuard<'_, HashMap<MessageId, SeenEntry<I>>> {
97        let idx = self.shard_index(id);
98        self.shards[idx].write().await
99    }
100}
101
102impl<I: Clone> ShardedSeenMap<I> {
103    /// Check if a message has been seen and get its entry.
104    #[allow(dead_code)]
105    async fn get(&self, id: &MessageId) -> Option<SeenEntry<I>> {
106        let shard = self.read_shard(id).await;
107        shard.get(id).cloned()
108    }
109
110    /// Check if a message has been seen.
111    async fn contains(&self, id: &MessageId) -> bool {
112        let shard = self.read_shard(id).await;
113        shard.contains_key(id)
114    }
115
116    /// Insert a seen entry for a message.
117    /// Returns true if this is a new entry, false if it already existed.
118    async fn insert(&self, id: MessageId, entry: SeenEntry<I>) -> bool {
119        let mut shard = self.write_shard(&id).await;
120        if shard.contains_key(&id) {
121            false
122        } else {
123            shard.insert(id, entry);
124            true
125        }
126    }
127
128    /// Update an existing entry or insert a new one.
129    #[allow(dead_code)]
130    async fn get_or_insert_with<F>(&self, id: MessageId, f: F) -> (SeenEntry<I>, bool)
131    where
132        F: FnOnce() -> SeenEntry<I>,
133    {
134        let mut shard = self.write_shard(&id).await;
135        if let Some(entry) = shard.get(&id) {
136            (entry.clone(), false)
137        } else {
138            let entry = f();
139            shard.insert(id, entry.clone());
140            (entry, true)
141        }
142    }
143
144    /// Increment receive count for an existing entry.
145    /// Returns the new count, or None if the entry doesn't exist.
146    #[allow(dead_code)]
147    async fn increment_receive_count(&self, id: &MessageId) -> Option<u32> {
148        let mut shard = self.write_shard(id).await;
149        if let Some(entry) = shard.get_mut(id) {
150            entry.receive_count += 1;
151            Some(entry.receive_count)
152        } else {
153            None
154        }
155    }
156
157    /// Check if message is seen; if so, increment count. Otherwise insert new entry.
158    /// Returns (is_duplicate, receive_count, evicted_count).
159    ///
160    /// If the shard exceeds capacity, emergency eviction removes the oldest entries.
161    async fn check_and_mark_seen(
162        &self,
163        id: MessageId,
164        entry_fn: impl FnOnce() -> SeenEntry<I>,
165    ) -> (bool, u32, usize) {
166        let idx = self.shard_index(&id);
167        let mut shard = self.shards[idx].write().await;
168
169        if let Some(entry) = shard.get_mut(&id) {
170            entry.receive_count += 1;
171            (true, entry.receive_count, 0)
172        } else {
173            // Check if we need emergency eviction before inserting
174            let evicted = if shard.len() >= self.max_capacity_per_shard {
175                Self::emergency_evict(&mut shard, self.max_capacity_per_shard / 10)
176            } else {
177                0
178            };
179
180            let entry = entry_fn();
181            let count = entry.receive_count;
182            shard.insert(id, entry);
183            (false, count, evicted)
184        }
185    }
186
187    /// Perform emergency eviction of oldest entries from a shard.
188    /// Removes at least `target_evict` entries (more if needed to make room).
189    /// Returns the number of entries evicted.
190    fn emergency_evict(shard: &mut HashMap<MessageId, SeenEntry<I>>, target_evict: usize) -> usize {
191        if shard.is_empty() || target_evict == 0 {
192            return 0;
193        }
194
195        // Find the oldest entries by seen_at timestamp
196        let mut entries: Vec<(MessageId, std::time::Instant)> = shard
197            .iter()
198            .map(|(id, entry)| (*id, entry.seen_at))
199            .collect();
200
201        // Sort by seen_at (oldest first)
202        entries.sort_by_key(|(_, seen_at)| *seen_at);
203
204        // Remove the oldest entries
205        let to_remove = target_evict.min(entries.len());
206        for (id, _) in entries.into_iter().take(to_remove) {
207            shard.remove(&id);
208        }
209
210        to_remove
211    }
212
213    /// Clean up expired entries from one shard.
214    /// Returns the number of entries removed.
215    async fn cleanup_shard(
216        &self,
217        shard_idx: usize,
218        now: std::time::Instant,
219        ttl: std::time::Duration,
220    ) -> usize {
221        let mut shard = self.shards[shard_idx].write().await;
222        let before = shard.len();
223        shard.retain(|_, entry| now.duration_since(entry.seen_at) < ttl);
224        before.saturating_sub(shard.len())
225    }
226
227    /// Get the total number of entries across all shards.
228    #[allow(dead_code)]
229    async fn len(&self) -> usize {
230        let mut total = 0;
231        for shard in &self.shards {
232            total += shard.read().await.len();
233        }
234        total
235    }
236
237    /// Try to get the approximate total number of entries (non-blocking).
238    /// Returns None if any shard lock cannot be acquired immediately.
239    fn try_len(&self) -> Option<usize> {
240        let mut total = 0;
241        for shard in &self.shards {
242            total += shard.try_read()?.len();
243        }
244        Some(total)
245    }
246
247    /// Get the number of shards.
248    fn shard_count(&self) -> usize {
249        self.shards.len()
250    }
251
252    /// Get the maximum capacity per shard.
253    fn max_capacity_per_shard(&self) -> usize {
254        self.max_capacity_per_shard
255    }
256
257    /// Get the total maximum capacity across all shards.
258    fn total_capacity(&self) -> usize {
259        self.shards.len() * self.max_capacity_per_shard
260    }
261}
262
263/// Delegate trait for receiving Plumtree events.
264///
265/// Implement this trait to handle delivered messages and other events.
266///
267/// # Type Parameters
268///
269/// - `I`: The peer identifier type (same as used with `Plumtree<I, D>`)
270///
271/// # Important Notes
272///
273/// - **Non-blocking**: All methods should return quickly. If your implementation
274///   needs to perform heavy computation or blocking I/O (e.g., writing to a database),
275///   spawn the work in a separate task to avoid blocking the Plumtree message loop.
276/// - **Thread-safe**: Methods may be called concurrently from multiple threads.
277///
278/// # Example (Non-blocking delivery)
279///
280/// ```ignore
281/// impl PlumtreeDelegate<NodeId> for MyApp {
282///     fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
283///         let tx = self.delivery_tx.clone();
284///         tokio::spawn(async move {
285///             // Heavy processing in background
286///             tx.send((message_id, payload)).await.ok();
287///         });
288///     }
289///
290///     fn on_graft_failed(&self, message_id: &MessageId, peer: &NodeId) {
291///         tracing::warn!("Graft failed for {:?} from peer {:?}", message_id, peer);
292///     }
293/// }
294/// ```
295pub trait PlumtreeDelegate<I = ()>: Send + Sync + 'static {
296    /// Called when a message is delivered (first time received).
297    ///
298    /// **Warning**: This is called synchronously in the message processing path.
299    /// Long-running operations here will block message forwarding and tree repair.
300    /// For heavy processing, spawn a background task instead.
301    fn on_deliver(&self, message_id: MessageId, payload: Bytes);
302
303    /// Called when a peer is promoted to eager.
304    fn on_eager_promotion(&self, _peer: &I) {}
305
306    /// Called when a peer is demoted to lazy.
307    fn on_lazy_demotion(&self, _peer: &I) {}
308
309    /// Called when a Graft is sent (tree repair).
310    fn on_graft_sent(&self, _peer: &I, _message_id: &MessageId) {}
311
312    /// Called when a Prune is sent (tree optimization).
313    fn on_prune_sent(&self, _peer: &I) {}
314
315    /// Called when Graft retries for a message are exhausted (zombie peer detection).
316    ///
317    /// This indicates that the message could not be retrieved from any peer after
318    /// maximum retry attempts. The peer that originally sent the IHave may be dead
319    /// or unreachable.
320    ///
321    /// Implementations may want to:
322    /// - Log a warning about the potentially dead peer
323    /// - Track failed peers for health monitoring
324    /// - Trigger peer removal if failures persist
325    fn on_graft_failed(&self, _message_id: &MessageId, _peer: &I) {}
326}
327
328// Manual implementations for Arc<T> and Box<T> since auto_impl doesn't handle generic traits well
329impl<I, T: PlumtreeDelegate<I> + ?Sized> PlumtreeDelegate<I> for Arc<T> {
330    fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
331        (**self).on_deliver(message_id, payload)
332    }
333    fn on_eager_promotion(&self, peer: &I) {
334        (**self).on_eager_promotion(peer)
335    }
336    fn on_lazy_demotion(&self, peer: &I) {
337        (**self).on_lazy_demotion(peer)
338    }
339    fn on_graft_sent(&self, peer: &I, message_id: &MessageId) {
340        (**self).on_graft_sent(peer, message_id)
341    }
342    fn on_prune_sent(&self, peer: &I) {
343        (**self).on_prune_sent(peer)
344    }
345    fn on_graft_failed(&self, message_id: &MessageId, peer: &I) {
346        (**self).on_graft_failed(message_id, peer)
347    }
348}
349
350impl<I, T: PlumtreeDelegate<I> + ?Sized> PlumtreeDelegate<I> for Box<T> {
351    fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
352        (**self).on_deliver(message_id, payload)
353    }
354    fn on_eager_promotion(&self, peer: &I) {
355        (**self).on_eager_promotion(peer)
356    }
357    fn on_lazy_demotion(&self, peer: &I) {
358        (**self).on_lazy_demotion(peer)
359    }
360    fn on_graft_sent(&self, peer: &I, message_id: &MessageId) {
361        (**self).on_graft_sent(peer, message_id)
362    }
363    fn on_prune_sent(&self, peer: &I) {
364        (**self).on_prune_sent(peer)
365    }
366    fn on_graft_failed(&self, message_id: &MessageId, peer: &I) {
367        (**self).on_graft_failed(message_id, peer)
368    }
369}
370
371/// No-op delegate for when no handler is needed.
372#[derive(Debug, Clone, Copy, Default)]
373pub struct NoopDelegate;
374
375impl<I> PlumtreeDelegate<I> for NoopDelegate {
376    fn on_deliver(&self, _message_id: MessageId, _payload: Bytes) {}
377}
378
379/// Core Plumtree broadcast implementation.
380///
381/// Combines SWIM-based membership (via memberlist) with Plumtree's
382/// epidemic broadcast trees for efficient O(n) message dissemination.
383///
384/// # Type Parameters
385///
386/// - `I`: Node identifier type (must be clonable, hashable, and serializable)
387/// - `D`: Delegate type for receiving events
388pub struct Plumtree<I, D = NoopDelegate> {
389    /// Inner state.
390    inner: Arc<PlumtreeInner<I, D>>,
391}
392
393struct PlumtreeInner<I, D> {
394    /// Peer state (eager and lazy sets).
395    peers: SharedPeerState<I>,
396
397    /// Message cache for Graft requests.
398    cache: Arc<MessageCache>,
399
400    /// IHave scheduler.
401    scheduler: Arc<IHaveScheduler>,
402
403    /// Graft timer for tracking pending messages.
404    graft_timer: Arc<GraftTimer<I>>,
405
406    /// Rate limiter for Graft requests (per-peer).
407    graft_rate_limiter: RateLimiter<I>,
408
409    /// Peer scoring for RTT-based optimization and zombie peer detection.
410    peer_scoring: Arc<PeerScoring<I>>,
411
412    /// Dynamic cleanup tuning based on system load.
413    cleanup_tuner: Arc<CleanupTuner>,
414
415    /// Adaptive IHave batching based on network conditions.
416    adaptive_batcher: Arc<AdaptiveBatcher>,
417
418    /// Event delegate.
419    delegate: D,
420
421    /// Configuration.
422    config: PlumtreeConfig,
423
424    /// Local node ID.
425    local_id: I,
426
427    /// Current broadcast round.
428    round: AtomicU32,
429
430    /// Shutdown flag.
431    shutdown: AtomicBool,
432
433    /// Channel for outgoing messages.
434    outgoing_tx: Sender<OutgoingMessage<I>>,
435
436    /// Channel for incoming messages.
437    incoming_tx: Sender<IncomingMessage<I>>,
438
439    /// Track messages seen for deduplication and parent tracking.
440    ///
441    /// Uses sharded locking to reduce contention:
442    /// - Each message operation only locks one shard (based on message ID hash)
443    /// - Cleanup iterates through shards one at a time, yielding between shards
444    /// - This prevents the cleanup task from blocking all message processing
445    seen: ShardedSeenMap<I>,
446}
447
448/// Entry for tracking seen messages and their delivery parent.
449#[derive(Debug, Clone)]
450struct SeenEntry<I> {
451    /// Round when first seen.
452    /// Reserved for future use in protocol diagnostics.
453    #[allow(dead_code)]
454    round: u32,
455    /// Number of times received.
456    receive_count: u32,
457    /// When this entry was first seen (for TTL-based cleanup).
458    seen_at: std::time::Instant,
459    /// Parent peer who delivered this message first.
460    /// Reserved for future use in tree repair when a better path is found.
461    #[allow(dead_code)]
462    parent: Option<I>,
463}
464
465/// Statistics about the seen map (deduplication map).
466#[derive(Debug, Clone, Copy)]
467pub struct SeenMapStats {
468    /// Current number of entries in the map.
469    pub size: usize,
470    /// Total maximum capacity across all shards.
471    pub capacity: usize,
472    /// Current utilization (size / capacity).
473    pub utilization: f64,
474    /// Number of shards.
475    pub shard_count: usize,
476    /// Maximum entries per shard.
477    pub max_per_shard: usize,
478}
479
480/// Outgoing message to be sent.
481#[derive(Debug)]
482pub struct OutgoingMessage<I> {
483    /// Target peer for unicast messages, None for broadcast.
484    pub target: Option<I>,
485    /// Message to send.
486    pub message: PlumtreeMessage,
487}
488
489impl<I> OutgoingMessage<I> {
490    /// Create a unicast message to a specific peer.
491    pub fn unicast(target: I, message: PlumtreeMessage) -> Self {
492        Self {
493            target: Some(target),
494            message,
495        }
496    }
497
498    /// Create a broadcast message (for initial broadcast only).
499    pub fn broadcast(message: PlumtreeMessage) -> Self {
500        Self {
501            target: None,
502            message,
503        }
504    }
505
506    /// Check if this is a unicast message.
507    pub fn is_unicast(&self) -> bool {
508        self.target.is_some()
509    }
510
511    /// Check if this is a broadcast message.
512    pub fn is_broadcast(&self) -> bool {
513        self.target.is_none()
514    }
515}
516
517/// Incoming message received from a peer.
518#[derive(Debug)]
519pub struct IncomingMessage<I> {
520    /// Source peer.
521    pub from: I,
522    /// Received message.
523    pub message: PlumtreeMessage,
524}
525
526impl<I, D> Plumtree<I, D>
527where
528    I: Clone + Eq + Hash + Ord + Debug + Send + Sync + 'static,
529    D: PlumtreeDelegate<I>,
530{
531    /// Create a new Plumtree instance.
532    ///
533    /// # Arguments
534    ///
535    /// - `local_id`: This node's identifier
536    /// - `config`: Plumtree configuration
537    /// - `delegate`: Event handler
538    pub fn new(local_id: I, config: PlumtreeConfig, delegate: D) -> (Self, PlumtreeHandle<I>) {
539        let (outgoing_tx, outgoing_rx) = async_channel::bounded(1024);
540        let (incoming_tx, incoming_rx) = async_channel::bounded(1024);
541
542        // Rate limiter: allow 10 Graft requests per peer per second, burst of 20
543        let graft_rate_limiter = RateLimiter::new(
544            config.graft_rate_limit_burst,
545            config.graft_rate_limit_per_second,
546        );
547
548        // Create peer state with or without hash ring topology
549        // Always set local_id for diverse peer selection even without hash ring
550        let peers = if config.use_hash_ring {
551            Arc::new(
552                PeerStateBuilder::new()
553                    .use_hash_ring(true)
554                    .with_local_id(local_id.clone())
555                    .build(),
556            )
557        } else {
558            Arc::new(
559                PeerStateBuilder::new()
560                    .use_hash_ring(false)
561                    .with_local_id(local_id.clone())
562                    .build(),
563            )
564        };
565
566        let inner = Arc::new(PlumtreeInner {
567            peers,
568            cache: Arc::new(MessageCache::new(
569                config.message_cache_ttl,
570                config.message_cache_max_size,
571            )),
572            scheduler: Arc::new(IHaveScheduler::new(
573                config.ihave_interval,
574                config.ihave_batch_size,
575                10000,
576            )),
577            graft_timer: Arc::new(GraftTimer::new(config.graft_timeout)),
578            graft_rate_limiter,
579            peer_scoring: Arc::new(PeerScoring::new(ScoringConfig::default())),
580            cleanup_tuner: Arc::new(CleanupTuner::new(CleanupConfig::default())),
581            adaptive_batcher: Arc::new(AdaptiveBatcher::new(BatcherConfig::default())),
582            delegate,
583            config,
584            local_id,
585            round: AtomicU32::new(0),
586            shutdown: AtomicBool::new(false),
587            outgoing_tx,
588            incoming_tx,
589            seen: ShardedSeenMap::new(),
590        });
591
592        let plumtree = Self {
593            inner: inner.clone(),
594        };
595
596        let handle = PlumtreeHandle {
597            outgoing_rx,
598            incoming_rx,
599            incoming_tx: plumtree.inner.incoming_tx.clone(),
600        };
601
602        (plumtree, handle)
603    }
604
605    /// Get the local node ID.
606    pub fn local_id(&self) -> &I {
607        &self.inner.local_id
608    }
609
610    /// Get the configuration.
611    pub fn config(&self) -> &PlumtreeConfig {
612        &self.inner.config
613    }
614
615    /// Get peer statistics.
616    pub fn peer_stats(&self) -> crate::peer_state::PeerStats {
617        self.inner.peers.stats()
618    }
619
620    /// Get cache statistics.
621    pub fn cache_stats(&self) -> crate::message::CacheStats {
622        self.inner.cache.stats()
623    }
624
625    /// Get seen map statistics (deduplication map).
626    ///
627    /// Returns None if the statistics cannot be computed without blocking.
628    pub fn seen_map_stats(&self) -> Option<SeenMapStats> {
629        let size = self.inner.seen.try_len()?;
630        let capacity = self.inner.seen.total_capacity();
631        let utilization = if capacity > 0 {
632            size as f64 / capacity as f64
633        } else {
634            0.0
635        };
636        Some(SeenMapStats {
637            size,
638            capacity,
639            utilization,
640            shard_count: self.inner.seen.shard_count(),
641            max_per_shard: self.inner.seen.max_capacity_per_shard(),
642        })
643    }
644
645    /// Get peer scoring statistics.
646    pub fn scoring_stats(&self) -> crate::peer_scoring::ScoringStats {
647        self.inner.peer_scoring.stats()
648    }
649
650    /// Get cleanup tuner statistics.
651    pub fn cleanup_stats(&self) -> crate::cleanup_tuner::CleanupStats {
652        self.inner.cleanup_tuner.stats()
653    }
654
655    /// Get adaptive batcher statistics.
656    pub fn batcher_stats(&self) -> crate::adaptive_batcher::BatcherStats {
657        self.inner.adaptive_batcher.stats()
658    }
659
660    /// Record an RTT sample for a peer.
661    ///
662    /// Call this when receiving a response from a peer to update their score.
663    /// Lower RTT results in higher peer scores.
664    pub fn record_peer_rtt(&self, peer: &I, rtt: std::time::Duration) {
665        self.inner.peer_scoring.record_rtt(peer, rtt);
666    }
667
668    /// Record a failure for a peer (e.g., timeout, connection error).
669    ///
670    /// This penalizes the peer's score, making them less likely to be
671    /// selected for eager promotion.
672    pub fn record_peer_failure(&self, peer: &I) {
673        self.inner.peer_scoring.record_failure(peer);
674    }
675
676    /// Get the score for a specific peer.
677    pub fn get_peer_score(&self, peer: &I) -> Option<crate::peer_scoring::PeerScore> {
678        self.inner.peer_scoring.get_score(peer)
679    }
680
681    /// Get the best peers by score for potential eager promotion.
682    pub fn best_peers_for_eager(&self, count: usize) -> Vec<I> {
683        self.inner.peer_scoring.best_peers(count)
684    }
685
686    /// Get a health report for the protocol.
687    ///
688    /// The health report provides information about peer connectivity,
689    /// message delivery status, and cache utilization.
690    ///
691    /// # Example
692    ///
693    /// ```ignore
694    /// let health = plumtree.health();
695    /// if health.status.is_unhealthy() {
696    ///     eprintln!("Warning: {}", health.message);
697    /// }
698    /// ```
699    pub fn health(&self) -> HealthReport {
700        let peer_stats = self.inner.peers.stats();
701        let cache_stats = self.inner.cache.stats();
702        let pending_grafts = self.inner.graft_timer.pending_count();
703
704        // For now, we don't track successful/failed grafts in the struct
705        // This would need additional atomic counters to track properly
706        // For the initial implementation, we'll use 0/0 which results in 0% failure rate
707        let successful_grafts = 0_u64;
708        let failed_grafts = 0_u64;
709
710        let total_peers = peer_stats.eager_count + peer_stats.lazy_count;
711
712        HealthReportBuilder::new()
713            .peers(
714                total_peers,
715                peer_stats.eager_count,
716                peer_stats.lazy_count,
717                self.inner.config.eager_fanout,
718            )
719            .grafts(pending_grafts, successful_grafts, failed_grafts)
720            .cache(
721                cache_stats.entries,
722                self.inner.config.message_cache_max_size,
723                self.inner.config.message_cache_ttl,
724            )
725            .shutdown(self.inner.shutdown.load(Ordering::Acquire))
726            .build()
727    }
728
729    /// Add a peer with automatic classification.
730    ///
731    /// The peer is automatically classified as eager or lazy based on
732    /// the current state and configuration:
733    /// - If `max_peers` is set and the limit is reached, the peer is rejected
734    /// - If the eager set is below `eager_fanout`, the peer joins as eager
735    /// - Otherwise, the peer joins as lazy
736    ///
737    /// Returns the result of the add operation.
738    pub fn add_peer(&self, peer: I) -> crate::peer_state::AddPeerResult {
739        use crate::peer_state::AddPeerResult;
740
741        if peer == self.inner.local_id {
742            return AddPeerResult::AlreadyExists;
743        }
744
745        self.inner.peers.add_peer_auto(
746            peer,
747            self.inner.config.max_peers,
748            self.inner.config.eager_fanout,
749        )
750    }
751
752    /// Add a peer to the lazy set only (traditional behavior).
753    ///
754    /// New peers always start as lazy. They are promoted to eager
755    /// via the Graft mechanism if needed.
756    ///
757    /// This bypasses the `max_peers` limit check and auto-classification.
758    pub fn add_peer_lazy(&self, peer: I) {
759        if peer != self.inner.local_id {
760            self.inner.peers.add_peer(peer);
761        }
762    }
763
764    /// Remove a peer.
765    ///
766    /// Removes the peer from both the topology state (eager/lazy sets) and
767    /// the scoring state (RTT, failure counts) to prevent memory leaks.
768    pub fn remove_peer(&self, peer: &I) {
769        self.inner.peers.remove_peer(peer);
770        // Clean up scoring data to free memory for departed peers
771        self.inner.peer_scoring.remove_peer(peer);
772    }
773
774    /// Broadcast a message to all nodes.
775    ///
776    /// The message is sent immediately to eager peers and queued
777    /// as IHave announcements for lazy peers.
778    ///
779    /// Returns the unique message ID assigned to this broadcast.
780    #[instrument(skip(self, payload), fields(payload_size))]
781    pub async fn broadcast(&self, payload: impl Into<Bytes>) -> Result<MessageId> {
782        let payload = payload.into();
783        let payload_size = payload.len();
784        tracing::Span::current().record("payload_size", payload_size);
785
786        // Check size limit
787        if payload_size > self.inner.config.max_message_size {
788            warn!(
789                payload_size,
790                max_size = self.inner.config.max_message_size,
791                "message too large"
792            );
793            return Err(Error::MessageTooLarge {
794                size: payload_size,
795                max_size: self.inner.config.max_message_size,
796            });
797        }
798
799        let msg_id = MessageId::new();
800        let round = self.inner.round.fetch_add(1, Ordering::Relaxed);
801
802        debug!(
803            message_id = %msg_id,
804            round,
805            payload_size,
806            "broadcasting new message"
807        );
808
809        // Cache the message
810        self.inner.cache.insert(msg_id, payload.clone());
811
812        // Mark as seen (no parent since we originated this message)
813        self.inner
814            .seen
815            .insert(
816                msg_id,
817                SeenEntry {
818                    round,
819                    receive_count: 1,
820                    seen_at: std::time::Instant::now(),
821                    parent: None, // We originated this message
822                },
823            )
824            .await;
825
826        // Send to eager peers with backpressure handling
827        let eager_peers = self.inner.peers.eager_peers();
828        let eager_count = eager_peers.len();
829        let mut dropped_count = 0;
830
831        for peer in eager_peers {
832            let msg = PlumtreeMessage::Gossip {
833                id: msg_id,
834                round,
835                payload: payload.clone(),
836            };
837            // Use try_send to avoid blocking under backpressure
838            if let Err(_e) = self
839                .inner
840                .outgoing_tx
841                .try_send(OutgoingMessage::unicast(peer, msg))
842            {
843                dropped_count += 1;
844                trace!(
845                    message_id = %msg_id,
846                    "outgoing queue full, message to eager peer dropped"
847                );
848            }
849        }
850
851        // Queue IHave for lazy peers
852        self.inner.scheduler.queue().push(msg_id, round);
853
854        // Check for backpressure condition
855        if dropped_count > 0 {
856            warn!(
857                message_id = %msg_id,
858                dropped = dropped_count,
859                total_eager = eager_count,
860                "backpressure: some eager push messages were dropped"
861            );
862            // Still return the message ID since it was cached and IHave was queued.
863            // The message can still reach peers via lazy push (IHave/Graft).
864            // But warn the caller about backpressure so they can throttle.
865            if dropped_count == eager_count {
866                // All messages dropped - this is severe backpressure
867                return Err(Error::QueueFull {
868                    dropped: dropped_count,
869                    capacity: self.inner.outgoing_tx.capacity().unwrap_or(1024),
870                });
871            }
872        }
873
874        trace!(
875            message_id = %msg_id,
876            eager_peers = eager_count,
877            sent = eager_count - dropped_count,
878            "broadcast complete, IHave queued for lazy peers"
879        );
880
881        Ok(msg_id)
882    }
883
884    /// Handle an incoming Plumtree message.
885    ///
886    /// This should be called when a message is received from the network.
887    pub async fn handle_message(&self, from: I, message: PlumtreeMessage) -> Result<()> {
888        if self.inner.shutdown.load(Ordering::Acquire) {
889            return Err(Error::Shutdown);
890        }
891
892        let msg_type = message.type_name();
893        let span = debug_span!("handle_message", msg_type, ?from);
894
895        async {
896            match message {
897                PlumtreeMessage::Gossip { id, round, payload } => {
898                    self.handle_gossip(from, id, round, payload).await
899                }
900                PlumtreeMessage::IHave { message_ids, round } => {
901                    self.handle_ihave(from, message_ids, round).await
902                }
903                PlumtreeMessage::Graft { message_id, round } => {
904                    self.handle_graft(from, message_id, round).await
905                }
906                PlumtreeMessage::Prune => self.handle_prune(from).await,
907            }
908        }
909        .instrument(span)
910        .await
911    }
912
913    /// Handle a Gossip message (eager push).
914    async fn handle_gossip(
915        &self,
916        from: I,
917        msg_id: MessageId,
918        round: u32,
919        payload: Bytes,
920    ) -> Result<()> {
921        let payload_size = payload.len();
922        trace!(
923            message_id = %msg_id,
924            round,
925            payload_size,
926            "received gossip"
927        );
928
929        // Record message for cleanup tuner's message rate tracking
930        self.inner.cleanup_tuner.record_message();
931        // Record message for adaptive batcher's throughput tracking
932        self.inner.adaptive_batcher.record_message();
933
934        // Cancel any pending Graft timer for this message
935        // Returns true if a graft was actually sent and satisfied
936        if self.inner.graft_timer.message_received(&msg_id) {
937            // Record graft success for adaptive batcher to adjust batch sizes
938            self.inner.adaptive_batcher.record_graft_received();
939        }
940
941        // Check if already seen and track parent atomically (single shard lock)
942        let (is_duplicate, receive_count, evicted) = self
943            .inner
944            .seen
945            .check_and_mark_seen(msg_id, || SeenEntry {
946                round,
947                receive_count: 1,
948                seen_at: std::time::Instant::now(),
949                parent: Some(from.clone()), // Track who delivered this message
950            })
951            .await;
952
953        if evicted > 0 {
954            debug!(
955                message_id = %msg_id,
956                evicted,
957                "emergency eviction in seen map due to capacity"
958            );
959            #[cfg(feature = "metrics")]
960            crate::metrics::record_seen_map_evictions(evicted);
961        }
962
963        if is_duplicate {
964            trace!(
965                message_id = %msg_id,
966                receive_count,
967                "duplicate gossip received"
968            );
969            // Optimization: prune redundant path after threshold
970            if receive_count > self.inner.config.optimization_threshold {
971                // Only send Prune if the peer is in the eager set
972                // Pruning a peer that is already lazy is redundant traffic
973                if self.inner.peers.is_eager(&from) {
974                    debug!(
975                        message_id = %msg_id,
976                        receive_count,
977                        threshold = self.inner.config.optimization_threshold,
978                        "pruning redundant path"
979                    );
980                    // Send Prune unicast to the duplicate sender
981                    let _ = self
982                        .inner
983                        .outgoing_tx
984                        .send(OutgoingMessage::unicast(
985                            from.clone(),
986                            PlumtreeMessage::Prune,
987                        ))
988                        .await;
989                    self.inner.delegate.on_prune_sent(&from);
990
991                    // Demote to lazy
992                    if self.inner.peers.demote_to_lazy(&from) {
993                        self.inner.delegate.on_lazy_demotion(&from);
994                    }
995                }
996            }
997            return Ok(());
998        }
999
1000        // First time seeing this message (parent already tracked above)
1001        debug!(
1002            message_id = %msg_id,
1003            round,
1004            payload_size,
1005            "delivering new message"
1006        );
1007
1008        // Cache for potential Graft requests
1009        self.inner.cache.insert(msg_id, payload.clone());
1010
1011        // Deliver to application
1012        self.inner.delegate.on_deliver(msg_id, payload.clone());
1013
1014        // Forward to eager peers (except sender) via unicast
1015        let eager_peers = self.inner.peers.random_eager_except(&from, usize::MAX);
1016        let forward_count = eager_peers.len();
1017        let mut dropped = 0;
1018
1019        for peer in eager_peers {
1020            let msg = PlumtreeMessage::Gossip {
1021                id: msg_id,
1022                round: round + 1,
1023                payload: payload.clone(),
1024            };
1025            // Use try_send to avoid blocking under backpressure
1026            if self
1027                .inner
1028                .outgoing_tx
1029                .try_send(OutgoingMessage::unicast(peer, msg))
1030                .is_err()
1031            {
1032                dropped += 1;
1033            }
1034        }
1035
1036        // Queue IHave for lazy peers (except sender)
1037        self.inner.scheduler.queue().push(msg_id, round + 1);
1038
1039        if dropped > 0 {
1040            debug!(
1041                message_id = %msg_id,
1042                dropped,
1043                total = forward_count,
1044                "backpressure: some gossip forwards dropped"
1045            );
1046        }
1047
1048        trace!(
1049            message_id = %msg_id,
1050            forward_count,
1051            sent = forward_count - dropped,
1052            "forwarded gossip to eager peers"
1053        );
1054
1055        Ok(())
1056    }
1057
1058    /// Handle an IHave message (lazy push announcement).
1059    async fn handle_ihave(
1060        &self,
1061        from: I,
1062        message_ids: SmallVec<[MessageId; 8]>,
1063        round: u32,
1064    ) -> Result<()> {
1065        let count = message_ids.len();
1066        trace!(count, round, "received IHave batch");
1067
1068        let mut missing_count = 0;
1069        for msg_id in message_ids {
1070            // Check if we already have this message (only locks one shard)
1071            let have_message = self.inner.seen.contains(&msg_id).await;
1072
1073            if !have_message {
1074                missing_count += 1;
1075                trace!(message_id = %msg_id, "missing message, sending Graft");
1076
1077                // We don't have this message - start Graft timer
1078                // Get alternative peers to try if the primary fails
1079                let alternatives: Vec<I> = self.inner.peers.random_eager_except(&from, 2);
1080
1081                self.inner.graft_timer.expect_message_with_alternatives(
1082                    msg_id,
1083                    from.clone(),
1084                    alternatives,
1085                    round,
1086                );
1087
1088                // Note: We do NOT demote any existing parent here. This avoids a race condition
1089                // where we might demote a peer that is about to (or just did) deliver the message.
1090                //
1091                // Example race without this fix:
1092                // 1. IHave(msg) arrives from C, we check seen -> not seen
1093                // 2. Meanwhile Gossip(msg) from B arrives and sets message_parents[msg] = B
1094                // 3. We demote B even though B just delivered successfully!
1095                //
1096                // Instead, tree optimization happens naturally via handle_gossip:
1097                // - If we receive duplicate Gossip, we send Prune to the redundant sender
1098                // - This is the correct place to demote, as it's based on actual delivery
1099
1100                // Promote sender to eager to get this and future messages
1101                if self.inner.peers.promote_to_eager(&from) {
1102                    debug!("promoted peer to eager after IHave");
1103                    self.inner.delegate.on_eager_promotion(&from);
1104                }
1105
1106                // Send Graft to get the missing message
1107                let _ = self
1108                    .inner
1109                    .outgoing_tx
1110                    .send(OutgoingMessage::unicast(
1111                        from.clone(),
1112                        PlumtreeMessage::Graft {
1113                            message_id: msg_id,
1114                            round,
1115                        },
1116                    ))
1117                    .await;
1118
1119                self.inner.delegate.on_graft_sent(&from, &msg_id);
1120            }
1121        }
1122
1123        if missing_count > 0 {
1124            debug!(
1125                total = count,
1126                missing = missing_count,
1127                "IHave processing complete"
1128            );
1129        }
1130
1131        Ok(())
1132    }
1133
1134    /// Handle a Graft message (request to establish eager link).
1135    async fn handle_graft(&self, from: I, msg_id: MessageId, round: u32) -> Result<()> {
1136        trace!(message_id = %msg_id, round, "received Graft request");
1137
1138        // Rate limit Graft requests per peer
1139        if !self.inner.graft_rate_limiter.check(&from) {
1140            warn!(message_id = %msg_id, "rate limiting Graft request");
1141            return Ok(());
1142        }
1143
1144        // Promote requester to eager
1145        if self.inner.peers.promote_to_eager(&from) {
1146            debug!("promoted peer to eager after Graft request");
1147            self.inner.delegate.on_eager_promotion(&from);
1148        }
1149
1150        // Send the requested message unicast to the requester
1151        if let Some(payload) = self.inner.cache.get(&msg_id) {
1152            debug!(
1153                message_id = %msg_id,
1154                payload_size = payload.len(),
1155                "responding to Graft with cached message"
1156            );
1157            let msg = PlumtreeMessage::Gossip {
1158                id: msg_id,
1159                round,
1160                payload: (*payload).clone(),
1161            };
1162            let _ = self
1163                .inner
1164                .outgoing_tx
1165                .send(OutgoingMessage::unicast(from, msg))
1166                .await;
1167        } else {
1168            debug!(message_id = %msg_id, "Graft request for unknown message");
1169        }
1170
1171        Ok(())
1172    }
1173
1174    /// Handle a Prune message (demote to lazy).
1175    async fn handle_prune(&self, from: I) -> Result<()> {
1176        trace!("received Prune request");
1177        if self.inner.peers.demote_to_lazy(&from) {
1178            debug!("demoted peer to lazy after Prune");
1179            self.inner.delegate.on_lazy_demotion(&from);
1180        }
1181        Ok(())
1182    }
1183
1184    /// Run the IHave scheduler background task.
1185    ///
1186    /// This should be spawned as a background task.
1187    /// Uses a "linger" strategy: flushes immediately if batch is full,
1188    /// otherwise waits for the configured interval.
1189    ///
1190    /// Uses the AdaptiveBatcher for dynamic batch size adjustment based on:
1191    /// - Network latency (smaller batches for low-latency)
1192    /// - Graft success rate (reduce batches if many Grafts fail)
1193    /// - Message throughput (larger batches under high load)
1194    #[instrument(skip(self), name = "ihave_scheduler")]
1195    pub async fn run_ihave_scheduler(&self) {
1196        info!("IHave scheduler started with adaptive batching");
1197        let ihave_interval = self.inner.config.ihave_interval;
1198        // Check for early flush more frequently (every 10ms)
1199        let check_interval = std::time::Duration::from_millis(10);
1200        let mut last_flush = std::time::Instant::now();
1201        let mut last_batch_size_update = std::time::Instant::now();
1202        let batch_size_update_interval = std::time::Duration::from_secs(5);
1203
1204        loop {
1205            if self.inner.shutdown.load(Ordering::Acquire) {
1206                info!("IHave scheduler shutting down");
1207                break;
1208            }
1209
1210            // Periodically update batch size from adaptive batcher
1211            if last_batch_size_update.elapsed() >= batch_size_update_interval {
1212                let recommended_size = self.inner.adaptive_batcher.recommended_batch_size();
1213                self.inner
1214                    .scheduler
1215                    .queue()
1216                    .set_flush_threshold(recommended_size);
1217                trace!(
1218                    batch_size = recommended_size,
1219                    "updated IHave batch size from adaptive batcher"
1220                );
1221                last_batch_size_update = std::time::Instant::now();
1222            }
1223
1224            // Check if we should flush early (queue reached batch size)
1225            let should_flush = self.inner.scheduler.queue().should_flush();
1226            let interval_elapsed = last_flush.elapsed() >= ihave_interval;
1227
1228            if should_flush || interval_elapsed {
1229                // Flush: either queue is full or interval elapsed
1230                self.flush_ihave_batch().await;
1231                last_flush = std::time::Instant::now();
1232            } else {
1233                // Wait for a short check period before checking again
1234                Delay::new(check_interval).await;
1235            }
1236        }
1237    }
1238
1239    /// Flush pending IHave messages to lazy peers.
1240    async fn flush_ihave_batch(&self) {
1241        // Get batch of IHaves to send
1242        let batch: SmallVec<[PendingIHave; 16]> = self.inner.scheduler.pop_batch();
1243
1244        if batch.is_empty() {
1245            return;
1246        }
1247
1248        // Collect message IDs
1249        let message_ids: SmallVec<[MessageId; 8]> = batch.iter().map(|p| p.message_id).collect();
1250        let round = batch.iter().map(|p| p.round).max().unwrap_or(0);
1251
1252        // Get lazy peers to send to
1253        let lazy_peers = self
1254            .inner
1255            .peers
1256            .random_lazy_except(&self.inner.local_id, self.inner.config.lazy_fanout);
1257
1258        let peer_count = lazy_peers.len();
1259        let batch_size = message_ids.len();
1260
1261        trace!(
1262            batch_size,
1263            peer_count,
1264            round,
1265            "flushing IHave batch to lazy peers"
1266        );
1267
1268        // Send IHave to each lazy peer with backpressure handling
1269        let mut ihave_dropped = 0;
1270        for peer in lazy_peers {
1271            let msg = PlumtreeMessage::IHave {
1272                message_ids: message_ids.clone(),
1273                round,
1274            };
1275            // Use try_send to avoid blocking under backpressure
1276            if self
1277                .inner
1278                .outgoing_tx
1279                .try_send(OutgoingMessage::unicast(peer, msg))
1280                .is_err()
1281            {
1282                ihave_dropped += 1;
1283            }
1284        }
1285
1286        if ihave_dropped > 0 {
1287            debug!(
1288                batch_size,
1289                dropped = ihave_dropped,
1290                total = peer_count,
1291                "backpressure: some IHave messages dropped"
1292            );
1293        }
1294
1295        // Record IHaves sent for adaptive batcher feedback
1296        let ihaves_sent = (peer_count - ihave_dropped) * batch_size;
1297        self.inner.adaptive_batcher.record_ihave_sent(ihaves_sent);
1298    }
1299
1300    /// Run the Graft timer checker background task.
1301    ///
1302    /// This should be spawned as a background task.
1303    #[instrument(skip(self), name = "graft_timer")]
1304    pub async fn run_graft_timer(&self) {
1305        info!("Graft timer started");
1306        let check_interval = self.inner.config.graft_timeout / 2;
1307        let mut interval = Delay::new(check_interval);
1308
1309        loop {
1310            if self.inner.shutdown.load(Ordering::Acquire) {
1311                info!("Graft timer shutting down");
1312                break;
1313            }
1314
1315            // Wait for interval
1316            (&mut interval).await;
1317            interval.reset(check_interval);
1318
1319            // Check for expired Graft timers and failures
1320            let (expired, failed) = self.inner.graft_timer.get_expired_with_failures();
1321
1322            // Handle failed grafts (zombie peer detection)
1323            for failed_graft in &failed {
1324                warn!(
1325                    message_id = %failed_graft.message_id,
1326                    retries = failed_graft.total_retries,
1327                    peer = ?failed_graft.original_peer,
1328                    "Graft failed after max retries - penalizing peer"
1329                );
1330                // Record failure in peer scoring to penalize unresponsive peers
1331                self.inner
1332                    .peer_scoring
1333                    .record_failure(&failed_graft.original_peer);
1334                // Record failure for adaptive batcher to adjust batch sizes
1335                self.inner.adaptive_batcher.record_graft_timeout();
1336                self.inner
1337                    .delegate
1338                    .on_graft_failed(&failed_graft.message_id, &failed_graft.original_peer);
1339            }
1340
1341            for expired_graft in expired {
1342                // Send Graft to the peer determined by the timer (primary or alternative)
1343                debug!(
1344                    message_id = %expired_graft.message_id,
1345                    attempt = expired_graft.retry_count,
1346                    is_retry = expired_graft.retry_count > 0,
1347                    "sending Graft request"
1348                );
1349
1350                let _ = self
1351                    .inner
1352                    .outgoing_tx
1353                    .send(OutgoingMessage::unicast(
1354                        expired_graft.peer.clone(),
1355                        PlumtreeMessage::Graft {
1356                            message_id: expired_graft.message_id,
1357                            round: expired_graft.round,
1358                        },
1359                    ))
1360                    .await;
1361
1362                self.inner
1363                    .delegate
1364                    .on_graft_sent(&expired_graft.peer, &expired_graft.message_id);
1365            }
1366        }
1367    }
1368
1369    /// Run the seen map cleanup background task.
1370    ///
1371    /// This removes old entries from the deduplication map to prevent
1372    /// unbounded memory growth. Entries older than message_cache_ttl are removed.
1373    ///
1374    /// Uses the CleanupTuner for dynamic cleanup intervals based on:
1375    /// - Cache utilization (more aggressive when near capacity)
1376    /// - Message rate (more conservative under high load)
1377    /// - Cleanup duration (adjusts batch size for responsiveness)
1378    ///
1379    /// This should be spawned as a background task.
1380    #[instrument(skip(self), name = "seen_cleanup")]
1381    pub async fn run_seen_cleanup(&self) {
1382        info!("seen map cleanup started with dynamic tuning");
1383        let ttl = self.inner.config.message_cache_ttl;
1384        let tuner = &self.inner.cleanup_tuner;
1385
1386        // Start with initial interval from tuner
1387        let initial_params = tuner.get_parameters(0.0, ttl);
1388        let mut interval = Delay::new(initial_params.interval);
1389        let mut rate_window_reset = std::time::Instant::now();
1390
1391        loop {
1392            if self.inner.shutdown.load(Ordering::Acquire) {
1393                info!("seen map cleanup shutting down");
1394                break;
1395            }
1396
1397            // Wait for dynamically-tuned interval
1398            (&mut interval).await;
1399
1400            // Get current utilization for tuning
1401            let utilization = self.seen_map_stats().map(|s| s.utilization).unwrap_or(0.0);
1402
1403            // Get tuned parameters based on current state
1404            let params = tuner.get_parameters(utilization, ttl);
1405
1406            trace!(
1407                interval_ms = params.interval.as_millis(),
1408                batch_size = params.batch_size,
1409                aggressive = params.aggressive,
1410                utilization = format!("{:.2}", params.utilization),
1411                message_rate = format!("{:.1}", params.message_rate),
1412                "cleanup tuner parameters"
1413            );
1414
1415            // Reset for next interval using tuned duration
1416            interval.reset(params.interval);
1417
1418            // Clean up expired entries from seen map, one shard at a time
1419            let cleanup_start = std::time::Instant::now();
1420            let now = cleanup_start;
1421            let mut total_removed = 0;
1422            let shard_count = self.inner.seen.shard_count();
1423
1424            for shard_idx in 0..shard_count {
1425                // Check for shutdown between shards
1426                if self.inner.shutdown.load(Ordering::Acquire) {
1427                    break;
1428                }
1429
1430                // Clean up one shard (only locks that shard)
1431                let removed = self.inner.seen.cleanup_shard(shard_idx, now, ttl).await;
1432                total_removed += removed;
1433
1434                // Brief yield to allow other tasks to make progress
1435                Delay::new(std::time::Duration::from_micros(1)).await;
1436            }
1437
1438            // Record cleanup metrics for tuner feedback
1439            let cleanup_duration = cleanup_start.elapsed();
1440            tuner.record_cleanup(cleanup_duration, total_removed, &params);
1441
1442            if total_removed > 0 {
1443                debug!(
1444                    removed = total_removed,
1445                    shards = shard_count,
1446                    duration_ms = cleanup_duration.as_millis(),
1447                    aggressive = params.aggressive,
1448                    "cleaned up expired entries from seen map"
1449                );
1450            }
1451
1452            // Periodically reset the rate window for responsive tuning
1453            if rate_window_reset.elapsed() >= tuner.config().rate_window {
1454                tuner.reset_rate_window();
1455                rate_window_reset = std::time::Instant::now();
1456            }
1457
1458            // Update seen map size metric after cleanup
1459            #[cfg(feature = "metrics")]
1460            if let Some(stats) = self.seen_map_stats() {
1461                crate::metrics::set_seen_map_size(stats.size);
1462            }
1463        }
1464    }
1465
1466    /// Run the periodic topology maintenance loop.
1467    ///
1468    /// This background task periodically checks if the eager peer count has
1469    /// dropped below `eager_fanout` and promotes lazy peers to restore the
1470    /// spanning tree. This is critical for automatic recovery from node failures.
1471    ///
1472    /// The loop includes random jitter to prevent "thundering herd" effects
1473    /// when multiple nodes detect topology degradation simultaneously.
1474    ///
1475    /// This should be spawned as a background task.
1476    #[instrument(skip(self), name = "maintenance_loop")]
1477    pub async fn run_maintenance_loop(&self)
1478    where
1479        I: Clone + Eq + std::hash::Hash + std::fmt::Debug,
1480    {
1481        use rand::Rng;
1482
1483        let interval = self.inner.config.maintenance_interval;
1484        let jitter = self.inner.config.maintenance_jitter;
1485
1486        // Skip if maintenance is disabled
1487        if interval.is_zero() {
1488            info!("maintenance loop disabled (interval=0)");
1489            return;
1490        }
1491
1492        info!(
1493            interval_ms = interval.as_millis(),
1494            jitter_ms = jitter.as_millis(),
1495            "topology maintenance loop started"
1496        );
1497
1498        loop {
1499            if self.inner.shutdown.load(Ordering::Acquire) {
1500                info!("maintenance loop shutting down");
1501                break;
1502            }
1503
1504            // Add random jitter to prevent thundering herd
1505            let jitter_duration = if !jitter.is_zero() {
1506                let jitter_ms = rand::rng().random_range(0..jitter.as_millis() as u64);
1507                std::time::Duration::from_millis(jitter_ms)
1508            } else {
1509                std::time::Duration::ZERO
1510            };
1511
1512            // Wait for interval + jitter
1513            Delay::new(interval + jitter_duration).await;
1514
1515            // Check for shutdown after waking
1516            if self.inner.shutdown.load(Ordering::Acquire) {
1517                break;
1518            }
1519
1520            // Check if repair is needed
1521            let target_eager = self.inner.config.eager_fanout;
1522            if self.inner.peers.needs_repair(target_eager) {
1523                let stats_before = self.inner.peers.stats();
1524
1525                // Use network-aware rebalancing with PeerScoring
1526                // This prefers peers with lower RTT and higher reliability for eager set
1527                let peer_scoring = &self.inner.peer_scoring;
1528                let scorer = |peer: &I| peer_scoring.normalized_score(peer, 0.5);
1529
1530                // Try non-blocking rebalance first to avoid contention
1531                if self
1532                    .inner
1533                    .peers
1534                    .try_rebalance_with_scorer(target_eager, scorer)
1535                {
1536                    let stats_after = self.inner.peers.stats();
1537                    let promoted = stats_after
1538                        .eager_count
1539                        .saturating_sub(stats_before.eager_count);
1540
1541                    if promoted > 0 {
1542                        info!(
1543                            promoted = promoted,
1544                            eager_before = stats_before.eager_count,
1545                            eager_after = stats_after.eager_count,
1546                            lazy_count = stats_after.lazy_count,
1547                            target = target_eager,
1548                            "topology repair: promoted lazy peers to eager (network-aware)"
1549                        );
1550
1551                        #[cfg(feature = "metrics")]
1552                        for _ in 0..promoted {
1553                            crate::metrics::record_peer_promotion();
1554                        }
1555                    }
1556                } else {
1557                    // Lock was contended, will retry next cycle
1558                    trace!("maintenance: lock contended, will retry");
1559                }
1560            }
1561
1562            // Update metrics
1563            #[cfg(feature = "metrics")]
1564            {
1565                let stats = self.inner.peers.stats();
1566                crate::metrics::set_eager_peers(stats.eager_count);
1567                crate::metrics::set_lazy_peers(stats.lazy_count);
1568                crate::metrics::set_total_peers(stats.eager_count + stats.lazy_count);
1569            }
1570        }
1571    }
1572
1573    /// Shutdown the Plumtree instance.
1574    pub fn shutdown(&self) {
1575        self.inner.shutdown.store(true, Ordering::Release);
1576        self.inner.scheduler.shutdown();
1577        self.inner.outgoing_tx.close();
1578        self.inner.incoming_tx.close();
1579    }
1580
1581    /// Check if shutdown has been requested.
1582    pub fn is_shutdown(&self) -> bool {
1583        self.inner.shutdown.load(Ordering::Acquire)
1584    }
1585
1586    /// Rebalance peers to match target fanout using network-aware scoring.
1587    ///
1588    /// Uses hybrid scoring that combines:
1589    /// - Topology proximity (hash ring distance)
1590    /// - Network performance (RTT and reliability from PeerScoring)
1591    ///
1592    /// Ring neighbors are protected and will not be demoted.
1593    pub fn rebalance_peers(&self) {
1594        let peer_scoring = &self.inner.peer_scoring;
1595        self.inner
1596            .peers
1597            .rebalance_with_scorer(self.inner.config.eager_fanout, |peer| {
1598                peer_scoring.normalized_score(peer, 0.5)
1599            });
1600    }
1601
1602    /// Get access to the peer state for testing/debugging.
1603    pub fn peers(&self) -> &SharedPeerState<I> {
1604        &self.inner.peers
1605    }
1606}
1607
1608impl<I, D> Clone for Plumtree<I, D> {
1609    fn clone(&self) -> Self {
1610        Self {
1611            inner: self.inner.clone(),
1612        }
1613    }
1614}
1615
1616/// Handle for interacting with Plumtree from the network layer.
1617///
1618/// Provides channels for sending and receiving messages.
1619pub struct PlumtreeHandle<I> {
1620    /// Channel for receiving outgoing messages to send.
1621    outgoing_rx: Receiver<OutgoingMessage<I>>,
1622    /// Channel for receiving incoming messages (for internal processing).
1623    incoming_rx: Receiver<IncomingMessage<I>>,
1624    /// Channel for submitting incoming messages.
1625    incoming_tx: Sender<IncomingMessage<I>>,
1626}
1627
1628impl<I> PlumtreeHandle<I> {
1629    /// Get the next outgoing message to send.
1630    pub async fn next_outgoing(&self) -> Option<OutgoingMessage<I>> {
1631        self.outgoing_rx.recv().await.ok()
1632    }
1633
1634    /// Try to get the next outgoing message without blocking.
1635    ///
1636    /// Returns `Some(message)` if a message is available, `None` otherwise.
1637    /// This is useful for non-blocking polling in tests and simulations.
1638    pub fn try_next_outgoing(&self) -> Option<OutgoingMessage<I>> {
1639        self.outgoing_rx.try_recv().ok()
1640    }
1641
1642    /// Submit an incoming message for processing.
1643    pub async fn submit_incoming(&self, from: I, message: PlumtreeMessage) -> Result<()> {
1644        self.incoming_tx
1645            .send(IncomingMessage { from, message })
1646            .await
1647            .map_err(|e| Error::Channel(e.to_string()))
1648    }
1649
1650    /// Get a stream of outgoing messages.
1651    pub fn outgoing_stream(&self) -> impl futures::Stream<Item = OutgoingMessage<I>> + '_ {
1652        self.outgoing_rx.clone()
1653    }
1654
1655    /// Get a stream of incoming messages for processing.
1656    ///
1657    /// Use this to receive messages that were submitted via `submit_incoming`.
1658    /// This is useful for custom message processing pipelines.
1659    pub fn incoming_stream(&self) -> impl futures::Stream<Item = IncomingMessage<I>> + '_ {
1660        self.incoming_rx.clone()
1661    }
1662
1663    /// Get the next incoming message for processing.
1664    ///
1665    /// Use this to receive messages that were submitted via `submit_incoming`.
1666    pub async fn next_incoming(&self) -> Option<IncomingMessage<I>> {
1667        self.incoming_rx.recv().await.ok()
1668    }
1669
1670    /// Check if the handle is closed.
1671    pub fn is_closed(&self) -> bool {
1672        self.outgoing_rx.is_closed()
1673    }
1674}
1675
1676#[cfg(test)]
1677mod tests {
1678    use super::*;
1679
1680    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1681    struct TestNodeId(u64);
1682
1683    struct TestDelegate {
1684        delivered: parking_lot::Mutex<Vec<(MessageId, Bytes)>>,
1685    }
1686
1687    impl TestDelegate {
1688        fn new() -> Self {
1689            Self {
1690                delivered: parking_lot::Mutex::new(Vec::new()),
1691            }
1692        }
1693
1694        fn delivered_count(&self) -> usize {
1695            self.delivered.lock().len()
1696        }
1697    }
1698
1699    impl PlumtreeDelegate<TestNodeId> for TestDelegate {
1700        fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
1701            self.delivered.lock().push((message_id, payload));
1702        }
1703    }
1704
1705    #[tokio::test]
1706    async fn test_broadcast() {
1707        let delegate = Arc::new(TestDelegate::new());
1708        let (plumtree, _handle) =
1709            Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1710
1711        // Add some peers
1712        plumtree.add_peer(TestNodeId(2));
1713        plumtree.add_peer(TestNodeId(3));
1714        plumtree.add_peer(TestNodeId(4));
1715
1716        // Broadcast a message
1717        let msg_id = plumtree
1718            .broadcast(Bytes::from_static(b"hello"))
1719            .await
1720            .unwrap();
1721
1722        // Message should be in cache
1723        assert!(plumtree.inner.cache.contains(&msg_id));
1724    }
1725
1726    #[tokio::test]
1727    async fn test_handle_gossip() {
1728        let delegate = Arc::new(TestDelegate::new());
1729        let (plumtree, _handle) =
1730            Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1731
1732        plumtree.add_peer(TestNodeId(2));
1733
1734        let msg_id = MessageId::new();
1735        let payload = Bytes::from_static(b"test message");
1736
1737        // Handle incoming gossip
1738        plumtree
1739            .handle_message(
1740                TestNodeId(2),
1741                PlumtreeMessage::Gossip {
1742                    id: msg_id,
1743                    round: 0,
1744                    payload: payload.clone(),
1745                },
1746            )
1747            .await
1748            .unwrap();
1749
1750        // Message should be delivered
1751        assert_eq!(delegate.delivered_count(), 1);
1752    }
1753
1754    #[tokio::test]
1755    async fn test_duplicate_detection() {
1756        let delegate = Arc::new(TestDelegate::new());
1757        let (plumtree, _handle) =
1758            Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1759
1760        plumtree.add_peer(TestNodeId(2));
1761
1762        let msg_id = MessageId::new();
1763        let payload = Bytes::from_static(b"test message");
1764
1765        // Handle same message twice
1766        for _ in 0..2 {
1767            plumtree
1768                .handle_message(
1769                    TestNodeId(2),
1770                    PlumtreeMessage::Gossip {
1771                        id: msg_id,
1772                        round: 0,
1773                        payload: payload.clone(),
1774                    },
1775                )
1776                .await
1777                .unwrap();
1778        }
1779
1780        // Should only be delivered once
1781        assert_eq!(delegate.delivered_count(), 1);
1782    }
1783
1784    #[tokio::test]
1785    async fn test_peer_promotion() {
1786        let delegate = Arc::new(TestDelegate::new());
1787        let (plumtree, _handle) =
1788            Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1789
1790        // Use add_peer_lazy to test promotion from lazy to eager
1791        plumtree.add_peer_lazy(TestNodeId(2));
1792
1793        // Peer starts as lazy
1794        assert!(plumtree.inner.peers.is_lazy(&TestNodeId(2)));
1795
1796        // Send IHave for unknown message - should trigger promotion
1797        let msg_id = MessageId::new();
1798        plumtree
1799            .handle_message(
1800                TestNodeId(2),
1801                PlumtreeMessage::IHave {
1802                    message_ids: smallvec::smallvec![msg_id],
1803                    round: 0,
1804                },
1805            )
1806            .await
1807            .unwrap();
1808
1809        // Peer should now be eager
1810        assert!(plumtree.inner.peers.is_eager(&TestNodeId(2)));
1811    }
1812
1813    #[tokio::test]
1814    async fn test_message_too_large() {
1815        let delegate = Arc::new(TestDelegate::new());
1816        let config = PlumtreeConfig::default().with_max_message_size(10);
1817        let (plumtree, _handle) = Plumtree::new(TestNodeId(1), config, delegate);
1818
1819        let result = plumtree
1820            .broadcast(Bytes::from_static(b"this is too large"))
1821            .await;
1822
1823        assert!(matches!(result, Err(Error::MessageTooLarge { .. })));
1824    }
1825}