Skip to main content

saorsa_gossip_pubsub/
lib.rs

1#![warn(missing_docs)]
2
3//! Plumtree-based pub/sub dissemination
4//!
5//! Implements:
6//! - EAGER push along spanning tree
7//! - IHAVE lazy digests to non-tree links
8//! - IWANT pull on demand
9//! - PRUNE/GRAFT for tree optimization
10//! - Anti-entropy reconciliation for partition recovery
11//!
12//! # Architecture
13//!
14//! Each topic maintains two sets of peers:
15//! - **Eager peers** (tree): Forward full messages immediately
16//! - **Lazy peers** (gossip): Send only message IDs (IHAVE)
17//!
18//! The tree self-optimizes via duplicate detection (PRUNE) and pull requests (GRAFT).
19
20use anyhow::{anyhow, Result};
21use bytes::Bytes;
22use lru::LruCache;
23use saorsa_gossip_transport::{GossipStreamType, GossipTransport};
24use saorsa_gossip_types::{MessageHeader, MessageKind, PeerId, TopicId};
25use serde::{Deserialize, Serialize};
26use std::collections::{HashMap, HashSet};
27use std::num::NonZeroUsize;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::sync::{mpsc, RwLock};
31use tokio::time;
32use tracing::{debug, error, trace, warn};
33
34/// Maximum message cache size per topic (10,000 messages)
35const MAX_CACHE_SIZE: usize = 10_000;
36
37/// Message cache TTL (5 minutes)
38const CACHE_TTL_SECS: u64 = 300;
39
40/// Maximum payload replay cache size per topic.
41const REPLAY_CACHE_MAX_ENTRIES: usize = 10_000;
42
43/// Payload replay cache TTL (5 minutes).
44const REPLAY_CACHE_TTL_SECS: u64 = 300;
45
46/// Maximum IHAVE batch size (per SPEC.md)
47const MAX_IHAVE_BATCH_SIZE: usize = 1024;
48
49/// IHAVE flush interval (100ms)
50const IHAVE_FLUSH_INTERVAL_MS: u64 = 100;
51
52/// Anti-entropy reconciliation interval (30 seconds)
53const ANTI_ENTROPY_INTERVAL_SECS: u64 = 30;
54
55/// Target eager peer degree (6-8)
56const MIN_EAGER_DEGREE: usize = 6;
57const MAX_EAGER_DEGREE: usize = 12;
58
59/// Message ID type alias
60type MessageIdType = [u8; 32];
61
62const fn message_cache_capacity() -> NonZeroUsize {
63    // SAFETY: MAX_CACHE_SIZE is a positive constant (10,000)
64    unsafe { NonZeroUsize::new_unchecked(MAX_CACHE_SIZE) }
65}
66
67const fn replay_cache_capacity() -> NonZeroUsize {
68    // SAFETY: REPLAY_CACHE_MAX_ENTRIES is a positive constant (10,000)
69    unsafe { NonZeroUsize::new_unchecked(REPLAY_CACHE_MAX_ENTRIES) }
70}
71
72/// Gossip message wrapper
73#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct GossipMessage {
75    /// Message header
76    pub header: MessageHeader,
77    /// Optional payload (None for IHAVE)
78    pub payload: Option<Bytes>,
79    /// ML-DSA signature over the header
80    pub signature: Vec<u8>,
81    /// Sender's ML-DSA public key for verification
82    pub public_key: Vec<u8>,
83}
84
85/// Anti-entropy reconciliation payload
86///
87/// Used for periodic set reconciliation between peers to recover
88/// messages missed during network partitions.
89#[derive(Clone, Debug, Serialize, Deserialize)]
90enum AntiEntropyPayload {
91    /// "Here are my message IDs, send me anything I'm missing"
92    Digest {
93        /// Message IDs the sender currently has cached
94        msg_ids: Vec<MessageIdType>,
95    },
96    /// "Here are the IDs you're missing" (actual messages follow as EAGER)
97    Response {
98        /// Message IDs the receiver is missing
99        missing_ids: Vec<MessageIdType>,
100    },
101}
102
103/// Per-peer quality score for tree optimization
104///
105/// Tracks delivery metrics to enable score-based promotion/demotion
106/// decisions in the Plumtree spanning tree.
107struct PeerScore {
108    /// Count of EAGER messages received from this peer
109    messages_delivered: u64,
110    /// Count of IWANTs we sent to this peer
111    iwant_requests: u64,
112    /// Count of responses received after sending IWANT
113    iwant_responses: u64,
114    /// Last time we received any message from this peer
115    last_seen: Instant,
116}
117
118impl PeerScore {
119    /// Create a new peer score with default values
120    fn new() -> Self {
121        Self {
122            messages_delivered: 0,
123            iwant_requests: 0,
124            iwant_responses: 0,
125            last_seen: Instant::now(),
126        }
127    }
128
129    /// Calculate peer quality score (0.0 to 1.0)
130    ///
131    /// Score = (iwant_response_rate * 0.6) + (recency_factor * 0.4)
132    fn score(&self) -> f64 {
133        let response_rate = if self.iwant_requests > 0 {
134            self.iwant_responses as f64 / self.iwant_requests as f64
135        } else {
136            // No IWANT requests means peer has been responsive enough via EAGER
137            // Give benefit of the doubt with a moderate score
138            if self.messages_delivered > 0 {
139                0.8
140            } else {
141                0.5
142            }
143        };
144
145        let secs_since_seen = Instant::now()
146            .saturating_duration_since(self.last_seen)
147            .as_secs_f64();
148        let recency = (1.0 - (secs_since_seen / 300.0)).max(0.0);
149
150        (response_rate.min(1.0) * 0.6) + (recency * 0.4)
151    }
152
153    /// Record a message delivery from this peer
154    fn record_delivery(&mut self) {
155        self.messages_delivered += 1;
156        self.last_seen = Instant::now();
157    }
158
159    /// Record that we sent an IWANT request to this peer
160    fn record_iwant_request(&mut self) {
161        self.iwant_requests += 1;
162    }
163
164    /// Record that this peer responded to an IWANT request
165    fn record_iwant_response(&mut self) {
166        self.iwant_responses += 1;
167        self.last_seen = Instant::now();
168    }
169}
170
171/// Cached message entry
172#[derive(Clone)]
173struct CachedMessage {
174    /// Message payload
175    payload: Bytes,
176    /// Timestamp when cached
177    timestamp: Instant,
178    /// Message header
179    header: MessageHeader,
180}
181
182/// Per-topic state
183struct TopicState {
184    /// Spanning tree peers (forward EAGER)
185    eager_peers: HashSet<PeerId>,
186    /// Non-tree peers (send IHAVE only)
187    lazy_peers: HashSet<PeerId>,
188    /// Message cache: msg_id -> cached message
189    message_cache: LruCache<MessageIdType, CachedMessage>,
190    /// Pending IHAVE batch (≤1024 message IDs)
191    pending_ihave: Vec<MessageIdType>,
192    /// Outstanding IWANT requests: msg_id -> (peer, timestamp)
193    outstanding_iwants: HashMap<MessageIdType, (PeerId, Instant)>,
194    /// Per-peer quality scores for tree optimization
195    peer_scores: HashMap<PeerId, PeerScore>,
196    /// Local subscribers
197    subscribers: Vec<mpsc::UnboundedSender<(PeerId, Bytes)>>,
198    /// Payload-level replay cache: BLAKE3(payload) -> insertion time.
199    ///
200    /// Catches replays where the same application payload is wrapped in a
201    /// different gossip envelope (different epoch, sender, msg_id).
202    replay_cache: LruCache<[u8; 32], Instant>,
203    /// TTL for replay cache entries.
204    replay_ttl: Duration,
205}
206
207impl TopicState {
208    fn new() -> Self {
209        Self {
210            eager_peers: HashSet::new(),
211            lazy_peers: HashSet::new(),
212            message_cache: LruCache::new(message_cache_capacity()),
213            pending_ihave: Vec::new(),
214            outstanding_iwants: HashMap::new(),
215            peer_scores: HashMap::new(),
216            subscribers: Vec::new(),
217            replay_cache: LruCache::new(replay_cache_capacity()),
218            replay_ttl: Duration::from_secs(REPLAY_CACHE_TTL_SECS),
219        }
220    }
221
222    /// Check if a payload has been seen before (replay detection).
223    ///
224    /// Returns `true` if this is a replay (payload hash already in cache
225    /// and not expired). Returns `false` if this is a new payload (and
226    /// inserts the hash into the cache).
227    fn is_payload_replay(&mut self, payload: &[u8]) -> bool {
228        let key: [u8; 32] = *blake3::hash(payload).as_bytes();
229        if let Some(ts) = self.replay_cache.get(&key) {
230            if ts.elapsed() < self.replay_ttl {
231                return true;
232            }
233        }
234        self.replay_cache.put(key, Instant::now());
235        false
236    }
237
238    /// Get all cached message IDs for anti-entropy digest
239    fn cached_message_ids(&self) -> Vec<MessageIdType> {
240        self.message_cache.iter().map(|(id, _)| *id).collect()
241    }
242
243    /// Check if message is in cache
244    fn has_message(&self, msg_id: &MessageIdType) -> bool {
245        self.message_cache.contains(msg_id)
246    }
247
248    /// Add message to cache
249    fn cache_message(&mut self, msg_id: MessageIdType, payload: Bytes, header: MessageHeader) {
250        let cached = CachedMessage {
251            payload,
252            timestamp: Instant::now(),
253            header,
254        };
255        self.message_cache.put(msg_id, cached);
256    }
257
258    /// Get cached message
259    fn get_message(&mut self, msg_id: &MessageIdType) -> Option<CachedMessage> {
260        self.message_cache.get(msg_id).cloned()
261    }
262
263    /// Clean expired cache entries
264    fn clean_cache(&mut self) {
265        let now = Instant::now();
266        let ttl = Duration::from_secs(CACHE_TTL_SECS);
267
268        // Collect expired keys
269        let mut expired = Vec::new();
270        for (msg_id, cached) in self.message_cache.iter() {
271            if now.duration_since(cached.timestamp) > ttl {
272                expired.push(*msg_id);
273            }
274        }
275
276        // Remove expired entries
277        for msg_id in expired {
278            self.message_cache.pop(&msg_id);
279        }
280
281        // Clean expired replay cache entries
282        let replay_ttl = self.replay_ttl;
283        let mut expired_replay = Vec::new();
284        for (hash, ts) in self.replay_cache.iter() {
285            if now.saturating_duration_since(*ts) > replay_ttl {
286                expired_replay.push(*hash);
287            }
288        }
289        for hash in expired_replay {
290            self.replay_cache.pop(&hash);
291        }
292
293        // Clean stale peer scores (10 minute expiry)
294        // Use saturating_duration_since to avoid panic on Windows (coarse timer)
295        let score_expiry = Duration::from_secs(600);
296        let now = Instant::now();
297        self.peer_scores
298            .retain(|_, score| now.saturating_duration_since(score.last_seen) < score_expiry);
299    }
300
301    /// Move peer from eager to lazy
302    fn prune_peer(&mut self, peer: PeerId) {
303        if self.eager_peers.remove(&peer) {
304            self.lazy_peers.insert(peer);
305            debug!(peer_id = %peer, "PRUNE: moved peer from eager to lazy");
306        }
307    }
308
309    /// Move peer from lazy to eager
310    fn graft_peer(&mut self, peer: PeerId) {
311        if self.lazy_peers.remove(&peer) {
312            self.eager_peers.insert(peer);
313            debug!(peer_id = %peer, "GRAFT: moved peer from lazy to eager");
314        }
315    }
316
317    /// Maintain eager peer degree (6-12) using score-based selection
318    ///
319    /// Promotes the highest-scoring lazy peers when below minimum degree,
320    /// and demotes the lowest-scoring eager peers when above maximum degree.
321    fn maintain_degree(&mut self) {
322        let eager_count = self.eager_peers.len();
323
324        if eager_count < MIN_EAGER_DEGREE && !self.lazy_peers.is_empty() {
325            // Promote highest-scoring lazy peers
326            let to_promote = MIN_EAGER_DEGREE - eager_count;
327            let mut scored_lazy: Vec<(PeerId, f64)> = self
328                .lazy_peers
329                .iter()
330                .map(|&p| {
331                    let score = self.peer_scores.get(&p).map_or(0.5, |s| s.score());
332                    (p, score)
333                })
334                .collect();
335            scored_lazy.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
336            let peers: Vec<PeerId> = scored_lazy
337                .iter()
338                .take(to_promote)
339                .map(|(p, _)| *p)
340                .collect();
341            for peer in peers {
342                self.graft_peer(peer);
343            }
344        } else if eager_count > MAX_EAGER_DEGREE {
345            // Demote lowest-scoring eager peers
346            let to_demote = eager_count - MAX_EAGER_DEGREE;
347            let mut scored_eager: Vec<(PeerId, f64)> = self
348                .eager_peers
349                .iter()
350                .map(|&p| {
351                    let score = self.peer_scores.get(&p).map_or(0.5, |s| s.score());
352                    (p, score)
353                })
354                .collect();
355            scored_eager.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
356            let peers: Vec<PeerId> = scored_eager
357                .iter()
358                .take(to_demote)
359                .map(|(p, _)| *p)
360                .collect();
361            for peer in peers {
362                self.prune_peer(peer);
363            }
364        }
365    }
366}
367
368/// Pub/sub trait for message dissemination
369#[async_trait::async_trait]
370pub trait PubSub: Send + Sync {
371    /// Publish a message to a topic
372    async fn publish(&self, topic: TopicId, data: Bytes) -> Result<()>;
373
374    /// Subscribe to a topic and receive messages
375    fn subscribe(&self, topic: TopicId) -> mpsc::UnboundedReceiver<(PeerId, Bytes)>;
376
377    /// Unsubscribe from a topic
378    async fn unsubscribe(&self, topic: TopicId) -> Result<()>;
379
380    /// Initialize peers for a topic
381    ///
382    /// Called when subscribing to a topic to populate the eager peers list
383    /// with currently connected peers for message dissemination.
384    async fn initialize_topic_peers(&self, topic: TopicId, peers: Vec<PeerId>);
385
386    /// Replace topic peers with exactly the given set of connected peers.
387    ///
388    /// Removes stale/disconnected peers and adds newly connected ones.
389    ///
390    /// The default implementation falls back to [`Self::initialize_topic_peers`]
391    /// (add-only). Override this method to get full prune-and-replace semantics.
392    async fn set_topic_peers(&self, topic: TopicId, connected: Vec<PeerId>) {
393        // Default: fall back to initialize (add-only)
394        self.initialize_topic_peers(topic, connected).await;
395    }
396
397    /// Handle an incoming pubsub message from a peer
398    ///
399    /// Routes the message to appropriate handler based on MessageKind (Eager, IHave, IWant).
400    /// Called by the transport layer when receiving PubSub messages.
401    async fn handle_message(&self, from: PeerId, data: Bytes) -> Result<()>;
402
403    /// Trigger an anti-entropy round for a specific topic
404    ///
405    /// This is primarily for testing. In production, anti-entropy runs
406    /// automatically via the background task.
407    async fn trigger_anti_entropy(&self, _topic: TopicId) -> Result<()> {
408        Ok(()) // Default no-op
409    }
410}
411
412/// Plumtree pub/sub implementation
413pub struct PlumtreePubSub<T: GossipTransport + 'static> {
414    /// Per-topic state
415    topics: Arc<RwLock<HashMap<TopicId, TopicState>>>,
416    /// Local peer ID
417    peer_id: PeerId,
418    /// Epoch for message IDs (system time in seconds)
419    epoch_start: std::time::SystemTime,
420    /// Transport layer for sending messages
421    transport: Arc<T>,
422    /// ML-DSA key pair for signing messages
423    signing_key: Arc<saorsa_gossip_identity::MlDsaKeyPair>,
424}
425
426impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
427    /// Create a new Plumtree pub/sub instance
428    ///
429    /// # Arguments
430    /// * `peer_id` - Local peer identifier
431    /// * `transport` - Transport layer for network communication
432    /// * `signing_key` - ML-DSA key pair for message signing
433    pub fn new(
434        peer_id: PeerId,
435        transport: Arc<T>,
436        signing_key: saorsa_gossip_identity::MlDsaKeyPair,
437    ) -> Self {
438        let pubsub = Self {
439            topics: Arc::new(RwLock::new(HashMap::new())),
440            peer_id,
441            epoch_start: std::time::SystemTime::UNIX_EPOCH,
442            transport,
443            signing_key: Arc::new(signing_key),
444        };
445
446        // Start background tasks
447        pubsub.spawn_ihave_flusher();
448        pubsub.spawn_cache_cleaner();
449        pubsub.spawn_degree_maintainer();
450        pubsub.spawn_anti_entropy_task();
451
452        pubsub
453    }
454
455    /// Get current epoch (seconds since UNIX_EPOCH)
456    fn current_epoch(&self) -> u64 {
457        std::time::SystemTime::now()
458            .duration_since(self.epoch_start)
459            .map(|d| d.as_secs())
460            .unwrap_or(0)
461    }
462
463    /// Calculate message ID
464    fn calculate_msg_id(&self, topic: &TopicId, payload: &Bytes) -> MessageIdType {
465        let epoch = self.current_epoch();
466        let payload_hash = blake3::hash(payload.as_ref());
467        MessageHeader::calculate_msg_id(topic, epoch, &self.peer_id, payload_hash.as_bytes())
468    }
469
470    /// Sign message header using ML-DSA-65
471    ///
472    /// Serializes the header and signs it with the node's ML-DSA key pair.
473    /// Per SPEC2 §2, all gossip messages MUST be signed for authenticity.
474    fn sign_message(&self, header: &MessageHeader) -> Vec<u8> {
475        // Serialize header for signing
476        let header_bytes = match postcard::to_stdvec(header) {
477            Ok(bytes) => bytes,
478            Err(e) => {
479                error!("Failed to serialize header for signing: {}", e);
480                return Vec::new();
481            }
482        };
483
484        // Sign with ML-DSA-65
485        match self.signing_key.sign(&header_bytes) {
486            Ok(signature) => signature,
487            Err(e) => {
488                error!("Failed to sign message: {}", e);
489                Vec::new()
490            }
491        }
492    }
493
494    /// Verify message signature using ML-DSA-65
495    ///
496    /// # Arguments
497    /// * `header` - Message header to verify
498    /// * `signature` - ML-DSA signature bytes
499    /// * `public_key` - Sender's public key bytes
500    ///
501    /// # Returns
502    /// `true` if signature is valid, `false` otherwise
503    fn verify_signature(
504        &self,
505        header: &MessageHeader,
506        signature: &[u8],
507        public_key: &[u8],
508    ) -> bool {
509        // Serialize header
510        let header_bytes = match postcard::to_stdvec(header) {
511            Ok(bytes) => bytes,
512            Err(e) => {
513                warn!("Failed to serialize header for verification: {}", e);
514                return false;
515            }
516        };
517
518        // Verify signature
519        match saorsa_gossip_identity::MlDsaKeyPair::verify(public_key, &header_bytes, signature) {
520            Ok(valid) => valid,
521            Err(e) => {
522                warn!("Failed to verify signature: {}", e);
523                false
524            }
525        }
526    }
527
528    /// Publish a message (local origin)
529    pub async fn publish_local(&self, topic: TopicId, payload: Bytes) -> Result<()> {
530        let msg_id = self.calculate_msg_id(&topic, &payload);
531
532        let header = MessageHeader {
533            version: 1,
534            topic,
535            msg_id,
536            kind: MessageKind::Eager,
537            hop: 0,
538            ttl: 10,
539        };
540
541        let signature = self.sign_message(&header);
542
543        let _message = GossipMessage {
544            header: header.clone(),
545            payload: Some(payload.clone()),
546            signature,
547            public_key: self.signing_key.public_key().to_vec(),
548        };
549
550        let mut topics = self.topics.write().await;
551        let state = topics.entry(topic).or_insert_with(TopicState::new);
552
553        // Add to cache
554        state.cache_message(msg_id, payload.clone(), header);
555
556        // Seed the replay cache so network echoes of our own publish are
557        // detected as replays (defense-in-depth alongside msg_id dedup).
558        let _ = state.is_payload_replay(&payload);
559
560        // Send EAGER to eager_peers
561        let eager_peers: Vec<PeerId> = state.eager_peers.iter().copied().collect();
562        drop(topics); // Release lock before network I/O
563
564        for peer in eager_peers {
565            let transport = self.transport.clone();
566            let message = _message.clone();
567            tokio::spawn(async move {
568                trace!(peer_id = %peer, msg_id = ?msg_id, "Sending EAGER");
569                let bytes = match postcard::to_stdvec(&message) {
570                    Ok(bytes) => bytes,
571                    Err(e) => {
572                        warn!(peer_id = %peer, msg_id = ?msg_id, "EAGER serialize failed: {e}");
573                        return;
574                    }
575                };
576                match transport
577                    .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
578                    .await
579                {
580                    Ok(()) => {}
581                    Err(err) => {
582                        warn!(peer_id = %peer, msg_id = ?msg_id, "EAGER send failed: {err}");
583                    }
584                }
585            });
586        }
587
588        // Batch msg_id to pending_ihave
589        let mut topics = self.topics.write().await;
590        if let Some(state) = topics.get_mut(&topic) {
591            state.pending_ihave.push(msg_id);
592
593            // Deliver to local subscribers
594            let data = (self.peer_id, payload);
595            state.subscribers.retain(|tx| tx.send(data.clone()).is_ok());
596        }
597
598        Ok(())
599    }
600
601    /// Handle incoming EAGER message
602    pub async fn handle_eager(
603        &self,
604        from: PeerId,
605        topic: TopicId,
606        message: GossipMessage,
607    ) -> Result<()> {
608        let msg_id = message.header.msg_id;
609
610        // Verify signature
611        if !self.verify_signature(&message.header, &message.signature, &message.public_key) {
612            warn!(peer_id = %from, msg_id = ?msg_id, "Invalid signature, dropping");
613            return Err(anyhow!("Invalid signature"));
614        }
615
616        let mut topics = self.topics.write().await;
617        let state = topics.entry(topic).or_insert_with(TopicState::new);
618
619        // Check for duplicate
620        if state.has_message(&msg_id) {
621            // PRUNE: move sender from eager to lazy
622            state.prune_peer(from);
623            return Ok(());
624        }
625
626        // New message - add to cache
627        let payload = message
628            .payload
629            .clone()
630            .ok_or_else(|| anyhow!("EAGER missing payload"))?;
631        state.cache_message(msg_id, payload.clone(), message.header.clone());
632
633        // Update peer score for the sender
634        state
635            .peer_scores
636            .entry(from)
637            .or_insert_with(PeerScore::new)
638            .record_delivery();
639
640        // Check if this message was requested via IWANT (anti-entropy or IHAVE recovery)
641        if state.outstanding_iwants.remove(&msg_id).is_some() {
642            state
643                .peer_scores
644                .entry(from)
645                .or_insert_with(PeerScore::new)
646                .record_iwant_response();
647        }
648
649        // Payload-level replay detection: catches re-wrapped payloads where
650        // the gossip envelope (msg_id) is new but the application payload is identical.
651        // We keep the msg_id cache entry (already done above) so PlumTree's
652        // PRUNE/GRAFT still works, but skip subscriber delivery and forwarding.
653        if state.is_payload_replay(&payload) {
654            debug!(
655                topic = ?topic,
656                msg_id = ?msg_id,
657                "Payload replay detected — msg_id new but payload hash seen before"
658            );
659            return Ok(());
660        }
661
662        // Add sender to eager_peers if not already present
663        // This ensures bidirectional message flow - if a peer sends us messages
664        // on a topic, they've subscribed and should receive our messages too.
665        if !state.eager_peers.contains(&from) && !state.lazy_peers.contains(&from) {
666            state.eager_peers.insert(from);
667            debug!(peer_id = %from, topic = ?topic, "Added sender to eager_peers");
668        }
669
670        // Deliver to local subscribers
671        let sub_count = state.subscribers.len();
672        let data = (from, payload.clone());
673        state.subscribers.retain(|tx| tx.send(data.clone()).is_ok());
674        let delivered = state.subscribers.len();
675        debug!(
676            topic = ?topic,
677            subscribers = sub_count,
678            delivered = delivered,
679            "plumtree handle_eager: delivered to local subscribers"
680        );
681
682        // Forward to eager_peers (except sender)
683        let eager_peers: Vec<PeerId> = state
684            .eager_peers
685            .iter()
686            .filter(|&&p| p != from)
687            .copied()
688            .collect();
689
690        // Batch msg_id to pending_ihave for lazy_peers
691        state.pending_ihave.push(msg_id);
692
693        drop(topics); // Release lock
694
695        // Serialize once — the payload is the same for all peers
696        let bytes: Bytes = postcard::to_stdvec(&message)
697            .map_err(|e| anyhow!("EAGER forward serialize failed: {e}"))?
698            .into();
699
700        // Forward EAGER (best-effort: log failures, don't abort the loop)
701        for peer in eager_peers {
702            trace!(peer_id = %peer, msg_id = ?msg_id, "Forwarding EAGER");
703            if let Err(e) = self
704                .transport
705                .send_to_peer(peer, GossipStreamType::PubSub, bytes.clone())
706                .await
707            {
708                warn!(peer_id = %peer, msg_id = ?msg_id, "EAGER forward failed: {e}");
709            }
710        }
711
712        Ok(())
713    }
714
715    /// Handle incoming IHAVE message
716    pub async fn handle_ihave(
717        &self,
718        from: PeerId,
719        topic: TopicId,
720        msg_ids: Vec<MessageIdType>,
721    ) -> Result<()> {
722        let mut topics = self.topics.write().await;
723        let state = topics.entry(topic).or_insert_with(TopicState::new);
724
725        let mut requested = Vec::new();
726
727        for msg_id in msg_ids {
728            // Skip if we have it
729            if state.has_message(&msg_id) {
730                continue;
731            }
732
733            // Skip if already requested
734            if state.outstanding_iwants.contains_key(&msg_id) {
735                continue;
736            }
737
738            // Request it
739            requested.push(msg_id);
740            state
741                .outstanding_iwants
742                .insert(msg_id, (from, Instant::now()));
743
744            // Track IWANT request for scoring
745            state
746                .peer_scores
747                .entry(from)
748                .or_insert_with(PeerScore::new)
749                .record_iwant_request();
750        }
751
752        drop(topics); // Release lock
753
754        if !requested.is_empty() {
755            debug!(peer_id = %from, count = requested.len(), "Sending IWANT");
756            // Create IWANT message
757            let iwant_header = MessageHeader {
758                version: 1,
759                topic,
760                msg_id: requested[0], // Use first ID as header
761                kind: MessageKind::IWant,
762                hop: 0,
763                ttl: 10,
764            };
765            let iwant_header_clone = iwant_header.clone();
766            let iwant_msg = GossipMessage {
767                header: iwant_header,
768                payload: Some(
769                    postcard::to_stdvec(&requested)
770                        .map_err(|e| anyhow!("Serialization failed: {}", e))?
771                        .into(),
772                ),
773                signature: self.sign_message(&iwant_header_clone),
774                public_key: self.signing_key.public_key().to_vec(),
775            };
776            let bytes = postcard::to_stdvec(&iwant_msg)
777                .map_err(|e| anyhow!("Serialization failed: {}", e))?;
778            self.transport
779                .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
780                .await?;
781        }
782
783        Ok(())
784    }
785
786    /// Handle incoming IWANT message
787    pub async fn handle_iwant(
788        &self,
789        from: PeerId,
790        topic: TopicId,
791        msg_ids: Vec<MessageIdType>,
792    ) -> Result<()> {
793        let mut topics = self.topics.write().await;
794        let state = topics.entry(topic).or_insert_with(TopicState::new);
795
796        let mut to_send = Vec::new();
797
798        for msg_id in msg_ids {
799            if let Some(cached) = state.get_message(&msg_id) {
800                to_send.push((msg_id, cached));
801                // GRAFT: move peer from lazy to eager
802                state.graft_peer(from);
803            } else {
804                warn!(msg_id = ?msg_id, "IWANT for unknown message");
805            }
806        }
807
808        drop(topics); // Release lock
809
810        // Send EAGER with payloads
811        for (msg_id, cached) in to_send {
812            debug!(peer_id = %from, msg_id = ?msg_id, "Sending EAGER in response to IWANT");
813
814            let _message = GossipMessage {
815                header: cached.header.clone(),
816                payload: Some(cached.payload.clone()),
817                signature: self.sign_message(&cached.header),
818                public_key: self.signing_key.public_key().to_vec(),
819            };
820
821            let bytes = postcard::to_stdvec(&_message)
822                .map_err(|e| anyhow!("Serialization failed: {}", e))?;
823            self.transport
824                .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
825                .await?;
826        }
827
828        Ok(())
829    }
830
831    /// Handle incoming anti-entropy message
832    ///
833    /// Processes `AntiEntropyPayload::Digest` and `AntiEntropyPayload::Response`
834    /// messages for set reconciliation after network partitions.
835    async fn handle_anti_entropy(
836        &self,
837        from: PeerId,
838        topic: TopicId,
839        message: GossipMessage,
840    ) -> Result<()> {
841        // Verify signature
842        if !self.verify_signature(&message.header, &message.signature, &message.public_key) {
843            warn!(peer_id = %from, "Anti-entropy: invalid signature, dropping");
844            return Err(anyhow!("Invalid signature on anti-entropy message"));
845        }
846
847        let payload_bytes = message
848            .payload
849            .ok_or_else(|| anyhow!("Anti-entropy message missing payload"))?;
850
851        let ae_payload: AntiEntropyPayload = postcard::from_bytes(&payload_bytes)
852            .map_err(|e| anyhow!("Failed to deserialize anti-entropy payload: {}", e))?;
853
854        match ae_payload {
855            AntiEntropyPayload::Digest { msg_ids } => {
856                debug!(
857                    peer_id = %from,
858                    topic = ?topic,
859                    their_count = msg_ids.len(),
860                    "Received anti-entropy digest"
861                );
862
863                let their_ids: HashSet<MessageIdType> = msg_ids.into_iter().collect();
864
865                let mut topics = self.topics.write().await;
866                let state = topics.entry(topic).or_insert_with(TopicState::new);
867
868                let our_ids: HashSet<MessageIdType> =
869                    state.cached_message_ids().into_iter().collect();
870
871                // IDs we have that they don't - send cached messages as EAGER
872                let mut messages_to_send = Vec::new();
873                for id in our_ids.difference(&their_ids) {
874                    if let Some(cached) = state.get_message(id) {
875                        messages_to_send.push(cached);
876                    }
877                }
878
879                // IDs they have that we don't - we need these
880                let ids_we_need: Vec<MessageIdType> =
881                    their_ids.difference(&our_ids).copied().collect();
882
883                drop(topics);
884
885                // Send cached messages the peer is missing as EAGER
886                for cached in &messages_to_send {
887                    let eager_msg = GossipMessage {
888                        header: cached.header.clone(),
889                        payload: Some(cached.payload.clone()),
890                        signature: self.sign_message(&cached.header),
891                        public_key: self.signing_key.public_key().to_vec(),
892                    };
893                    if let Ok(bytes) = postcard::to_stdvec(&eager_msg) {
894                        let _ = self
895                            .transport
896                            .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
897                            .await;
898                    }
899                }
900
901                // Send IWANT for IDs they have that we don't
902                if !ids_we_need.is_empty() {
903                    debug!(
904                        peer_id = %from,
905                        count = ids_we_need.len(),
906                        "Anti-entropy: requesting missing messages via IWANT"
907                    );
908                    let iwant_header = MessageHeader {
909                        version: 1,
910                        topic,
911                        msg_id: ids_we_need[0],
912                        kind: MessageKind::IWant,
913                        hop: 0,
914                        ttl: 10,
915                    };
916                    let iwant_header_clone = iwant_header.clone();
917                    let iwant_msg = GossipMessage {
918                        header: iwant_header,
919                        payload: Some(
920                            postcard::to_stdvec(&ids_we_need)
921                                .map_err(|e| anyhow!("Serialization failed: {}", e))?
922                                .into(),
923                        ),
924                        signature: self.sign_message(&iwant_header_clone),
925                        public_key: self.signing_key.public_key().to_vec(),
926                    };
927                    if let Ok(bytes) = postcard::to_stdvec(&iwant_msg) {
928                        let _ = self
929                            .transport
930                            .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
931                            .await;
932                    }
933                }
934
935                debug!(
936                    peer_id = %from,
937                    sent = messages_to_send.len(),
938                    requested = ids_we_need.len(),
939                    "Anti-entropy digest processed"
940                );
941            }
942            AntiEntropyPayload::Response { missing_ids } => {
943                debug!(
944                    peer_id = %from,
945                    topic = ?topic,
946                    count = missing_ids.len(),
947                    "Received anti-entropy response"
948                );
949
950                // Filter out IDs we already have
951                let topics = self.topics.read().await;
952                let ids_to_request: Vec<MessageIdType> = if let Some(state) = topics.get(&topic) {
953                    missing_ids
954                        .into_iter()
955                        .filter(|id| !state.has_message(id))
956                        .collect()
957                } else {
958                    missing_ids
959                };
960                drop(topics);
961
962                // Send IWANT for each ID we don't have
963                if !ids_to_request.is_empty() {
964                    debug!(
965                        peer_id = %from,
966                        count = ids_to_request.len(),
967                        "Anti-entropy response: sending IWANT for missing IDs"
968                    );
969                    let iwant_header = MessageHeader {
970                        version: 1,
971                        topic,
972                        msg_id: ids_to_request[0],
973                        kind: MessageKind::IWant,
974                        hop: 0,
975                        ttl: 10,
976                    };
977                    let iwant_header_clone = iwant_header.clone();
978                    let iwant_msg = GossipMessage {
979                        header: iwant_header,
980                        payload: Some(
981                            postcard::to_stdvec(&ids_to_request)
982                                .map_err(|e| anyhow!("Serialization failed: {}", e))?
983                                .into(),
984                        ),
985                        signature: self.sign_message(&iwant_header_clone),
986                        public_key: self.signing_key.public_key().to_vec(),
987                    };
988                    if let Ok(bytes) = postcard::to_stdvec(&iwant_msg) {
989                        let _ = self
990                            .transport
991                            .send_to_peer(from, GossipStreamType::PubSub, bytes.into())
992                            .await;
993                    }
994                }
995            }
996        }
997
998        Ok(())
999    }
1000
1001    /// Send an anti-entropy digest for a specific topic to a specific peer
1002    ///
1003    /// Collects cached message IDs and sends them as an `AntiEntropyPayload::Digest`.
1004    async fn send_anti_entropy_digest(&self, topic: TopicId, peer: PeerId) -> Result<()> {
1005        let topics = self.topics.read().await;
1006        let msg_ids = if let Some(state) = topics.get(&topic) {
1007            state.cached_message_ids()
1008        } else {
1009            return Ok(());
1010        };
1011        drop(topics);
1012
1013        if msg_ids.is_empty() {
1014            return Ok(());
1015        }
1016
1017        let ae_payload = AntiEntropyPayload::Digest { msg_ids };
1018        let payload_bytes = postcard::to_stdvec(&ae_payload)
1019            .map_err(|e| anyhow!("Failed to serialize anti-entropy payload: {}", e))?;
1020
1021        let header = MessageHeader {
1022            version: 1,
1023            topic,
1024            msg_id: [0u8; 32],
1025            kind: MessageKind::AntiEntropy,
1026            hop: 0,
1027            ttl: 1,
1028        };
1029
1030        let signature = self.sign_message(&header);
1031
1032        let message = GossipMessage {
1033            header,
1034            payload: Some(payload_bytes.into()),
1035            signature,
1036            public_key: self.signing_key.public_key().to_vec(),
1037        };
1038
1039        let bytes =
1040            postcard::to_stdvec(&message).map_err(|e| anyhow!("Serialization failed: {}", e))?;
1041        self.transport
1042            .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
1043            .await?;
1044
1045        debug!(
1046            peer_id = %peer,
1047            topic = ?topic,
1048            "Sent anti-entropy digest"
1049        );
1050
1051        Ok(())
1052    }
1053
1054    /// Spawn background task to flush IHAVE batches
1055    fn spawn_ihave_flusher(&self) {
1056        let topics = self.topics.clone();
1057        let transport = self.transport.clone();
1058        let signing_key = self.signing_key.clone();
1059
1060        tokio::spawn(async move {
1061            let mut interval = time::interval(Duration::from_millis(IHAVE_FLUSH_INTERVAL_MS));
1062
1063            loop {
1064                interval.tick().await;
1065
1066                let mut topics_guard = topics.write().await;
1067
1068                for (topic_id, state) in topics_guard.iter_mut() {
1069                    if state.pending_ihave.is_empty() {
1070                        continue;
1071                    }
1072
1073                    // Take up to MAX_IHAVE_BATCH_SIZE
1074                    let batch: Vec<MessageIdType> = state
1075                        .pending_ihave
1076                        .drain(..state.pending_ihave.len().min(MAX_IHAVE_BATCH_SIZE))
1077                        .collect();
1078
1079                    let lazy_peers: Vec<PeerId> = state.lazy_peers.iter().copied().collect();
1080
1081                    trace!(topic = ?topic_id, batch_size = batch.len(), peer_count = lazy_peers.len(), "Flushing IHAVE batch");
1082
1083                    // Send IHAVE to each lazy peer
1084                    for peer in lazy_peers {
1085                        let ihave_header = MessageHeader {
1086                            version: 1,
1087                            topic: *topic_id,
1088                            msg_id: batch[0], // Use first ID as header
1089                            kind: MessageKind::IHave,
1090                            hop: 0,
1091                            ttl: 10,
1092                        };
1093                        let ihave_header_clone = ihave_header.clone();
1094
1095                        // Sign the header
1096                        let signature = match postcard::to_stdvec(&ihave_header_clone) {
1097                            Ok(bytes) => signing_key.sign(&bytes).unwrap_or_default(),
1098                            Err(_) => Vec::new(),
1099                        };
1100
1101                        let ihave_msg = GossipMessage {
1102                            header: ihave_header,
1103                            payload: Some(postcard::to_stdvec(&batch).unwrap_or_default().into()),
1104                            signature,
1105                            public_key: signing_key.public_key().to_vec(),
1106                        };
1107                        if let Ok(bytes) = postcard::to_stdvec(&ihave_msg) {
1108                            let _ = transport
1109                                .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
1110                                .await;
1111                        }
1112                    }
1113                }
1114            }
1115        });
1116    }
1117
1118    /// Spawn background task to clean expired cache entries
1119    fn spawn_cache_cleaner(&self) {
1120        let topics = self.topics.clone();
1121
1122        tokio::spawn(async move {
1123            let mut interval = time::interval(Duration::from_secs(60));
1124
1125            loop {
1126                interval.tick().await;
1127
1128                let mut topics_guard = topics.write().await;
1129
1130                for state in topics_guard.values_mut() {
1131                    state.clean_cache();
1132                }
1133            }
1134        });
1135    }
1136
1137    /// Spawn background task to maintain eager peer degree
1138    fn spawn_degree_maintainer(&self) {
1139        let topics = self.topics.clone();
1140
1141        tokio::spawn(async move {
1142            let mut interval = time::interval(Duration::from_secs(30));
1143
1144            loop {
1145                interval.tick().await;
1146
1147                let mut topics_guard = topics.write().await;
1148
1149                for state in topics_guard.values_mut() {
1150                    state.maintain_degree();
1151                }
1152            }
1153        });
1154    }
1155
1156    /// Spawn background task for anti-entropy reconciliation
1157    ///
1158    /// Every `ANTI_ENTROPY_INTERVAL_SECS` seconds, for each topic with cached messages,
1159    /// picks one random peer and sends an anti-entropy digest containing our cached message IDs.
1160    fn spawn_anti_entropy_task(&self) {
1161        let topics = self.topics.clone();
1162        let transport = self.transport.clone();
1163        let signing_key = self.signing_key.clone();
1164
1165        tokio::spawn(async move {
1166            let mut interval = time::interval(Duration::from_secs(ANTI_ENTROPY_INTERVAL_SECS));
1167
1168            loop {
1169                interval.tick().await;
1170
1171                let topics_guard = topics.read().await;
1172
1173                // Collect work to do (topic, peer, msg_ids) while holding the read lock
1174                let mut work: Vec<(TopicId, PeerId, Vec<MessageIdType>)> = Vec::new();
1175
1176                for (topic_id, state) in topics_guard.iter() {
1177                    let msg_ids = state.cached_message_ids();
1178                    if msg_ids.is_empty() {
1179                        continue;
1180                    }
1181
1182                    // Collect all peers (eager + lazy) for random selection
1183                    let all_peers: Vec<PeerId> = state
1184                        .eager_peers
1185                        .iter()
1186                        .chain(state.lazy_peers.iter())
1187                        .copied()
1188                        .collect();
1189
1190                    if all_peers.is_empty() {
1191                        continue;
1192                    }
1193
1194                    // Pick a deterministic-random peer using hash of topic + current time
1195                    let now = std::time::SystemTime::now()
1196                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
1197                        .map(|d| d.as_secs())
1198                        .unwrap_or(0);
1199                    let hash_input = blake3::hash(
1200                        &[topic_id.to_bytes().as_slice(), &now.to_le_bytes()].concat(),
1201                    );
1202                    let hash_bytes = hash_input.as_bytes();
1203                    let index = (hash_bytes[0] as usize) % all_peers.len();
1204                    let selected_peer = all_peers[index];
1205
1206                    work.push((*topic_id, selected_peer, msg_ids));
1207                }
1208
1209                drop(topics_guard);
1210
1211                // Send digests without holding the lock
1212                for (topic_id, peer, msg_ids) in work {
1213                    let ae_payload = AntiEntropyPayload::Digest { msg_ids };
1214                    let payload_bytes = match postcard::to_stdvec(&ae_payload) {
1215                        Ok(bytes) => bytes,
1216                        Err(e) => {
1217                            warn!("Anti-entropy: failed to serialize payload: {}", e);
1218                            continue;
1219                        }
1220                    };
1221
1222                    let header = MessageHeader {
1223                        version: 1,
1224                        topic: topic_id,
1225                        msg_id: [0u8; 32],
1226                        kind: MessageKind::AntiEntropy,
1227                        hop: 0,
1228                        ttl: 1,
1229                    };
1230
1231                    let signature = match postcard::to_stdvec(&header) {
1232                        Ok(bytes) => signing_key.sign(&bytes).unwrap_or_default(),
1233                        Err(_) => Vec::new(),
1234                    };
1235
1236                    let message = GossipMessage {
1237                        header,
1238                        payload: Some(payload_bytes.into()),
1239                        signature,
1240                        public_key: signing_key.public_key().to_vec(),
1241                    };
1242
1243                    if let Ok(bytes) = postcard::to_stdvec(&message) {
1244                        let _ = transport
1245                            .send_to_peer(peer, GossipStreamType::PubSub, bytes.into())
1246                            .await;
1247                    }
1248
1249                    trace!(
1250                        peer_id = %peer,
1251                        topic = ?topic_id,
1252                        "Anti-entropy: sent digest"
1253                    );
1254                }
1255            }
1256        });
1257    }
1258
1259    /// Initialize peers for a topic from membership layer
1260    pub async fn initialize_topic_peers(&self, topic: TopicId, peers: Vec<PeerId>) {
1261        let mut topics = self.topics.write().await;
1262        let state = topics.entry(topic).or_insert_with(TopicState::new);
1263
1264        // Start with all peers as eager (tree will optimize via PRUNE)
1265        for peer in peers {
1266            state.eager_peers.insert(peer);
1267        }
1268
1269        debug!(topic = ?topic, peer_count = state.eager_peers.len(), "Initialized topic peers");
1270    }
1271
1272    /// Replace topic peers with exactly the given set of connected peers.
1273    ///
1274    /// Removes stale peers that are no longer connected and adds new ones.
1275    /// Peers that were previously moved to `lazy_peers` via PRUNE are left
1276    /// in lazy if they are still connected; otherwise they are removed.
1277    pub async fn set_topic_peers(&self, topic: TopicId, connected: Vec<PeerId>) {
1278        let mut topics = self.topics.write().await;
1279        let state = topics.entry(topic).or_insert_with(TopicState::new);
1280
1281        let connected_set: HashSet<PeerId> = connected.iter().copied().collect();
1282
1283        // Remove stale peers (no longer connected) from both sets.
1284        state.eager_peers.retain(|p| connected_set.contains(p));
1285        state.lazy_peers.retain(|p| connected_set.contains(p));
1286
1287        // Promote all connected lazy peers back to eager. PlumTree's PRUNE
1288        // optimization moves peers to lazy when duplicate messages are detected,
1289        // but the periodic peer refresh should restore them. Without this,
1290        // peers pruned during a message burst stay lazy permanently, breaking
1291        // gossip routing after the burst ends.
1292        let to_promote: Vec<PeerId> = state.lazy_peers.iter().copied().collect();
1293        for peer in to_promote {
1294            state.lazy_peers.remove(&peer);
1295            state.eager_peers.insert(peer);
1296        }
1297
1298        // Add any remaining connected peers not in either set as eager.
1299        for peer in connected {
1300            if !state.eager_peers.contains(&peer) {
1301                state.eager_peers.insert(peer);
1302            }
1303        }
1304
1305        debug!(
1306            topic = ?topic,
1307            eager = state.eager_peers.len(),
1308            lazy = state.lazy_peers.len(),
1309            "Set topic peers"
1310        );
1311    }
1312
1313    /// Return all topic IDs known to PlumTree (subscribed or pass-through).
1314    ///
1315    /// This includes topics that have local subscribers AND topics that only
1316    /// exist because an EAGER message was received and forwarded. The caller
1317    /// should use this to refresh peer sets for all topics, not just locally
1318    /// subscribed ones — otherwise pass-through topics lose their forwarding
1319    /// peers and gossip messages cannot propagate through relay nodes.
1320    pub async fn all_topic_ids(&self) -> Vec<TopicId> {
1321        self.topics.read().await.keys().copied().collect()
1322    }
1323}
1324
1325#[async_trait::async_trait]
1326impl<T: GossipTransport + 'static> PubSub for PlumtreePubSub<T> {
1327    async fn publish(&self, topic: TopicId, data: Bytes) -> Result<()> {
1328        self.publish_local(topic, data).await
1329    }
1330
1331    fn subscribe(&self, topic: TopicId) -> mpsc::UnboundedReceiver<(PeerId, Bytes)> {
1332        let (tx, rx) = mpsc::unbounded_channel();
1333        let topics = self.topics.clone();
1334
1335        tokio::spawn(async move {
1336            let mut topics_guard = topics.write().await;
1337            let state = topics_guard.entry(topic).or_insert_with(TopicState::new);
1338            state.subscribers.push(tx);
1339        });
1340
1341        rx
1342    }
1343
1344    async fn unsubscribe(&self, topic: TopicId) -> Result<()> {
1345        let mut topics = self.topics.write().await;
1346        topics.remove(&topic);
1347        Ok(())
1348    }
1349
1350    async fn initialize_topic_peers(&self, topic: TopicId, peers: Vec<PeerId>) {
1351        PlumtreePubSub::initialize_topic_peers(self, topic, peers).await
1352    }
1353
1354    async fn set_topic_peers(&self, topic: TopicId, connected: Vec<PeerId>) {
1355        PlumtreePubSub::set_topic_peers(self, topic, connected).await
1356    }
1357
1358    async fn handle_message(&self, from: PeerId, data: Bytes) -> Result<()> {
1359        // Deserialize the GossipMessage
1360        let message: GossipMessage = postcard::from_bytes(&data)
1361            .map_err(|e| anyhow!("Failed to deserialize PubSub message: {}", e))?;
1362
1363        let topic_id = message.header.topic;
1364        let msg_kind = message.header.kind;
1365
1366        debug!(
1367            msg_kind = ?msg_kind,
1368            peer_id = %from,
1369            topic = ?topic_id,
1370            "Handling incoming PubSub message"
1371        );
1372
1373        // Route to appropriate handler based on message kind
1374        // Only handle pubsub-specific message kinds (Eager, IHave, IWant)
1375        match msg_kind {
1376            MessageKind::Eager => self.handle_eager(from, topic_id, message).await,
1377            MessageKind::IHave => {
1378                // IHAVE payload contains Vec<MessageIdType>
1379                if let Some(payload) = &message.payload {
1380                    let msg_ids: Vec<MessageIdType> = postcard::from_bytes(payload)
1381                        .map_err(|e| anyhow!("Failed to deserialize IHAVE payload: {}", e))?;
1382                    self.handle_ihave(from, topic_id, msg_ids).await
1383                } else {
1384                    Err(anyhow!("IHAVE message missing payload"))
1385                }
1386            }
1387            MessageKind::IWant => {
1388                // IWANT payload contains Vec<MessageIdType>
1389                if let Some(payload) = &message.payload {
1390                    let msg_ids: Vec<MessageIdType> = postcard::from_bytes(payload)
1391                        .map_err(|e| anyhow!("Failed to deserialize IWANT payload: {}", e))?;
1392                    self.handle_iwant(from, topic_id, msg_ids).await
1393                } else {
1394                    Err(anyhow!("IWANT message missing payload"))
1395                }
1396            }
1397            MessageKind::AntiEntropy => self.handle_anti_entropy(from, topic_id, message).await,
1398            // Other message kinds (Ping, Ack, Find, Presence, Shuffle) are not handled by PubSub
1399            _ => {
1400                warn!(
1401                    "PubSub received non-pubsub message kind {:?}, ignoring",
1402                    msg_kind
1403                );
1404                Ok(())
1405            }
1406        }
1407    }
1408
1409    async fn trigger_anti_entropy(&self, topic: TopicId) -> Result<()> {
1410        let topics = self.topics.read().await;
1411
1412        let peer = if let Some(state) = topics.get(&topic) {
1413            // Pick a peer (any eager or lazy)
1414            state
1415                .eager_peers
1416                .iter()
1417                .chain(state.lazy_peers.iter())
1418                .next()
1419                .copied()
1420        } else {
1421            None
1422        };
1423
1424        drop(topics);
1425
1426        if let Some(peer) = peer {
1427            self.send_anti_entropy_digest(topic, peer).await
1428        } else {
1429            Ok(()) // No peers available
1430        }
1431    }
1432}
1433
1434#[cfg(test)]
1435#[allow(clippy::expect_used, clippy::unwrap_used)]
1436mod tests {
1437    use super::*;
1438    use saorsa_gossip_transport::UdpTransportAdapter;
1439    use std::net::SocketAddr;
1440
1441    fn test_peer_id(id: u8) -> PeerId {
1442        let mut bytes = [0u8; 32];
1443        bytes[0] = id;
1444        PeerId::new(bytes)
1445    }
1446
1447    async fn test_transport() -> Arc<UdpTransportAdapter> {
1448        let bind: SocketAddr = "127.0.0.1:0".parse().expect("valid addr");
1449        Arc::new(
1450            UdpTransportAdapter::new(bind, vec![])
1451                .await
1452                .expect("transport"),
1453        )
1454    }
1455
1456    fn test_signing_key() -> saorsa_gossip_identity::MlDsaKeyPair {
1457        saorsa_gossip_identity::MlDsaKeyPair::generate().expect("Failed to generate test key pair")
1458    }
1459
1460    #[tokio::test]
1461    async fn test_pubsub_creation() {
1462        let peer_id = test_peer_id(1);
1463        let transport = test_transport().await;
1464        let signing_key = test_signing_key();
1465        let _pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
1466    }
1467
1468    #[tokio::test]
1469    async fn test_publish_and_subscribe() {
1470        let peer_id = test_peer_id(1);
1471        let transport = test_transport().await;
1472        let signing_key = test_signing_key();
1473        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
1474        let topic = TopicId::new([1u8; 32]);
1475
1476        let mut rx = pubsub.subscribe(topic);
1477        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1478
1479        let data = Bytes::from("test message");
1480        pubsub.publish(topic, data.clone()).await.ok();
1481
1482        let received =
1483            tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await;
1484
1485        assert!(received.is_ok());
1486        let (_, payload) = received.unwrap().unwrap();
1487        assert_eq!(payload, data);
1488    }
1489
1490    #[tokio::test]
1491    async fn test_message_caching() {
1492        let peer_id = test_peer_id(1);
1493        let transport = test_transport().await;
1494        let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1495        let topic = TopicId::new([1u8; 32]);
1496
1497        let payload = Bytes::from("test");
1498        let msg_id = pubsub.calculate_msg_id(&topic, &payload);
1499
1500        pubsub.publish(topic, payload.clone()).await.ok();
1501
1502        // Check cache
1503        let topics = pubsub.topics.read().await;
1504        let state = topics.get(&topic).unwrap();
1505        assert!(state.has_message(&msg_id));
1506    }
1507
1508    #[tokio::test]
1509    async fn test_duplicate_detection_prune() {
1510        let peer_id = test_peer_id(1);
1511        let transport = test_transport().await;
1512        let signing_key = test_signing_key();
1513        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1514        let topic = TopicId::new([1u8; 32]);
1515        let from_peer = test_peer_id(2);
1516
1517        // Initialize peer as eager
1518        pubsub.initialize_topic_peers(topic, vec![from_peer]).await;
1519
1520        let payload = Bytes::from("test");
1521        let msg_id = pubsub.calculate_msg_id(&topic, &payload);
1522
1523        let header = MessageHeader {
1524            version: 1,
1525            topic,
1526            msg_id,
1527            kind: MessageKind::Eager,
1528            hop: 0,
1529            ttl: 10,
1530        };
1531
1532        // Create properly signed message
1533        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1534        let signature = signing_key.sign(&header_bytes).expect("sign");
1535
1536        let message = GossipMessage {
1537            header,
1538            payload: Some(payload.clone()),
1539            signature,
1540            public_key: signing_key.public_key().to_vec(),
1541        };
1542
1543        // First EAGER - should be accepted
1544        pubsub
1545            .handle_eager(from_peer, topic, message.clone())
1546            .await
1547            .ok();
1548
1549        // Second EAGER - should trigger PRUNE
1550        pubsub.handle_eager(from_peer, topic, message).await.ok();
1551
1552        // Verify peer was moved to lazy
1553        let topics = pubsub.topics.read().await;
1554        let state = topics.get(&topic).unwrap();
1555        assert!(!state.eager_peers.contains(&from_peer));
1556        assert!(state.lazy_peers.contains(&from_peer));
1557    }
1558
1559    #[tokio::test]
1560    async fn test_ihave_handling() {
1561        let peer_id = test_peer_id(1);
1562        let transport = test_transport().await;
1563        let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1564        let topic = TopicId::new([1u8; 32]);
1565        let from_peer = test_peer_id(2);
1566
1567        let unknown_msg_id = [42u8; 32];
1568
1569        pubsub
1570            .handle_ihave(from_peer, topic, vec![unknown_msg_id])
1571            .await
1572            .ok();
1573
1574        // Verify IWANT was tracked
1575        let topics = pubsub.topics.read().await;
1576        let state = topics.get(&topic).unwrap();
1577        assert!(state.outstanding_iwants.contains_key(&unknown_msg_id));
1578    }
1579
1580    #[tokio::test]
1581    async fn test_iwant_graft() {
1582        let peer_id = test_peer_id(1);
1583        let transport = test_transport().await;
1584        let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1585        let topic = TopicId::new([1u8; 32]);
1586        let from_peer = test_peer_id(2);
1587
1588        // Initialize peer as lazy
1589        {
1590            let mut topics = pubsub.topics.write().await;
1591            let state = topics.entry(topic).or_insert_with(TopicState::new);
1592            state.lazy_peers.insert(from_peer);
1593        }
1594
1595        // Publish a message to cache it
1596        let payload = Bytes::from("test");
1597        pubsub.publish(topic, payload.clone()).await.ok();
1598
1599        // Get the actual cached msg_id (don't recalculate - epoch may have changed)
1600        let msg_id = {
1601            let topics = pubsub.topics.read().await;
1602            let state = topics.get(&topic).unwrap();
1603            // Get the first (and only) cached message ID
1604            state
1605                .message_cache
1606                .peek_lru()
1607                .map(|(id, _)| *id)
1608                .expect("message should be cached")
1609        };
1610
1611        // Handle IWANT from lazy peer
1612        pubsub
1613            .handle_iwant(from_peer, topic, vec![msg_id])
1614            .await
1615            .ok();
1616
1617        // Verify peer was grafted to eager
1618        let topics = pubsub.topics.read().await;
1619        let state = topics.get(&topic).unwrap();
1620        assert!(state.eager_peers.contains(&from_peer));
1621        assert!(!state.lazy_peers.contains(&from_peer));
1622    }
1623
1624    #[tokio::test]
1625    async fn test_degree_maintenance() {
1626        let peer_id = test_peer_id(1);
1627        let transport = test_transport().await;
1628        let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1629        let topic = TopicId::new([1u8; 32]);
1630
1631        // Add many peers to lazy
1632        let mut peers = Vec::new();
1633        for i in 2..20 {
1634            peers.push(test_peer_id(i));
1635        }
1636
1637        {
1638            let mut topics = pubsub.topics.write().await;
1639            let state = topics.entry(topic).or_insert_with(TopicState::new);
1640            for peer in &peers {
1641                state.lazy_peers.insert(*peer);
1642            }
1643
1644            // Maintain degree (should promote to reach MIN_EAGER_DEGREE)
1645            state.maintain_degree();
1646
1647            assert!(state.eager_peers.len() >= MIN_EAGER_DEGREE);
1648        }
1649    }
1650
1651    #[tokio::test]
1652    async fn test_cache_expiration() {
1653        let peer_id = test_peer_id(1);
1654        let transport = test_transport().await;
1655        let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1656        let topic = TopicId::new([1u8; 32]);
1657
1658        let payload = Bytes::from("test");
1659        pubsub.publish(topic, payload).await.ok();
1660
1661        // Manually expire cache entry
1662        {
1663            let mut topics = pubsub.topics.write().await;
1664            let state = topics.get_mut(&topic).unwrap();
1665
1666            // Modify timestamp to simulate expiry
1667            for (_, cached) in state.message_cache.iter_mut() {
1668                cached.timestamp = Instant::now() - Duration::from_secs(CACHE_TTL_SECS + 10);
1669            }
1670
1671            state.clean_cache();
1672
1673            assert_eq!(state.message_cache.len(), 0);
1674        }
1675    }
1676
1677    // TDD: RED phase - These tests will fail until we implement real ML-DSA signing
1678
1679    #[tokio::test]
1680    async fn test_message_signing_with_real_mldsa() {
1681        // GREEN: Now implementing real ML-DSA signing
1682        use saorsa_gossip_identity::MlDsaKeyPair;
1683
1684        let keypair = MlDsaKeyPair::generate().expect("keypair");
1685        let peer_id = PeerId::new([1u8; 32]);
1686        let transport = test_transport().await;
1687
1688        // Create PlumtreePubSub with signing key
1689        let _pubsub = PlumtreePubSub::new(peer_id, transport, keypair.clone());
1690
1691        // Create a message header
1692        let topic = TopicId::new([1u8; 32]);
1693        let header = MessageHeader {
1694            version: 1,
1695            topic,
1696            msg_id: [0u8; 32],
1697            kind: MessageKind::Eager,
1698            hop: 0,
1699            ttl: 10,
1700        };
1701
1702        // Serialize header for signing
1703        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1704
1705        // Sign with ML-DSA
1706        let signature = keypair.sign(&header_bytes).expect("sign");
1707
1708        // Signature should NOT be empty
1709        assert!(
1710            !signature.is_empty(),
1711            "ML-DSA signature should not be empty"
1712        );
1713
1714        // Signature should be valid
1715        let valid =
1716            MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &signature).expect("verify");
1717        assert!(valid, "Signature should be valid");
1718    }
1719
1720    #[tokio::test]
1721    async fn test_message_signature_verification() {
1722        // RED: This will fail because verify_signature always returns true
1723        use saorsa_gossip_identity::MlDsaKeyPair;
1724
1725        let keypair = MlDsaKeyPair::generate().expect("keypair");
1726
1727        let topic = TopicId::new([1u8; 32]);
1728        let header = MessageHeader {
1729            version: 1,
1730            topic,
1731            msg_id: [1u8; 32],
1732            kind: MessageKind::Eager,
1733            hop: 0,
1734            ttl: 10,
1735        };
1736
1737        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1738        let signature = keypair.sign(&header_bytes).expect("sign");
1739
1740        // Valid signature should verify
1741        let valid =
1742            MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &signature).expect("verify");
1743        assert!(valid, "Valid signature should verify");
1744
1745        // Tampered signature should NOT verify
1746        let mut bad_signature = signature.clone();
1747        bad_signature[0] ^= 0xFF; // Flip bits
1748
1749        let invalid = MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &bad_signature)
1750            .expect("verify");
1751        assert!(!invalid, "Tampered signature should not verify");
1752    }
1753
1754    #[tokio::test]
1755    async fn test_published_message_has_valid_signature() {
1756        // GREEN: Now verifying that published messages have valid signatures
1757        use saorsa_gossip_identity::MlDsaKeyPair;
1758
1759        let keypair = MlDsaKeyPair::generate().expect("keypair");
1760        let peer_id = PeerId::new([1u8; 32]);
1761        let transport = test_transport().await;
1762
1763        // Create pubsub with signing key
1764        let pubsub = PlumtreePubSub::new(peer_id, transport, keypair.clone());
1765
1766        let topic = TopicId::new([1u8; 32]);
1767        let payload = Bytes::from("test message");
1768
1769        // Publish a message
1770        pubsub.publish(topic, payload.clone()).await.ok();
1771
1772        // The message should be signed internally
1773        // Verify by checking that sign_message produces non-empty signatures
1774        let header = MessageHeader {
1775            version: 1,
1776            topic,
1777            msg_id: [0u8; 32],
1778            kind: MessageKind::Eager,
1779            hop: 0,
1780            ttl: 10,
1781        };
1782
1783        let signature = pubsub.sign_message(&header);
1784        assert!(
1785            !signature.is_empty(),
1786            "Published messages should have non-empty signatures"
1787        );
1788
1789        // Verify the signature is valid
1790        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
1791        let valid =
1792            MlDsaKeyPair::verify(keypair.public_key(), &header_bytes, &signature).expect("verify");
1793        assert!(valid, "Signature should be valid");
1794    }
1795
1796    // Anti-entropy tests
1797
1798    #[test]
1799    fn test_anti_entropy_payload_serialization() {
1800        // Test Digest variant round-trips through postcard
1801        let digest = AntiEntropyPayload::Digest {
1802            msg_ids: vec![[1u8; 32], [2u8; 32], [3u8; 32]],
1803        };
1804        let bytes = postcard::to_stdvec(&digest).expect("serialize digest");
1805        let deserialized: AntiEntropyPayload =
1806            postcard::from_bytes(&bytes).expect("deserialize digest");
1807
1808        match deserialized {
1809            AntiEntropyPayload::Digest { msg_ids } => {
1810                assert_eq!(msg_ids.len(), 3);
1811                assert_eq!(msg_ids[0], [1u8; 32]);
1812                assert_eq!(msg_ids[1], [2u8; 32]);
1813                assert_eq!(msg_ids[2], [3u8; 32]);
1814            }
1815            AntiEntropyPayload::Response { .. } => {
1816                panic!("Expected Digest, got Response");
1817            }
1818        }
1819
1820        // Test Response variant round-trips through postcard
1821        let response = AntiEntropyPayload::Response {
1822            missing_ids: vec![[4u8; 32], [5u8; 32]],
1823        };
1824        let bytes = postcard::to_stdvec(&response).expect("serialize response");
1825        let deserialized: AntiEntropyPayload =
1826            postcard::from_bytes(&bytes).expect("deserialize response");
1827
1828        match deserialized {
1829            AntiEntropyPayload::Response { missing_ids } => {
1830                assert_eq!(missing_ids.len(), 2);
1831                assert_eq!(missing_ids[0], [4u8; 32]);
1832                assert_eq!(missing_ids[1], [5u8; 32]);
1833            }
1834            AntiEntropyPayload::Digest { .. } => {
1835                panic!("Expected Response, got Digest");
1836            }
1837        }
1838    }
1839
1840    #[test]
1841    fn test_anti_entropy_payload_empty_serialization() {
1842        // Empty digest should also round-trip
1843        let digest = AntiEntropyPayload::Digest {
1844            msg_ids: Vec::new(),
1845        };
1846        let bytes = postcard::to_stdvec(&digest).expect("serialize empty digest");
1847        let deserialized: AntiEntropyPayload =
1848            postcard::from_bytes(&bytes).expect("deserialize empty digest");
1849
1850        match deserialized {
1851            AntiEntropyPayload::Digest { msg_ids } => {
1852                assert!(msg_ids.is_empty());
1853            }
1854            AntiEntropyPayload::Response { .. } => {
1855                panic!("Expected Digest, got Response");
1856            }
1857        }
1858    }
1859
1860    #[tokio::test]
1861    async fn test_cached_message_ids() {
1862        let peer_id = test_peer_id(1);
1863        let transport = test_transport().await;
1864        let pubsub = PlumtreePubSub::new(peer_id, transport, test_signing_key());
1865        let topic = TopicId::new([1u8; 32]);
1866
1867        // Publish 3 messages
1868        pubsub
1869            .publish(topic, Bytes::from("msg1"))
1870            .await
1871            .expect("publish 1");
1872        pubsub
1873            .publish(topic, Bytes::from("msg2"))
1874            .await
1875            .expect("publish 2");
1876        pubsub
1877            .publish(topic, Bytes::from("msg3"))
1878            .await
1879            .expect("publish 3");
1880
1881        // Verify cached_message_ids returns all 3
1882        let topics = pubsub.topics.read().await;
1883        let state = topics.get(&topic).unwrap();
1884        let ids = state.cached_message_ids();
1885        assert_eq!(ids.len(), 3, "Should have 3 cached message IDs");
1886    }
1887
1888    #[tokio::test]
1889    async fn test_handle_anti_entropy_digest_sends_missing() {
1890        let signing_key = test_signing_key();
1891        let peer_id = test_peer_id(1);
1892        let transport = test_transport().await;
1893        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1894        let topic = TopicId::new([1u8; 32]);
1895        let from_peer = test_peer_id(2);
1896
1897        // Publish a message so we have it cached
1898        pubsub
1899            .publish(topic, Bytes::from("cached message"))
1900            .await
1901            .expect("publish");
1902
1903        // Get the cached message ID
1904        let our_msg_id = {
1905            let topics = pubsub.topics.read().await;
1906            let state = topics.get(&topic).unwrap();
1907            let ids = state.cached_message_ids();
1908            assert_eq!(ids.len(), 1);
1909            ids[0]
1910        };
1911
1912        // Create a digest from the "remote" peer that has NO messages (empty)
1913        let ae_payload = AntiEntropyPayload::Digest {
1914            msg_ids: Vec::new(),
1915        };
1916        let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
1917
1918        let header = MessageHeader {
1919            version: 1,
1920            topic,
1921            msg_id: [0u8; 32],
1922            kind: MessageKind::AntiEntropy,
1923            hop: 0,
1924            ttl: 1,
1925        };
1926
1927        let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
1928        let signature = signing_key.sign(&header_bytes).expect("sign");
1929
1930        let message = GossipMessage {
1931            header,
1932            payload: Some(payload_bytes.into()),
1933            signature,
1934            public_key: signing_key.public_key().to_vec(),
1935        };
1936
1937        // Handle the digest - should attempt to send our cached message to the peer
1938        let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
1939        // The send may fail (no actual connection) but the method should not error
1940        // on the logic itself
1941        assert!(result.is_ok());
1942
1943        // Our message should still be in cache
1944        let topics = pubsub.topics.read().await;
1945        let state = topics.get(&topic).unwrap();
1946        assert!(state.has_message(&our_msg_id));
1947    }
1948
1949    #[tokio::test]
1950    async fn test_handle_anti_entropy_digest_requests_missing() {
1951        let signing_key = test_signing_key();
1952        let peer_id = test_peer_id(1);
1953        let transport = test_transport().await;
1954        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1955        let topic = TopicId::new([1u8; 32]);
1956        let from_peer = test_peer_id(2);
1957
1958        // We have NO messages cached. The remote peer claims to have some.
1959        let remote_msg_id = [99u8; 32];
1960        let ae_payload = AntiEntropyPayload::Digest {
1961            msg_ids: vec![remote_msg_id],
1962        };
1963        let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
1964
1965        let header = MessageHeader {
1966            version: 1,
1967            topic,
1968            msg_id: [0u8; 32],
1969            kind: MessageKind::AntiEntropy,
1970            hop: 0,
1971            ttl: 1,
1972        };
1973
1974        let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
1975        let signature = signing_key.sign(&header_bytes).expect("sign");
1976
1977        let message = GossipMessage {
1978            header,
1979            payload: Some(payload_bytes.into()),
1980            signature,
1981            public_key: signing_key.public_key().to_vec(),
1982        };
1983
1984        // Handle the digest - should try to send IWANT for the missing message
1985        let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
1986        assert!(result.is_ok());
1987    }
1988
1989    #[tokio::test]
1990    async fn test_handle_anti_entropy_response() {
1991        let signing_key = test_signing_key();
1992        let peer_id = test_peer_id(1);
1993        let transport = test_transport().await;
1994        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
1995        let topic = TopicId::new([1u8; 32]);
1996        let from_peer = test_peer_id(2);
1997
1998        // Create a Response saying we're missing some IDs
1999        let missing_id = [77u8; 32];
2000        let ae_payload = AntiEntropyPayload::Response {
2001            missing_ids: vec![missing_id],
2002        };
2003        let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
2004
2005        let header = MessageHeader {
2006            version: 1,
2007            topic,
2008            msg_id: [0u8; 32],
2009            kind: MessageKind::AntiEntropy,
2010            hop: 0,
2011            ttl: 1,
2012        };
2013
2014        let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
2015        let signature = signing_key.sign(&header_bytes).expect("sign");
2016
2017        let message = GossipMessage {
2018            header,
2019            payload: Some(payload_bytes.into()),
2020            signature,
2021            public_key: signing_key.public_key().to_vec(),
2022        };
2023
2024        // Handle the response - should try to send IWANT
2025        let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
2026        assert!(result.is_ok());
2027    }
2028
2029    #[tokio::test]
2030    async fn test_anti_entropy_invalid_signature_rejected() {
2031        let signing_key = test_signing_key();
2032        let peer_id = test_peer_id(1);
2033        let transport = test_transport().await;
2034        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2035        let topic = TopicId::new([1u8; 32]);
2036        let from_peer = test_peer_id(2);
2037
2038        let ae_payload = AntiEntropyPayload::Digest {
2039            msg_ids: Vec::new(),
2040        };
2041        let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
2042
2043        let header = MessageHeader {
2044            version: 1,
2045            topic,
2046            msg_id: [0u8; 32],
2047            kind: MessageKind::AntiEntropy,
2048            hop: 0,
2049            ttl: 1,
2050        };
2051
2052        // Use a BAD signature
2053        let message = GossipMessage {
2054            header,
2055            payload: Some(payload_bytes.into()),
2056            signature: vec![0u8; 100], // invalid signature
2057            public_key: signing_key.public_key().to_vec(),
2058        };
2059
2060        let result = pubsub.handle_anti_entropy(from_peer, topic, message).await;
2061        assert!(result.is_err(), "Invalid signature should be rejected");
2062    }
2063
2064    #[tokio::test]
2065    async fn test_anti_entropy_message_routing() {
2066        // Test that AntiEntropy messages are correctly routed via handle_message
2067        let signing_key = test_signing_key();
2068        let peer_id = test_peer_id(1);
2069        let transport = test_transport().await;
2070        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2071        let topic = TopicId::new([1u8; 32]);
2072        let from_peer = test_peer_id(2);
2073
2074        let ae_payload = AntiEntropyPayload::Digest {
2075            msg_ids: Vec::new(),
2076        };
2077        let payload_bytes = postcard::to_stdvec(&ae_payload).expect("serialize");
2078
2079        let header = MessageHeader {
2080            version: 1,
2081            topic,
2082            msg_id: [0u8; 32],
2083            kind: MessageKind::AntiEntropy,
2084            hop: 0,
2085            ttl: 1,
2086        };
2087
2088        let header_bytes = postcard::to_stdvec(&header).expect("serialize header");
2089        let signature = signing_key.sign(&header_bytes).expect("sign");
2090
2091        let message = GossipMessage {
2092            header,
2093            payload: Some(payload_bytes.into()),
2094            signature,
2095            public_key: signing_key.public_key().to_vec(),
2096        };
2097
2098        // Serialize the full message as it would come over the wire
2099        let wire_bytes = postcard::to_stdvec(&message).expect("serialize wire message");
2100
2101        // Route through handle_message (the PubSub trait method)
2102        let result = pubsub.handle_message(from_peer, wire_bytes.into()).await;
2103        assert!(
2104            result.is_ok(),
2105            "AntiEntropy message should be routed correctly"
2106        );
2107    }
2108
2109    #[tokio::test]
2110    async fn test_trigger_anti_entropy() {
2111        let signing_key = test_signing_key();
2112        let peer_id = test_peer_id(1);
2113        let transport = test_transport().await;
2114        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2115        let topic = TopicId::new([1u8; 32]);
2116        let peer = test_peer_id(2);
2117
2118        // Initialize with a peer
2119        pubsub.initialize_topic_peers(topic, vec![peer]).await;
2120
2121        // Publish a message so there's something to reconcile
2122        pubsub
2123            .publish(topic, Bytes::from("test data"))
2124            .await
2125            .expect("publish");
2126
2127        // Trigger anti-entropy manually (send may fail on transport, but logic is correct)
2128        let result = pubsub.trigger_anti_entropy(topic).await;
2129        // The result may be Ok or Err depending on transport - we're testing the logic path
2130        // If transport fails, the error is from send_to_peer, not from our logic
2131        let _ = result;
2132    }
2133
2134    #[tokio::test]
2135    async fn test_trigger_anti_entropy_no_peers() {
2136        let signing_key = test_signing_key();
2137        let peer_id = test_peer_id(1);
2138        let transport = test_transport().await;
2139        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2140        let topic = TopicId::new([1u8; 32]);
2141
2142        // No peers initialized - should return Ok without doing anything
2143        let result = pubsub.trigger_anti_entropy(topic).await;
2144        assert!(result.is_ok(), "No peers should result in no-op Ok");
2145    }
2146
2147    #[tokio::test]
2148    async fn test_send_anti_entropy_digest() {
2149        let signing_key = test_signing_key();
2150        let peer_id = test_peer_id(1);
2151        let transport = test_transport().await;
2152        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2153        let topic = TopicId::new([1u8; 32]);
2154        let peer = test_peer_id(2);
2155
2156        // Publish a message so there's a cached message
2157        pubsub
2158            .publish(topic, Bytes::from("digest test"))
2159            .await
2160            .expect("publish");
2161
2162        // Send digest (transport send may fail, but serialization and logic should work)
2163        let _ = pubsub.send_anti_entropy_digest(topic, peer).await;
2164    }
2165
2166    #[tokio::test]
2167    async fn test_send_anti_entropy_digest_empty_cache() {
2168        let signing_key = test_signing_key();
2169        let peer_id = test_peer_id(1);
2170        let transport = test_transport().await;
2171        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2172        let topic = TopicId::new([1u8; 32]);
2173        let peer = test_peer_id(2);
2174
2175        // Initialize topic but don't publish anything
2176        pubsub.initialize_topic_peers(topic, vec![peer]).await;
2177
2178        // Should return Ok since there's nothing to send
2179        let result = pubsub.send_anti_entropy_digest(topic, peer).await;
2180        assert!(result.is_ok(), "Empty cache should result in no-op Ok");
2181    }
2182
2183    // Peer scoring tests
2184
2185    #[test]
2186    fn test_peer_score_no_requests_no_deliveries() {
2187        // A brand-new peer with no activity should get a moderate score
2188        let score = PeerScore::new();
2189        let s = score.score();
2190        // No deliveries, no IWANT requests => response_rate = 0.5
2191        // Recency should be ~1.0 (just created)
2192        // Score = 0.5 * 0.6 + ~1.0 * 0.4 = 0.3 + 0.4 = ~0.7
2193        assert!(
2194            s > 0.6,
2195            "New peer with no activity should have moderate score, got {s}"
2196        );
2197        assert!(s < 1.0, "Score should be below 1.0, got {s}");
2198    }
2199
2200    #[test]
2201    fn test_peer_score_with_deliveries_no_iwant() {
2202        // Peer that has delivered messages but no IWANT requests
2203        let mut score = PeerScore::new();
2204        score.record_delivery();
2205        score.record_delivery();
2206        score.record_delivery();
2207        let s = score.score();
2208        // deliveries > 0, no IWANT => response_rate = 0.8
2209        // Recency ~1.0
2210        // Score = 0.8 * 0.6 + ~1.0 * 0.4 = 0.48 + 0.4 = ~0.88
2211        assert!(
2212            s > 0.8,
2213            "Peer with deliveries should have high score, got {s}"
2214        );
2215        assert!(s <= 1.0, "Score should be at most 1.0, got {s}");
2216    }
2217
2218    #[test]
2219    fn test_peer_score_perfect_iwant_response_rate() {
2220        // Peer with perfect IWANT response rate
2221        let mut score = PeerScore::new();
2222        score.record_iwant_request();
2223        score.record_iwant_response();
2224        score.record_iwant_request();
2225        score.record_iwant_response();
2226        let s = score.score();
2227        // response_rate = 2/2 = 1.0
2228        // Recency ~1.0
2229        // Score = 1.0 * 0.6 + ~1.0 * 0.4 = ~1.0
2230        assert!(
2231            s > 0.9,
2232            "Perfect IWANT response rate should give high score, got {s}"
2233        );
2234    }
2235
2236    #[test]
2237    fn test_peer_score_50_percent_iwant_response_rate() {
2238        // Peer with 50% IWANT response rate
2239        let mut score = PeerScore::new();
2240        score.record_iwant_request();
2241        score.record_iwant_response();
2242        score.record_iwant_request();
2243        // 1 response out of 2 requests = 50%
2244        let s = score.score();
2245        // response_rate = 1/2 = 0.5
2246        // Recency ~1.0
2247        // Score = 0.5 * 0.6 + ~1.0 * 0.4 = 0.3 + 0.4 = ~0.7
2248        assert!(
2249            s > 0.6,
2250            "50% IWANT response rate should give moderate score, got {s}"
2251        );
2252        assert!(s < 0.85, "50% rate should be below perfect, got {s}");
2253    }
2254
2255    #[test]
2256    fn test_peer_score_recency_decay() {
2257        // Test that a peer unseen for a long time has lower score
2258        let mut score = PeerScore::new();
2259        score.record_delivery();
2260        // Simulate the peer being unseen for 5+ minutes
2261        score.last_seen = Instant::now() - Duration::from_secs(350);
2262        let s = score.score();
2263        // deliveries > 0, no IWANT => response_rate = 0.8
2264        // secs_since_seen = 350, recency = max(0, 1 - 350/300) = 0.0
2265        // Score = 0.8 * 0.6 + 0.0 * 0.4 = 0.48
2266        assert!(
2267            s < 0.55,
2268            "Stale peer should have low score due to recency decay, got {s}"
2269        );
2270        assert!(
2271            s > 0.4,
2272            "Stale peer should still have some score from response rate, got {s}"
2273        );
2274    }
2275
2276    #[tokio::test]
2277    async fn test_eager_records_delivery_score() {
2278        let peer_id = test_peer_id(1);
2279        let transport = test_transport().await;
2280        let signing_key = test_signing_key();
2281        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2282        let topic = TopicId::new([1u8; 32]);
2283        let from_peer = test_peer_id(2);
2284
2285        // Initialize peer as eager
2286        pubsub.initialize_topic_peers(topic, vec![from_peer]).await;
2287
2288        let payload = Bytes::from("test delivery");
2289        let msg_id = pubsub.calculate_msg_id(&topic, &payload);
2290
2291        let header = MessageHeader {
2292            version: 1,
2293            topic,
2294            msg_id,
2295            kind: MessageKind::Eager,
2296            hop: 0,
2297            ttl: 10,
2298        };
2299
2300        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
2301        let signature = signing_key.sign(&header_bytes).expect("sign");
2302
2303        let message = GossipMessage {
2304            header,
2305            payload: Some(payload),
2306            signature,
2307            public_key: signing_key.public_key().to_vec(),
2308        };
2309
2310        pubsub
2311            .handle_eager(from_peer, topic, message)
2312            .await
2313            .expect("handle_eager");
2314
2315        // Verify peer score has messages_delivered == 1
2316        let topics = pubsub.topics.read().await;
2317        let state = topics.get(&topic).unwrap();
2318        let peer_score = state
2319            .peer_scores
2320            .get(&from_peer)
2321            .expect("peer score should exist");
2322        assert_eq!(
2323            peer_score.messages_delivered, 1,
2324            "Should have 1 delivery recorded"
2325        );
2326    }
2327
2328    #[tokio::test]
2329    async fn test_ihave_iwant_eager_flow_updates_scores() {
2330        let peer_id = test_peer_id(1);
2331        let transport = test_transport().await;
2332        let signing_key = test_signing_key();
2333        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2334        let topic = TopicId::new([1u8; 32]);
2335        let from_peer = test_peer_id(2);
2336
2337        let unknown_msg_id = [42u8; 32];
2338
2339        // Step 1: IHAVE from peer triggers IWANT
2340        pubsub
2341            .handle_ihave(from_peer, topic, vec![unknown_msg_id])
2342            .await
2343            .ok();
2344
2345        // Verify IWANT request was tracked in score
2346        {
2347            let topics = pubsub.topics.read().await;
2348            let state = topics.get(&topic).unwrap();
2349            let peer_score = state
2350                .peer_scores
2351                .get(&from_peer)
2352                .expect("peer score should exist");
2353            assert_eq!(
2354                peer_score.iwant_requests, 1,
2355                "Should have 1 IWANT request recorded"
2356            );
2357            assert_eq!(
2358                peer_score.iwant_responses, 0,
2359                "Should have 0 IWANT responses yet"
2360            );
2361        }
2362
2363        // Step 2: EAGER arrives with the requested message - should record IWANT response
2364        let payload = Bytes::from("requested message");
2365        let header = MessageHeader {
2366            version: 1,
2367            topic,
2368            msg_id: unknown_msg_id,
2369            kind: MessageKind::Eager,
2370            hop: 0,
2371            ttl: 10,
2372        };
2373
2374        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
2375        let signature = signing_key.sign(&header_bytes).expect("sign");
2376
2377        let message = GossipMessage {
2378            header,
2379            payload: Some(payload),
2380            signature,
2381            public_key: signing_key.public_key().to_vec(),
2382        };
2383
2384        pubsub
2385            .handle_eager(from_peer, topic, message)
2386            .await
2387            .expect("handle_eager");
2388
2389        // Verify IWANT response was recorded
2390        let topics = pubsub.topics.read().await;
2391        let state = topics.get(&topic).unwrap();
2392        let peer_score = state
2393            .peer_scores
2394            .get(&from_peer)
2395            .expect("peer score should exist");
2396        assert_eq!(
2397            peer_score.iwant_responses, 1,
2398            "Should have 1 IWANT response recorded"
2399        );
2400        assert_eq!(
2401            peer_score.messages_delivered, 1,
2402            "Should have 1 delivery recorded"
2403        );
2404    }
2405
2406    #[test]
2407    fn test_score_based_promotion_highest_first() {
2408        // Create lazy peers with different scores - highest should be promoted first
2409        let mut state = TopicState::new();
2410
2411        let peer_high = test_peer_id(10);
2412        let peer_low = test_peer_id(11);
2413        let peer_mid = test_peer_id(12);
2414
2415        state.lazy_peers.insert(peer_high);
2416        state.lazy_peers.insert(peer_low);
2417        state.lazy_peers.insert(peer_mid);
2418
2419        // Give peer_high the best score (many deliveries)
2420        let mut high_score = PeerScore::new();
2421        high_score.messages_delivered = 100;
2422        state.peer_scores.insert(peer_high, high_score);
2423
2424        // Give peer_low a poor score (no deliveries, stale)
2425        let mut low_score = PeerScore::new();
2426        low_score.last_seen = Instant::now() - Duration::from_secs(250);
2427        state.peer_scores.insert(peer_low, low_score);
2428
2429        // Give peer_mid a moderate score
2430        let mut mid_score = PeerScore::new();
2431        mid_score.messages_delivered = 10;
2432        state.peer_scores.insert(peer_mid, mid_score);
2433
2434        // Eager is empty, so maintain_degree should promote up to MIN_EAGER_DEGREE
2435        // But we only have 3 lazy peers, so all 3 get promoted
2436        state.maintain_degree();
2437
2438        // All should be promoted since we're below MIN_EAGER_DEGREE
2439        assert!(
2440            state.eager_peers.contains(&peer_high),
2441            "High-scoring peer should be promoted"
2442        );
2443        assert!(
2444            state.eager_peers.contains(&peer_mid),
2445            "Mid-scoring peer should be promoted"
2446        );
2447        assert!(
2448            state.eager_peers.contains(&peer_low),
2449            "Low-scoring peer should be promoted (not enough peers)"
2450        );
2451    }
2452
2453    #[test]
2454    fn test_score_based_demotion_lowest_first() {
2455        // Create too many eager peers with different scores using IWANT response rates
2456        // which create a continuous gradient (unlike messages_delivered which is binary).
2457        let mut state = TopicState::new();
2458
2459        // Add MAX_EAGER_DEGREE + 2 eager peers
2460        let mut peers = Vec::new();
2461        for i in 0..(MAX_EAGER_DEGREE + 2) {
2462            let peer = test_peer_id(i as u8 + 10);
2463            peers.push(peer);
2464            state.eager_peers.insert(peer);
2465
2466            // Use IWANT response rates to create clearly different scores.
2467            // All peers have 10 IWANT requests; peer i responds to i of them.
2468            // This gives response_rate = i/10, creating a gradient from 0.0 to ~1.0.
2469            let mut score = PeerScore::new();
2470            score.iwant_requests = 10;
2471            score.iwant_responses = i as u64;
2472            state.peer_scores.insert(peer, score);
2473        }
2474
2475        // The first peer (i=0) has the worst score (0% IWANT response rate)
2476        let worst_peer = peers[0];
2477        let second_worst = peers[1];
2478
2479        state.maintain_degree();
2480
2481        // Should have demoted 2 peers (down to MAX_EAGER_DEGREE)
2482        assert_eq!(
2483            state.eager_peers.len(),
2484            MAX_EAGER_DEGREE,
2485            "Should have MAX_EAGER_DEGREE eager peers"
2486        );
2487
2488        // The worst-scoring peers should have been demoted
2489        assert!(
2490            state.lazy_peers.contains(&worst_peer),
2491            "Worst-scoring peer should be demoted"
2492        );
2493        assert!(
2494            state.lazy_peers.contains(&second_worst),
2495            "Second-worst peer should be demoted"
2496        );
2497
2498        // The best-scoring peer should still be eager
2499        let best_peer = peers[MAX_EAGER_DEGREE + 1];
2500        assert!(
2501            state.eager_peers.contains(&best_peer),
2502            "Best-scoring peer should remain eager"
2503        );
2504    }
2505
2506    #[test]
2507    fn test_stale_peer_scores_cleaned() {
2508        let mut state = TopicState::new();
2509
2510        let fresh_peer = test_peer_id(20);
2511        let stale_peer = test_peer_id(21);
2512
2513        // Fresh peer score (just created)
2514        state.peer_scores.insert(fresh_peer, PeerScore::new());
2515
2516        // Stale peer score (last seen > 10 minutes ago)
2517        // Use checked_sub because on some platforms (Windows CI) the
2518        // monotonic clock epoch may be too recent for a 700s subtraction.
2519        let mut stale_score = PeerScore::new();
2520        let Some(past) = Instant::now().checked_sub(Duration::from_secs(700)) else {
2521            // Platform doesn't have enough headroom — skip test gracefully
2522            return;
2523        };
2524        stale_score.last_seen = past;
2525        state.peer_scores.insert(stale_peer, stale_score);
2526
2527        // Clean cache (which also cleans peer scores)
2528        state.clean_cache();
2529
2530        assert!(
2531            state.peer_scores.contains_key(&fresh_peer),
2532            "Fresh peer score should be retained"
2533        );
2534        assert!(
2535            !state.peer_scores.contains_key(&stale_peer),
2536            "Stale peer score should be cleaned up"
2537        );
2538    }
2539
2540    #[tokio::test]
2541    async fn test_set_topic_peers_prunes_stale_eager() {
2542        let peer_id = test_peer_id(1);
2543        let transport = test_transport().await;
2544        let signing_key = test_signing_key();
2545        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2546        let topic = TopicId::new([1u8; 32]);
2547
2548        let peer_a = test_peer_id(2);
2549        let peer_b = test_peer_id(3);
2550
2551        // Initialize with two eager peers
2552        pubsub
2553            .initialize_topic_peers(topic, vec![peer_a, peer_b])
2554            .await;
2555
2556        // Only peer_a is still connected
2557        pubsub.set_topic_peers(topic, vec![peer_a]).await;
2558
2559        let topics = pubsub.topics.read().await;
2560        let state = topics.get(&topic).unwrap();
2561        assert!(state.eager_peers.contains(&peer_a));
2562        assert!(!state.eager_peers.contains(&peer_b));
2563    }
2564
2565    #[tokio::test]
2566    async fn test_set_topic_peers_prunes_stale_lazy() {
2567        let peer_id = test_peer_id(1);
2568        let transport = test_transport().await;
2569        let signing_key = test_signing_key();
2570        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2571        let topic = TopicId::new([1u8; 32]);
2572
2573        let peer_a = test_peer_id(2);
2574        let peer_b = test_peer_id(3);
2575
2576        // Manually set up: peer_a eager, peer_b lazy
2577        {
2578            let mut topics = pubsub.topics.write().await;
2579            let state = topics.entry(topic).or_insert_with(TopicState::new);
2580            state.eager_peers.insert(peer_a);
2581            state.lazy_peers.insert(peer_b);
2582        }
2583
2584        // Only peer_a is still connected — peer_b should be pruned from lazy
2585        pubsub.set_topic_peers(topic, vec![peer_a]).await;
2586
2587        let topics = pubsub.topics.read().await;
2588        let state = topics.get(&topic).unwrap();
2589        assert!(state.eager_peers.contains(&peer_a));
2590        assert!(!state.lazy_peers.contains(&peer_b));
2591    }
2592
2593    #[tokio::test]
2594    async fn test_set_topic_peers_adds_new_as_eager() {
2595        let peer_id = test_peer_id(1);
2596        let transport = test_transport().await;
2597        let signing_key = test_signing_key();
2598        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2599        let topic = TopicId::new([1u8; 32]);
2600
2601        let peer_a = test_peer_id(2);
2602        let peer_b = test_peer_id(3);
2603
2604        // Initialize with only peer_a
2605        pubsub.initialize_topic_peers(topic, vec![peer_a]).await;
2606
2607        // Now peer_b has connected too
2608        pubsub.set_topic_peers(topic, vec![peer_a, peer_b]).await;
2609
2610        let topics = pubsub.topics.read().await;
2611        let state = topics.get(&topic).unwrap();
2612        assert!(state.eager_peers.contains(&peer_a));
2613        assert!(state.eager_peers.contains(&peer_b));
2614    }
2615
2616    #[tokio::test]
2617    async fn test_set_topic_peers_retains_lazy_if_connected() {
2618        let peer_id = test_peer_id(1);
2619        let transport = test_transport().await;
2620        let signing_key = test_signing_key();
2621        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2622        let topic = TopicId::new([1u8; 32]);
2623
2624        let peer_a = test_peer_id(2);
2625        let peer_b = test_peer_id(3);
2626
2627        // peer_a eager, peer_b lazy (simulating a prior PRUNE)
2628        {
2629            let mut topics = pubsub.topics.write().await;
2630            let state = topics.entry(topic).or_insert_with(TopicState::new);
2631            state.eager_peers.insert(peer_a);
2632            state.lazy_peers.insert(peer_b);
2633        }
2634
2635        // Both still connected — peer_b should be promoted back to eager
2636        // during the periodic refresh so that PRUNE optimizations don't
2637        // permanently break gossip routing.
2638        pubsub.set_topic_peers(topic, vec![peer_a, peer_b]).await;
2639
2640        let topics = pubsub.topics.read().await;
2641        let state = topics.get(&topic).unwrap();
2642        assert!(state.eager_peers.contains(&peer_a));
2643        assert!(
2644            state.eager_peers.contains(&peer_b),
2645            "Lazy peer should be promoted to eager during refresh"
2646        );
2647        assert!(
2648            !state.lazy_peers.contains(&peer_b),
2649            "Promoted peer should no longer be in lazy set"
2650        );
2651    }
2652
2653    #[tokio::test]
2654    async fn test_set_topic_peers_combined_prune_and_add() {
2655        let peer_id = test_peer_id(1);
2656        let transport = test_transport().await;
2657        let signing_key = test_signing_key();
2658        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key);
2659        let topic = TopicId::new([1u8; 32]);
2660
2661        let peer_a = test_peer_id(2);
2662        let peer_b = test_peer_id(3);
2663        let peer_c = test_peer_id(4);
2664
2665        // Start with peer_a eager, peer_b lazy
2666        {
2667            let mut topics = pubsub.topics.write().await;
2668            let state = topics.entry(topic).or_insert_with(TopicState::new);
2669            state.eager_peers.insert(peer_a);
2670            state.lazy_peers.insert(peer_b);
2671        }
2672
2673        // peer_a disconnected, peer_b still connected, peer_c is new
2674        pubsub.set_topic_peers(topic, vec![peer_b, peer_c]).await;
2675
2676        let topics = pubsub.topics.read().await;
2677        let state = topics.get(&topic).unwrap();
2678        assert!(
2679            !state.eager_peers.contains(&peer_a),
2680            "Disconnected eager peer should be removed"
2681        );
2682        assert!(
2683            state.eager_peers.contains(&peer_b),
2684            "Connected lazy peer should be promoted to eager"
2685        );
2686        assert!(
2687            !state.lazy_peers.contains(&peer_b),
2688            "Promoted peer should no longer be in lazy set"
2689        );
2690        assert!(
2691            state.eager_peers.contains(&peer_c),
2692            "New peer should be added as eager"
2693        );
2694    }
2695
2696    // -----------------------------------------------------------------------
2697    // Payload replay cache tests
2698    // -----------------------------------------------------------------------
2699
2700    #[test]
2701    fn test_payload_replay_detected() {
2702        let mut state = TopicState::new();
2703        let payload = b"hello world";
2704
2705        assert!(
2706            !state.is_payload_replay(payload),
2707            "First insert should be new"
2708        );
2709        assert!(
2710            state.is_payload_replay(payload),
2711            "Second insert should be replay"
2712        );
2713    }
2714
2715    #[test]
2716    fn test_payload_replay_different_payloads_pass() {
2717        let mut state = TopicState::new();
2718
2719        assert!(!state.is_payload_replay(b"message 1"));
2720        assert!(!state.is_payload_replay(b"message 2"));
2721        assert!(!state.is_payload_replay(b"message 3"));
2722    }
2723
2724    #[test]
2725    fn test_payload_replay_lru_eviction() {
2726        let mut state = TopicState::new();
2727
2728        // Fill the cache beyond capacity
2729        for i in 0..REPLAY_CACHE_MAX_ENTRIES + 100 {
2730            let payload = format!("payload-{i}");
2731            assert!(!state.is_payload_replay(payload.as_bytes()));
2732        }
2733
2734        // Cache should not exceed max entries
2735        assert!(state.replay_cache.len() <= REPLAY_CACHE_MAX_ENTRIES);
2736
2737        // The very first entry should have been evicted
2738        assert!(
2739            !state.is_payload_replay(b"payload-0"),
2740            "Evicted entry should be accepted as new again"
2741        );
2742    }
2743
2744    #[tokio::test]
2745    async fn test_handle_eager_drops_replayed_payload() {
2746        let peer_id = test_peer_id(1);
2747        let transport = test_transport().await;
2748        let signing_key = test_signing_key();
2749        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2750        let topic = TopicId::new([1u8; 32]);
2751
2752        // Subscribe to receive messages
2753        let mut rx = pubsub.subscribe(topic);
2754        tokio::task::yield_now().await;
2755
2756        let payload = Bytes::from("important data");
2757
2758        // First EAGER with one msg_id
2759        let msg_id_1 = {
2760            let mut hasher = blake3::Hasher::new();
2761            hasher.update(topic.as_bytes());
2762            hasher.update(&1u64.to_le_bytes()); // epoch 1
2763            hasher.update(test_peer_id(2).as_bytes());
2764            hasher.update(blake3::hash(&payload).as_bytes());
2765            let hash = hasher.finalize();
2766            let mut id = [0u8; 32];
2767            id.copy_from_slice(&hash.as_bytes()[..32]);
2768            id
2769        };
2770
2771        let header1 = MessageHeader {
2772            version: 1,
2773            topic,
2774            msg_id: msg_id_1,
2775            kind: MessageKind::Eager,
2776            hop: 0,
2777            ttl: 10,
2778        };
2779        let header_bytes1 = postcard::to_stdvec(&header1).expect("serialize");
2780        let signature1 = signing_key.sign(&header_bytes1).expect("sign");
2781        let message1 = GossipMessage {
2782            header: header1,
2783            payload: Some(payload.clone()),
2784            signature: signature1,
2785            public_key: signing_key.public_key().to_vec(),
2786        };
2787
2788        // Second EAGER: same payload but different msg_id (simulating re-wrapped replay)
2789        let msg_id_2 = {
2790            let mut hasher = blake3::Hasher::new();
2791            hasher.update(topic.as_bytes());
2792            hasher.update(&2u64.to_le_bytes()); // epoch 2 — different!
2793            hasher.update(test_peer_id(3).as_bytes()); // different sender
2794            hasher.update(blake3::hash(&payload).as_bytes());
2795            let hash = hasher.finalize();
2796            let mut id = [0u8; 32];
2797            id.copy_from_slice(&hash.as_bytes()[..32]);
2798            id
2799        };
2800
2801        let header2 = MessageHeader {
2802            version: 1,
2803            topic,
2804            msg_id: msg_id_2,
2805            kind: MessageKind::Eager,
2806            hop: 0,
2807            ttl: 10,
2808        };
2809        let header_bytes2 = postcard::to_stdvec(&header2).expect("serialize");
2810        let signature2 = signing_key.sign(&header_bytes2).expect("sign");
2811        let message2 = GossipMessage {
2812            header: header2,
2813            payload: Some(payload.clone()),
2814            signature: signature2,
2815            public_key: signing_key.public_key().to_vec(),
2816        };
2817
2818        let from_peer = test_peer_id(2);
2819
2820        // Handle first EAGER — should deliver to subscriber
2821        pubsub
2822            .handle_eager(from_peer, topic, message1)
2823            .await
2824            .expect("first handle_eager");
2825
2826        // Handle second EAGER (replay) — should NOT deliver
2827        let from_peer_2 = test_peer_id(3);
2828        pubsub
2829            .handle_eager(from_peer_2, topic, message2)
2830            .await
2831            .expect("second handle_eager");
2832
2833        // Subscriber should receive exactly one message
2834        let msg = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2835            .await
2836            .expect("should receive first message")
2837            .expect("channel should not be closed");
2838        assert_eq!(msg.1, payload);
2839
2840        // No second message should arrive
2841        let replay = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
2842        assert!(
2843            replay.is_err(),
2844            "Replayed payload should NOT be delivered to subscriber"
2845        );
2846    }
2847
2848    #[tokio::test]
2849    async fn test_publish_local_seeds_replay_cache() {
2850        let peer_id = test_peer_id(1);
2851        let transport = test_transport().await;
2852        let signing_key = test_signing_key();
2853        let pubsub = PlumtreePubSub::new(peer_id, transport, signing_key.clone());
2854        let topic = TopicId::new([1u8; 32]);
2855
2856        // Subscribe
2857        let mut rx = pubsub.subscribe(topic);
2858        tokio::task::yield_now().await;
2859
2860        let payload = Bytes::from("local message");
2861
2862        // Publish locally — should deliver to subscriber AND seed replay cache
2863        pubsub
2864            .publish(topic, payload.clone())
2865            .await
2866            .expect("publish");
2867
2868        // Receive the local publish
2869        let msg = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2870            .await
2871            .expect("should receive local publish")
2872            .expect("channel open");
2873        assert_eq!(msg.1, payload);
2874
2875        // Now simulate an EAGER from the network with the same payload
2876        let msg_id = {
2877            let mut hasher = blake3::Hasher::new();
2878            hasher.update(topic.as_bytes());
2879            hasher.update(&99u64.to_le_bytes());
2880            hasher.update(test_peer_id(5).as_bytes());
2881            hasher.update(blake3::hash(&payload).as_bytes());
2882            let hash = hasher.finalize();
2883            let mut id = [0u8; 32];
2884            id.copy_from_slice(&hash.as_bytes()[..32]);
2885            id
2886        };
2887
2888        let header = MessageHeader {
2889            version: 1,
2890            topic,
2891            msg_id,
2892            kind: MessageKind::Eager,
2893            hop: 0,
2894            ttl: 10,
2895        };
2896        let header_bytes = postcard::to_stdvec(&header).expect("serialize");
2897        let signature = signing_key.sign(&header_bytes).expect("sign");
2898        let message = GossipMessage {
2899            header,
2900            payload: Some(payload.clone()),
2901            signature,
2902            public_key: signing_key.public_key().to_vec(),
2903        };
2904
2905        let from_peer = test_peer_id(5);
2906        pubsub
2907            .handle_eager(from_peer, topic, message)
2908            .await
2909            .expect("handle_eager echo");
2910
2911        // The echo should be caught by the replay cache — no second delivery
2912        let echo = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
2913        assert!(
2914            echo.is_err(),
2915            "Network echo of locally published payload should be dropped by replay cache"
2916        );
2917    }
2918}