1use async_channel::{Receiver, Sender};
7use async_lock::RwLock;
8use bytes::Bytes;
9use futures_timer::Delay;
10use smallvec::SmallVec;
11use std::{
12 collections::HashMap,
13 fmt::Debug,
14 hash::Hash,
15 sync::{
16 atomic::{AtomicBool, AtomicU32, Ordering},
17 Arc,
18 },
19};
20use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument};
21
22use crate::{
23 adaptive_batcher::{AdaptiveBatcher, BatcherConfig},
24 cleanup_tuner::{CleanupConfig, CleanupTuner},
25 config::PlumtreeConfig,
26 error::{Error, Result},
27 health::{HealthReport, HealthReportBuilder},
28 message::{MessageCache, MessageId, PlumtreeMessage},
29 peer_scoring::{PeerScoring, ScoringConfig},
30 peer_state::SharedPeerState,
31 rate_limiter::RateLimiter,
32 scheduler::{GraftTimer, IHaveScheduler, PendingIHave},
33 PeerStateBuilder,
34};
35
36const SEEN_MAP_SHARDS: usize = 16;
39
40const DEFAULT_MAX_CAPACITY_PER_SHARD: usize = 10_000;
43
44struct ShardedSeenMap<I> {
54 shards: Vec<RwLock<HashMap<MessageId, SeenEntry<I>>>>,
55 max_capacity_per_shard: usize,
57}
58
59impl<I> ShardedSeenMap<I> {
60 fn new() -> Self {
62 Self::with_capacity(DEFAULT_MAX_CAPACITY_PER_SHARD)
63 }
64
65 fn with_capacity(max_capacity_per_shard: usize) -> Self {
67 let shards = (0..SEEN_MAP_SHARDS)
68 .map(|_| RwLock::new(HashMap::new()))
69 .collect();
70 Self {
71 shards,
72 max_capacity_per_shard,
73 }
74 }
75
76 fn shard_index(&self, id: &MessageId) -> usize {
78 let hash = id.timestamp() as usize ^ (id.random() as usize);
80 hash % self.shards.len()
81 }
82
83 async fn read_shard(
85 &self,
86 id: &MessageId,
87 ) -> async_lock::RwLockReadGuard<'_, HashMap<MessageId, SeenEntry<I>>> {
88 let idx = self.shard_index(id);
89 self.shards[idx].read().await
90 }
91
92 async fn write_shard(
94 &self,
95 id: &MessageId,
96 ) -> async_lock::RwLockWriteGuard<'_, HashMap<MessageId, SeenEntry<I>>> {
97 let idx = self.shard_index(id);
98 self.shards[idx].write().await
99 }
100}
101
102impl<I: Clone> ShardedSeenMap<I> {
103 #[allow(dead_code)]
105 async fn get(&self, id: &MessageId) -> Option<SeenEntry<I>> {
106 let shard = self.read_shard(id).await;
107 shard.get(id).cloned()
108 }
109
110 async fn contains(&self, id: &MessageId) -> bool {
112 let shard = self.read_shard(id).await;
113 shard.contains_key(id)
114 }
115
116 async fn insert(&self, id: MessageId, entry: SeenEntry<I>) -> bool {
119 let mut shard = self.write_shard(&id).await;
120 if shard.contains_key(&id) {
121 false
122 } else {
123 shard.insert(id, entry);
124 true
125 }
126 }
127
128 #[allow(dead_code)]
130 async fn get_or_insert_with<F>(&self, id: MessageId, f: F) -> (SeenEntry<I>, bool)
131 where
132 F: FnOnce() -> SeenEntry<I>,
133 {
134 let mut shard = self.write_shard(&id).await;
135 if let Some(entry) = shard.get(&id) {
136 (entry.clone(), false)
137 } else {
138 let entry = f();
139 shard.insert(id, entry.clone());
140 (entry, true)
141 }
142 }
143
144 #[allow(dead_code)]
147 async fn increment_receive_count(&self, id: &MessageId) -> Option<u32> {
148 let mut shard = self.write_shard(id).await;
149 if let Some(entry) = shard.get_mut(id) {
150 entry.receive_count += 1;
151 Some(entry.receive_count)
152 } else {
153 None
154 }
155 }
156
157 async fn check_and_mark_seen(
162 &self,
163 id: MessageId,
164 entry_fn: impl FnOnce() -> SeenEntry<I>,
165 ) -> (bool, u32, usize) {
166 let idx = self.shard_index(&id);
167 let mut shard = self.shards[idx].write().await;
168
169 if let Some(entry) = shard.get_mut(&id) {
170 entry.receive_count += 1;
171 (true, entry.receive_count, 0)
172 } else {
173 let evicted = if shard.len() >= self.max_capacity_per_shard {
175 Self::emergency_evict(&mut shard, self.max_capacity_per_shard / 10)
176 } else {
177 0
178 };
179
180 let entry = entry_fn();
181 let count = entry.receive_count;
182 shard.insert(id, entry);
183 (false, count, evicted)
184 }
185 }
186
187 fn emergency_evict(shard: &mut HashMap<MessageId, SeenEntry<I>>, target_evict: usize) -> usize {
191 if shard.is_empty() || target_evict == 0 {
192 return 0;
193 }
194
195 let mut entries: Vec<(MessageId, std::time::Instant)> = shard
197 .iter()
198 .map(|(id, entry)| (*id, entry.seen_at))
199 .collect();
200
201 entries.sort_by_key(|(_, seen_at)| *seen_at);
203
204 let to_remove = target_evict.min(entries.len());
206 for (id, _) in entries.into_iter().take(to_remove) {
207 shard.remove(&id);
208 }
209
210 to_remove
211 }
212
213 async fn cleanup_shard(
216 &self,
217 shard_idx: usize,
218 now: std::time::Instant,
219 ttl: std::time::Duration,
220 ) -> usize {
221 let mut shard = self.shards[shard_idx].write().await;
222 let before = shard.len();
223 shard.retain(|_, entry| now.duration_since(entry.seen_at) < ttl);
224 before.saturating_sub(shard.len())
225 }
226
227 #[allow(dead_code)]
229 async fn len(&self) -> usize {
230 let mut total = 0;
231 for shard in &self.shards {
232 total += shard.read().await.len();
233 }
234 total
235 }
236
237 fn try_len(&self) -> Option<usize> {
240 let mut total = 0;
241 for shard in &self.shards {
242 total += shard.try_read()?.len();
243 }
244 Some(total)
245 }
246
247 fn shard_count(&self) -> usize {
249 self.shards.len()
250 }
251
252 fn max_capacity_per_shard(&self) -> usize {
254 self.max_capacity_per_shard
255 }
256
257 fn total_capacity(&self) -> usize {
259 self.shards.len() * self.max_capacity_per_shard
260 }
261}
262
263pub trait PlumtreeDelegate<I = ()>: Send + Sync + 'static {
296 fn on_deliver(&self, message_id: MessageId, payload: Bytes);
302
303 fn on_eager_promotion(&self, _peer: &I) {}
305
306 fn on_lazy_demotion(&self, _peer: &I) {}
308
309 fn on_graft_sent(&self, _peer: &I, _message_id: &MessageId) {}
311
312 fn on_prune_sent(&self, _peer: &I) {}
314
315 fn on_graft_failed(&self, _message_id: &MessageId, _peer: &I) {}
326}
327
328impl<I, T: PlumtreeDelegate<I> + ?Sized> PlumtreeDelegate<I> for Arc<T> {
330 fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
331 (**self).on_deliver(message_id, payload)
332 }
333 fn on_eager_promotion(&self, peer: &I) {
334 (**self).on_eager_promotion(peer)
335 }
336 fn on_lazy_demotion(&self, peer: &I) {
337 (**self).on_lazy_demotion(peer)
338 }
339 fn on_graft_sent(&self, peer: &I, message_id: &MessageId) {
340 (**self).on_graft_sent(peer, message_id)
341 }
342 fn on_prune_sent(&self, peer: &I) {
343 (**self).on_prune_sent(peer)
344 }
345 fn on_graft_failed(&self, message_id: &MessageId, peer: &I) {
346 (**self).on_graft_failed(message_id, peer)
347 }
348}
349
350impl<I, T: PlumtreeDelegate<I> + ?Sized> PlumtreeDelegate<I> for Box<T> {
351 fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
352 (**self).on_deliver(message_id, payload)
353 }
354 fn on_eager_promotion(&self, peer: &I) {
355 (**self).on_eager_promotion(peer)
356 }
357 fn on_lazy_demotion(&self, peer: &I) {
358 (**self).on_lazy_demotion(peer)
359 }
360 fn on_graft_sent(&self, peer: &I, message_id: &MessageId) {
361 (**self).on_graft_sent(peer, message_id)
362 }
363 fn on_prune_sent(&self, peer: &I) {
364 (**self).on_prune_sent(peer)
365 }
366 fn on_graft_failed(&self, message_id: &MessageId, peer: &I) {
367 (**self).on_graft_failed(message_id, peer)
368 }
369}
370
371#[derive(Debug, Clone, Copy, Default)]
373pub struct NoopDelegate;
374
375impl<I> PlumtreeDelegate<I> for NoopDelegate {
376 fn on_deliver(&self, _message_id: MessageId, _payload: Bytes) {}
377}
378
379pub struct Plumtree<I, D = NoopDelegate> {
389 inner: Arc<PlumtreeInner<I, D>>,
391}
392
393struct PlumtreeInner<I, D> {
394 peers: SharedPeerState<I>,
396
397 cache: Arc<MessageCache>,
399
400 scheduler: Arc<IHaveScheduler>,
402
403 graft_timer: Arc<GraftTimer<I>>,
405
406 graft_rate_limiter: RateLimiter<I>,
408
409 peer_scoring: Arc<PeerScoring<I>>,
411
412 cleanup_tuner: Arc<CleanupTuner>,
414
415 adaptive_batcher: Arc<AdaptiveBatcher>,
417
418 delegate: D,
420
421 config: PlumtreeConfig,
423
424 local_id: I,
426
427 round: AtomicU32,
429
430 shutdown: AtomicBool,
432
433 outgoing_tx: Sender<OutgoingMessage<I>>,
435
436 incoming_tx: Sender<IncomingMessage<I>>,
438
439 seen: ShardedSeenMap<I>,
446}
447
448#[derive(Debug, Clone)]
450struct SeenEntry<I> {
451 #[allow(dead_code)]
454 round: u32,
455 receive_count: u32,
457 seen_at: std::time::Instant,
459 #[allow(dead_code)]
462 parent: Option<I>,
463}
464
465#[derive(Debug, Clone, Copy)]
467pub struct SeenMapStats {
468 pub size: usize,
470 pub capacity: usize,
472 pub utilization: f64,
474 pub shard_count: usize,
476 pub max_per_shard: usize,
478}
479
480#[derive(Debug)]
482pub struct OutgoingMessage<I> {
483 pub target: Option<I>,
485 pub message: PlumtreeMessage,
487}
488
489impl<I> OutgoingMessage<I> {
490 pub fn unicast(target: I, message: PlumtreeMessage) -> Self {
492 Self {
493 target: Some(target),
494 message,
495 }
496 }
497
498 pub fn broadcast(message: PlumtreeMessage) -> Self {
500 Self {
501 target: None,
502 message,
503 }
504 }
505
506 pub fn is_unicast(&self) -> bool {
508 self.target.is_some()
509 }
510
511 pub fn is_broadcast(&self) -> bool {
513 self.target.is_none()
514 }
515}
516
517#[derive(Debug)]
519pub struct IncomingMessage<I> {
520 pub from: I,
522 pub message: PlumtreeMessage,
524}
525
526impl<I, D> Plumtree<I, D>
527where
528 I: Clone + Eq + Hash + Ord + Debug + Send + Sync + 'static,
529 D: PlumtreeDelegate<I>,
530{
531 pub fn new(local_id: I, config: PlumtreeConfig, delegate: D) -> (Self, PlumtreeHandle<I>) {
539 let (outgoing_tx, outgoing_rx) = async_channel::bounded(1024);
540 let (incoming_tx, incoming_rx) = async_channel::bounded(1024);
541
542 let graft_rate_limiter = RateLimiter::new(
544 config.graft_rate_limit_burst,
545 config.graft_rate_limit_per_second,
546 );
547
548 let peers = if config.use_hash_ring {
551 Arc::new(
552 PeerStateBuilder::new()
553 .use_hash_ring(true)
554 .with_local_id(local_id.clone())
555 .build(),
556 )
557 } else {
558 Arc::new(
559 PeerStateBuilder::new()
560 .use_hash_ring(false)
561 .with_local_id(local_id.clone())
562 .build(),
563 )
564 };
565
566 let inner = Arc::new(PlumtreeInner {
567 peers,
568 cache: Arc::new(MessageCache::new(
569 config.message_cache_ttl,
570 config.message_cache_max_size,
571 )),
572 scheduler: Arc::new(IHaveScheduler::new(
573 config.ihave_interval,
574 config.ihave_batch_size,
575 10000,
576 )),
577 graft_timer: Arc::new(GraftTimer::new(config.graft_timeout)),
578 graft_rate_limiter,
579 peer_scoring: Arc::new(PeerScoring::new(ScoringConfig::default())),
580 cleanup_tuner: Arc::new(CleanupTuner::new(CleanupConfig::default())),
581 adaptive_batcher: Arc::new(AdaptiveBatcher::new(BatcherConfig::default())),
582 delegate,
583 config,
584 local_id,
585 round: AtomicU32::new(0),
586 shutdown: AtomicBool::new(false),
587 outgoing_tx,
588 incoming_tx,
589 seen: ShardedSeenMap::new(),
590 });
591
592 let plumtree = Self {
593 inner: inner.clone(),
594 };
595
596 let handle = PlumtreeHandle {
597 outgoing_rx,
598 incoming_rx,
599 incoming_tx: plumtree.inner.incoming_tx.clone(),
600 };
601
602 (plumtree, handle)
603 }
604
605 pub fn local_id(&self) -> &I {
607 &self.inner.local_id
608 }
609
610 pub fn config(&self) -> &PlumtreeConfig {
612 &self.inner.config
613 }
614
615 pub fn peer_stats(&self) -> crate::peer_state::PeerStats {
617 self.inner.peers.stats()
618 }
619
620 pub fn cache_stats(&self) -> crate::message::CacheStats {
622 self.inner.cache.stats()
623 }
624
625 pub fn seen_map_stats(&self) -> Option<SeenMapStats> {
629 let size = self.inner.seen.try_len()?;
630 let capacity = self.inner.seen.total_capacity();
631 let utilization = if capacity > 0 {
632 size as f64 / capacity as f64
633 } else {
634 0.0
635 };
636 Some(SeenMapStats {
637 size,
638 capacity,
639 utilization,
640 shard_count: self.inner.seen.shard_count(),
641 max_per_shard: self.inner.seen.max_capacity_per_shard(),
642 })
643 }
644
645 pub fn scoring_stats(&self) -> crate::peer_scoring::ScoringStats {
647 self.inner.peer_scoring.stats()
648 }
649
650 pub fn cleanup_stats(&self) -> crate::cleanup_tuner::CleanupStats {
652 self.inner.cleanup_tuner.stats()
653 }
654
655 pub fn batcher_stats(&self) -> crate::adaptive_batcher::BatcherStats {
657 self.inner.adaptive_batcher.stats()
658 }
659
660 pub fn record_peer_rtt(&self, peer: &I, rtt: std::time::Duration) {
665 self.inner.peer_scoring.record_rtt(peer, rtt);
666 }
667
668 pub fn record_peer_failure(&self, peer: &I) {
673 self.inner.peer_scoring.record_failure(peer);
674 }
675
676 pub fn get_peer_score(&self, peer: &I) -> Option<crate::peer_scoring::PeerScore> {
678 self.inner.peer_scoring.get_score(peer)
679 }
680
681 pub fn best_peers_for_eager(&self, count: usize) -> Vec<I> {
683 self.inner.peer_scoring.best_peers(count)
684 }
685
686 pub fn health(&self) -> HealthReport {
700 let peer_stats = self.inner.peers.stats();
701 let cache_stats = self.inner.cache.stats();
702 let pending_grafts = self.inner.graft_timer.pending_count();
703
704 let successful_grafts = 0_u64;
708 let failed_grafts = 0_u64;
709
710 let total_peers = peer_stats.eager_count + peer_stats.lazy_count;
711
712 HealthReportBuilder::new()
713 .peers(
714 total_peers,
715 peer_stats.eager_count,
716 peer_stats.lazy_count,
717 self.inner.config.eager_fanout,
718 )
719 .grafts(pending_grafts, successful_grafts, failed_grafts)
720 .cache(
721 cache_stats.entries,
722 self.inner.config.message_cache_max_size,
723 self.inner.config.message_cache_ttl,
724 )
725 .shutdown(self.inner.shutdown.load(Ordering::Acquire))
726 .build()
727 }
728
729 pub fn add_peer(&self, peer: I) -> crate::peer_state::AddPeerResult {
739 use crate::peer_state::AddPeerResult;
740
741 if peer == self.inner.local_id {
742 return AddPeerResult::AlreadyExists;
743 }
744
745 self.inner.peers.add_peer_auto(
746 peer,
747 self.inner.config.max_peers,
748 self.inner.config.eager_fanout,
749 )
750 }
751
752 pub fn add_peer_lazy(&self, peer: I) {
759 if peer != self.inner.local_id {
760 self.inner.peers.add_peer(peer);
761 }
762 }
763
764 pub fn remove_peer(&self, peer: &I) {
769 self.inner.peers.remove_peer(peer);
770 self.inner.peer_scoring.remove_peer(peer);
772 }
773
774 #[instrument(skip(self, payload), fields(payload_size))]
781 pub async fn broadcast(&self, payload: impl Into<Bytes>) -> Result<MessageId> {
782 let payload = payload.into();
783 let payload_size = payload.len();
784 tracing::Span::current().record("payload_size", payload_size);
785
786 if payload_size > self.inner.config.max_message_size {
788 warn!(
789 payload_size,
790 max_size = self.inner.config.max_message_size,
791 "message too large"
792 );
793 return Err(Error::MessageTooLarge {
794 size: payload_size,
795 max_size: self.inner.config.max_message_size,
796 });
797 }
798
799 let msg_id = MessageId::new();
800 let round = self.inner.round.fetch_add(1, Ordering::Relaxed);
801
802 debug!(
803 message_id = %msg_id,
804 round,
805 payload_size,
806 "broadcasting new message"
807 );
808
809 self.inner.cache.insert(msg_id, payload.clone());
811
812 self.inner
814 .seen
815 .insert(
816 msg_id,
817 SeenEntry {
818 round,
819 receive_count: 1,
820 seen_at: std::time::Instant::now(),
821 parent: None, },
823 )
824 .await;
825
826 let eager_peers = self.inner.peers.eager_peers();
828 let eager_count = eager_peers.len();
829 let mut dropped_count = 0;
830
831 for peer in eager_peers {
832 let msg = PlumtreeMessage::Gossip {
833 id: msg_id,
834 round,
835 payload: payload.clone(),
836 };
837 if let Err(_e) = self
839 .inner
840 .outgoing_tx
841 .try_send(OutgoingMessage::unicast(peer, msg))
842 {
843 dropped_count += 1;
844 trace!(
845 message_id = %msg_id,
846 "outgoing queue full, message to eager peer dropped"
847 );
848 }
849 }
850
851 self.inner.scheduler.queue().push(msg_id, round);
853
854 if dropped_count > 0 {
856 warn!(
857 message_id = %msg_id,
858 dropped = dropped_count,
859 total_eager = eager_count,
860 "backpressure: some eager push messages were dropped"
861 );
862 if dropped_count == eager_count {
866 return Err(Error::QueueFull {
868 dropped: dropped_count,
869 capacity: self.inner.outgoing_tx.capacity().unwrap_or(1024),
870 });
871 }
872 }
873
874 trace!(
875 message_id = %msg_id,
876 eager_peers = eager_count,
877 sent = eager_count - dropped_count,
878 "broadcast complete, IHave queued for lazy peers"
879 );
880
881 Ok(msg_id)
882 }
883
884 pub async fn handle_message(&self, from: I, message: PlumtreeMessage) -> Result<()> {
888 if self.inner.shutdown.load(Ordering::Acquire) {
889 return Err(Error::Shutdown);
890 }
891
892 let msg_type = message.type_name();
893 let span = debug_span!("handle_message", msg_type, ?from);
894
895 async {
896 match message {
897 PlumtreeMessage::Gossip { id, round, payload } => {
898 self.handle_gossip(from, id, round, payload).await
899 }
900 PlumtreeMessage::IHave { message_ids, round } => {
901 self.handle_ihave(from, message_ids, round).await
902 }
903 PlumtreeMessage::Graft { message_id, round } => {
904 self.handle_graft(from, message_id, round).await
905 }
906 PlumtreeMessage::Prune => self.handle_prune(from).await,
907 }
908 }
909 .instrument(span)
910 .await
911 }
912
913 async fn handle_gossip(
915 &self,
916 from: I,
917 msg_id: MessageId,
918 round: u32,
919 payload: Bytes,
920 ) -> Result<()> {
921 let payload_size = payload.len();
922 trace!(
923 message_id = %msg_id,
924 round,
925 payload_size,
926 "received gossip"
927 );
928
929 self.inner.cleanup_tuner.record_message();
931 self.inner.adaptive_batcher.record_message();
933
934 if self.inner.graft_timer.message_received(&msg_id) {
937 self.inner.adaptive_batcher.record_graft_received();
939 }
940
941 let (is_duplicate, receive_count, evicted) = self
943 .inner
944 .seen
945 .check_and_mark_seen(msg_id, || SeenEntry {
946 round,
947 receive_count: 1,
948 seen_at: std::time::Instant::now(),
949 parent: Some(from.clone()), })
951 .await;
952
953 if evicted > 0 {
954 debug!(
955 message_id = %msg_id,
956 evicted,
957 "emergency eviction in seen map due to capacity"
958 );
959 #[cfg(feature = "metrics")]
960 crate::metrics::record_seen_map_evictions(evicted);
961 }
962
963 if is_duplicate {
964 trace!(
965 message_id = %msg_id,
966 receive_count,
967 "duplicate gossip received"
968 );
969 if receive_count > self.inner.config.optimization_threshold {
971 if self.inner.peers.is_eager(&from) {
974 debug!(
975 message_id = %msg_id,
976 receive_count,
977 threshold = self.inner.config.optimization_threshold,
978 "pruning redundant path"
979 );
980 let _ = self
982 .inner
983 .outgoing_tx
984 .send(OutgoingMessage::unicast(
985 from.clone(),
986 PlumtreeMessage::Prune,
987 ))
988 .await;
989 self.inner.delegate.on_prune_sent(&from);
990
991 if self.inner.peers.demote_to_lazy(&from) {
993 self.inner.delegate.on_lazy_demotion(&from);
994 }
995 }
996 }
997 return Ok(());
998 }
999
1000 debug!(
1002 message_id = %msg_id,
1003 round,
1004 payload_size,
1005 "delivering new message"
1006 );
1007
1008 self.inner.cache.insert(msg_id, payload.clone());
1010
1011 self.inner.delegate.on_deliver(msg_id, payload.clone());
1013
1014 let eager_peers = self.inner.peers.random_eager_except(&from, usize::MAX);
1016 let forward_count = eager_peers.len();
1017 let mut dropped = 0;
1018
1019 for peer in eager_peers {
1020 let msg = PlumtreeMessage::Gossip {
1021 id: msg_id,
1022 round: round + 1,
1023 payload: payload.clone(),
1024 };
1025 if self
1027 .inner
1028 .outgoing_tx
1029 .try_send(OutgoingMessage::unicast(peer, msg))
1030 .is_err()
1031 {
1032 dropped += 1;
1033 }
1034 }
1035
1036 self.inner.scheduler.queue().push(msg_id, round + 1);
1038
1039 if dropped > 0 {
1040 debug!(
1041 message_id = %msg_id,
1042 dropped,
1043 total = forward_count,
1044 "backpressure: some gossip forwards dropped"
1045 );
1046 }
1047
1048 trace!(
1049 message_id = %msg_id,
1050 forward_count,
1051 sent = forward_count - dropped,
1052 "forwarded gossip to eager peers"
1053 );
1054
1055 Ok(())
1056 }
1057
1058 async fn handle_ihave(
1060 &self,
1061 from: I,
1062 message_ids: SmallVec<[MessageId; 8]>,
1063 round: u32,
1064 ) -> Result<()> {
1065 let count = message_ids.len();
1066 trace!(count, round, "received IHave batch");
1067
1068 let mut missing_count = 0;
1069 for msg_id in message_ids {
1070 let have_message = self.inner.seen.contains(&msg_id).await;
1072
1073 if !have_message {
1074 missing_count += 1;
1075 trace!(message_id = %msg_id, "missing message, sending Graft");
1076
1077 let alternatives: Vec<I> = self.inner.peers.random_eager_except(&from, 2);
1080
1081 self.inner.graft_timer.expect_message_with_alternatives(
1082 msg_id,
1083 from.clone(),
1084 alternatives,
1085 round,
1086 );
1087
1088 if self.inner.peers.promote_to_eager(&from) {
1102 debug!("promoted peer to eager after IHave");
1103 self.inner.delegate.on_eager_promotion(&from);
1104 }
1105
1106 let _ = self
1108 .inner
1109 .outgoing_tx
1110 .send(OutgoingMessage::unicast(
1111 from.clone(),
1112 PlumtreeMessage::Graft {
1113 message_id: msg_id,
1114 round,
1115 },
1116 ))
1117 .await;
1118
1119 self.inner.delegate.on_graft_sent(&from, &msg_id);
1120 }
1121 }
1122
1123 if missing_count > 0 {
1124 debug!(
1125 total = count,
1126 missing = missing_count,
1127 "IHave processing complete"
1128 );
1129 }
1130
1131 Ok(())
1132 }
1133
1134 async fn handle_graft(&self, from: I, msg_id: MessageId, round: u32) -> Result<()> {
1136 trace!(message_id = %msg_id, round, "received Graft request");
1137
1138 if !self.inner.graft_rate_limiter.check(&from) {
1140 warn!(message_id = %msg_id, "rate limiting Graft request");
1141 return Ok(());
1142 }
1143
1144 if self.inner.peers.promote_to_eager(&from) {
1146 debug!("promoted peer to eager after Graft request");
1147 self.inner.delegate.on_eager_promotion(&from);
1148 }
1149
1150 if let Some(payload) = self.inner.cache.get(&msg_id) {
1152 debug!(
1153 message_id = %msg_id,
1154 payload_size = payload.len(),
1155 "responding to Graft with cached message"
1156 );
1157 let msg = PlumtreeMessage::Gossip {
1158 id: msg_id,
1159 round,
1160 payload: (*payload).clone(),
1161 };
1162 let _ = self
1163 .inner
1164 .outgoing_tx
1165 .send(OutgoingMessage::unicast(from, msg))
1166 .await;
1167 } else {
1168 debug!(message_id = %msg_id, "Graft request for unknown message");
1169 }
1170
1171 Ok(())
1172 }
1173
1174 async fn handle_prune(&self, from: I) -> Result<()> {
1176 trace!("received Prune request");
1177 if self.inner.peers.demote_to_lazy(&from) {
1178 debug!("demoted peer to lazy after Prune");
1179 self.inner.delegate.on_lazy_demotion(&from);
1180 }
1181 Ok(())
1182 }
1183
1184 #[instrument(skip(self), name = "ihave_scheduler")]
1195 pub async fn run_ihave_scheduler(&self) {
1196 info!("IHave scheduler started with adaptive batching");
1197 let ihave_interval = self.inner.config.ihave_interval;
1198 let check_interval = std::time::Duration::from_millis(10);
1200 let mut last_flush = std::time::Instant::now();
1201 let mut last_batch_size_update = std::time::Instant::now();
1202 let batch_size_update_interval = std::time::Duration::from_secs(5);
1203
1204 loop {
1205 if self.inner.shutdown.load(Ordering::Acquire) {
1206 info!("IHave scheduler shutting down");
1207 break;
1208 }
1209
1210 if last_batch_size_update.elapsed() >= batch_size_update_interval {
1212 let recommended_size = self.inner.adaptive_batcher.recommended_batch_size();
1213 self.inner
1214 .scheduler
1215 .queue()
1216 .set_flush_threshold(recommended_size);
1217 trace!(
1218 batch_size = recommended_size,
1219 "updated IHave batch size from adaptive batcher"
1220 );
1221 last_batch_size_update = std::time::Instant::now();
1222 }
1223
1224 let should_flush = self.inner.scheduler.queue().should_flush();
1226 let interval_elapsed = last_flush.elapsed() >= ihave_interval;
1227
1228 if should_flush || interval_elapsed {
1229 self.flush_ihave_batch().await;
1231 last_flush = std::time::Instant::now();
1232 } else {
1233 Delay::new(check_interval).await;
1235 }
1236 }
1237 }
1238
1239 async fn flush_ihave_batch(&self) {
1241 let batch: SmallVec<[PendingIHave; 16]> = self.inner.scheduler.pop_batch();
1243
1244 if batch.is_empty() {
1245 return;
1246 }
1247
1248 let message_ids: SmallVec<[MessageId; 8]> = batch.iter().map(|p| p.message_id).collect();
1250 let round = batch.iter().map(|p| p.round).max().unwrap_or(0);
1251
1252 let lazy_peers = self
1254 .inner
1255 .peers
1256 .random_lazy_except(&self.inner.local_id, self.inner.config.lazy_fanout);
1257
1258 let peer_count = lazy_peers.len();
1259 let batch_size = message_ids.len();
1260
1261 trace!(
1262 batch_size,
1263 peer_count,
1264 round,
1265 "flushing IHave batch to lazy peers"
1266 );
1267
1268 let mut ihave_dropped = 0;
1270 for peer in lazy_peers {
1271 let msg = PlumtreeMessage::IHave {
1272 message_ids: message_ids.clone(),
1273 round,
1274 };
1275 if self
1277 .inner
1278 .outgoing_tx
1279 .try_send(OutgoingMessage::unicast(peer, msg))
1280 .is_err()
1281 {
1282 ihave_dropped += 1;
1283 }
1284 }
1285
1286 if ihave_dropped > 0 {
1287 debug!(
1288 batch_size,
1289 dropped = ihave_dropped,
1290 total = peer_count,
1291 "backpressure: some IHave messages dropped"
1292 );
1293 }
1294
1295 let ihaves_sent = (peer_count - ihave_dropped) * batch_size;
1297 self.inner.adaptive_batcher.record_ihave_sent(ihaves_sent);
1298 }
1299
1300 #[instrument(skip(self), name = "graft_timer")]
1304 pub async fn run_graft_timer(&self) {
1305 info!("Graft timer started");
1306 let check_interval = self.inner.config.graft_timeout / 2;
1307 let mut interval = Delay::new(check_interval);
1308
1309 loop {
1310 if self.inner.shutdown.load(Ordering::Acquire) {
1311 info!("Graft timer shutting down");
1312 break;
1313 }
1314
1315 (&mut interval).await;
1317 interval.reset(check_interval);
1318
1319 let (expired, failed) = self.inner.graft_timer.get_expired_with_failures();
1321
1322 for failed_graft in &failed {
1324 warn!(
1325 message_id = %failed_graft.message_id,
1326 retries = failed_graft.total_retries,
1327 peer = ?failed_graft.original_peer,
1328 "Graft failed after max retries - penalizing peer"
1329 );
1330 self.inner
1332 .peer_scoring
1333 .record_failure(&failed_graft.original_peer);
1334 self.inner.adaptive_batcher.record_graft_timeout();
1336 self.inner
1337 .delegate
1338 .on_graft_failed(&failed_graft.message_id, &failed_graft.original_peer);
1339 }
1340
1341 for expired_graft in expired {
1342 debug!(
1344 message_id = %expired_graft.message_id,
1345 attempt = expired_graft.retry_count,
1346 is_retry = expired_graft.retry_count > 0,
1347 "sending Graft request"
1348 );
1349
1350 let _ = self
1351 .inner
1352 .outgoing_tx
1353 .send(OutgoingMessage::unicast(
1354 expired_graft.peer.clone(),
1355 PlumtreeMessage::Graft {
1356 message_id: expired_graft.message_id,
1357 round: expired_graft.round,
1358 },
1359 ))
1360 .await;
1361
1362 self.inner
1363 .delegate
1364 .on_graft_sent(&expired_graft.peer, &expired_graft.message_id);
1365 }
1366 }
1367 }
1368
1369 #[instrument(skip(self), name = "seen_cleanup")]
1381 pub async fn run_seen_cleanup(&self) {
1382 info!("seen map cleanup started with dynamic tuning");
1383 let ttl = self.inner.config.message_cache_ttl;
1384 let tuner = &self.inner.cleanup_tuner;
1385
1386 let initial_params = tuner.get_parameters(0.0, ttl);
1388 let mut interval = Delay::new(initial_params.interval);
1389 let mut rate_window_reset = std::time::Instant::now();
1390
1391 loop {
1392 if self.inner.shutdown.load(Ordering::Acquire) {
1393 info!("seen map cleanup shutting down");
1394 break;
1395 }
1396
1397 (&mut interval).await;
1399
1400 let utilization = self.seen_map_stats().map(|s| s.utilization).unwrap_or(0.0);
1402
1403 let params = tuner.get_parameters(utilization, ttl);
1405
1406 trace!(
1407 interval_ms = params.interval.as_millis(),
1408 batch_size = params.batch_size,
1409 aggressive = params.aggressive,
1410 utilization = format!("{:.2}", params.utilization),
1411 message_rate = format!("{:.1}", params.message_rate),
1412 "cleanup tuner parameters"
1413 );
1414
1415 interval.reset(params.interval);
1417
1418 let cleanup_start = std::time::Instant::now();
1420 let now = cleanup_start;
1421 let mut total_removed = 0;
1422 let shard_count = self.inner.seen.shard_count();
1423
1424 for shard_idx in 0..shard_count {
1425 if self.inner.shutdown.load(Ordering::Acquire) {
1427 break;
1428 }
1429
1430 let removed = self.inner.seen.cleanup_shard(shard_idx, now, ttl).await;
1432 total_removed += removed;
1433
1434 Delay::new(std::time::Duration::from_micros(1)).await;
1436 }
1437
1438 let cleanup_duration = cleanup_start.elapsed();
1440 tuner.record_cleanup(cleanup_duration, total_removed, ¶ms);
1441
1442 if total_removed > 0 {
1443 debug!(
1444 removed = total_removed,
1445 shards = shard_count,
1446 duration_ms = cleanup_duration.as_millis(),
1447 aggressive = params.aggressive,
1448 "cleaned up expired entries from seen map"
1449 );
1450 }
1451
1452 if rate_window_reset.elapsed() >= tuner.config().rate_window {
1454 tuner.reset_rate_window();
1455 rate_window_reset = std::time::Instant::now();
1456 }
1457
1458 #[cfg(feature = "metrics")]
1460 if let Some(stats) = self.seen_map_stats() {
1461 crate::metrics::set_seen_map_size(stats.size);
1462 }
1463 }
1464 }
1465
1466 #[instrument(skip(self), name = "maintenance_loop")]
1477 pub async fn run_maintenance_loop(&self)
1478 where
1479 I: Clone + Eq + std::hash::Hash + std::fmt::Debug,
1480 {
1481 use rand::Rng;
1482
1483 let interval = self.inner.config.maintenance_interval;
1484 let jitter = self.inner.config.maintenance_jitter;
1485
1486 if interval.is_zero() {
1488 info!("maintenance loop disabled (interval=0)");
1489 return;
1490 }
1491
1492 info!(
1493 interval_ms = interval.as_millis(),
1494 jitter_ms = jitter.as_millis(),
1495 "topology maintenance loop started"
1496 );
1497
1498 loop {
1499 if self.inner.shutdown.load(Ordering::Acquire) {
1500 info!("maintenance loop shutting down");
1501 break;
1502 }
1503
1504 let jitter_duration = if !jitter.is_zero() {
1506 let jitter_ms = rand::rng().random_range(0..jitter.as_millis() as u64);
1507 std::time::Duration::from_millis(jitter_ms)
1508 } else {
1509 std::time::Duration::ZERO
1510 };
1511
1512 Delay::new(interval + jitter_duration).await;
1514
1515 if self.inner.shutdown.load(Ordering::Acquire) {
1517 break;
1518 }
1519
1520 let target_eager = self.inner.config.eager_fanout;
1522 if self.inner.peers.needs_repair(target_eager) {
1523 let stats_before = self.inner.peers.stats();
1524
1525 let peer_scoring = &self.inner.peer_scoring;
1528 let scorer = |peer: &I| peer_scoring.normalized_score(peer, 0.5);
1529
1530 if self
1532 .inner
1533 .peers
1534 .try_rebalance_with_scorer(target_eager, scorer)
1535 {
1536 let stats_after = self.inner.peers.stats();
1537 let promoted = stats_after
1538 .eager_count
1539 .saturating_sub(stats_before.eager_count);
1540
1541 if promoted > 0 {
1542 info!(
1543 promoted = promoted,
1544 eager_before = stats_before.eager_count,
1545 eager_after = stats_after.eager_count,
1546 lazy_count = stats_after.lazy_count,
1547 target = target_eager,
1548 "topology repair: promoted lazy peers to eager (network-aware)"
1549 );
1550
1551 #[cfg(feature = "metrics")]
1552 for _ in 0..promoted {
1553 crate::metrics::record_peer_promotion();
1554 }
1555 }
1556 } else {
1557 trace!("maintenance: lock contended, will retry");
1559 }
1560 }
1561
1562 #[cfg(feature = "metrics")]
1564 {
1565 let stats = self.inner.peers.stats();
1566 crate::metrics::set_eager_peers(stats.eager_count);
1567 crate::metrics::set_lazy_peers(stats.lazy_count);
1568 crate::metrics::set_total_peers(stats.eager_count + stats.lazy_count);
1569 }
1570 }
1571 }
1572
1573 pub fn shutdown(&self) {
1575 self.inner.shutdown.store(true, Ordering::Release);
1576 self.inner.scheduler.shutdown();
1577 self.inner.outgoing_tx.close();
1578 self.inner.incoming_tx.close();
1579 }
1580
1581 pub fn is_shutdown(&self) -> bool {
1583 self.inner.shutdown.load(Ordering::Acquire)
1584 }
1585
1586 pub fn rebalance_peers(&self) {
1594 let peer_scoring = &self.inner.peer_scoring;
1595 self.inner
1596 .peers
1597 .rebalance_with_scorer(self.inner.config.eager_fanout, |peer| {
1598 peer_scoring.normalized_score(peer, 0.5)
1599 });
1600 }
1601
1602 pub fn peers(&self) -> &SharedPeerState<I> {
1604 &self.inner.peers
1605 }
1606}
1607
1608impl<I, D> Clone for Plumtree<I, D> {
1609 fn clone(&self) -> Self {
1610 Self {
1611 inner: self.inner.clone(),
1612 }
1613 }
1614}
1615
1616pub struct PlumtreeHandle<I> {
1620 outgoing_rx: Receiver<OutgoingMessage<I>>,
1622 incoming_rx: Receiver<IncomingMessage<I>>,
1624 incoming_tx: Sender<IncomingMessage<I>>,
1626}
1627
1628impl<I> PlumtreeHandle<I> {
1629 pub async fn next_outgoing(&self) -> Option<OutgoingMessage<I>> {
1631 self.outgoing_rx.recv().await.ok()
1632 }
1633
1634 pub fn try_next_outgoing(&self) -> Option<OutgoingMessage<I>> {
1639 self.outgoing_rx.try_recv().ok()
1640 }
1641
1642 pub async fn submit_incoming(&self, from: I, message: PlumtreeMessage) -> Result<()> {
1644 self.incoming_tx
1645 .send(IncomingMessage { from, message })
1646 .await
1647 .map_err(|e| Error::Channel(e.to_string()))
1648 }
1649
1650 pub fn outgoing_stream(&self) -> impl futures::Stream<Item = OutgoingMessage<I>> + '_ {
1652 self.outgoing_rx.clone()
1653 }
1654
1655 pub fn incoming_stream(&self) -> impl futures::Stream<Item = IncomingMessage<I>> + '_ {
1660 self.incoming_rx.clone()
1661 }
1662
1663 pub async fn next_incoming(&self) -> Option<IncomingMessage<I>> {
1667 self.incoming_rx.recv().await.ok()
1668 }
1669
1670 pub fn is_closed(&self) -> bool {
1672 self.outgoing_rx.is_closed()
1673 }
1674}
1675
1676#[cfg(test)]
1677mod tests {
1678 use super::*;
1679
1680 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1681 struct TestNodeId(u64);
1682
1683 struct TestDelegate {
1684 delivered: parking_lot::Mutex<Vec<(MessageId, Bytes)>>,
1685 }
1686
1687 impl TestDelegate {
1688 fn new() -> Self {
1689 Self {
1690 delivered: parking_lot::Mutex::new(Vec::new()),
1691 }
1692 }
1693
1694 fn delivered_count(&self) -> usize {
1695 self.delivered.lock().len()
1696 }
1697 }
1698
1699 impl PlumtreeDelegate<TestNodeId> for TestDelegate {
1700 fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
1701 self.delivered.lock().push((message_id, payload));
1702 }
1703 }
1704
1705 #[tokio::test]
1706 async fn test_broadcast() {
1707 let delegate = Arc::new(TestDelegate::new());
1708 let (plumtree, _handle) =
1709 Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1710
1711 plumtree.add_peer(TestNodeId(2));
1713 plumtree.add_peer(TestNodeId(3));
1714 plumtree.add_peer(TestNodeId(4));
1715
1716 let msg_id = plumtree
1718 .broadcast(Bytes::from_static(b"hello"))
1719 .await
1720 .unwrap();
1721
1722 assert!(plumtree.inner.cache.contains(&msg_id));
1724 }
1725
1726 #[tokio::test]
1727 async fn test_handle_gossip() {
1728 let delegate = Arc::new(TestDelegate::new());
1729 let (plumtree, _handle) =
1730 Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1731
1732 plumtree.add_peer(TestNodeId(2));
1733
1734 let msg_id = MessageId::new();
1735 let payload = Bytes::from_static(b"test message");
1736
1737 plumtree
1739 .handle_message(
1740 TestNodeId(2),
1741 PlumtreeMessage::Gossip {
1742 id: msg_id,
1743 round: 0,
1744 payload: payload.clone(),
1745 },
1746 )
1747 .await
1748 .unwrap();
1749
1750 assert_eq!(delegate.delivered_count(), 1);
1752 }
1753
1754 #[tokio::test]
1755 async fn test_duplicate_detection() {
1756 let delegate = Arc::new(TestDelegate::new());
1757 let (plumtree, _handle) =
1758 Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1759
1760 plumtree.add_peer(TestNodeId(2));
1761
1762 let msg_id = MessageId::new();
1763 let payload = Bytes::from_static(b"test message");
1764
1765 for _ in 0..2 {
1767 plumtree
1768 .handle_message(
1769 TestNodeId(2),
1770 PlumtreeMessage::Gossip {
1771 id: msg_id,
1772 round: 0,
1773 payload: payload.clone(),
1774 },
1775 )
1776 .await
1777 .unwrap();
1778 }
1779
1780 assert_eq!(delegate.delivered_count(), 1);
1782 }
1783
1784 #[tokio::test]
1785 async fn test_peer_promotion() {
1786 let delegate = Arc::new(TestDelegate::new());
1787 let (plumtree, _handle) =
1788 Plumtree::new(TestNodeId(1), PlumtreeConfig::default(), delegate.clone());
1789
1790 plumtree.add_peer_lazy(TestNodeId(2));
1792
1793 assert!(plumtree.inner.peers.is_lazy(&TestNodeId(2)));
1795
1796 let msg_id = MessageId::new();
1798 plumtree
1799 .handle_message(
1800 TestNodeId(2),
1801 PlumtreeMessage::IHave {
1802 message_ids: smallvec::smallvec![msg_id],
1803 round: 0,
1804 },
1805 )
1806 .await
1807 .unwrap();
1808
1809 assert!(plumtree.inner.peers.is_eager(&TestNodeId(2)));
1811 }
1812
1813 #[tokio::test]
1814 async fn test_message_too_large() {
1815 let delegate = Arc::new(TestDelegate::new());
1816 let config = PlumtreeConfig::default().with_max_message_size(10);
1817 let (plumtree, _handle) = Plumtree::new(TestNodeId(1), config, delegate);
1818
1819 let result = plumtree
1820 .broadcast(Bytes::from_static(b"this is too large"))
1821 .await;
1822
1823 assert!(matches!(result, Err(Error::MessageTooLarge { .. })));
1824 }
1825}