1use dashmap::DashMap;
24use libp2p::PeerId;
25use parking_lot::RwLock;
26use serde::{Deserialize, Serialize};
27use std::collections::{HashMap, HashSet};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use thiserror::Error;
31
32#[derive(Error, Debug)]
34pub enum GossipSubError {
35 #[error("Topic not found: {0}")]
36 TopicNotFound(String),
37
38 #[error("Already subscribed to topic: {0}")]
39 AlreadySubscribed(String),
40
41 #[error("Not subscribed to topic: {0}")]
42 NotSubscribed(String),
43
44 #[error("Message too large: {size} bytes (max: {max})")]
45 MessageTooLarge { size: usize, max: usize },
46
47 #[error("Invalid topic name: {0}")]
48 InvalidTopicName(String),
49
50 #[error("Peer scoring error: {0}")]
51 ScoringError(String),
52}
53
54#[derive(Debug, Clone)]
56pub struct GossipSubConfig {
57 pub mesh_n_low: usize,
59
60 pub mesh_n: usize,
62
63 pub mesh_n_high: usize,
65
66 pub gossip_n: usize,
68
69 pub heartbeat_interval: Duration,
71
72 pub max_message_size: usize,
74
75 pub enable_scoring: bool,
77
78 pub duplicate_cache_time: Duration,
80
81 pub max_duplicate_cache_size: usize,
83
84 pub enable_validation: bool,
86}
87
88impl Default for GossipSubConfig {
89 fn default() -> Self {
90 Self {
91 mesh_n_low: 4,
92 mesh_n: 6,
93 mesh_n_high: 12,
94 gossip_n: 3,
95 heartbeat_interval: Duration::from_secs(1),
96 max_message_size: 1024 * 1024, enable_scoring: true,
98 duplicate_cache_time: Duration::from_secs(120),
99 max_duplicate_cache_size: 10000,
100 enable_validation: true,
101 }
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
107pub struct TopicId(pub String);
108
109impl TopicId {
110 pub fn new(name: impl Into<String>) -> Self {
111 Self(name.into())
112 }
113
114 pub fn content_announce() -> Self {
116 Self("/ipfrs/content/announce/1.0.0".to_string())
117 }
118
119 pub fn peer_announce() -> Self {
121 Self("/ipfrs/peer/announce/1.0.0".to_string())
122 }
123
124 pub fn dht_events() -> Self {
126 Self("/ipfrs/dht/events/1.0.0".to_string())
127 }
128}
129
130#[derive(Debug, Clone)]
132pub struct GossipSubMessage {
133 pub id: MessageId,
135
136 pub source: PeerId,
138
139 pub topic: TopicId,
141
142 pub data: Vec<u8>,
144
145 pub sequence: u64,
147
148 pub timestamp: Instant,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub struct MessageId(pub Vec<u8>);
155
156impl MessageId {
157 pub fn new(source: &PeerId, sequence: u64) -> Self {
159 let mut data = source.to_bytes();
160 data.extend_from_slice(&sequence.to_le_bytes());
161 Self(data)
162 }
163}
164
165#[derive(Debug, Clone, Default)]
167pub struct PeerScore {
168 pub topic_scores: HashMap<TopicId, f64>,
170
171 pub total_score: f64,
173
174 pub invalid_messages: u64,
176
177 pub valid_messages: u64,
179
180 pub last_update: Option<Instant>,
182}
183
184impl PeerScore {
185 pub fn calculate_total(&mut self) {
187 if self.topic_scores.is_empty() {
188 self.total_score = 0.0;
189 return;
190 }
191
192 let sum: f64 = self.topic_scores.values().sum();
194 self.total_score = sum / self.topic_scores.len() as f64;
195
196 let total_messages = self.invalid_messages + self.valid_messages;
198 if total_messages > 0 {
199 let invalid_ratio = self.invalid_messages as f64 / total_messages as f64;
200 self.total_score *= 1.0 - invalid_ratio;
201 }
202
203 self.last_update = Some(Instant::now());
204 }
205
206 pub fn update_topic_score(&mut self, topic: TopicId, score: f64) {
208 self.topic_scores.insert(topic, score);
209 self.calculate_total();
210 }
211
212 pub fn record_message(&mut self, valid: bool) {
214 if valid {
215 self.valid_messages += 1;
216 } else {
217 self.invalid_messages += 1;
218 }
219 self.calculate_total();
220 }
221}
222
223#[derive(Debug, Clone)]
225pub struct TopicSubscription {
226 pub topic: TopicId,
228
229 pub subscribed_at: Instant,
231
232 pub mesh_peers: HashSet<PeerId>,
234
235 pub messages_received: u64,
237
238 pub messages_published: u64,
240}
241
242#[derive(Debug, Clone, Default, Serialize, Deserialize)]
244pub struct GossipSubStats {
245 pub subscribed_topics: usize,
247
248 pub messages_published: u64,
250
251 pub messages_received: u64,
253
254 pub duplicate_messages: u64,
256
257 pub invalid_messages: u64,
259
260 pub active_mesh_peers: usize,
262
263 pub mesh_prune_count: u64,
265
266 pub mesh_graft_count: u64,
268
269 pub messages_per_topic: HashMap<String, u64>,
271}
272
273#[derive(Debug, Clone)]
275struct SeenCacheEntry {
276 timestamp: Instant,
277}
278
279pub struct GossipSubManager {
281 config: GossipSubConfig,
283
284 subscriptions: Arc<DashMap<TopicId, TopicSubscription>>,
286
287 peer_scores: Arc<DashMap<PeerId, PeerScore>>,
289
290 seen_messages: Arc<DashMap<MessageId, SeenCacheEntry>>,
292
293 sequence_counter: Arc<RwLock<u64>>,
295
296 stats: Arc<RwLock<GossipSubStats>>,
298}
299
300impl GossipSubManager {
301 pub fn new(config: GossipSubConfig) -> Self {
303 Self {
304 config,
305 subscriptions: Arc::new(DashMap::new()),
306 peer_scores: Arc::new(DashMap::new()),
307 seen_messages: Arc::new(DashMap::new()),
308 sequence_counter: Arc::new(RwLock::new(0)),
309 stats: Arc::new(RwLock::new(GossipSubStats::default())),
310 }
311 }
312
313 pub fn subscribe(&self, topic: TopicId) -> Result<(), GossipSubError> {
315 if self.subscriptions.contains_key(&topic) {
316 return Err(GossipSubError::AlreadySubscribed(topic.0.clone()));
317 }
318
319 let subscription = TopicSubscription {
320 topic: topic.clone(),
321 subscribed_at: Instant::now(),
322 mesh_peers: HashSet::new(),
323 messages_received: 0,
324 messages_published: 0,
325 };
326
327 self.subscriptions.insert(topic.clone(), subscription);
328
329 let mut stats = self.stats.write();
330 stats.subscribed_topics = self.subscriptions.len();
331
332 Ok(())
333 }
334
335 pub fn unsubscribe(&self, topic: &TopicId) -> Result<(), GossipSubError> {
337 self.subscriptions
338 .remove(topic)
339 .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
340
341 let mut stats = self.stats.write();
342 stats.subscribed_topics = self.subscriptions.len();
343
344 Ok(())
345 }
346
347 pub fn publish(
349 &self,
350 topic: TopicId,
351 data: Vec<u8>,
352 source: PeerId,
353 ) -> Result<MessageId, GossipSubError> {
354 if !self.subscriptions.contains_key(&topic) {
356 return Err(GossipSubError::NotSubscribed(topic.0.clone()));
357 }
358
359 if data.len() > self.config.max_message_size {
361 return Err(GossipSubError::MessageTooLarge {
362 size: data.len(),
363 max: self.config.max_message_size,
364 });
365 }
366
367 let sequence = {
369 let mut counter = self.sequence_counter.write();
370 *counter += 1;
371 *counter
372 };
373
374 let message_id = MessageId::new(&source, sequence);
376
377 if let Some(mut subscription) = self.subscriptions.get_mut(&topic) {
379 subscription.messages_published += 1;
380 }
381
382 let mut stats = self.stats.write();
383 stats.messages_published += 1;
384 *stats.messages_per_topic.entry(topic.0.clone()).or_insert(0) += 1;
385
386 Ok(message_id)
387 }
388
389 pub fn handle_message(&self, message: GossipSubMessage) -> Result<bool, GossipSubError> {
391 if self.is_duplicate(&message.id) {
393 let mut stats = self.stats.write();
394 stats.duplicate_messages += 1;
395 return Ok(false); }
397
398 self.add_to_seen_cache(message.id.clone());
400
401 if self.config.enable_validation && !self.validate_message(&message) {
403 let mut stats = self.stats.write();
404 stats.invalid_messages += 1;
405
406 if self.config.enable_scoring {
408 if let Some(mut score) = self.peer_scores.get_mut(&message.source) {
409 score.record_message(false);
410 }
411 }
412
413 return Ok(false);
414 }
415
416 if let Some(mut subscription) = self.subscriptions.get_mut(&message.topic) {
418 subscription.messages_received += 1;
419 }
420
421 let mut stats = self.stats.write();
422 stats.messages_received += 1;
423
424 if self.config.enable_scoring {
426 self.peer_scores
427 .entry(message.source)
428 .or_default()
429 .record_message(true);
430 }
431
432 Ok(true) }
434
435 fn is_duplicate(&self, message_id: &MessageId) -> bool {
437 if let Some(entry) = self.seen_messages.get(message_id) {
438 let age = Instant::now().duration_since(entry.timestamp);
439 return age < self.config.duplicate_cache_time;
440 }
441 false
442 }
443
444 fn add_to_seen_cache(&self, message_id: MessageId) {
446 let entry = SeenCacheEntry {
447 timestamp: Instant::now(),
448 };
449
450 self.seen_messages.insert(message_id, entry);
451
452 if self.seen_messages.len() > self.config.max_duplicate_cache_size {
454 self.cleanup_seen_cache();
455 }
456 }
457
458 fn cleanup_seen_cache(&self) {
460 let now = Instant::now();
461 let ttl = self.config.duplicate_cache_time;
462
463 self.seen_messages
464 .retain(|_, entry| now.duration_since(entry.timestamp) < ttl);
465 }
466
467 fn validate_message(&self, _message: &GossipSubMessage) -> bool {
469 true
472 }
473
474 pub fn add_peer_to_mesh(&self, topic: &TopicId, peer: PeerId) -> Result<(), GossipSubError> {
476 let inserted = {
477 let mut subscription = self
478 .subscriptions
479 .get_mut(topic)
480 .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
481 subscription.mesh_peers.insert(peer)
482 }; if inserted {
485 let mut stats = self.stats.write();
486 stats.mesh_graft_count += 1;
487 stats.active_mesh_peers = self.count_mesh_peers();
488 }
489
490 Ok(())
491 }
492
493 pub fn remove_peer_from_mesh(
495 &self,
496 topic: &TopicId,
497 peer: &PeerId,
498 ) -> Result<(), GossipSubError> {
499 let removed = {
500 let mut subscription = self
501 .subscriptions
502 .get_mut(topic)
503 .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
504 subscription.mesh_peers.remove(peer)
505 }; if removed {
508 let mut stats = self.stats.write();
509 stats.mesh_prune_count += 1;
510 stats.active_mesh_peers = self.count_mesh_peers();
511 }
512
513 Ok(())
514 }
515
516 pub fn get_mesh_peers(&self, topic: &TopicId) -> Result<Vec<PeerId>, GossipSubError> {
518 let subscription = self
519 .subscriptions
520 .get(topic)
521 .ok_or_else(|| GossipSubError::NotSubscribed(topic.0.clone()))?;
522
523 Ok(subscription.mesh_peers.iter().cloned().collect())
524 }
525
526 fn count_mesh_peers(&self) -> usize {
528 self.subscriptions
529 .iter()
530 .map(|entry| entry.mesh_peers.len())
531 .sum()
532 }
533
534 pub fn get_peer_score(&self, peer: &PeerId) -> Option<PeerScore> {
536 self.peer_scores.get(peer).map(|s| s.clone())
537 }
538
539 pub fn update_peer_score(&self, peer: &PeerId, topic: TopicId, score: f64) {
541 self.peer_scores
542 .entry(*peer)
543 .or_default()
544 .update_topic_score(topic, score);
545 }
546
547 pub fn get_peers_to_prune(&self, topic: &TopicId, threshold: f64) -> Vec<PeerId> {
549 let subscription = match self.subscriptions.get(topic) {
550 Some(sub) => sub,
551 None => return Vec::new(),
552 };
553
554 subscription
555 .mesh_peers
556 .iter()
557 .filter(|peer| {
558 if let Some(score) = self.peer_scores.get(peer) {
559 score.total_score < threshold
560 } else {
561 false
562 }
563 })
564 .cloned()
565 .collect()
566 }
567
568 pub fn stats(&self) -> GossipSubStats {
570 self.stats.read().clone()
571 }
572
573 pub fn list_topics(&self) -> Vec<TopicId> {
575 self.subscriptions
576 .iter()
577 .map(|entry| entry.key().clone())
578 .collect()
579 }
580
581 pub fn is_subscribed(&self, topic: &TopicId) -> bool {
583 self.subscriptions.contains_key(topic)
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590 use libp2p::identity::Keypair;
591
592 fn test_peer_id(index: u8) -> PeerId {
594 let mut seed = [0u8; 32];
596 seed[0] = index;
597 let keypair = Keypair::ed25519_from_bytes(seed).expect("valid seed");
598 keypair.public().to_peer_id()
599 }
600
601 #[test]
602 fn test_gossipsub_manager_creation() {
603 let config = GossipSubConfig::default();
604 let manager = GossipSubManager::new(config);
605 assert_eq!(manager.list_topics().len(), 0);
606 }
607
608 #[test]
609 fn test_topic_subscription() {
610 let manager = GossipSubManager::new(GossipSubConfig::default());
611 let topic = TopicId::content_announce();
612
613 manager.subscribe(topic.clone()).unwrap();
614 assert!(manager.is_subscribed(&topic));
615 assert_eq!(manager.list_topics().len(), 1);
616 }
617
618 #[test]
619 fn test_duplicate_subscription() {
620 let manager = GossipSubManager::new(GossipSubConfig::default());
621 let topic = TopicId::content_announce();
622
623 manager.subscribe(topic.clone()).unwrap();
624 let result = manager.subscribe(topic);
625 assert!(matches!(result, Err(GossipSubError::AlreadySubscribed(_))));
626 }
627
628 #[test]
629 fn test_unsubscribe() {
630 let manager = GossipSubManager::new(GossipSubConfig::default());
631 let topic = TopicId::content_announce();
632
633 manager.subscribe(topic.clone()).unwrap();
634 manager.unsubscribe(&topic).unwrap();
635 assert!(!manager.is_subscribed(&topic));
636 assert_eq!(manager.list_topics().len(), 0);
637 }
638
639 #[test]
640 fn test_publish_message() {
641 let manager = GossipSubManager::new(GossipSubConfig::default());
642 let topic = TopicId::content_announce();
643 let peer = test_peer_id(1);
644
645 manager.subscribe(topic.clone()).unwrap();
646
647 let data = b"Hello, GossipSub!".to_vec();
648 let message_id = manager.publish(topic, data, peer).unwrap();
649
650 let stats = manager.stats();
651 assert_eq!(stats.messages_published, 1);
652 assert!(!message_id.0.is_empty());
653 }
654
655 #[test]
656 fn test_publish_without_subscription() {
657 let manager = GossipSubManager::new(GossipSubConfig::default());
658 let topic = TopicId::content_announce();
659 let peer = test_peer_id(1);
660
661 let data = b"Hello".to_vec();
662 let result = manager.publish(topic, data, peer);
663 assert!(matches!(result, Err(GossipSubError::NotSubscribed(_))));
664 }
665
666 #[test]
667 fn test_message_too_large() {
668 let config = GossipSubConfig {
669 max_message_size: 100,
670 ..Default::default()
671 };
672 let manager = GossipSubManager::new(config);
673 let topic = TopicId::content_announce();
674 let peer = test_peer_id(1);
675
676 manager.subscribe(topic.clone()).unwrap();
677
678 let data = vec![0u8; 200]; let result = manager.publish(topic, data, peer);
680 assert!(matches!(
681 result,
682 Err(GossipSubError::MessageTooLarge { .. })
683 ));
684 }
685
686 #[test]
687 fn test_message_deduplication() {
688 let manager = GossipSubManager::new(GossipSubConfig::default());
689 let topic = TopicId::content_announce();
690 let peer = test_peer_id(1);
691
692 manager.subscribe(topic.clone()).unwrap();
693
694 let message = GossipSubMessage {
695 id: MessageId::new(&peer, 1),
696 source: peer,
697 topic: topic.clone(),
698 data: b"Test".to_vec(),
699 sequence: 1,
700 timestamp: Instant::now(),
701 };
702
703 let result1 = manager.handle_message(message.clone()).unwrap();
705 assert!(result1);
706
707 let result2 = manager.handle_message(message).unwrap();
709 assert!(!result2);
710
711 let stats = manager.stats();
712 assert_eq!(stats.duplicate_messages, 1);
713 }
714
715 #[test]
716 fn test_peer_scoring() {
717 let manager = GossipSubManager::new(GossipSubConfig::default());
718 let peer = test_peer_id(1);
719 let topic = TopicId::content_announce();
720
721 manager.update_peer_score(&peer, topic.clone(), 0.8);
722
723 let score = manager.get_peer_score(&peer).unwrap();
724 assert_eq!(score.topic_scores.get(&topic), Some(&0.8));
725 assert!(score.total_score > 0.0);
726 }
727
728 #[test]
729 fn test_mesh_management() {
730 let manager = GossipSubManager::new(GossipSubConfig::default());
731 let topic = TopicId::content_announce();
732 let peer1 = test_peer_id(1);
733 let peer2 = test_peer_id(2);
734
735 manager.subscribe(topic.clone()).unwrap();
736
737 manager.add_peer_to_mesh(&topic, peer1).unwrap();
739 manager.add_peer_to_mesh(&topic, peer2).unwrap();
740
741 let mesh_peers = manager.get_mesh_peers(&topic).unwrap();
742 assert_eq!(mesh_peers.len(), 2);
743 assert!(mesh_peers.contains(&peer1));
744 assert!(mesh_peers.contains(&peer2));
745
746 manager.remove_peer_from_mesh(&topic, &peer1).unwrap();
748 let mesh_peers = manager.get_mesh_peers(&topic).unwrap();
749 assert_eq!(mesh_peers.len(), 1);
750 assert!(!mesh_peers.contains(&peer1));
751 }
752
753 #[test]
754 fn test_peers_to_prune() {
755 let manager = GossipSubManager::new(GossipSubConfig::default());
756 let topic = TopicId::content_announce();
757 let peer1 = test_peer_id(1);
758 let peer2 = test_peer_id(2);
759
760 manager.subscribe(topic.clone()).unwrap();
761 manager.add_peer_to_mesh(&topic, peer1).unwrap();
762 manager.add_peer_to_mesh(&topic, peer2).unwrap();
763
764 manager.update_peer_score(&peer1, topic.clone(), 0.9);
766 manager.update_peer_score(&peer2, topic.clone(), 0.3);
767
768 let to_prune = manager.get_peers_to_prune(&topic, 0.5);
770 assert_eq!(to_prune.len(), 1);
771 assert!(to_prune.contains(&peer2));
772 }
773
774 #[test]
775 fn test_topic_ids() {
776 assert_eq!(
777 TopicId::content_announce().0,
778 "/ipfrs/content/announce/1.0.0"
779 );
780 assert_eq!(TopicId::peer_announce().0, "/ipfrs/peer/announce/1.0.0");
781 assert_eq!(TopicId::dht_events().0, "/ipfrs/dht/events/1.0.0");
782 }
783
784 #[test]
785 fn test_message_id_generation() {
786 let peer = test_peer_id(1);
787 let id1 = MessageId::new(&peer, 1);
788 let id2 = MessageId::new(&peer, 1);
789 let id3 = MessageId::new(&peer, 2);
790
791 assert_eq!(id1, id2); assert_ne!(id1, id3); }
794
795 #[test]
796 fn test_peer_score_calculation() {
797 let mut score = PeerScore::default();
798
799 score.update_topic_score(TopicId::content_announce(), 0.8);
800 score.update_topic_score(TopicId::peer_announce(), 0.6);
801
802 assert_eq!(score.topic_scores.len(), 2);
803 assert_eq!(score.total_score, 0.7); score.record_message(false);
807 assert!(score.total_score < 0.7); }
809}