ipfrs_network/
gossipsub.rs

1//! GossipSub - Topic-based pub/sub messaging
2//!
3//! This module provides efficient topic-based publish/subscribe messaging
4//! using the GossipSub protocol from libp2p.
5//!
6//! ## Features
7//!
8//! - **Topic Subscription**: Subscribe to topics of interest
9//! - **Message Publishing**: Publish messages to topics
10//! - **Mesh Formation**: Automatic peer mesh formation for topic propagation
11//! - **Message Deduplication**: Seen message tracking to prevent duplicates
12//! - **Peer Scoring**: Score-based peer selection for mesh quality
13//! - **Content Announcements**: Broadcast new content availability
14//!
15//! ## Design
16//!
17//! GossipSub maintains a mesh of peers for each topic, ensuring:
18//! - Low latency message delivery
19//! - High reliability through redundancy
20//! - Efficient bandwidth usage through mesh optimization
21//! - Resistance to spam and malicious peers through scoring
22
23use dashmap::DashMap;
24use libp2p::PeerId;
25use parking_lot::RwLock;
26use serde::{Deserialize, Serialize};
27use std::collections::{HashMap, HashSet};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use thiserror::Error;
31
32/// Errors that can occur in GossipSub operations
33#[derive(Error, Debug)]
34pub enum GossipSubError {
35    #[error("Topic not found: {0}")]
36    TopicNotFound(String),
37
38    #[error("Already subscribed to topic: {0}")]
39    AlreadySubscribed(String),
40
41    #[error("Not subscribed to topic: {0}")]
42    NotSubscribed(String),
43
44    #[error("Message too large: {size} bytes (max: {max})")]
45    MessageTooLarge { size: usize, max: usize },
46
47    #[error("Invalid topic name: {0}")]
48    InvalidTopicName(String),
49
50    #[error("Peer scoring error: {0}")]
51    ScoringError(String),
52}
53
54/// GossipSub configuration
55#[derive(Debug, Clone)]
56pub struct GossipSubConfig {
57    /// Minimum number of peers in mesh (D_low)
58    pub mesh_n_low: usize,
59
60    /// Target number of peers in mesh (D)
61    pub mesh_n: usize,
62
63    /// Maximum number of peers in mesh (D_high)
64    pub mesh_n_high: usize,
65
66    /// Number of peers to send gossip to (D_lazy)
67    pub gossip_n: usize,
68
69    /// Heartbeat interval for mesh maintenance
70    pub heartbeat_interval: Duration,
71
72    /// Maximum message size
73    pub max_message_size: usize,
74
75    /// Enable peer scoring
76    pub enable_scoring: bool,
77
78    /// Time window for message deduplication
79    pub duplicate_cache_time: Duration,
80
81    /// Maximum number of messages in duplicate cache
82    pub max_duplicate_cache_size: usize,
83
84    /// Enable message validation
85    pub enable_validation: bool,
86}
87
88impl Default for GossipSubConfig {
89    fn default() -> Self {
90        Self {
91            mesh_n_low: 4,
92            mesh_n: 6,
93            mesh_n_high: 12,
94            gossip_n: 3,
95            heartbeat_interval: Duration::from_secs(1),
96            max_message_size: 1024 * 1024, // 1 MB
97            enable_scoring: true,
98            duplicate_cache_time: Duration::from_secs(120),
99            max_duplicate_cache_size: 10000,
100            enable_validation: true,
101        }
102    }
103}
104
105/// Topic identifier
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
107pub struct TopicId(pub String);
108
109impl TopicId {
110    pub fn new(name: impl Into<String>) -> Self {
111        Self(name.into())
112    }
113
114    /// Topic for content announcements
115    pub fn content_announce() -> Self {
116        Self("/ipfrs/content/announce/1.0.0".to_string())
117    }
118
119    /// Topic for peer announcements
120    pub fn peer_announce() -> Self {
121        Self("/ipfrs/peer/announce/1.0.0".to_string())
122    }
123
124    /// Topic for DHT events
125    pub fn dht_events() -> Self {
126        Self("/ipfrs/dht/events/1.0.0".to_string())
127    }
128}
129
130/// GossipSub message
131#[derive(Debug, Clone)]
132pub struct GossipSubMessage {
133    /// Message ID
134    pub id: MessageId,
135
136    /// Source peer
137    pub source: PeerId,
138
139    /// Topic this message belongs to
140    pub topic: TopicId,
141
142    /// Message payload
143    pub data: Vec<u8>,
144
145    /// Sequence number
146    pub sequence: u64,
147
148    /// Timestamp
149    pub timestamp: Instant,
150}
151
152/// Message identifier
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub struct MessageId(pub Vec<u8>);
155
156impl MessageId {
157    /// Create a message ID from source peer and sequence number
158    pub fn new(source: &PeerId, sequence: u64) -> Self {
159        let mut data = source.to_bytes();
160        data.extend_from_slice(&sequence.to_le_bytes());
161        Self(data)
162    }
163}
164
165/// Peer score for mesh quality
166#[derive(Debug, Clone, Default)]
167pub struct PeerScore {
168    /// Topic-specific scores
169    pub topic_scores: HashMap<TopicId, f64>,
170
171    /// Overall score
172    pub total_score: f64,
173
174    /// Number of invalid messages
175    pub invalid_messages: u64,
176
177    /// Number of valid messages
178    pub valid_messages: u64,
179
180    /// Last update time
181    pub last_update: Option<Instant>,
182}
183
184impl PeerScore {
185    /// Calculate overall score
186    pub fn calculate_total(&mut self) {
187        if self.topic_scores.is_empty() {
188            self.total_score = 0.0;
189            return;
190        }
191
192        // Average of topic scores
193        let sum: f64 = self.topic_scores.values().sum();
194        self.total_score = sum / self.topic_scores.len() as f64;
195
196        // Penalize for invalid messages
197        let total_messages = self.invalid_messages + self.valid_messages;
198        if total_messages > 0 {
199            let invalid_ratio = self.invalid_messages as f64 / total_messages as f64;
200            self.total_score *= 1.0 - invalid_ratio;
201        }
202
203        self.last_update = Some(Instant::now());
204    }
205
206    /// Update topic score
207    pub fn update_topic_score(&mut self, topic: TopicId, score: f64) {
208        self.topic_scores.insert(topic, score);
209        self.calculate_total();
210    }
211
212    /// Record message validation result
213    pub fn record_message(&mut self, valid: bool) {
214        if valid {
215            self.valid_messages += 1;
216        } else {
217            self.invalid_messages += 1;
218        }
219        self.calculate_total();
220    }
221}
222
223/// Topic subscription information
224#[derive(Debug, Clone)]
225pub struct TopicSubscription {
226    /// Topic ID
227    pub topic: TopicId,
228
229    /// Subscribed since
230    pub subscribed_at: Instant,
231
232    /// Mesh peers for this topic
233    pub mesh_peers: HashSet<PeerId>,
234
235    /// Number of messages received
236    pub messages_received: u64,
237
238    /// Number of messages published
239    pub messages_published: u64,
240}
241
242/// GossipSub statistics
243#[derive(Debug, Clone, Default, Serialize, Deserialize)]
244pub struct GossipSubStats {
245    /// Total topics subscribed
246    pub subscribed_topics: usize,
247
248    /// Total messages published
249    pub messages_published: u64,
250
251    /// Total messages received
252    pub messages_received: u64,
253
254    /// Total duplicate messages seen
255    pub duplicate_messages: u64,
256
257    /// Total invalid messages
258    pub invalid_messages: u64,
259
260    /// Active mesh peers
261    pub active_mesh_peers: usize,
262
263    /// Mesh prune events
264    pub mesh_prune_count: u64,
265
266    /// Mesh graft events
267    pub mesh_graft_count: u64,
268
269    /// Messages per topic
270    pub messages_per_topic: HashMap<String, u64>,
271}
272
273/// Message seen cache entry
274#[derive(Debug, Clone)]
275struct SeenCacheEntry {
276    timestamp: Instant,
277}
278
279/// GossipSub manager
280pub struct GossipSubManager {
281    /// Configuration
282    config: GossipSubConfig,
283
284    /// Subscribed topics
285    subscriptions: Arc<DashMap<TopicId, TopicSubscription>>,
286
287    /// Peer scores
288    peer_scores: Arc<DashMap<PeerId, PeerScore>>,
289
290    /// Seen message cache (deduplication)
291    seen_messages: Arc<DashMap<MessageId, SeenCacheEntry>>,
292
293    /// Message sequence number counter
294    sequence_counter: Arc<RwLock<u64>>,
295
296    /// Statistics
297    stats: Arc<RwLock<GossipSubStats>>,
298}
299
300impl GossipSubManager {
301    /// Create a new GossipSub manager
302    pub fn new(config: GossipSubConfig) -> Self {
303        Self {
304            config,
305            subscriptions: Arc::new(DashMap::new()),
306            peer_scores: Arc::new(DashMap::new()),
307            seen_messages: Arc::new(DashMap::new()),
308            sequence_counter: Arc::new(RwLock::new(0)),
309            stats: Arc::new(RwLock::new(GossipSubStats::default())),
310        }
311    }
312
313    /// Subscribe to a topic
314    pub fn subscribe(&self, topic: TopicId) -> Result<(), GossipSubError> {
315        if self.subscriptions.contains_key(&topic) {
316            return Err(GossipSubError::AlreadySubscribed(topic.0.clone()));
317        }
318
319        let subscription = TopicSubscription {
320            topic: topic.clone(),
321            subscribed_at: Instant::now(),
322            mesh_peers: HashSet::new(),
323            messages_received: 0,
324            messages_published: 0,
325        };
326
327        self.subscriptions.insert(topic.clone(), subscription);
328
329        let mut stats = self.stats.write();
330        stats.subscribed_topics = self.subscriptions.len();
331
332        Ok(())
333    }
334
335    /// Unsubscribe from a topic
336    pub fn unsubscribe(&self, topic: &TopicId) -> Result<(), GossipSubError> {
337        self.subscriptions
338            .remove(topic)
339            .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
340
341        let mut stats = self.stats.write();
342        stats.subscribed_topics = self.subscriptions.len();
343
344        Ok(())
345    }
346
347    /// Publish a message to a topic
348    pub fn publish(
349        &self,
350        topic: TopicId,
351        data: Vec<u8>,
352        source: PeerId,
353    ) -> Result<MessageId, GossipSubError> {
354        // Check if subscribed
355        if !self.subscriptions.contains_key(&topic) {
356            return Err(GossipSubError::NotSubscribed(topic.0.clone()));
357        }
358
359        // Check message size
360        if data.len() > self.config.max_message_size {
361            return Err(GossipSubError::MessageTooLarge {
362                size: data.len(),
363                max: self.config.max_message_size,
364            });
365        }
366
367        // Generate sequence number
368        let sequence = {
369            let mut counter = self.sequence_counter.write();
370            *counter += 1;
371            *counter
372        };
373
374        // Create message ID
375        let message_id = MessageId::new(&source, sequence);
376
377        // Update statistics
378        if let Some(mut subscription) = self.subscriptions.get_mut(&topic) {
379            subscription.messages_published += 1;
380        }
381
382        let mut stats = self.stats.write();
383        stats.messages_published += 1;
384        *stats.messages_per_topic.entry(topic.0.clone()).or_insert(0) += 1;
385
386        Ok(message_id)
387    }
388
389    /// Handle received message
390    pub fn handle_message(&self, message: GossipSubMessage) -> Result<bool, GossipSubError> {
391        // Check for duplicate
392        if self.is_duplicate(&message.id) {
393            let mut stats = self.stats.write();
394            stats.duplicate_messages += 1;
395            return Ok(false); // Message already seen
396        }
397
398        // Add to seen cache
399        self.add_to_seen_cache(message.id.clone());
400
401        // Validate message if enabled
402        if self.config.enable_validation && !self.validate_message(&message) {
403            let mut stats = self.stats.write();
404            stats.invalid_messages += 1;
405
406            // Update peer score
407            if self.config.enable_scoring {
408                if let Some(mut score) = self.peer_scores.get_mut(&message.source) {
409                    score.record_message(false);
410                }
411            }
412
413            return Ok(false);
414        }
415
416        // Update statistics
417        if let Some(mut subscription) = self.subscriptions.get_mut(&message.topic) {
418            subscription.messages_received += 1;
419        }
420
421        let mut stats = self.stats.write();
422        stats.messages_received += 1;
423
424        // Update peer score
425        if self.config.enable_scoring {
426            self.peer_scores
427                .entry(message.source)
428                .or_default()
429                .record_message(true);
430        }
431
432        Ok(true) // Message is new and valid
433    }
434
435    /// Check if message is a duplicate
436    fn is_duplicate(&self, message_id: &MessageId) -> bool {
437        if let Some(entry) = self.seen_messages.get(message_id) {
438            let age = Instant::now().duration_since(entry.timestamp);
439            return age < self.config.duplicate_cache_time;
440        }
441        false
442    }
443
444    /// Add message to seen cache
445    fn add_to_seen_cache(&self, message_id: MessageId) {
446        let entry = SeenCacheEntry {
447            timestamp: Instant::now(),
448        };
449
450        self.seen_messages.insert(message_id, entry);
451
452        // Cleanup old entries if cache is too large
453        if self.seen_messages.len() > self.config.max_duplicate_cache_size {
454            self.cleanup_seen_cache();
455        }
456    }
457
458    /// Cleanup old entries from seen cache
459    fn cleanup_seen_cache(&self) {
460        let now = Instant::now();
461        let ttl = self.config.duplicate_cache_time;
462
463        self.seen_messages
464            .retain(|_, entry| now.duration_since(entry.timestamp) < ttl);
465    }
466
467    /// Validate message
468    fn validate_message(&self, _message: &GossipSubMessage) -> bool {
469        // Basic validation - can be extended
470        // Check if source peer is not banned, message format is correct, etc.
471        true
472    }
473
474    /// Add peer to topic mesh
475    pub fn add_peer_to_mesh(&self, topic: &TopicId, peer: PeerId) -> Result<(), GossipSubError> {
476        let inserted = {
477            let mut subscription = self
478                .subscriptions
479                .get_mut(topic)
480                .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
481            subscription.mesh_peers.insert(peer)
482        }; // Guard dropped here before count_mesh_peers()
483
484        if inserted {
485            let mut stats = self.stats.write();
486            stats.mesh_graft_count += 1;
487            stats.active_mesh_peers = self.count_mesh_peers();
488        }
489
490        Ok(())
491    }
492
493    /// Remove peer from topic mesh
494    pub fn remove_peer_from_mesh(
495        &self,
496        topic: &TopicId,
497        peer: &PeerId,
498    ) -> Result<(), GossipSubError> {
499        let removed = {
500            let mut subscription = self
501                .subscriptions
502                .get_mut(topic)
503                .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
504            subscription.mesh_peers.remove(peer)
505        }; // Guard dropped here before count_mesh_peers()
506
507        if removed {
508            let mut stats = self.stats.write();
509            stats.mesh_prune_count += 1;
510            stats.active_mesh_peers = self.count_mesh_peers();
511        }
512
513        Ok(())
514    }
515
516    /// Get peers in topic mesh
517    pub fn get_mesh_peers(&self, topic: &TopicId) -> Result<Vec<PeerId>, GossipSubError> {
518        let subscription = self
519            .subscriptions
520            .get(topic)
521            .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
522
523        Ok(subscription.mesh_peers.iter().cloned().collect())
524    }
525
526    /// Count total mesh peers across all topics
527    fn count_mesh_peers(&self) -> usize {
528        self.subscriptions
529            .iter()
530            .map(|entry| entry.mesh_peers.len())
531            .sum()
532    }
533
534    /// Get peer score
535    pub fn get_peer_score(&self, peer: &PeerId) -> Option<PeerScore> {
536        self.peer_scores.get(peer).map(|s| s.clone())
537    }
538
539    /// Update peer score for a topic
540    pub fn update_peer_score(&self, peer: &PeerId, topic: TopicId, score: f64) {
541        self.peer_scores
542            .entry(*peer)
543            .or_default()
544            .update_topic_score(topic, score);
545    }
546
547    /// Get low-scoring peers that should be pruned
548    pub fn get_peers_to_prune(&self, topic: &TopicId, threshold: f64) -> Vec<PeerId> {
549        let subscription = match self.subscriptions.get(topic) {
550            Some(sub) => sub,
551            None => return Vec::new(),
552        };
553
554        subscription
555            .mesh_peers
556            .iter()
557            .filter(|peer| {
558                if let Some(score) = self.peer_scores.get(peer) {
559                    score.total_score < threshold
560                } else {
561                    false
562                }
563            })
564            .cloned()
565            .collect()
566    }
567
568    /// Get statistics
569    pub fn stats(&self) -> GossipSubStats {
570        self.stats.read().clone()
571    }
572
573    /// List subscribed topics
574    pub fn list_topics(&self) -> Vec<TopicId> {
575        self.subscriptions
576            .iter()
577            .map(|entry| entry.key().clone())
578            .collect()
579    }
580
581    /// Check if subscribed to a topic
582    pub fn is_subscribed(&self, topic: &TopicId) -> bool {
583        self.subscriptions.contains_key(topic)
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use libp2p::identity::Keypair;
591
592    /// Create a deterministic PeerId from an index (avoids slow random key generation)
593    fn test_peer_id(index: u8) -> PeerId {
594        // Use a deterministic seed based on index
595        let mut seed = [0u8; 32];
596        seed[0] = index;
597        let keypair = Keypair::ed25519_from_bytes(seed).expect("valid seed");
598        keypair.public().to_peer_id()
599    }
600
601    #[test]
602    fn test_gossipsub_manager_creation() {
603        let config = GossipSubConfig::default();
604        let manager = GossipSubManager::new(config);
605        assert_eq!(manager.list_topics().len(), 0);
606    }
607
608    #[test]
609    fn test_topic_subscription() {
610        let manager = GossipSubManager::new(GossipSubConfig::default());
611        let topic = TopicId::content_announce();
612
613        manager.subscribe(topic.clone()).unwrap();
614        assert!(manager.is_subscribed(&topic));
615        assert_eq!(manager.list_topics().len(), 1);
616    }
617
618    #[test]
619    fn test_duplicate_subscription() {
620        let manager = GossipSubManager::new(GossipSubConfig::default());
621        let topic = TopicId::content_announce();
622
623        manager.subscribe(topic.clone()).unwrap();
624        let result = manager.subscribe(topic);
625        assert!(matches!(result, Err(GossipSubError::AlreadySubscribed(_))));
626    }
627
628    #[test]
629    fn test_unsubscribe() {
630        let manager = GossipSubManager::new(GossipSubConfig::default());
631        let topic = TopicId::content_announce();
632
633        manager.subscribe(topic.clone()).unwrap();
634        manager.unsubscribe(&topic).unwrap();
635        assert!(!manager.is_subscribed(&topic));
636        assert_eq!(manager.list_topics().len(), 0);
637    }
638
639    #[test]
640    fn test_publish_message() {
641        let manager = GossipSubManager::new(GossipSubConfig::default());
642        let topic = TopicId::content_announce();
643        let peer = test_peer_id(1);
644
645        manager.subscribe(topic.clone()).unwrap();
646
647        let data = b"Hello, GossipSub!".to_vec();
648        let message_id = manager.publish(topic, data, peer).unwrap();
649
650        let stats = manager.stats();
651        assert_eq!(stats.messages_published, 1);
652        assert!(!message_id.0.is_empty());
653    }
654
655    #[test]
656    fn test_publish_without_subscription() {
657        let manager = GossipSubManager::new(GossipSubConfig::default());
658        let topic = TopicId::content_announce();
659        let peer = test_peer_id(1);
660
661        let data = b"Hello".to_vec();
662        let result = manager.publish(topic, data, peer);
663        assert!(matches!(result, Err(GossipSubError::NotSubscribed(_))));
664    }
665
666    #[test]
667    fn test_message_too_large() {
668        let config = GossipSubConfig {
669            max_message_size: 100,
670            ..Default::default()
671        };
672        let manager = GossipSubManager::new(config);
673        let topic = TopicId::content_announce();
674        let peer = test_peer_id(1);
675
676        manager.subscribe(topic.clone()).unwrap();
677
678        let data = vec![0u8; 200]; // Larger than max
679        let result = manager.publish(topic, data, peer);
680        assert!(matches!(
681            result,
682            Err(GossipSubError::MessageTooLarge { .. })
683        ));
684    }
685
686    #[test]
687    fn test_message_deduplication() {
688        let manager = GossipSubManager::new(GossipSubConfig::default());
689        let topic = TopicId::content_announce();
690        let peer = test_peer_id(1);
691
692        manager.subscribe(topic.clone()).unwrap();
693
694        let message = GossipSubMessage {
695            id: MessageId::new(&peer, 1),
696            source: peer,
697            topic: topic.clone(),
698            data: b"Test".to_vec(),
699            sequence: 1,
700            timestamp: Instant::now(),
701        };
702
703        // First message should be accepted
704        let result1 = manager.handle_message(message.clone()).unwrap();
705        assert!(result1);
706
707        // Duplicate should be rejected
708        let result2 = manager.handle_message(message).unwrap();
709        assert!(!result2);
710
711        let stats = manager.stats();
712        assert_eq!(stats.duplicate_messages, 1);
713    }
714
715    #[test]
716    fn test_peer_scoring() {
717        let manager = GossipSubManager::new(GossipSubConfig::default());
718        let peer = test_peer_id(1);
719        let topic = TopicId::content_announce();
720
721        manager.update_peer_score(&peer, topic.clone(), 0.8);
722
723        let score = manager.get_peer_score(&peer).unwrap();
724        assert_eq!(score.topic_scores.get(&topic), Some(&0.8));
725        assert!(score.total_score > 0.0);
726    }
727
728    #[test]
729    fn test_mesh_management() {
730        let manager = GossipSubManager::new(GossipSubConfig::default());
731        let topic = TopicId::content_announce();
732        let peer1 = test_peer_id(1);
733        let peer2 = test_peer_id(2);
734
735        manager.subscribe(topic.clone()).unwrap();
736
737        // Add peers to mesh
738        manager.add_peer_to_mesh(&topic, peer1).unwrap();
739        manager.add_peer_to_mesh(&topic, peer2).unwrap();
740
741        let mesh_peers = manager.get_mesh_peers(&topic).unwrap();
742        assert_eq!(mesh_peers.len(), 2);
743        assert!(mesh_peers.contains(&peer1));
744        assert!(mesh_peers.contains(&peer2));
745
746        // Remove peer from mesh
747        manager.remove_peer_from_mesh(&topic, &peer1).unwrap();
748        let mesh_peers = manager.get_mesh_peers(&topic).unwrap();
749        assert_eq!(mesh_peers.len(), 1);
750        assert!(!mesh_peers.contains(&peer1));
751    }
752
753    #[test]
754    fn test_peers_to_prune() {
755        let manager = GossipSubManager::new(GossipSubConfig::default());
756        let topic = TopicId::content_announce();
757        let peer1 = test_peer_id(1);
758        let peer2 = test_peer_id(2);
759
760        manager.subscribe(topic.clone()).unwrap();
761        manager.add_peer_to_mesh(&topic, peer1).unwrap();
762        manager.add_peer_to_mesh(&topic, peer2).unwrap();
763
764        // Set scores
765        manager.update_peer_score(&peer1, topic.clone(), 0.9);
766        manager.update_peer_score(&peer2, topic.clone(), 0.3);
767
768        // Get peers below threshold
769        let to_prune = manager.get_peers_to_prune(&topic, 0.5);
770        assert_eq!(to_prune.len(), 1);
771        assert!(to_prune.contains(&peer2));
772    }
773
774    #[test]
775    fn test_topic_ids() {
776        assert_eq!(
777            TopicId::content_announce().0,
778            "/ipfrs/content/announce/1.0.0"
779        );
780        assert_eq!(TopicId::peer_announce().0, "/ipfrs/peer/announce/1.0.0");
781        assert_eq!(TopicId::dht_events().0, "/ipfrs/dht/events/1.0.0");
782    }
783
784    #[test]
785    fn test_message_id_generation() {
786        let peer = test_peer_id(1);
787        let id1 = MessageId::new(&peer, 1);
788        let id2 = MessageId::new(&peer, 1);
789        let id3 = MessageId::new(&peer, 2);
790
791        assert_eq!(id1, id2); // Same peer and sequence
792        assert_ne!(id1, id3); // Different sequence
793    }
794
795    #[test]
796    fn test_peer_score_calculation() {
797        let mut score = PeerScore::default();
798
799        score.update_topic_score(TopicId::content_announce(), 0.8);
800        score.update_topic_score(TopicId::peer_announce(), 0.6);
801
802        assert_eq!(score.topic_scores.len(), 2);
803        assert_eq!(score.total_score, 0.7); // Average: (0.8 + 0.6) / 2
804
805        // Record invalid message
806        score.record_message(false);
807        assert!(score.total_score < 0.7); // Score should decrease
808    }
809}