1use super::*;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, HashSet, VecDeque};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::{RwLock, mpsc};
25
26type GossipMessageRx = mpsc::Receiver<(NodeId, GossipMessage)>;
28type ControlMessageTx = mpsc::Sender<(NodeId, ControlMessage)>;
29
30pub type Topic = String;
32
33pub type MessageId = [u8; 32];
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub enum ControlMessage {
39 Graft {
40 topic: Topic,
41 },
42 Prune {
43 topic: Topic,
44 backoff: Duration,
45 },
46 IHave {
47 topic: Topic,
48 message_ids: Vec<MessageId>,
49 },
50 IWant {
51 message_ids: Vec<MessageId>,
52 },
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub enum TopicPriority {
58 Low,
59 Normal,
60 High,
61 Critical,
62}
63
64#[async_trait::async_trait]
66pub trait MessageValidator: Send + Sync {
67 async fn validate(&self, message: &GossipMessage) -> Result<bool>;
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct GossipStats {
74 pub messages_sent: u64,
76
77 pub messages_received: u64,
79
80 pub mesh_size: usize,
82
83 pub topic_count: usize,
85
86 pub peer_count: usize,
88
89 pub messages_by_topic: HashMap<Topic, u64>,
91}
92
93pub struct AdaptiveGossipSub {
95 _local_id: NodeId,
97
98 mesh: Arc<RwLock<HashMap<Topic, HashSet<NodeId>>>>,
100
101 fanout: Arc<RwLock<HashMap<Topic, HashSet<NodeId>>>>,
103
104 seen_messages: Arc<RwLock<HashMap<MessageId, Instant>>>,
106
107 message_cache: Arc<RwLock<HashMap<MessageId, GossipMessage>>>,
109
110 peer_scores: Arc<RwLock<HashMap<NodeId, PeerScore>>>,
112
113 topics: Arc<RwLock<HashMap<Topic, TopicParams>>>,
115
116 topic_priorities: Arc<RwLock<HashMap<Topic, TopicPriority>>>,
118
119 _heartbeat_interval: Duration,
121
122 message_validators: Arc<RwLock<HashMap<Topic, Box<dyn MessageValidator + Send + Sync>>>>,
124
125 trust_provider: Arc<dyn TrustProvider>,
127
128 _message_rx: Arc<RwLock<Option<GossipMessageRx>>>,
130
131 control_tx: Arc<RwLock<Option<ControlMessageTx>>>,
133
134 churn_detector: Arc<RwLock<ChurnDetector>>,
136
137 stats: Arc<RwLock<GossipStats>>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct GossipMessage {
144 pub topic: Topic,
145 pub data: Vec<u8>,
146 pub from: NodeId,
147 pub seqno: u64,
148 pub timestamp: u64,
149}
150
151#[derive(Debug, Clone)]
153pub struct PeerScore {
154 pub time_in_mesh: Duration,
155 pub first_message_deliveries: u64,
156 pub mesh_message_deliveries: u64,
157 pub invalid_messages: u64,
158 pub behavior_penalty: f64,
159 pub app_specific_score: f64, }
161
162impl PeerScore {
163 #[allow(dead_code)]
164 fn new() -> Self {
165 Self {
166 time_in_mesh: Duration::ZERO,
167 first_message_deliveries: 0,
168 mesh_message_deliveries: 0,
169 invalid_messages: 0,
170 behavior_penalty: 0.0,
171 app_specific_score: 0.5,
172 }
173 }
174
175 pub fn score(&self) -> f64 {
176 let time_score = (self.time_in_mesh.as_secs() as f64 / 60.0).min(10.0) * 0.5;
177 let delivery_score = (self.first_message_deliveries as f64).min(100.0) / 100.0;
178 let mesh_score = (self.mesh_message_deliveries as f64).min(1000.0) / 1000.0 * 0.2;
179 let invalid_penalty = self.invalid_messages as f64 * -10.0;
180
181 time_score
182 + delivery_score
183 + mesh_score
184 + invalid_penalty
185 + self.behavior_penalty
186 + self.app_specific_score
187 }
188}
189
190#[derive(Debug, Clone)]
192pub struct TopicParams {
193 pub d: usize, pub d_low: usize, pub d_high: usize, pub d_out: usize, pub graylist_threshold: f64, pub mesh_message_deliveries_threshold: f64,
199 pub gossip_factor: f64, pub priority: TopicPriority,
201}
202
203impl Default for TopicParams {
204 fn default() -> Self {
205 Self {
206 d: 8,
207 d_low: 6,
208 d_high: 12,
209 d_out: 2,
210 graylist_threshold: -1.0,
211 mesh_message_deliveries_threshold: 0.5,
212 gossip_factor: 0.25,
213 priority: TopicPriority::Normal,
214 }
215 }
216}
217
218#[derive(Debug, Clone)]
220pub struct ChurnDetector {
221 events: VecDeque<(Instant, ChurnEvent)>,
223 window: Duration,
225 churn_rate: f64,
227}
228
229#[derive(Debug, Clone)]
230#[allow(dead_code)]
231enum ChurnEvent {
232 PeerJoined(NodeId),
233 PeerLeft(NodeId),
234}
235
236#[derive(Debug)]
238pub struct ChurnStats {
239 pub joins: usize,
241 pub leaves: usize,
243 pub avg_session_duration: Duration,
245 node_join_times: HashMap<NodeId, Instant>,
247}
248
249impl ChurnStats {
250 pub fn get_node_uptime(&self, node_id: &NodeId) -> Duration {
252 self.node_join_times
253 .get(node_id)
254 .map(|join_time| Instant::now().duration_since(*join_time))
255 .unwrap_or(Duration::from_secs(0))
256 }
257}
258
259impl ChurnDetector {
260 fn new() -> Self {
261 Self {
262 events: VecDeque::new(),
263 window: Duration::from_secs(300), churn_rate: 0.0,
265 }
266 }
267
268 fn record_join(&mut self, peer: NodeId) {
269 self.events
270 .push_back((Instant::now(), ChurnEvent::PeerJoined(peer)));
271 self.update_rate();
272 }
273
274 fn record_leave(&mut self, peer: NodeId) {
275 self.events
276 .push_back((Instant::now(), ChurnEvent::PeerLeft(peer)));
277 self.update_rate();
278 }
279
280 fn update_rate(&mut self) {
281 let cutoff = Instant::now() - self.window;
282 self.events.retain(|(time, _)| *time > cutoff);
283
284 let joins = self
285 .events
286 .iter()
287 .filter(|(_, event)| matches!(event, ChurnEvent::PeerJoined(_)))
288 .count();
289 let leaves = self
290 .events
291 .iter()
292 .filter(|(_, event)| matches!(event, ChurnEvent::PeerLeft(_)))
293 .count();
294
295 self.churn_rate = (joins + leaves) as f64 / self.window.as_secs() as f64;
297 }
298
299 fn get_rate(&self) -> f64 {
300 self.churn_rate
301 }
302
303 pub async fn get_hourly_rates(&self, hours: usize) -> Vec<f64> {
304 let now = Instant::now();
305 let mut hourly_rates = vec![0.0; hours];
306
307 for (time, event) in &self.events {
308 let age = now.duration_since(*time);
309 let hour_index = (age.as_secs() / 3600) as usize;
310
311 if hour_index < hours {
312 match event {
313 ChurnEvent::PeerJoined(_) | ChurnEvent::PeerLeft(_) => {
314 hourly_rates[hour_index] += 1.0;
315 }
316 }
317 }
318 }
319
320 for rate in &mut hourly_rates {
322 *rate /= 3600.0; }
324
325 hourly_rates
326 }
327
328 pub async fn get_recent_stats(&self, window: Duration) -> ChurnStats {
329 let now = Instant::now();
330 let mut joins = 0;
331 let mut leaves = 0;
332 let mut _session_durations = Vec::new();
333 let mut _node_join_times = HashMap::new();
334
335 for (time, event) in &self.events {
336 if now.duration_since(*time) <= window {
337 match event {
338 ChurnEvent::PeerJoined(node_id) => {
339 joins += 1;
340 _node_join_times.insert(node_id.clone(), *time);
341 }
342 ChurnEvent::PeerLeft(_) => leaves += 1,
343 }
344 }
345 }
346
347 let avg_session_duration = if _session_durations.is_empty() {
348 Duration::from_secs(3600) } else {
350 Duration::from_secs(
351 _session_durations
352 .iter()
353 .map(|d: &Duration| d.as_secs())
354 .sum::<u64>()
355 / _session_durations.len() as u64,
356 )
357 };
358
359 ChurnStats {
360 joins,
361 leaves,
362 avg_session_duration,
363 node_join_times: _node_join_times,
364 }
365 }
366}
367
368impl AdaptiveGossipSub {
369 pub fn new(local_id: NodeId, trust_provider: Arc<dyn TrustProvider>) -> Self {
371 let (control_tx, _control_rx) = mpsc::channel(1000);
372 let (_message_tx, message_rx) = mpsc::channel(1000);
373
374 Self {
375 _local_id: local_id,
376 mesh: Arc::new(RwLock::new(HashMap::new())),
377 fanout: Arc::new(RwLock::new(HashMap::new())),
378 seen_messages: Arc::new(RwLock::new(HashMap::new())),
379 message_cache: Arc::new(RwLock::new(HashMap::new())),
380 peer_scores: Arc::new(RwLock::new(HashMap::new())),
381 topics: Arc::new(RwLock::new(HashMap::new())),
382 topic_priorities: Arc::new(RwLock::new(HashMap::new())),
383 _heartbeat_interval: Duration::from_secs(1),
384 message_validators: Arc::new(RwLock::new(HashMap::new())),
385 trust_provider,
386 _message_rx: Arc::new(RwLock::new(Some(message_rx))),
387 control_tx: Arc::new(RwLock::new(Some(control_tx))),
388 churn_detector: Arc::new(RwLock::new(ChurnDetector::new())),
389 stats: Arc::new(RwLock::new(GossipStats::default())),
390 }
391 }
392
393 pub async fn subscribe(&self, topic: &str) -> Result<()> {
395 let mut topics = self.topics.write().await;
396 topics
397 .entry(topic.to_string())
398 .or_insert_with(TopicParams::default);
399
400 let mut mesh = self.mesh.write().await;
401 mesh.insert(topic.to_string(), HashSet::new());
402
403 Ok(())
404 }
405
406 pub async fn unsubscribe(&self, topic: &str) -> Result<()> {
408 let mut mesh = self.mesh.write().await;
409 mesh.remove(topic);
410
411 Ok(())
412 }
413
414 pub async fn publish(&self, topic: &str, message: GossipMessage) -> Result<()> {
416 if !self.validate_message(&message).await? {
418 return Err(AdaptiveNetworkError::Gossip(
419 "Message validation failed".to_string(),
420 ));
421 }
422
423 let msg_id = self.compute_message_id(&message);
424
425 {
427 let mut seen = self.seen_messages.write().await;
428 seen.insert(msg_id, Instant::now());
429
430 let mut cache = self.message_cache.write().await;
431 cache.insert(msg_id, message.clone());
432 }
433
434 let mesh = self.mesh.read().await;
436 if let Some(mesh_peers) = mesh.get(topic) {
437 for peer in mesh_peers {
438 self.send_message(peer, &message).await?;
440 }
441 } else {
442 let fanout = self.fanout.read().await;
444 let fanout_peers = fanout
445 .get(topic)
446 .cloned()
447 .unwrap_or_else(|| self.get_fanout_peers(topic).unwrap_or_default());
448
449 for peer in &fanout_peers {
450 self.send_message(peer, &message).await?;
451 }
452 }
453
454 Ok(())
455 }
456
457 pub async fn send_graft(&self, peer: &NodeId, topic: &str) -> Result<()> {
459 let control_tx = self.control_tx.read().await;
460 if let Some(tx) = control_tx.as_ref() {
461 let msg = ControlMessage::Graft {
462 topic: topic.to_string(),
463 };
464 tx.send((peer.clone(), msg))
465 .await
466 .map_err(|_| AdaptiveNetworkError::Other("Failed to send GRAFT".to_string()))?;
467 }
468 Ok(())
469 }
470
471 pub async fn send_prune(&self, peer: &NodeId, topic: &str, backoff: Duration) -> Result<()> {
473 let control_tx = self.control_tx.read().await;
474 if let Some(tx) = control_tx.as_ref() {
475 let msg = ControlMessage::Prune {
476 topic: topic.to_string(),
477 backoff,
478 };
479 tx.send((peer.clone(), msg))
480 .await
481 .map_err(|_| AdaptiveNetworkError::Other("Failed to send PRUNE".to_string()))?;
482 }
483 Ok(())
484 }
485
486 pub async fn send_ihave(
488 &self,
489 peer: &NodeId,
490 topic: &str,
491 message_ids: Vec<MessageId>,
492 ) -> Result<()> {
493 let control_tx = self.control_tx.read().await;
494 if let Some(tx) = control_tx.as_ref() {
495 let msg = ControlMessage::IHave {
496 topic: topic.to_string(),
497 message_ids,
498 };
499 tx.send((peer.clone(), msg))
500 .await
501 .map_err(|_| AdaptiveNetworkError::Other("Failed to send IHAVE".to_string()))?;
502 }
503 Ok(())
504 }
505
506 pub async fn send_iwant(&self, peer: &NodeId, message_ids: Vec<MessageId>) -> Result<()> {
508 let control_tx = self.control_tx.read().await;
509 if let Some(tx) = control_tx.as_ref() {
510 let msg = ControlMessage::IWant { message_ids };
511 tx.send((peer.clone(), msg))
512 .await
513 .map_err(|_| AdaptiveNetworkError::Other("Failed to send IWANT".to_string()))?;
514 }
515 Ok(())
516 }
517
518 pub async fn heartbeat(&self) {
520 let mesh = self.mesh.read().await.clone();
521
522 for (topic, mesh_peers) in mesh {
523 let params = {
524 let topics = self.topics.read().await;
525 topics.get(&topic).cloned().unwrap_or_default()
526 };
527
528 let target_size = self.calculate_adaptive_mesh_size(&topic).await;
530
531 let mut peers_to_remove = Vec::new();
533 {
534 let scores = self.peer_scores.read().await;
535 for peer in &mesh_peers {
536 if let Some(score) = scores.get(peer)
537 && score.score() < params.graylist_threshold
538 {
539 peers_to_remove.push(peer.clone());
540 }
541 }
542 }
543
544 let mut mesh_write = self.mesh.write().await;
546 if let Some(topic_mesh) = mesh_write.get_mut(&topic) {
547 for peer in peers_to_remove {
549 topic_mesh.remove(&peer);
550 let _ = self
551 .send_prune(&peer, &topic, Duration::from_secs(60))
552 .await;
553
554 let mut churn = self.churn_detector.write().await;
556 churn.record_leave(peer);
557 }
558
559 while topic_mesh.len() < target_size {
561 if let Some(peer) = self.select_peer_for_mesh(&topic, topic_mesh).await {
562 topic_mesh.insert(peer.clone());
563 let _ = self.send_graft(&peer, &topic).await;
564
565 let mut churn = self.churn_detector.write().await;
567 churn.record_join(peer);
568 } else {
569 break;
570 }
571 }
572 }
573 }
574
575 self.update_peer_scores().await;
577
578 self.clean_seen_messages().await;
580 }
581
582 pub async fn calculate_adaptive_mesh_size(&self, topic: &str) -> usize {
584 let base_size = 8;
585
586 let churn_rate = {
588 let churn = self.churn_detector.read().await;
589 churn.get_rate()
590 };
591
592 let priority_factor = {
594 let priorities = self.topic_priorities.read().await;
595 match priorities.get(topic) {
596 Some(TopicPriority::Critical) => 2.0,
597 Some(TopicPriority::High) => 1.5,
598 Some(TopicPriority::Normal) => 1.0,
599 Some(TopicPriority::Low) => 0.8,
600 None => 1.0,
601 }
602 };
603
604 let churn_factor = 1.0 + (churn_rate * 0.1).min(0.5); (base_size as f64 * churn_factor * priority_factor).round() as usize
608 }
609
610 async fn select_peer_for_mesh(
612 &self,
613 _topic: &str,
614 current_mesh: &HashSet<NodeId>,
615 ) -> Option<NodeId> {
616 let scores = self.peer_scores.read().await;
618 let mut candidates: Vec<_> = scores
619 .iter()
620 .filter(|(peer_id, _)| !current_mesh.contains(peer_id))
621 .map(|(peer_id, score)| (peer_id.clone(), score.score()))
622 .collect();
623
624 candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
625 candidates.first().map(|(peer, _)| peer.clone())
626 }
627
628 async fn update_peer_scores(&self) {
630 let mut scores = self.peer_scores.write().await;
631 for (peer_id, score) in scores.iter_mut() {
632 score.app_specific_score = self.trust_provider.get_trust(peer_id);
634
635 score.behavior_penalty *= 0.99;
637 }
638 }
639
640 async fn clean_seen_messages(&self) {
642 let cutoff = Instant::now() - Duration::from_secs(300); let mut seen = self.seen_messages.write().await;
644 seen.retain(|_, timestamp| *timestamp > cutoff);
645 }
646
647 pub fn compute_message_id(&self, message: &GossipMessage) -> MessageId {
649 use sha2::{Digest, Sha256};
650 let mut hasher = Sha256::new();
651 hasher.update(message.topic.as_bytes());
652 hasher.update(message.from.hash);
653 hasher.update(message.seqno.to_le_bytes());
654 hasher.update(&message.data);
655
656 let result = hasher.finalize();
657 let mut id = [0u8; 32];
658 id.copy_from_slice(&result);
659 id
660 }
661
662 async fn send_message(&self, _peer: &NodeId, _message: &GossipMessage) -> Result<()> {
664 Ok(())
666 }
667
668 fn get_fanout_peers(&self, _topic: &str) -> Option<HashSet<NodeId>> {
670 None
672 }
673
674 pub async fn handle_control_message(
676 &self,
677 from: &NodeId,
678 message: ControlMessage,
679 ) -> Result<()> {
680 match message {
681 ControlMessage::Graft { topic } => {
682 let mut mesh = self.mesh.write().await;
684 if let Some(topic_mesh) = mesh.get_mut(&topic) {
685 let score = {
687 let scores = self.peer_scores.read().await;
688 scores.get(from).map(|s| s.score()).unwrap_or(0.0)
689 };
690
691 let score = if score == 0.0 {
693 self.trust_provider.get_trust(from)
694 } else {
695 score
696 };
697
698 if score > 0.0 {
699 topic_mesh.insert(from.clone());
700 } else {
701 let _ = self.send_prune(from, &topic, Duration::from_secs(60)).await;
703 }
704 }
705 }
706 ControlMessage::Prune { topic, backoff: _ } => {
707 let mut mesh = self.mesh.write().await;
709 if let Some(topic_mesh) = mesh.get_mut(&topic) {
710 topic_mesh.remove(from);
711 }
712 }
713 ControlMessage::IHave {
714 topic: _,
715 message_ids,
716 } => {
717 let seen = self.seen_messages.read().await;
719 let mut want = Vec::new();
720
721 for msg_id in message_ids {
722 if !seen.contains_key(&msg_id) {
723 want.push(msg_id);
724 }
725 }
726
727 if !want.is_empty() {
728 let _ = self.send_iwant(from, want).await;
729 }
730 }
731 ControlMessage::IWant { message_ids } => {
732 let cache = self.message_cache.read().await;
734 for msg_id in message_ids {
735 if let Some(message) = cache.get(&msg_id) {
736 let _ = self.send_message(from, message).await;
737 }
738 }
739 }
740 }
741
742 Ok(())
743 }
744
745 pub async fn set_topic_priority(&self, topic: &str, priority: TopicPriority) {
747 let mut priorities = self.topic_priorities.write().await;
748 priorities.insert(topic.to_string(), priority);
749 }
750
751 pub async fn register_validator(
753 &self,
754 topic: &str,
755 validator: Box<dyn MessageValidator + Send + Sync>,
756 ) -> Result<()> {
757 let mut validators = self.message_validators.write().await;
758 validators.insert(topic.to_string(), validator);
759 Ok(())
760 }
761
762 async fn validate_message(&self, message: &GossipMessage) -> Result<bool> {
764 let validators = self.message_validators.read().await;
765
766 if let Some(validator) = validators.get(&message.topic) {
767 validator.validate(message).await
768 } else {
769 Ok(true)
771 }
772 }
773
774 pub async fn reduce_fanout(&self, factor: f64) {
776 let _ = factor; }
780
781 pub async fn get_stats(&self) -> GossipStats {
783 let mut stats = self.stats.read().await.clone();
784
785 let mesh = self.mesh.read().await;
787 stats.mesh_size = mesh.values().map(|peers| peers.len()).sum();
788 stats.topic_count = mesh.len();
789
790 let peer_scores = self.peer_scores.read().await;
791 stats.peer_count = peer_scores.len();
792
793 stats
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800
801 #[tokio::test]
802 async fn test_gossipsub_subscribe() {
803 struct MockTrustProvider;
804 impl TrustProvider for MockTrustProvider {
805 fn get_trust(&self, _node: &NodeId) -> f64 {
806 0.5
807 }
808 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
809 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
810 HashMap::new()
811 }
812 fn remove_node(&self, _node: &NodeId) {}
813 }
814
815 use crate::peer_record::UserId;
816 use rand::RngCore;
817
818 let mut hash = [0u8; 32];
819 rand::thread_rng().fill_bytes(&mut hash);
820 let local_id = UserId::from_bytes(hash);
821
822 let trust_provider = Arc::new(MockTrustProvider);
823 let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
824
825 gossip.subscribe("test-topic").await.unwrap();
826
827 let mesh = gossip.mesh.read().await;
828 assert!(mesh.contains_key("test-topic"));
829 }
830
831 #[test]
832 fn test_peer_score() {
833 let mut score = PeerScore::new();
834 assert!(score.score() > 0.0);
835
836 score.invalid_messages = 5;
837 assert!(score.score() < 0.0);
838 }
839
840 #[test]
841 fn test_message_id() {
842 struct MockTrustProvider;
843 impl TrustProvider for MockTrustProvider {
844 fn get_trust(&self, _node: &NodeId) -> f64 {
845 0.5
846 }
847 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
848 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
849 HashMap::new()
850 }
851 fn remove_node(&self, _node: &NodeId) {}
852 }
853
854 use crate::peer_record::UserId;
855 use rand::RngCore;
856
857 let mut hash = [0u8; 32];
858 rand::thread_rng().fill_bytes(&mut hash);
859 let local_id = UserId::from_bytes(hash);
860
861 let trust_provider = Arc::new(MockTrustProvider);
862 let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
863
864 let mut hash2 = [0u8; 32];
865 rand::thread_rng().fill_bytes(&mut hash2);
866
867 let msg = GossipMessage {
868 topic: "test".to_string(),
869 data: vec![1, 2, 3],
870 from: UserId::from_bytes(hash2),
871 seqno: 1,
872 timestamp: 12345,
873 };
874
875 let id1 = gossip.compute_message_id(&msg);
876 let id2 = gossip.compute_message_id(&msg);
877
878 assert_eq!(id1, id2);
879 }
880
881 #[tokio::test]
882 async fn test_adaptive_mesh_size() {
883 use crate::peer_record::UserId;
884 use rand::RngCore;
885
886 struct MockTrustProvider;
887 impl TrustProvider for MockTrustProvider {
888 fn get_trust(&self, _node: &NodeId) -> f64 {
889 0.5
890 }
891 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
892 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
893 HashMap::new()
894 }
895 fn remove_node(&self, _node: &NodeId) {}
896 }
897
898 let mut hash = [0u8; 32];
899 rand::thread_rng().fill_bytes(&mut hash);
900 let local_id = UserId::from_bytes(hash);
901
902 let trust_provider = Arc::new(MockTrustProvider);
903 let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
904
905 gossip
907 .set_topic_priority("critical-topic", TopicPriority::Critical)
908 .await;
909 gossip
910 .set_topic_priority("low-topic", TopicPriority::Low)
911 .await;
912
913 let critical_size = gossip.calculate_adaptive_mesh_size("critical-topic").await;
915 let normal_size = gossip.calculate_adaptive_mesh_size("normal-topic").await;
916 let low_size = gossip.calculate_adaptive_mesh_size("low-topic").await;
917
918 assert!(critical_size > normal_size);
919 assert!(normal_size > low_size);
920 }
921
922 #[test]
923 fn test_churn_detector() {
924 use crate::peer_record::UserId;
925 use rand::RngCore;
926
927 let mut detector = ChurnDetector::new();
928
929 for i in 0..10 {
931 let mut hash = [0u8; 32];
932 rand::thread_rng().fill_bytes(&mut hash);
933 hash[0] = i;
934 let peer = UserId::from_bytes(hash);
935
936 if i % 2 == 0 {
937 detector.record_join(peer);
938 } else {
939 detector.record_leave(peer);
940 }
941 }
942
943 let rate = detector.get_rate();
944 assert!(rate > 0.0);
945 }
946
947 #[tokio::test]
948 async fn test_control_messages() {
949 use crate::peer_record::UserId;
950 use rand::RngCore;
951
952 struct MockTrustProvider;
953 impl TrustProvider for MockTrustProvider {
954 fn get_trust(&self, _node: &NodeId) -> f64 {
955 0.8
956 }
957 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
958 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
959 HashMap::new()
960 }
961 fn remove_node(&self, _node: &NodeId) {}
962 }
963
964 let mut hash = [0u8; 32];
965 rand::thread_rng().fill_bytes(&mut hash);
966 let local_id = UserId::from_bytes(hash);
967
968 let trust_provider = Arc::new(MockTrustProvider);
969 let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
970
971 gossip.subscribe("test-topic").await.unwrap();
973
974 let mut peer_hash = [0u8; 32];
976 rand::thread_rng().fill_bytes(&mut peer_hash);
977 let peer_id = UserId::from_bytes(peer_hash);
978
979 let graft_msg = ControlMessage::Graft {
980 topic: "test-topic".to_string(),
981 };
982 gossip
983 .handle_control_message(&peer_id, graft_msg)
984 .await
985 .unwrap();
986
987 let mesh = gossip.mesh.read().await;
989 assert!(mesh.get("test-topic").unwrap().contains(&peer_id));
990 }
991
992 #[tokio::test]
993 async fn test_message_validation() {
994 use crate::peer_record::UserId;
995 use rand::RngCore;
996
997 struct MockTrustProvider;
998 impl TrustProvider for MockTrustProvider {
999 fn get_trust(&self, _node: &NodeId) -> f64 {
1000 0.8
1001 }
1002 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
1003 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
1004 HashMap::new()
1005 }
1006 fn remove_node(&self, _node: &NodeId) {}
1007 }
1008
1009 struct TestValidator;
1011 #[async_trait::async_trait]
1012 impl MessageValidator for TestValidator {
1013 async fn validate(&self, message: &GossipMessage) -> Result<bool> {
1014 Ok(!message.data.windows(3).any(|w| w == b"bad"))
1015 }
1016 }
1017
1018 let mut hash = [0u8; 32];
1019 rand::thread_rng().fill_bytes(&mut hash);
1020 let local_id = UserId::from_bytes(hash);
1021
1022 let trust_provider = Arc::new(MockTrustProvider);
1023 let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
1024
1025 gossip
1027 .register_validator("test-topic", Box::new(TestValidator))
1028 .await
1029 .unwrap();
1030
1031 let valid_message = GossipMessage {
1033 topic: "test-topic".to_string(),
1034 data: vec![1, 2, 3, 4], from: UserId::from_bytes([0; 32]),
1036 seqno: 1,
1037 timestamp: 12345,
1038 };
1039
1040 assert!(gossip.publish("test-topic", valid_message).await.is_ok());
1042
1043 let invalid_message = GossipMessage {
1045 topic: "test-topic".to_string(),
1046 data: vec![b'b', b'a', b'd', b'!'], from: UserId::from_bytes([0; 32]),
1048 seqno: 2,
1049 timestamp: 12346,
1050 };
1051
1052 assert!(gossip.publish("test-topic", invalid_message).await.is_err());
1054 }
1055
1056 #[tokio::test]
1057 async fn test_ihave_iwant_flow() {
1058 use crate::peer_record::UserId;
1059 use rand::RngCore;
1060
1061 struct MockTrustProvider;
1062 impl TrustProvider for MockTrustProvider {
1063 fn get_trust(&self, _node: &NodeId) -> f64 {
1064 0.8
1065 }
1066 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
1067 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
1068 HashMap::new()
1069 }
1070 fn remove_node(&self, _node: &NodeId) {}
1071 }
1072
1073 let mut hash = [0u8; 32];
1074 rand::thread_rng().fill_bytes(&mut hash);
1075 let local_id = UserId::from_bytes(hash);
1076
1077 let trust_provider = Arc::new(MockTrustProvider);
1078 let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
1079
1080 let mut peer_hash = [0u8; 32];
1082 rand::thread_rng().fill_bytes(&mut peer_hash);
1083 let from_peer = UserId::from_bytes(peer_hash);
1084
1085 let message = GossipMessage {
1086 topic: "test-topic".to_string(),
1087 data: vec![1, 2, 3, 4],
1088 from: from_peer.clone(),
1089 seqno: 1,
1090 timestamp: 12345,
1091 };
1092
1093 gossip.publish("test-topic", message.clone()).await.unwrap();
1095
1096 let msg_id = gossip.compute_message_id(&message);
1098 let cache = gossip.message_cache.read().await;
1099 assert!(cache.contains_key(&msg_id));
1100 }
1101}