1#![warn(missing_docs)]
2
3use anyhow::{anyhow, Result};
21use bytes::Bytes;
22use lru::LruCache;
23use saorsa_gossip_transport::{GossipStreamType, GossipTransport};
24use saorsa_gossip_types::{MessageHeader, MessageKind, PeerId, TopicId};
25use serde::{Deserialize, Serialize};
26use std::collections::{HashMap, HashSet};
27use std::num::NonZeroUsize;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::sync::{mpsc, RwLock};
31use tokio::time;
32use tracing::{debug, error, trace, warn};
33
34const MAX_CACHE_SIZE: usize = 10_000;
36
37const CACHE_TTL_SECS: u64 = 300;
39
40const REPLAY_CACHE_MAX_ENTRIES: usize = 10_000;
42
43const REPLAY_CACHE_TTL_SECS: u64 = 300;
45
46const MAX_IHAVE_BATCH_SIZE: usize = 1024;
48
49const IHAVE_FLUSH_INTERVAL_MS: u64 = 100;
51
52const ANTI_ENTROPY_INTERVAL_SECS: u64 = 30;
54
55const MIN_EAGER_DEGREE: usize = 6;
57const MAX_EAGER_DEGREE: usize = 12;
58
59type MessageIdType = [u8; 32];
61
62const fn message_cache_capacity() -> NonZeroUsize {
63 unsafe { NonZeroUsize::new_unchecked(MAX_CACHE_SIZE) }
65}
66
67const fn replay_cache_capacity() -> NonZeroUsize {
68 unsafe { NonZeroUsize::new_unchecked(REPLAY_CACHE_MAX_ENTRIES) }
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct GossipMessage {
75 pub header: MessageHeader,
77 pub payload: Option<Bytes>,
79 pub signature: Vec<u8>,
81 pub public_key: Vec<u8>,
83}
84
85#[derive(Clone, Debug, Serialize, Deserialize)]
90enum AntiEntropyPayload {
91 Digest {
93 msg_ids: Vec<MessageIdType>,
95 },
96 Response {
98 missing_ids: Vec<MessageIdType>,
100 },
101}
102
103struct PeerScore {
108 messages_delivered: u64,
110 iwant_requests: u64,
112 iwant_responses: u64,
114 last_seen: Instant,
116}
117
118impl PeerScore {
119 fn new() -> Self {
121 Self {
122 messages_delivered: 0,
123 iwant_requests: 0,
124 iwant_responses: 0,
125 last_seen: Instant::now(),
126 }
127 }
128
129 fn score(&self) -> f64 {
133 let response_rate = if self.iwant_requests > 0 {
134 self.iwant_responses as f64 / self.iwant_requests as f64
135 } else {
136 if self.messages_delivered > 0 {
139 0.8
140 } else {
141 0.5
142 }
143 };
144
145 let secs_since_seen = Instant::now()
146 .saturating_duration_since(self.last_seen)
147 .as_secs_f64();
148 let recency = (1.0 - (secs_since_seen / 300.0)).max(0.0);
149
150 (response_rate.min(1.0) * 0.6) + (recency * 0.4)
151 }
152
153 fn record_delivery(&mut self) {
155 self.messages_delivered += 1;
156 self.last_seen = Instant::now();
157 }
158
159 fn record_iwant_request(&mut self) {
161 self.iwant_requests += 1;
162 }
163
164 fn record_iwant_response(&mut self) {
166 self.iwant_responses += 1;
167 self.last_seen = Instant::now();
168 }
169}
170
171#[derive(Clone)]
173struct CachedMessage {
174 payload: Bytes,
176 timestamp: Instant,
178 header: MessageHeader,
180}
181
182struct TopicState {
184 eager_peers: HashSet<PeerId>,
186 lazy_peers: HashSet<PeerId>,
188 message_cache: LruCache<MessageIdType, CachedMessage>,
190 pending_ihave: Vec<MessageIdType>,
192 outstanding_iwants: HashMap<MessageIdType, (PeerId, Instant)>,
194 peer_scores: HashMap<PeerId, PeerScore>,
196 subscribers: Vec<mpsc::UnboundedSender<(PeerId, Bytes)>>,
198 replay_cache: LruCache<[u8; 32], Instant>,
203 replay_ttl: Duration,
205}
206
207impl TopicState {
208 fn new() -> Self {
209 Self {
210 eager_peers: HashSet::new(),
211 lazy_peers: HashSet::new(),
212 message_cache: LruCache::new(message_cache_capacity()),
213 pending_ihave: Vec::new(),
214 outstanding_iwants: HashMap::new(),
215 peer_scores: HashMap::new(),
216 subscribers: Vec::new(),
217 replay_cache: LruCache::new(replay_cache_capacity()),
218 replay_ttl: Duration::from_secs(REPLAY_CACHE_TTL_SECS),
219 }
220 }
221
222 fn is_payload_replay(&mut self, payload: &[u8]) -> bool {
228 let key: [u8; 32] = *blake3::hash(payload).as_bytes();
229 if let Some(ts) = self.replay_cache.get(&key) {
230 if ts.elapsed() < self.replay_ttl {
231 return true;
232 }
233 }
234 self.replay_cache.put(key, Instant::now());
235 false
236 }
237
238 fn cached_message_ids(&self) -> Vec<MessageIdType> {
240 self.message_cache.iter().map(|(id, _)| *id).collect()
241 }
242
243 fn has_message(&self, msg_id: &MessageIdType) -> bool {
245 self.message_cache.contains(msg_id)
246 }
247
248 fn cache_message(&mut self, msg_id: MessageIdType, payload: Bytes, header: MessageHeader) {
250 let cached = CachedMessage {
251 payload,
252 timestamp: Instant::now(),
253 header,
254 };
255 self.message_cache.put(msg_id, cached);
256 }
257
258 fn get_message(&mut self, msg_id: &MessageIdType) -> Option<CachedMessage> {
260 self.message_cache.get(msg_id).cloned()
261 }
262
263 fn clean_cache(&mut self) {
265 let now = Instant::now();
266 let ttl = Duration::from_secs(CACHE_TTL_SECS);
267
268 let mut expired = Vec::new();
270 for (msg_id, cached) in self.message_cache.iter() {
271 if now.duration_since(cached.timestamp) > ttl {
272 expired.push(*msg_id);
273 }
274 }
275
276 for msg_id in expired {
278 self.message_cache.pop(&msg_id);
279 }
280
281 let replay_ttl = self.replay_ttl;
283 let mut expired_replay = Vec::new();
284 for (hash, ts) in self.replay_cache.iter() {
285 if now.saturating_duration_since(*ts) > replay_ttl {
286 expired_replay.push(*hash);
287 }
288 }
289 for hash in expired_replay {
290 self.replay_cache.pop(&hash);
291 }
292
293 let score_expiry = Duration::from_secs(600);
296 let now = Instant::now();
297 self.peer_scores
298 .retain(|_, score| now.saturating_duration_since(score.last_seen) < score_expiry);
299 }
300
301 fn prune_peer(&mut self, peer: PeerId) {
303 if self.eager_peers.remove(&peer) {
304 self.lazy_peers.insert(peer);
305 debug!(peer_id = %peer, "PRUNE: moved peer from eager to lazy");
306 }
307 }
308
309 fn graft_peer(&mut self, peer: PeerId) {
311 if self.lazy_peers.remove(&peer) {
312 self.eager_peers.insert(peer);
313 debug!(peer_id = %peer, "GRAFT: moved peer from lazy to eager");
314 }
315 }
316
317 fn maintain_degree(&mut self) {
322 let eager_count = self.eager_peers.len();
323
324 if eager_count < MIN_EAGER_DEGREE && !self.lazy_peers.is_empty() {
325 let to_promote = MIN_EAGER_DEGREE - eager_count;
327 let mut scored_lazy: Vec<(PeerId, f64)> = self
328 .lazy_peers
329 .iter()
330 .map(|&p| {
331 let score = self.peer_scores.get(&p).map_or(0.5, |s| s.score());
332 (p, score)
333 })
334 .collect();
335 scored_lazy.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
336 let peers: Vec<PeerId> = scored_lazy
337 .iter()
338 .take(to_promote)
339 .map(|(p, _)| *p)
340 .collect();
341 for peer in peers {
342 self.graft_peer(peer);
343 }
344 } else if eager_count > MAX_EAGER_DEGREE {
345 let to_demote = eager_count - MAX_EAGER_DEGREE;
347 let mut scored_eager: Vec<(PeerId, f64)> = self
348 .eager_peers
349 .iter()
350 .map(|&p| {
351 let score = self.peer_scores.get(&p).map_or(0.5, |s| s.score());
352 (p, score)
353 })
354 .collect();
355 scored_eager.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
356 let peers: Vec<PeerId> = scored_eager
357 .iter()
358 .take(to_demote)
359 .map(|(p, _)| *p)
360 .collect();
361 for peer in peers {
362 self.prune_peer(peer);
363 }
364 }
365 }
366}
367
368#[async_trait::async_trait]
370pub trait PubSub: Send + Sync {
371 async fn publish(&self, topic: TopicId, data: Bytes) -> Result<()>;
373
374 fn subscribe(&self, topic: TopicId) -> mpsc::UnboundedReceiver<(PeerId, Bytes)>;
376
377 async fn unsubscribe(&self, topic: TopicId) -> Result<()>;
379
380 async fn initialize_topic_peers(&self, topic: TopicId, peers: Vec<PeerId>);
385
386 async fn set_topic_peers(&self, topic: TopicId, connected: Vec<PeerId>) {
393 self.initialize_topic_peers(topic, connected).await;
395 }
396
397 async fn handle_message(&self, from: PeerId, data: Bytes) -> Result<()>;
402
403 async fn trigger_anti_entropy(&self, _topic: TopicId) -> Result<()> {
408 Ok(()) }
410}
411
412pub struct PlumtreePubSub<T: GossipTransport + 'static> {
414 topics: Arc<RwLock<HashMap<TopicId, TopicState>>>,
416 peer_id: PeerId,
418 epoch_start: std::time::SystemTime,
420 transport: Arc<T>,
422 signing_key: Arc<saorsa_gossip_identity::MlDsaKeyPair>,
424}
425
426impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
427 pub fn new(
434 peer_id: PeerId,
435 transport: Arc<T>,
436 signing_key: saorsa_gossip_identity::MlDsaKeyPair,
437 ) -> Self {
438 let pubsub = Self {
439 topics: Arc::new(RwLock::new(HashMap::new())),
440 peer_id,
441 epoch_start: std::time::SystemTime::UNIX_EPOCH,
442 transport,
443 signing_key: Arc::new(signing_key),
444 };
445
446 pubsub.spawn_ihave_flusher();
448 pubsub.spawn_cache_cleaner();
449 pubsub.spawn_degree_maintainer();
450 pubsub.spawn_anti_entropy_task();
451
452 pubsub
453 }
454
455 fn current_epoch(&self) -> u64 {
457 std::time::SystemTime::now()
458 .duration_since(self.epoch_start)
459 .map(|d| d.as_secs())
460 .unwrap_or(0)
461 }
462
463 fn calculate_msg_id(&self, topic: &TopicId, payload: &Bytes) -> MessageIdType {
465 let epoch = self.current_epoch();
466 let payload_hash = blake3::hash(payload.as_ref());
467 MessageHeader::calculate_msg_id(topic, epoch, &self.peer_id, payload_hash.as_bytes())
468 }
469
470 fn sign_message(&self, header: &MessageHeader) -> Vec<u8> {
475 let header_bytes = match postcard::to_stdvec(header) {
477 Ok(bytes) => bytes,
478 Err(e) => {
479 error!("Failed to serialize header for signing: {}", e);
480 return Vec::new();
481 }
482 };
483
484 match self.signing_key.sign(&header_bytes) {
486 Ok(signature) => signature,
487 Err(e) => {
488 error!("Failed to sign message: {}", e);
489 Vec::new()
490 }
491 }
492 }
493
494 fn verify_signature(
504 &self,
505 header: &MessageHeader,
506 signature: &[u8],
507 public_key: &[u8],
508 ) -> bool {
509 let header_bytes = match postcard::to_stdvec(header) {
511 Ok(bytes) => bytes,
512 Err(e) => {
513 warn!("Failed to serialize header for verification: {}", e);
514 return false;
515 }
516 };
517
518 match saorsa_gossip_identity::MlDsaKeyPair::verify(public_key, &header_bytes, signature) {
520 Ok(valid) => valid,
521 Err(e) => {
522 warn!("Failed to verify signature: {}", e);
523 false
524 }
525 }
526 }
527
528 pub async fn publish_local(&self, topic: TopicId, payload: Bytes) -> Result<()> {
530 let msg_id = self.calculate_msg_id(&topic, &payload);
531
532 let header = MessageHeader {
533 version: 1,
534 topic,
535 msg_id,
536 kind: MessageKind::Eager,
537 hop: 0,
538 ttl: 10,
539 };
540
541 let signature = self.sign_message(&header);
542
543 let _message = GossipMessage {
544 header: header.clone(),
545 payload: Some(payload.clone()),
546 signature,
547 public_key: self.signing_key.public_key().to_vec(),
548 };
549
550 let mut topics = self.topics.write().await;
551 let state = topics.entry(topic).or_insert_with(TopicState::new);
552
553 state.cache_message(msg_id, payload.clone(), header);
555
556 let _ = state.is_payload_replay(&payload);
559
560 let eager_peers: Vec<PeerId> = state.eager_peers.iter().copied().collect();
562 drop(topics); for peer in eager_peers {
565 let transport = self.transport.clone();
566 let message = _message.clone();
567 tokio::spawn(async move {
568 trace!(peer_id = %peer, msg_id = ?msg_id, "Sending EAGER");
569 let bytes = match postcard::to_stdvec(&message) {
570 Ok(bytes) => bytes,
571 Err(e) => {
572 warn!(peer_id = %peer, msg_id = ?msg_id, "EAGER serialize failed: {e}");
573 return;
574 }
575 };
576 match transport
577 .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
578 .await
579 {
580 Ok(()) => {}
581 Err(err) => {
582 warn!(peer_id = %peer, msg_id = ?msg_id, "EAGER send failed: {err}");
583 }
584 }
585 });
586 }
587
588 let mut topics = self.topics.write().await;
590 if let Some(state) = topics.get_mut(&topic) {
591 state.pending_ihave.push(msg_id);
592
593 let data = (self.peer_id, payload);
595 state.subscribers.retain(|tx| tx.send(data.clone()).is_ok());
596 }
597
598 Ok(())
599 }
600
601 pub async fn handle_eager(
603 &self,
604 from: PeerId,
605 topic: TopicId,
606 message: GossipMessage,
607 ) -> Result<()> {
608 let msg_id = message.header.msg_id;
609
610 if !self.verify_signature(&message.header, &message.signature, &message.public_key) {
612 warn!(peer_id = %from, msg_id = ?msg_id, "Invalid signature, dropping");
613 return Err(anyhow!("Invalid signature"));
614 }
615
616 let mut topics = self.topics.write().await;
617 let state = topics.entry(topic).or_insert_with(TopicState::new);
618
619 if state.has_message(&msg_id) {
621 state.prune_peer(from);
623 return Ok(());
624 }
625
626 let payload = message
628 .payload
629 .clone()
630 .ok_or_else(|| anyhow!("EAGER missing payload"))?;
631 state.cache_message(msg_id, payload.clone(), message.header.clone());
632
633 state
635 .peer_scores
636 .entry(from)
637 .or_insert_with(PeerScore::new)
638 .record_delivery();
639
640 if state.outstanding_iwants.remove(&msg_id).is_some() {
642 state
643 .peer_scores
644 .entry(from)
645 .or_insert_with(PeerScore::new)
646 .record_iwant_response();
647 }
648
649 if state.is_payload_replay(&payload) {
654 debug!(
655 topic = ?topic,
656 msg_id = ?msg_id,
657 "Payload replay detected — msg_id new but payload hash seen before"
658 );
659 return Ok(());
660 }
661
662 if !state.eager_peers.contains(&from) && !state.lazy_peers.contains(&from) {
666 state.eager_peers.insert(from);
667 debug!(peer_id = %from, topic = ?topic, "Added sender to eager_peers");
668 }
669
670 let sub_count = state.subscribers.len();
672 let data = (from, payload.clone());
673 state.subscribers.retain(|tx| tx.send(data.clone()).is_ok());
674 let delivered = state.subscribers.len();
675 debug!(
676 topic = ?topic,
677 subscribers = sub_count,
678 delivered = delivered,
679 "plumtree handle_eager: delivered to local subscribers"
680 );
681
682 let eager_peers: Vec<PeerId> = state
684 .eager_peers
685 .iter()
686 .filter(|&&p| p != from)
687 .copied()
688 .collect();
689
690 state.pending_ihave.push(msg_id);
692
693 drop(topics); let bytes: Bytes = postcard::to_stdvec(&message)
697 .map_err(|e| anyhow!("EAGER forward serialize failed: {e}"))?
698 .into();
699
700 for peer in eager_peers {
702 trace!(peer_id = %peer, msg_id = ?msg_id, "Forwarding EAGER");
703 if let Err(e) = self
704 .transport
705 .send_to_peer(peer, GossipStreamType::PubSub, bytes.clone())
706 .await
707 {
708 warn!(peer_id = %peer, msg_id = ?msg_id, "EAGER forward failed: {e}");
709 }
710 }
711
712 Ok(())
713 }
714
715 pub async fn handle_ihave(
717 &self,
718 from: PeerId,
719 topic: TopicId,
720 msg_ids: Vec<MessageIdType>,
721 ) -> Result<()> {
722 let mut topics = self.topics.write().await;
723 let state = topics.entry(topic).or_insert_with(TopicState::new);
724
725 let mut requested = Vec::new();
726
727 for msg_id in msg_ids {
728 if state.has_message(&msg_id) {
730 continue;
731 }
732
733 if state.outstanding_iwants.contains_key(&msg_id) {
735 continue;
736 }
737
738 requested.push(msg_id);
740 state
741 .outstanding_iwants
742 .insert(msg_id, (from, Instant::now()));
743
744 state
746 .peer_scores
747 .entry(from)
748 .or_insert_with(PeerScore::new)
749 .record_iwant_request();
750 }
751
752 drop(topics); if !requested.is_empty() {
755 debug!(peer_id = %from, count = requested.len(), "Sending IWANT");
756 let iwant_header = MessageHeader {
758 version: 1,
759 topic,
760 msg_id: requested[0], kind: MessageKind::IWant,
762 hop: 0,
763 ttl: 10,
764 };
765 let iwant_header_clone = iwant_header.clone();
766 let iwant_msg = GossipMessage {
767 header: iwant_header,
768 payload: Some(
769 postcard::to_stdvec(&requested)
770 .map_err(|e| anyhow!("Serialization failed: {}", e))?
771 .into(),
772 ),
773 signature: self.sign_message(&iwant_header_clone),
774 public_key: self.signing_key.public_key().to_vec(),
775 };
776 let bytes = postcard::to_stdvec(&iwant_msg)
777 .map_err(|e| anyhow!("Serialization failed: {}", e))?;
778 self.transport
779 .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
780 .await?;
781 }
782
783 Ok(())
784 }
785
786 pub async fn handle_iwant(
788 &self,
789 from: PeerId,
790 topic: TopicId,
791 msg_ids: Vec<MessageIdType>,
792 ) -> Result<()> {
793 let mut topics = self.topics.write().await;
794 let state = topics.entry(topic).or_insert_with(TopicState::new);
795
796 let mut to_send = Vec::new();
797
798 for msg_id in msg_ids {
799 if let Some(cached) = state.get_message(&msg_id) {
800 to_send.push((msg_id, cached));
801 state.graft_peer(from);
803 } else {
804 warn!(msg_id = ?msg_id, "IWANT for unknown message");
805 }
806 }
807
808 drop(topics); for (msg_id, cached) in to_send {
812 debug!(peer_id = %from, msg_id = ?msg_id, "Sending EAGER in response to IWANT");
813
814 let _message = GossipMessage {
815 header: cached.header.clone(),
816 payload: Some(cached.payload.clone()),
817 signature: self.sign_message(&cached.header),
818 public_key: self.signing_key.public_key().to_vec(),
819 };
820
821 let bytes = postcard::to_stdvec(&_message)
822 .map_err(|e| anyhow!("Serialization failed: {}", e))?;
823 self.transport
824 .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
825 .await?;
826 }
827
828 Ok(())
829 }
830
831 async fn handle_anti_entropy(
836 &self,
837 from: PeerId,
838 topic: TopicId,
839 message: GossipMessage,
840 ) -> Result<()> {
841 if !self.verify_signature(&message.header, &message.signature, &message.public_key) {
843 warn!(peer_id = %from, "Anti-entropy: invalid signature, dropping");
844 return Err(anyhow!("Invalid signature on anti-entropy message"));
845 }
846
847 let payload_bytes = message
848 .payload
849 .ok_or_else(|| anyhow!("Anti-entropy message missing payload"))?;
850
851 let ae_payload: AntiEntropyPayload = postcard::from_bytes(&payload_bytes)
852 .map_err(|e| anyhow!("Failed to deserialize anti-entropy payload: {}", e))?;
853
854 match ae_payload {
855 AntiEntropyPayload::Digest { msg_ids } => {
856 debug!(
857 peer_id = %from,
858 topic = ?topic,
859 their_count = msg_ids.len(),
860 "Received anti-entropy digest"
861 );
862
863 let their_ids: HashSet<MessageIdType> = msg_ids.into_iter().collect();
864
865 let mut topics = self.topics.write().await;
866 let state = topics.entry(topic).or_insert_with(TopicState::new);
867
868 let our_ids: HashSet<MessageIdType> =
869 state.cached_message_ids().into_iter().collect();
870
871 let mut messages_to_send = Vec::new();
873 for id in our_ids.difference(&their_ids) {
874 if let Some(cached) = state.get_message(id) {
875 messages_to_send.push(cached);
876 }
877 }
878
879 let ids_we_need: Vec<MessageIdType> =
881 their_ids.difference(&our_ids).copied().collect();
882
883 drop(topics);
884
885 for cached in &messages_to_send {
887 let eager_msg = GossipMessage {
888 header: cached.header.clone(),
889 payload: Some(cached.payload.clone()),
890 signature: self.sign_message(&cached.header),
891 public_key: self.signing_key.public_key().to_vec(),
892 };
893 if let Ok(bytes) = postcard::to_stdvec(&eager_msg) {
894 let _ = self
895 .transport
896 .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
897 .await;
898 }
899 }
900
901 if !ids_we_need.is_empty() {
903 debug!(
904 peer_id = %from,
905 count = ids_we_need.len(),
906 "Anti-entropy: requesting missing messages via IWANT"
907 );
908 let iwant_header = MessageHeader {
909 version: 1,
910 topic,
911 msg_id: ids_we_need[0],
912 kind: MessageKind::IWant,
913 hop: 0,
914 ttl: 10,
915 };
916 let iwant_header_clone = iwant_header.clone();
917 let iwant_msg = GossipMessage {
918 header: iwant_header,
919 payload: Some(
920 postcard::to_stdvec(&ids_we_need)
921 .map_err(|e| anyhow!("Serialization failed: {}", e))?
922 .into(),
923 ),
924 signature: self.sign_message(&iwant_header_clone),
925 public_key: self.signing_key.public_key().to_vec(),
926 };
927 if let Ok(bytes) = postcard::to_stdvec(&iwant_msg) {
928 let _ = self
929 .transport
930 .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
931 .await;
932 }
933 }
934
935 debug!(
936 peer_id = %from,
937 sent = messages_to_send.len(),
938 requested = ids_we_need.len(),
939 "Anti-entropy digest processed"
940 );
941 }
942 AntiEntropyPayload::Response { missing_ids } => {
943 debug!(
944 peer_id = %from,
945 topic = ?topic,
946 count = missing_ids.len(),
947 "Received anti-entropy response"
948 );
949
950 let topics = self.topics.read().await;
952 let ids_to_request: Vec<MessageIdType> = if let Some(state) = topics.get(&topic) {
953 missing_ids
954 .into_iter()
955 .filter(|id| !state.has_message(id))
956 .collect()
957 } else {
958 missing_ids
959 };
960 drop(topics);
961
962 if !ids_to_request.is_empty() {
964 debug!(
965 peer_id = %from,
966 count = ids_to_request.len(),
967 "Anti-entropy response: sending IWANT for missing IDs"
968 );
969 let iwant_header = MessageHeader {
970 version: 1,
971 topic,
972 msg_id: ids_to_request[0],
973 kind: MessageKind::IWant,
974 hop: 0,
975 ttl: 10,
976 };
977 let iwant_header_clone = iwant_header.clone();
978 let iwant_msg = GossipMessage {
979 header: iwant_header,
980 payload: Some(
981 postcard::to_stdvec(&ids_to_request)
982 .map_err(|e| anyhow!("Serialization failed: {}", e))?
983 .into(),
984 ),
985 signature: self.sign_message(&iwant_header_clone),
986 public_key: self.signing_key.public_key().to_vec(),
987 };
988 if let Ok(bytes) = postcard::to_stdvec(&iwant_msg) {
989 let _ = self
990 .transport
991 .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
992 .await;
993 }
994 }
995 }
996 }
997
998 Ok(())
999 }
1000
1001 async fn send_anti_entropy_digest(&self, topic: TopicId, peer: PeerId) -> Result<()> {
1005 let topics = self.topics.read().await;
1006 let msg_ids = if let Some(state) = topics.get(&topic) {
1007 state.cached_message_ids()
1008 } else {
1009 return Ok(());
1010 };
1011 drop(topics);
1012
1013 if msg_ids.is_empty() {
1014 return Ok(());
1015 }
1016
1017 let ae_payload = AntiEntropyPayload::Digest { msg_ids };
1018 let payload_bytes = postcard::to_stdvec(&ae_payload)
1019 .map_err(|e| anyhow!("Failed to serialize anti-entropy payload: {}", e))?;
1020
1021 let header = MessageHeader {
1022 version: 1,
1023 topic,
1024 msg_id: [0u8; 32],
1025 kind: MessageKind::AntiEntropy,
1026 hop: 0,
1027 ttl: 1,
1028 };
1029
1030 let signature = self.sign_message(&header);
1031
1032 let message = GossipMessage {
1033 header,
1034 payload: Some(payload_bytes.into()),
1035 signature,
1036 public_key: self.signing_key.public_key().to_vec(),
1037 };
1038
1039 let bytes =
1040 postcard::to_stdvec(&message).map_err(|e| anyhow!("Serialization failed: {}", e))?;
1041 self.transport
1042 .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
1043 .await?;
1044
1045 debug!(
1046 peer_id = %peer,
1047 topic = ?topic,
1048 "Sent anti-entropy digest"
1049 );
1050
1051 Ok(())
1052 }
1053
1054 fn spawn_ihave_flusher(&self) {
1056 let topics = self.topics.clone();
1057 let transport = self.transport.clone();
1058 let signing_key = self.signing_key.clone();
1059
1060 tokio::spawn(async move {
1061 let mut interval = time::interval(Duration::from_millis(IHAVE_FLUSH_INTERVAL_MS));
1062
1063 loop {
1064 interval.tick().await;
1065
1066 let mut topics_guard = topics.write().await;
1067
1068 for (topic_id, state) in topics_guard.iter_mut() {
1069 if state.pending_ihave.is_empty() {
1070 continue;
1071 }
1072
1073 let batch: Vec<MessageIdType> = state
1075 .pending_ihave
1076 .drain(..state.pending_ihave.len().min(MAX_IHAVE_BATCH_SIZE))
1077 .collect();
1078
1079 let lazy_peers: Vec<PeerId> = state.lazy_peers.iter().copied().collect();
1080
1081 trace!(topic = ?topic_id, batch_size = batch.len(), peer_count = lazy_peers.len(), "Flushing IHAVE batch");
1082
1083 for peer in lazy_peers {
1085 let ihave_header = MessageHeader {
1086 version: 1,
1087 topic: *topic_id,
1088 msg_id: batch[0], kind: MessageKind::IHave,
1090 hop: 0,
1091 ttl: 10,
1092 };
1093 let ihave_header_clone = ihave_header.clone();
1094
1095 let signature = match postcard::to_stdvec(&ihave_header_clone) {
1097 Ok(bytes) => signing_key.sign(&bytes).unwrap_or_default(),
1098 Err(_) => Vec::new(),
1099 };
1100
1101 let ihave_msg = GossipMessage {
1102 header: ihave_header,
1103 payload: Some(postcard::to_stdvec(&batch).unwrap_or_default().into()),
1104 signature,
1105 public_key: signing_key.public_key().to_vec(),
1106 };
1107 if let Ok(bytes) = postcard::to_stdvec(&ihave_msg) {
1108 let _ = transport
1109 .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
1110 .await;
1111 }
1112 }
1113 }
1114 }
1115 });
1116 }
1117
1118 fn spawn_cache_cleaner(&self) {
1120 let topics = self.topics.clone();
1121
1122 tokio::spawn(async move {
1123 let mut interval = time::interval(Duration::from_secs(60));
1124
1125 loop {
1126 interval.tick().await;
1127
1128 let mut topics_guard = topics.write().await;
1129
1130 for state in topics_guard.values_mut() {
1131 state.clean_cache();
1132 }
1133 }
1134 });
1135 }
1136
1137 fn spawn_degree_maintainer(&self) {
1139 let topics = self.topics.clone();
1140
1141 tokio::spawn(async move {
1142 let mut interval = time::interval(Duration::from_secs(30));
1143
1144 loop {
1145 interval.tick().await;
1146
1147 let mut topics_guard = topics.write().await;
1148
1149 for state in topics_guard.values_mut() {
1150 state.maintain_degree();
1151 }
1152 }
1153 });
1154 }
1155
1156 fn spawn_anti_entropy_task(&self) {
1161 let topics = self.topics.clone();
1162 let transport = self.transport.clone();
1163 let signing_key = self.signing_key.clone();
1164
1165 tokio::spawn(async move {
1166 let mut interval = time::interval(Duration::from_secs(ANTI_ENTROPY_INTERVAL_SECS));
1167
1168 loop {
1169 interval.tick().await;
1170
1171 let topics_guard = topics.read().await;
1172
1173 let mut work: Vec<(TopicId, PeerId, Vec<MessageIdType>)> = Vec::new();
1175
1176 for (topic_id, state) in topics_guard.iter() {
1177 let msg_ids = state.cached_message_ids();
1178 if msg_ids.is_empty() {
1179 continue;
1180 }
1181
1182 let all_peers: Vec<PeerId> = state
1184 .eager_peers
1185 .iter()
1186 .chain(state.lazy_peers.iter())
1187 .copied()
1188 .collect();
1189
1190 if all_peers.is_empty() {
1191 continue;
1192 }
1193
1194 let now = std::time::SystemTime::now()
1196 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1197 .map(|d| d.as_secs())
1198 .unwrap_or(0);
1199 let hash_input = blake3::hash(
1200 &[topic_id.to_bytes().as_slice(), &now.to_le_bytes()].concat(),
1201 );
1202 let hash_bytes = hash_input.as_bytes();
1203 let index = (hash_bytes[0] as usize) % all_peers.len();
1204 let selected_peer = all_peers[index];
1205
1206 work.push((*topic_id, selected_peer, msg_ids));
1207 }
1208
1209 drop(topics_guard);
1210
1211 for (topic_id, peer, msg_ids) in work {
1213 let ae_payload = AntiEntropyPayload::Digest { msg_ids };
1214 let payload_bytes = match postcard::to_stdvec(&ae_payload) {
1215 Ok(bytes) => bytes,
1216 Err(e) => {
1217 warn!("Anti-entropy: failed to serialize payload: {}", e);
1218 continue;
1219 }
1220 };
1221
1222 let header = MessageHeader {
1223 version: 1,
1224 topic: topic_id,
1225 msg_id: [0u8; 32],
1226 kind: MessageKind::AntiEntropy,
1227 hop: 0,
1228 ttl: 1,
1229 };
1230
1231 let signature = match postcard::to_stdvec(&header) {
1232 Ok(bytes) => signing_key.sign(&bytes).unwrap_or_default(),
1233 Err(_) => Vec::new(),
1234 };
1235
1236 let message = GossipMessage {
1237 header,
1238 payload: Some(payload_bytes.into()),
1239 signature,
1240 public_key: signing_key.public_key().to_vec(),
1241 };
1242
1243 if let Ok(bytes) = postcard::to_stdvec(&message) {
1244 let _ = transport
1245 .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
1246 .await;
1247 }
1248
1249 trace!(
1250 peer_id = %peer,
1251 topic = ?topic_id,
1252 "Anti-entropy: sent digest"
1253 );
1254 }
1255 }
1256 });
1257 }
1258
1259 pub async fn initialize_topic_peers(&self, topic: TopicId, peers: Vec<PeerId>) {
1261 let mut topics = self.topics.write().await;
1262 let state = topics.entry(topic).or_insert_with(TopicState::new);
1263
1264 for peer in peers {
1266 state.eager_peers.insert(peer);
1267 }
1268
1269 debug!(topic = ?topic, peer_count = state.eager_peers.len(), "Initialized topic peers");
1270 }
1271
1272 pub async fn set_topic_peers(&self, topic: TopicId, connected: Vec<PeerId>) {
1278 let mut topics = self.topics.write().await;
1279 let state = topics.entry(topic).or_insert_with(TopicState::new);
1280
1281 let connected_set: HashSet<PeerId> = connected.iter().copied().collect();
1282
1283 state.eager_peers.retain(|p| connected_set.contains(p));
1285 state.lazy_peers.retain(|p| connected_set.contains(p));
1286
1287 let to_promote: Vec<PeerId> = state.lazy_peers.iter().copied().collect();
1293 for peer in to_promote {
1294 state.lazy_peers.remove(&peer);
1295 state.eager_peers.insert(peer);
1296 }
1297
1298 for peer in connected {
1300 if !state.eager_peers.contains(&peer) {
1301 state.eager_peers.insert(peer);
1302 }
1303 }
1304
1305 debug!(
1306 topic = ?topic,
1307 eager = state.eager_peers.len(),
1308 lazy = state.lazy_peers.len(),
1309 "Set topic peers"
1310 );
1311 }
1312
1313 pub async fn all_topic_ids(&self) -> Vec<TopicId> {
1321 self.topics.read().await.keys().copied().collect()
1322 }
1323}
1324
1325#[async_trait::async_trait]
1326impl<T: GossipTransport + 'static> PubSub for PlumtreePubSub<T> {
1327 async fn publish(&self, topic: TopicId, data: Bytes) -> Result<()> {
1328 self.publish_local(topic, data).await
1329 }
1330
1331 fn subscribe(&self, topic: TopicId) -> mpsc::UnboundedReceiver<(PeerId, Bytes)> {
1332 let (tx, rx) = mpsc::unbounded_channel();
1333 let topics = self.topics.clone();
1334
1335 tokio::spawn(async move {
1336 let mut topics_guard = topics.write().await;
1337 let state = topics_guard.entry(topic).or_insert_with(TopicState::new);
1338 state.subscribers.push(tx);
1339 });
1340
1341 rx
1342 }
1343
1344 async fn unsubscribe(&self, topic: TopicId) -> Result<()> {
1345 let mut topics = self.topics.write().await;
1346 topics.remove(&topic);
1347 Ok(())
1348 }
1349
1350 async fn initialize_topic_peers(&self, topic: TopicId, peers: Vec<PeerId>) {
1351 PlumtreePubSub::initialize_topic_peers(self, topic, peers).await
1352 }
1353
1354 async fn set_topic_peers(&self, topic: TopicId, connected: Vec<PeerId>) {
1355 PlumtreePubSub::set_topic_peers(self, topic, connected).await
1356 }
1357
1358 async fn handle_message(&self, from: PeerId, data: Bytes) -> Result<()> {
1359 let message: GossipMessage = postcard::from_bytes(&data)
1361 .map_err(|e| anyhow!("Failed to deserialize PubSub message: {}", e))?;
1362
1363 let topic_id = message.header.topic;
1364 let msg_kind = message.header.kind;
1365
1366 debug!(
1367 msg_kind = ?msg_kind,
1368 peer_id = %from,
1369 topic = ?topic_id,
1370 "Handling incoming PubSub message"
1371 );
1372
1373 match msg_kind {
1376 MessageKind::Eager => self.handle_eager(from, topic_id, message).await,
1377 MessageKind::IHave => {
1378 if let Some(payload) = &message.payload {
1380 let msg_ids: Vec<MessageIdType> = postcard::from_bytes(payload)
1381 .map_err(|e| anyhow!("Failed to deserialize IHAVE payload: {}", e))?;
1382 self.handle_ihave(from, topic_id, msg_ids).await
1383 } else {
1384 Err(anyhow!("IHAVE message missing payload"))
1385 }
1386 }
1387 MessageKind::IWant => {
1388 if let Some(payload) = &message.payload {
1390 let msg_ids: Vec<MessageIdType> = postcard::from_bytes(payload)
1391 .map_err(|e| anyhow!("Failed to deserialize IWANT payload: {}", e))?;
1392 self.handle_iwant(from, topic_id, msg_ids).await
1393 } else {
1394 Err(anyhow!("IWANT message missing payload"))
1395 }
1396 }
1397 MessageKind::AntiEntropy => self.handle_anti_entropy(from, topic_id, message).await,
1398 _ => {
1400 warn!(
1401 "PubSub received non-pubsub message kind {:?}, ignoring",
1402 msg_kind
1403 );
1404 Ok(())
1405 }
1406 }
1407 }
1408
1409 async fn trigger_anti_entropy(&self, topic: TopicId) -> Result<()> {
1410 let topics = self.topics.read().await;
1411
1412 let peer = if let Some(state) = topics.get(&topic) {
1413 state
1415 .eager_peers
1416 .iter()
1417 .chain(state.lazy_peers.iter())
1418 .next()
1419 .copied()
1420 } else {
1421 None
1422 };
1423
1424 drop(topics);
1425
1426 if let Some(peer) = peer {
1427 self.send_anti_entropy_digest(topic, peer).await
1428 } else {
1429 Ok(()) }
1431 }
1432}
1433
1434#[cfg(test)]
1435#[allow(clippy::expect_used, clippy::unwrap_used)]
1436mod tests {
1437 use super::*;
1438 use saorsa_gossip_transport::UdpTransportAdapter;
1439 use std::net::SocketAddr;
1440
1441 fn test_peer_id(id: u8) -> PeerId {
1442 let mut bytes = [0u8; 32];
1443 bytes[0] = id;
1444 PeerId::new(bytes)
1445 }
1446
1447 async fn test_transport() -> Arc<UdpTransportAdapter> {
1448 let bind: SocketAddr = "127.0.0.1:0".parse().expect("valid addr");
1449 Arc::new(
1450 UdpTransportAdapter::new(bind, vec![])
1451 .await
1452 .expect("transport"),
1453 )
1454 }
1455
1456 fn test_signing_key() -> saorsa_gossip_identity::MlDsaKeyPair {
1457 saorsa_gossip_identity::MlDsaKeyPair::generate().expect("Failed to generate test key pair")
1458 }
1459
1460 #[tokio::test]
1461 async fn test_pubsub_creation() {
1462 let peer_id = test_peer_id(1);
1463 let transport = test_transport().await;
1464 let signing_key = test_signing_key();
1465 let _pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
1466 }
1467
1468 #[tokio::test]
1469 async fn test_publish_and_subscribe() {
1470 let peer_id = test_peer_id(1);
1471 let transport = test_transport().await;
1472 let signing_key = test_signing_key();
1473 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
1474 let topic = TopicId::new([1u8; 32]);
1475
1476 let mut rx = pubsub.subscribe(topic);
1477 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1478
1479 let data = Bytes::from("test message");
1480 pubsub.publish(topic, data.clone()).await.ok();
1481
1482 let received =
1483 tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await;
1484
1485 assert!(received.is_ok());
1486 let (_, payload) = received.unwrap().unwrap();
1487 assert_eq!(payload, data);
1488 }
1489
1490 #[tokio::test]
1491 async fn test_message_caching() {
1492 let peer_id = test_peer_id(1);
1493 let transport = test_transport().await;
1494 let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1495 let topic = TopicId::new([1u8; 32]);
1496
1497 let payload = Bytes::from("test");
1498 let msg_id = pubsub.calculate_msg_id(&topic, &payload);
1499
1500 pubsub.publish(topic, payload.clone()).await.ok();
1501
1502 let topics = pubsub.topics.read().await;
1504 let state = topics.get(&topic).unwrap();
1505 assert!(state.has_message(&msg_id));
1506 }
1507
1508 #[tokio::test]
1509 async fn test_duplicate_detection_prune() {
1510 let peer_id = test_peer_id(1);
1511 let transport = test_transport().await;
1512 let signing_key = test_signing_key();
1513 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1514 let topic = TopicId::new([1u8; 32]);
1515 let from_peer = test_peer_id(2);
1516
1517 pubsub.initialize_topic_peers(topic, vec![from_peer]).await;
1519
1520 let payload = Bytes::from("test");
1521 let msg_id = pubsub.calculate_msg_id(&topic, &payload);
1522
1523 let header = MessageHeader {
1524 version: 1,
1525 topic,
1526 msg_id,
1527 kind: MessageKind::Eager,
1528 hop: 0,
1529 ttl: 10,
1530 };
1531
1532 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1534 let signature = signing_key.sign(&header_bytes).expect("sign");
1535
1536 let message = GossipMessage {
1537 header,
1538 payload: Some(payload.clone()),
1539 signature,
1540 public_key: signing_key.public_key().to_vec(),
1541 };
1542
1543 pubsub
1545 .handle_eager(from_peer, topic, message.clone())
1546 .await
1547 .ok();
1548
1549 pubsub.handle_eager(from_peer, topic, message).await.ok();
1551
1552 let topics = pubsub.topics.read().await;
1554 let state = topics.get(&topic).unwrap();
1555 assert!(!state.eager_peers.contains(&from_peer));
1556 assert!(state.lazy_peers.contains(&from_peer));
1557 }
1558
1559 #[tokio::test]
1560 async fn test_ihave_handling() {
1561 let peer_id = test_peer_id(1);
1562 let transport = test_transport().await;
1563 let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1564 let topic = TopicId::new([1u8; 32]);
1565 let from_peer = test_peer_id(2);
1566
1567 let unknown_msg_id = [42u8; 32];
1568
1569 pubsub
1570 .handle_ihave(from_peer, topic, vec![unknown_msg_id])
1571 .await
1572 .ok();
1573
1574 let topics = pubsub.topics.read().await;
1576 let state = topics.get(&topic).unwrap();
1577 assert!(state.outstanding_iwants.contains_key(&unknown_msg_id));
1578 }
1579
1580 #[tokio::test]
1581 async fn test_iwant_graft() {
1582 let peer_id = test_peer_id(1);
1583 let transport = test_transport().await;
1584 let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1585 let topic = TopicId::new([1u8; 32]);
1586 let from_peer = test_peer_id(2);
1587
1588 {
1590 let mut topics = pubsub.topics.write().await;
1591 let state = topics.entry(topic).or_insert_with(TopicState::new);
1592 state.lazy_peers.insert(from_peer);
1593 }
1594
1595 let payload = Bytes::from("test");
1597 pubsub.publish(topic, payload.clone()).await.ok();
1598
1599 let msg_id = {
1601 let topics = pubsub.topics.read().await;
1602 let state = topics.get(&topic).unwrap();
1603 state
1605 .message_cache
1606 .peek_lru()
1607 .map(|(id, _)| *id)
1608 .expect("message should be cached")
1609 };
1610
1611 pubsub
1613 .handle_iwant(from_peer, topic, vec![msg_id])
1614 .await
1615 .ok();
1616
1617 let topics = pubsub.topics.read().await;
1619 let state = topics.get(&topic).unwrap();
1620 assert!(state.eager_peers.contains(&from_peer));
1621 assert!(!state.lazy_peers.contains(&from_peer));
1622 }
1623
1624 #[tokio::test]
1625 async fn test_degree_maintenance() {
1626 let peer_id = test_peer_id(1);
1627 let transport = test_transport().await;
1628 let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1629 let topic = TopicId::new([1u8; 32]);
1630
1631 let mut peers = Vec::new();
1633 for i in 2..20 {
1634 peers.push(test_peer_id(i));
1635 }
1636
1637 {
1638 let mut topics = pubsub.topics.write().await;
1639 let state = topics.entry(topic).or_insert_with(TopicState::new);
1640 for peer in &peers {
1641 state.lazy_peers.insert(*peer);
1642 }
1643
1644 state.maintain_degree();
1646
1647 assert!(state.eager_peers.len() >= MIN_EAGER_DEGREE);
1648 }
1649 }
1650
1651 #[tokio::test]
1652 async fn test_cache_expiration() {
1653 let peer_id = test_peer_id(1);
1654 let transport = test_transport().await;
1655 let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1656 let topic = TopicId::new([1u8; 32]);
1657
1658 let payload = Bytes::from("test");
1659 pubsub.publish(topic, payload).await.ok();
1660
1661 {
1663 let mut topics = pubsub.topics.write().await;
1664 let state = topics.get_mut(&topic).unwrap();
1665
1666 for (_, cached) in state.message_cache.iter_mut() {
1668 cached.timestamp = Instant::now() - Duration::from_secs(CACHE_TTL_SECS + 10);
1669 }
1670
1671 state.clean_cache();
1672
1673 assert_eq!(state.message_cache.len(), 0);
1674 }
1675 }
1676
1677 #[tokio::test]
1680 async fn test_message_signing_with_real_mldsa() {
1681 use saorsa_gossip_identity::MlDsaKeyPair;
1683
1684 let keypair = MlDsaKeyPair::generate().expect("keypair");
1685 let peer_id = PeerId::new([1u8; 32]);
1686 let transport = test_transport().await;
1687
1688 let _pubsub = PlumtreePubSub::new(peer_id, transport, keypair.clone());
1690
1691 let topic = TopicId::new([1u8; 32]);
1693 let header = MessageHeader {
1694 version: 1,
1695 topic,
1696 msg_id: [0u8; 32],
1697 kind: MessageKind::Eager,
1698 hop: 0,
1699 ttl: 10,
1700 };
1701
1702 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1704
1705 let signature = keypair.sign(&header_bytes).expect("sign");
1707
1708 assert!(
1710 !signature.is_empty(),
1711 "ML-DSA signature should not be empty"
1712 );
1713
1714 let valid =
1716 MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &signature).expect("verify");
1717 assert!(valid, "Signature should be valid");
1718 }
1719
1720 #[tokio::test]
1721 async fn test_message_signature_verification() {
1722 use saorsa_gossip_identity::MlDsaKeyPair;
1724
1725 let keypair = MlDsaKeyPair::generate().expect("keypair");
1726
1727 let topic = TopicId::new([1u8; 32]);
1728 let header = MessageHeader {
1729 version: 1,
1730 topic,
1731 msg_id: [1u8; 32],
1732 kind: MessageKind::Eager,
1733 hop: 0,
1734 ttl: 10,
1735 };
1736
1737 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1738 let signature = keypair.sign(&header_bytes).expect("sign");
1739
1740 let valid =
1742 MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &signature).expect("verify");
1743 assert!(valid, "Valid signature should verify");
1744
1745 let mut bad_signature = signature.clone();
1747 bad_signature[0] ^= 0xFF; let invalid = MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &bad_signature)
1750 .expect("verify");
1751 assert!(!invalid, "Tampered signature should not verify");
1752 }
1753
1754 #[tokio::test]
1755 async fn test_published_message_has_valid_signature() {
1756 use saorsa_gossip_identity::MlDsaKeyPair;
1758
1759 let keypair = MlDsaKeyPair::generate().expect("keypair");
1760 let peer_id = PeerId::new([1u8; 32]);
1761 let transport = test_transport().await;
1762
1763 let pubsub = PlumtreePubSub::new(peer_id, transport, keypair.clone());
1765
1766 let topic = TopicId::new([1u8; 32]);
1767 let payload = Bytes::from("test message");
1768
1769 pubsub.publish(topic, payload.clone()).await.ok();
1771
1772 let header = MessageHeader {
1775 version: 1,
1776 topic,
1777 msg_id: [0u8; 32],
1778 kind: MessageKind::Eager,
1779 hop: 0,
1780 ttl: 10,
1781 };
1782
1783 let signature = pubsub.sign_message(&header);
1784 assert!(
1785 !signature.is_empty(),
1786 "Published messages should have non-empty signatures"
1787 );
1788
1789 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1791 let valid =
1792 MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &signature).expect("verify");
1793 assert!(valid, "Signature should be valid");
1794 }
1795
1796 #[test]
1799 fn test_anti_entropy_payload_serialization() {
1800 let digest = AntiEntropyPayload::Digest {
1802 msg_ids: vec![[1u8; 32], [2u8; 32], [3u8; 32]],
1803 };
1804 let bytes = postcard::to_stdvec(&digest).expect("serialize digest");
1805 let deserialized: AntiEntropyPayload =
1806 postcard::from_bytes(&bytes).expect("deserialize digest");
1807
1808 match deserialized {
1809 AntiEntropyPayload::Digest { msg_ids } => {
1810 assert_eq!(msg_ids.len(), 3);
1811 assert_eq!(msg_ids[0], [1u8; 32]);
1812 assert_eq!(msg_ids[1], [2u8; 32]);
1813 assert_eq!(msg_ids[2], [3u8; 32]);
1814 }
1815 AntiEntropyPayload::Response { .. } => {
1816 panic!("Expected Digest, got Response");
1817 }
1818 }
1819
1820 let response = AntiEntropyPayload::Response {
1822 missing_ids: vec![[4u8; 32], [5u8; 32]],
1823 };
1824 let bytes = postcard::to_stdvec(&response).expect("serialize response");
1825 let deserialized: AntiEntropyPayload =
1826 postcard::from_bytes(&bytes).expect("deserialize response");
1827
1828 match deserialized {
1829 AntiEntropyPayload::Response { missing_ids } => {
1830 assert_eq!(missing_ids.len(), 2);
1831 assert_eq!(missing_ids[0], [4u8; 32]);
1832 assert_eq!(missing_ids[1], [5u8; 32]);
1833 }
1834 AntiEntropyPayload::Digest { .. } => {
1835 panic!("Expected Response, got Digest");
1836 }
1837 }
1838 }
1839
1840 #[test]
1841 fn test_anti_entropy_payload_empty_serialization() {
1842 let digest = AntiEntropyPayload::Digest {
1844 msg_ids: Vec::new(),
1845 };
1846 let bytes = postcard::to_stdvec(&digest).expect("serialize empty digest");
1847 let deserialized: AntiEntropyPayload =
1848 postcard::from_bytes(&bytes).expect("deserialize empty digest");
1849
1850 match deserialized {
1851 AntiEntropyPayload::Digest { msg_ids } => {
1852 assert!(msg_ids.is_empty());
1853 }
1854 AntiEntropyPayload::Response { .. } => {
1855 panic!("Expected Digest, got Response");
1856 }
1857 }
1858 }
1859
1860 #[tokio::test]
1861 async fn test_cached_message_ids() {
1862 let peer_id = test_peer_id(1);
1863 let transport = test_transport().await;
1864 let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1865 let topic = TopicId::new([1u8; 32]);
1866
1867 pubsub
1869 .publish(topic, Bytes::from("msg1"))
1870 .await
1871 .expect("publish 1");
1872 pubsub
1873 .publish(topic, Bytes::from("msg2"))
1874 .await
1875 .expect("publish 2");
1876 pubsub
1877 .publish(topic, Bytes::from("msg3"))
1878 .await
1879 .expect("publish 3");
1880
1881 let topics = pubsub.topics.read().await;
1883 let state = topics.get(&topic).unwrap();
1884 let ids = state.cached_message_ids();
1885 assert_eq!(ids.len(), 3, "Should have 3 cached message IDs");
1886 }
1887
1888 #[tokio::test]
1889 async fn test_handle_anti_entropy_digest_sends_missing() {
1890 let signing_key = test_signing_key();
1891 let peer_id = test_peer_id(1);
1892 let transport = test_transport().await;
1893 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1894 let topic = TopicId::new([1u8; 32]);
1895 let from_peer = test_peer_id(2);
1896
1897 pubsub
1899 .publish(topic, Bytes::from("cached message"))
1900 .await
1901 .expect("publish");
1902
1903 let our_msg_id = {
1905 let topics = pubsub.topics.read().await;
1906 let state = topics.get(&topic).unwrap();
1907 let ids = state.cached_message_ids();
1908 assert_eq!(ids.len(), 1);
1909 ids[0]
1910 };
1911
1912 let ae_payload = AntiEntropyPayload::Digest {
1914 msg_ids: Vec::new(),
1915 };
1916 let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
1917
1918 let header = MessageHeader {
1919 version: 1,
1920 topic,
1921 msg_id: [0u8; 32],
1922 kind: MessageKind::AntiEntropy,
1923 hop: 0,
1924 ttl: 1,
1925 };
1926
1927 let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
1928 let signature = signing_key.sign(&header_bytes).expect("sign");
1929
1930 let message = GossipMessage {
1931 header,
1932 payload: Some(payload_bytes.into()),
1933 signature,
1934 public_key: signing_key.public_key().to_vec(),
1935 };
1936
1937 let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
1939 assert!(result.is_ok());
1942
1943 let topics = pubsub.topics.read().await;
1945 let state = topics.get(&topic).unwrap();
1946 assert!(state.has_message(&our_msg_id));
1947 }
1948
1949 #[tokio::test]
1950 async fn test_handle_anti_entropy_digest_requests_missing() {
1951 let signing_key = test_signing_key();
1952 let peer_id = test_peer_id(1);
1953 let transport = test_transport().await;
1954 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1955 let topic = TopicId::new([1u8; 32]);
1956 let from_peer = test_peer_id(2);
1957
1958 let remote_msg_id = [99u8; 32];
1960 let ae_payload = AntiEntropyPayload::Digest {
1961 msg_ids: vec![remote_msg_id],
1962 };
1963 let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
1964
1965 let header = MessageHeader {
1966 version: 1,
1967 topic,
1968 msg_id: [0u8; 32],
1969 kind: MessageKind::AntiEntropy,
1970 hop: 0,
1971 ttl: 1,
1972 };
1973
1974 let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
1975 let signature = signing_key.sign(&header_bytes).expect("sign");
1976
1977 let message = GossipMessage {
1978 header,
1979 payload: Some(payload_bytes.into()),
1980 signature,
1981 public_key: signing_key.public_key().to_vec(),
1982 };
1983
1984 let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
1986 assert!(result.is_ok());
1987 }
1988
1989 #[tokio::test]
1990 async fn test_handle_anti_entropy_response() {
1991 let signing_key = test_signing_key();
1992 let peer_id = test_peer_id(1);
1993 let transport = test_transport().await;
1994 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1995 let topic = TopicId::new([1u8; 32]);
1996 let from_peer = test_peer_id(2);
1997
1998 let missing_id = [77u8; 32];
2000 let ae_payload = AntiEntropyPayload::Response {
2001 missing_ids: vec![missing_id],
2002 };
2003 let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
2004
2005 let header = MessageHeader {
2006 version: 1,
2007 topic,
2008 msg_id: [0u8; 32],
2009 kind: MessageKind::AntiEntropy,
2010 hop: 0,
2011 ttl: 1,
2012 };
2013
2014 let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
2015 let signature = signing_key.sign(&header_bytes).expect("sign");
2016
2017 let message = GossipMessage {
2018 header,
2019 payload: Some(payload_bytes.into()),
2020 signature,
2021 public_key: signing_key.public_key().to_vec(),
2022 };
2023
2024 let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
2026 assert!(result.is_ok());
2027 }
2028
2029 #[tokio::test]
2030 async fn test_anti_entropy_invalid_signature_rejected() {
2031 let signing_key = test_signing_key();
2032 let peer_id = test_peer_id(1);
2033 let transport = test_transport().await;
2034 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2035 let topic = TopicId::new([1u8; 32]);
2036 let from_peer = test_peer_id(2);
2037
2038 let ae_payload = AntiEntropyPayload::Digest {
2039 msg_ids: Vec::new(),
2040 };
2041 let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
2042
2043 let header = MessageHeader {
2044 version: 1,
2045 topic,
2046 msg_id: [0u8; 32],
2047 kind: MessageKind::AntiEntropy,
2048 hop: 0,
2049 ttl: 1,
2050 };
2051
2052 let message = GossipMessage {
2054 header,
2055 payload: Some(payload_bytes.into()),
2056 signature: vec![0u8; 100], public_key: signing_key.public_key().to_vec(),
2058 };
2059
2060 let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
2061 assert!(result.is_err(), "Invalid signature should be rejected");
2062 }
2063
2064 #[tokio::test]
2065 async fn test_anti_entropy_message_routing() {
2066 let signing_key = test_signing_key();
2068 let peer_id = test_peer_id(1);
2069 let transport = test_transport().await;
2070 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2071 let topic = TopicId::new([1u8; 32]);
2072 let from_peer = test_peer_id(2);
2073
2074 let ae_payload = AntiEntropyPayload::Digest {
2075 msg_ids: Vec::new(),
2076 };
2077 let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
2078
2079 let header = MessageHeader {
2080 version: 1,
2081 topic,
2082 msg_id: [0u8; 32],
2083 kind: MessageKind::AntiEntropy,
2084 hop: 0,
2085 ttl: 1,
2086 };
2087
2088 let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
2089 let signature = signing_key.sign(&header_bytes).expect("sign");
2090
2091 let message = GossipMessage {
2092 header,
2093 payload: Some(payload_bytes.into()),
2094 signature,
2095 public_key: signing_key.public_key().to_vec(),
2096 };
2097
2098 let wire_bytes = postcard::to_stdvec(&message).expect("serialize wire message");
2100
2101 let result = pubsub.handle_message(from_peer, wire_bytes.into()).await;
2103 assert!(
2104 result.is_ok(),
2105 "AntiEntropy message should be routed correctly"
2106 );
2107 }
2108
2109 #[tokio::test]
2110 async fn test_trigger_anti_entropy() {
2111 let signing_key = test_signing_key();
2112 let peer_id = test_peer_id(1);
2113 let transport = test_transport().await;
2114 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2115 let topic = TopicId::new([1u8; 32]);
2116 let peer = test_peer_id(2);
2117
2118 pubsub.initialize_topic_peers(topic, vec![peer]).await;
2120
2121 pubsub
2123 .publish(topic, Bytes::from("test data"))
2124 .await
2125 .expect("publish");
2126
2127 let result = pubsub.trigger_anti_entropy(topic).await;
2129 let _ = result;
2132 }
2133
2134 #[tokio::test]
2135 async fn test_trigger_anti_entropy_no_peers() {
2136 let signing_key = test_signing_key();
2137 let peer_id = test_peer_id(1);
2138 let transport = test_transport().await;
2139 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2140 let topic = TopicId::new([1u8; 32]);
2141
2142 let result = pubsub.trigger_anti_entropy(topic).await;
2144 assert!(result.is_ok(), "No peers should result in no-op Ok");
2145 }
2146
2147 #[tokio::test]
2148 async fn test_send_anti_entropy_digest() {
2149 let signing_key = test_signing_key();
2150 let peer_id = test_peer_id(1);
2151 let transport = test_transport().await;
2152 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2153 let topic = TopicId::new([1u8; 32]);
2154 let peer = test_peer_id(2);
2155
2156 pubsub
2158 .publish(topic, Bytes::from("digest test"))
2159 .await
2160 .expect("publish");
2161
2162 let _ = pubsub.send_anti_entropy_digest(topic, peer).await;
2164 }
2165
2166 #[tokio::test]
2167 async fn test_send_anti_entropy_digest_empty_cache() {
2168 let signing_key = test_signing_key();
2169 let peer_id = test_peer_id(1);
2170 let transport = test_transport().await;
2171 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2172 let topic = TopicId::new([1u8; 32]);
2173 let peer = test_peer_id(2);
2174
2175 pubsub.initialize_topic_peers(topic, vec![peer]).await;
2177
2178 let result = pubsub.send_anti_entropy_digest(topic, peer).await;
2180 assert!(result.is_ok(), "Empty cache should result in no-op Ok");
2181 }
2182
2183 #[test]
2186 fn test_peer_score_no_requests_no_deliveries() {
2187 let score = PeerScore::new();
2189 let s = score.score();
2190 assert!(
2194 s > 0.6,
2195 "New peer with no activity should have moderate score, got {s}"
2196 );
2197 assert!(s < 1.0, "Score should be below 1.0, got {s}");
2198 }
2199
2200 #[test]
2201 fn test_peer_score_with_deliveries_no_iwant() {
2202 let mut score = PeerScore::new();
2204 score.record_delivery();
2205 score.record_delivery();
2206 score.record_delivery();
2207 let s = score.score();
2208 assert!(
2212 s > 0.8,
2213 "Peer with deliveries should have high score, got {s}"
2214 );
2215 assert!(s <= 1.0, "Score should be at most 1.0, got {s}");
2216 }
2217
2218 #[test]
2219 fn test_peer_score_perfect_iwant_response_rate() {
2220 let mut score = PeerScore::new();
2222 score.record_iwant_request();
2223 score.record_iwant_response();
2224 score.record_iwant_request();
2225 score.record_iwant_response();
2226 let s = score.score();
2227 assert!(
2231 s > 0.9,
2232 "Perfect IWANT response rate should give high score, got {s}"
2233 );
2234 }
2235
2236 #[test]
2237 fn test_peer_score_50_percent_iwant_response_rate() {
2238 let mut score = PeerScore::new();
2240 score.record_iwant_request();
2241 score.record_iwant_response();
2242 score.record_iwant_request();
2243 let s = score.score();
2245 assert!(
2249 s > 0.6,
2250 "50% IWANT response rate should give moderate score, got {s}"
2251 );
2252 assert!(s < 0.85, "50% rate should be below perfect, got {s}");
2253 }
2254
2255 #[test]
2256 fn test_peer_score_recency_decay() {
2257 let mut score = PeerScore::new();
2259 score.record_delivery();
2260 score.last_seen = Instant::now() - Duration::from_secs(350);
2262 let s = score.score();
2263 assert!(
2267 s < 0.55,
2268 "Stale peer should have low score due to recency decay, got {s}"
2269 );
2270 assert!(
2271 s > 0.4,
2272 "Stale peer should still have some score from response rate, got {s}"
2273 );
2274 }
2275
2276 #[tokio::test]
2277 async fn test_eager_records_delivery_score() {
2278 let peer_id = test_peer_id(1);
2279 let transport = test_transport().await;
2280 let signing_key = test_signing_key();
2281 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2282 let topic = TopicId::new([1u8; 32]);
2283 let from_peer = test_peer_id(2);
2284
2285 pubsub.initialize_topic_peers(topic, vec![from_peer]).await;
2287
2288 let payload = Bytes::from("test delivery");
2289 let msg_id = pubsub.calculate_msg_id(&topic, &payload);
2290
2291 let header = MessageHeader {
2292 version: 1,
2293 topic,
2294 msg_id,
2295 kind: MessageKind::Eager,
2296 hop: 0,
2297 ttl: 10,
2298 };
2299
2300 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
2301 let signature = signing_key.sign(&header_bytes).expect("sign");
2302
2303 let message = GossipMessage {
2304 header,
2305 payload: Some(payload),
2306 signature,
2307 public_key: signing_key.public_key().to_vec(),
2308 };
2309
2310 pubsub
2311 .handle_eager(from_peer, topic, message)
2312 .await
2313 .expect("handle_eager");
2314
2315 let topics = pubsub.topics.read().await;
2317 let state = topics.get(&topic).unwrap();
2318 let peer_score = state
2319 .peer_scores
2320 .get(&from_peer)
2321 .expect("peer score should exist");
2322 assert_eq!(
2323 peer_score.messages_delivered, 1,
2324 "Should have 1 delivery recorded"
2325 );
2326 }
2327
2328 #[tokio::test]
2329 async fn test_ihave_iwant_eager_flow_updates_scores() {
2330 let peer_id = test_peer_id(1);
2331 let transport = test_transport().await;
2332 let signing_key = test_signing_key();
2333 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2334 let topic = TopicId::new([1u8; 32]);
2335 let from_peer = test_peer_id(2);
2336
2337 let unknown_msg_id = [42u8; 32];
2338
2339 pubsub
2341 .handle_ihave(from_peer, topic, vec![unknown_msg_id])
2342 .await
2343 .ok();
2344
2345 {
2347 let topics = pubsub.topics.read().await;
2348 let state = topics.get(&topic).unwrap();
2349 let peer_score = state
2350 .peer_scores
2351 .get(&from_peer)
2352 .expect("peer score should exist");
2353 assert_eq!(
2354 peer_score.iwant_requests, 1,
2355 "Should have 1 IWANT request recorded"
2356 );
2357 assert_eq!(
2358 peer_score.iwant_responses, 0,
2359 "Should have 0 IWANT responses yet"
2360 );
2361 }
2362
2363 let payload = Bytes::from("requested message");
2365 let header = MessageHeader {
2366 version: 1,
2367 topic,
2368 msg_id: unknown_msg_id,
2369 kind: MessageKind::Eager,
2370 hop: 0,
2371 ttl: 10,
2372 };
2373
2374 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
2375 let signature = signing_key.sign(&header_bytes).expect("sign");
2376
2377 let message = GossipMessage {
2378 header,
2379 payload: Some(payload),
2380 signature,
2381 public_key: signing_key.public_key().to_vec(),
2382 };
2383
2384 pubsub
2385 .handle_eager(from_peer, topic, message)
2386 .await
2387 .expect("handle_eager");
2388
2389 let topics = pubsub.topics.read().await;
2391 let state = topics.get(&topic).unwrap();
2392 let peer_score = state
2393 .peer_scores
2394 .get(&from_peer)
2395 .expect("peer score should exist");
2396 assert_eq!(
2397 peer_score.iwant_responses, 1,
2398 "Should have 1 IWANT response recorded"
2399 );
2400 assert_eq!(
2401 peer_score.messages_delivered, 1,
2402 "Should have 1 delivery recorded"
2403 );
2404 }
2405
2406 #[test]
2407 fn test_score_based_promotion_highest_first() {
2408 let mut state = TopicState::new();
2410
2411 let peer_high = test_peer_id(10);
2412 let peer_low = test_peer_id(11);
2413 let peer_mid = test_peer_id(12);
2414
2415 state.lazy_peers.insert(peer_high);
2416 state.lazy_peers.insert(peer_low);
2417 state.lazy_peers.insert(peer_mid);
2418
2419 let mut high_score = PeerScore::new();
2421 high_score.messages_delivered = 100;
2422 state.peer_scores.insert(peer_high, high_score);
2423
2424 let mut low_score = PeerScore::new();
2426 low_score.last_seen = Instant::now() - Duration::from_secs(250);
2427 state.peer_scores.insert(peer_low, low_score);
2428
2429 let mut mid_score = PeerScore::new();
2431 mid_score.messages_delivered = 10;
2432 state.peer_scores.insert(peer_mid, mid_score);
2433
2434 state.maintain_degree();
2437
2438 assert!(
2440 state.eager_peers.contains(&peer_high),
2441 "High-scoring peer should be promoted"
2442 );
2443 assert!(
2444 state.eager_peers.contains(&peer_mid),
2445 "Mid-scoring peer should be promoted"
2446 );
2447 assert!(
2448 state.eager_peers.contains(&peer_low),
2449 "Low-scoring peer should be promoted (not enough peers)"
2450 );
2451 }
2452
2453 #[test]
2454 fn test_score_based_demotion_lowest_first() {
2455 let mut state = TopicState::new();
2458
2459 let mut peers = Vec::new();
2461 for i in 0..(MAX_EAGER_DEGREE + 2) {
2462 let peer = test_peer_id(i as u8 + 10);
2463 peers.push(peer);
2464 state.eager_peers.insert(peer);
2465
2466 let mut score = PeerScore::new();
2470 score.iwant_requests = 10;
2471 score.iwant_responses = i as u64;
2472 state.peer_scores.insert(peer, score);
2473 }
2474
2475 let worst_peer = peers[0];
2477 let second_worst = peers[1];
2478
2479 state.maintain_degree();
2480
2481 assert_eq!(
2483 state.eager_peers.len(),
2484 MAX_EAGER_DEGREE,
2485 "Should have MAX_EAGER_DEGREE eager peers"
2486 );
2487
2488 assert!(
2490 state.lazy_peers.contains(&worst_peer),
2491 "Worst-scoring peer should be demoted"
2492 );
2493 assert!(
2494 state.lazy_peers.contains(&second_worst),
2495 "Second-worst peer should be demoted"
2496 );
2497
2498 let best_peer = peers[MAX_EAGER_DEGREE + 1];
2500 assert!(
2501 state.eager_peers.contains(&best_peer),
2502 "Best-scoring peer should remain eager"
2503 );
2504 }
2505
2506 #[test]
2507 fn test_stale_peer_scores_cleaned() {
2508 let mut state = TopicState::new();
2509
2510 let fresh_peer = test_peer_id(20);
2511 let stale_peer = test_peer_id(21);
2512
2513 state.peer_scores.insert(fresh_peer, PeerScore::new());
2515
2516 let mut stale_score = PeerScore::new();
2520 let Some(past) = Instant::now().checked_sub(Duration::from_secs(700)) else {
2521 return;
2523 };
2524 stale_score.last_seen = past;
2525 state.peer_scores.insert(stale_peer, stale_score);
2526
2527 state.clean_cache();
2529
2530 assert!(
2531 state.peer_scores.contains_key(&fresh_peer),
2532 "Fresh peer score should be retained"
2533 );
2534 assert!(
2535 !state.peer_scores.contains_key(&stale_peer),
2536 "Stale peer score should be cleaned up"
2537 );
2538 }
2539
2540 #[tokio::test]
2541 async fn test_set_topic_peers_prunes_stale_eager() {
2542 let peer_id = test_peer_id(1);
2543 let transport = test_transport().await;
2544 let signing_key = test_signing_key();
2545 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2546 let topic = TopicId::new([1u8; 32]);
2547
2548 let peer_a = test_peer_id(2);
2549 let peer_b = test_peer_id(3);
2550
2551 pubsub
2553 .initialize_topic_peers(topic, vec![peer_a, peer_b])
2554 .await;
2555
2556 pubsub.set_topic_peers(topic, vec![peer_a]).await;
2558
2559 let topics = pubsub.topics.read().await;
2560 let state = topics.get(&topic).unwrap();
2561 assert!(state.eager_peers.contains(&peer_a));
2562 assert!(!state.eager_peers.contains(&peer_b));
2563 }
2564
2565 #[tokio::test]
2566 async fn test_set_topic_peers_prunes_stale_lazy() {
2567 let peer_id = test_peer_id(1);
2568 let transport = test_transport().await;
2569 let signing_key = test_signing_key();
2570 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2571 let topic = TopicId::new([1u8; 32]);
2572
2573 let peer_a = test_peer_id(2);
2574 let peer_b = test_peer_id(3);
2575
2576 {
2578 let mut topics = pubsub.topics.write().await;
2579 let state = topics.entry(topic).or_insert_with(TopicState::new);
2580 state.eager_peers.insert(peer_a);
2581 state.lazy_peers.insert(peer_b);
2582 }
2583
2584 pubsub.set_topic_peers(topic, vec![peer_a]).await;
2586
2587 let topics = pubsub.topics.read().await;
2588 let state = topics.get(&topic).unwrap();
2589 assert!(state.eager_peers.contains(&peer_a));
2590 assert!(!state.lazy_peers.contains(&peer_b));
2591 }
2592
2593 #[tokio::test]
2594 async fn test_set_topic_peers_adds_new_as_eager() {
2595 let peer_id = test_peer_id(1);
2596 let transport = test_transport().await;
2597 let signing_key = test_signing_key();
2598 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2599 let topic = TopicId::new([1u8; 32]);
2600
2601 let peer_a = test_peer_id(2);
2602 let peer_b = test_peer_id(3);
2603
2604 pubsub.initialize_topic_peers(topic, vec![peer_a]).await;
2606
2607 pubsub.set_topic_peers(topic, vec![peer_a, peer_b]).await;
2609
2610 let topics = pubsub.topics.read().await;
2611 let state = topics.get(&topic).unwrap();
2612 assert!(state.eager_peers.contains(&peer_a));
2613 assert!(state.eager_peers.contains(&peer_b));
2614 }
2615
2616 #[tokio::test]
2617 async fn test_set_topic_peers_retains_lazy_if_connected() {
2618 let peer_id = test_peer_id(1);
2619 let transport = test_transport().await;
2620 let signing_key = test_signing_key();
2621 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2622 let topic = TopicId::new([1u8; 32]);
2623
2624 let peer_a = test_peer_id(2);
2625 let peer_b = test_peer_id(3);
2626
2627 {
2629 let mut topics = pubsub.topics.write().await;
2630 let state = topics.entry(topic).or_insert_with(TopicState::new);
2631 state.eager_peers.insert(peer_a);
2632 state.lazy_peers.insert(peer_b);
2633 }
2634
2635 pubsub.set_topic_peers(topic, vec![peer_a, peer_b]).await;
2639
2640 let topics = pubsub.topics.read().await;
2641 let state = topics.get(&topic).unwrap();
2642 assert!(state.eager_peers.contains(&peer_a));
2643 assert!(
2644 state.eager_peers.contains(&peer_b),
2645 "Lazy peer should be promoted to eager during refresh"
2646 );
2647 assert!(
2648 !state.lazy_peers.contains(&peer_b),
2649 "Promoted peer should no longer be in lazy set"
2650 );
2651 }
2652
2653 #[tokio::test]
2654 async fn test_set_topic_peers_combined_prune_and_add() {
2655 let peer_id = test_peer_id(1);
2656 let transport = test_transport().await;
2657 let signing_key = test_signing_key();
2658 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2659 let topic = TopicId::new([1u8; 32]);
2660
2661 let peer_a = test_peer_id(2);
2662 let peer_b = test_peer_id(3);
2663 let peer_c = test_peer_id(4);
2664
2665 {
2667 let mut topics = pubsub.topics.write().await;
2668 let state = topics.entry(topic).or_insert_with(TopicState::new);
2669 state.eager_peers.insert(peer_a);
2670 state.lazy_peers.insert(peer_b);
2671 }
2672
2673 pubsub.set_topic_peers(topic, vec![peer_b, peer_c]).await;
2675
2676 let topics = pubsub.topics.read().await;
2677 let state = topics.get(&topic).unwrap();
2678 assert!(
2679 !state.eager_peers.contains(&peer_a),
2680 "Disconnected eager peer should be removed"
2681 );
2682 assert!(
2683 state.eager_peers.contains(&peer_b),
2684 "Connected lazy peer should be promoted to eager"
2685 );
2686 assert!(
2687 !state.lazy_peers.contains(&peer_b),
2688 "Promoted peer should no longer be in lazy set"
2689 );
2690 assert!(
2691 state.eager_peers.contains(&peer_c),
2692 "New peer should be added as eager"
2693 );
2694 }
2695
2696 #[test]
2701 fn test_payload_replay_detected() {
2702 let mut state = TopicState::new();
2703 let payload = b"hello world";
2704
2705 assert!(
2706 !state.is_payload_replay(payload),
2707 "First insert should be new"
2708 );
2709 assert!(
2710 state.is_payload_replay(payload),
2711 "Second insert should be replay"
2712 );
2713 }
2714
2715 #[test]
2716 fn test_payload_replay_different_payloads_pass() {
2717 let mut state = TopicState::new();
2718
2719 assert!(!state.is_payload_replay(b"message 1"));
2720 assert!(!state.is_payload_replay(b"message 2"));
2721 assert!(!state.is_payload_replay(b"message 3"));
2722 }
2723
2724 #[test]
2725 fn test_payload_replay_lru_eviction() {
2726 let mut state = TopicState::new();
2727
2728 for i in 0..REPLAY_CACHE_MAX_ENTRIES + 100 {
2730 let payload = format!("payload-{i}");
2731 assert!(!state.is_payload_replay(payload.as_bytes()));
2732 }
2733
2734 assert!(state.replay_cache.len() <= REPLAY_CACHE_MAX_ENTRIES);
2736
2737 assert!(
2739 !state.is_payload_replay(b"payload-0"),
2740 "Evicted entry should be accepted as new again"
2741 );
2742 }
2743
2744 #[tokio::test]
2745 async fn test_handle_eager_drops_replayed_payload() {
2746 let peer_id = test_peer_id(1);
2747 let transport = test_transport().await;
2748 let signing_key = test_signing_key();
2749 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2750 let topic = TopicId::new([1u8; 32]);
2751
2752 let mut rx = pubsub.subscribe(topic);
2754 tokio::task::yield_now().await;
2755
2756 let payload = Bytes::from("important data");
2757
2758 let msg_id_1 = {
2760 let mut hasher = blake3::Hasher::new();
2761 hasher.update(topic.as_bytes());
2762 hasher.update(&1u64.to_le_bytes()); hasher.update(test_peer_id(2).as_bytes());
2764 hasher.update(blake3::hash(&payload).as_bytes());
2765 let hash = hasher.finalize();
2766 let mut id = [0u8; 32];
2767 id.copy_from_slice(&hash.as_bytes()[..32]);
2768 id
2769 };
2770
2771 let header1 = MessageHeader {
2772 version: 1,
2773 topic,
2774 msg_id: msg_id_1,
2775 kind: MessageKind::Eager,
2776 hop: 0,
2777 ttl: 10,
2778 };
2779 let header_bytes1 = postcard::to_stdvec(&header1).expect("serialize");
2780 let signature1 = signing_key.sign(&header_bytes1).expect("sign");
2781 let message1 = GossipMessage {
2782 header: header1,
2783 payload: Some(payload.clone()),
2784 signature: signature1,
2785 public_key: signing_key.public_key().to_vec(),
2786 };
2787
2788 let msg_id_2 = {
2790 let mut hasher = blake3::Hasher::new();
2791 hasher.update(topic.as_bytes());
2792 hasher.update(&2u64.to_le_bytes()); hasher.update(test_peer_id(3).as_bytes()); hasher.update(blake3::hash(&payload).as_bytes());
2795 let hash = hasher.finalize();
2796 let mut id = [0u8; 32];
2797 id.copy_from_slice(&hash.as_bytes()[..32]);
2798 id
2799 };
2800
2801 let header2 = MessageHeader {
2802 version: 1,
2803 topic,
2804 msg_id: msg_id_2,
2805 kind: MessageKind::Eager,
2806 hop: 0,
2807 ttl: 10,
2808 };
2809 let header_bytes2 = postcard::to_stdvec(&header2).expect("serialize");
2810 let signature2 = signing_key.sign(&header_bytes2).expect("sign");
2811 let message2 = GossipMessage {
2812 header: header2,
2813 payload: Some(payload.clone()),
2814 signature: signature2,
2815 public_key: signing_key.public_key().to_vec(),
2816 };
2817
2818 let from_peer = test_peer_id(2);
2819
2820 pubsub
2822 .handle_eager(from_peer, topic, message1)
2823 .await
2824 .expect("first handle_eager");
2825
2826 let from_peer_2 = test_peer_id(3);
2828 pubsub
2829 .handle_eager(from_peer_2, topic, message2)
2830 .await
2831 .expect("second handle_eager");
2832
2833 let msg = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2835 .await
2836 .expect("should receive first message")
2837 .expect("channel should not be closed");
2838 assert_eq!(msg.1, payload);
2839
2840 let replay = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
2842 assert!(
2843 replay.is_err(),
2844 "Replayed payload should NOT be delivered to subscriber"
2845 );
2846 }
2847
2848 #[tokio::test]
2849 async fn test_publish_local_seeds_replay_cache() {
2850 let peer_id = test_peer_id(1);
2851 let transport = test_transport().await;
2852 let signing_key = test_signing_key();
2853 let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2854 let topic = TopicId::new([1u8; 32]);
2855
2856 let mut rx = pubsub.subscribe(topic);
2858 tokio::task::yield_now().await;
2859
2860 let payload = Bytes::from("local message");
2861
2862 pubsub
2864 .publish(topic, payload.clone())
2865 .await
2866 .expect("publish");
2867
2868 let msg = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2870 .await
2871 .expect("should receive local publish")
2872 .expect("channel open");
2873 assert_eq!(msg.1, payload);
2874
2875 let msg_id = {
2877 let mut hasher = blake3::Hasher::new();
2878 hasher.update(topic.as_bytes());
2879 hasher.update(&99u64.to_le_bytes());
2880 hasher.update(test_peer_id(5).as_bytes());
2881 hasher.update(blake3::hash(&payload).as_bytes());
2882 let hash = hasher.finalize();
2883 let mut id = [0u8; 32];
2884 id.copy_from_slice(&hash.as_bytes()[..32]);
2885 id
2886 };
2887
2888 let header = MessageHeader {
2889 version: 1,
2890 topic,
2891 msg_id,
2892 kind: MessageKind::Eager,
2893 hop: 0,
2894 ttl: 10,
2895 };
2896 let header_bytes = postcard::to_stdvec(&header).expect("serialize");
2897 let signature = signing_key.sign(&header_bytes).expect("sign");
2898 let message = GossipMessage {
2899 header,
2900 payload: Some(payload.clone()),
2901 signature,
2902 public_key: signing_key.public_key().to_vec(),
2903 };
2904
2905 let from_peer = test_peer_id(5);
2906 pubsub
2907 .handle_eager(from_peer, topic, message)
2908 .await
2909 .expect("handle_eager echo");
2910
2911 let echo = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
2913 assert!(
2914 echo.is_err(),
2915 "Network echo of locally published payload should be dropped by replay cache"
2916 );
2917 }
2918}