saorsa_core/adaptive/
gossip.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Adaptive GossipSub implementation
15//!
16//! Enhanced gossip protocol with adaptive mesh degree, peer scoring,
17//! and priority message types
18
19use super::*;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, HashSet, VecDeque};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::{RwLock, mpsc};
25
26// Type aliases to reduce type complexity for channels
27type GossipMessageRx = mpsc::Receiver<(NodeId, GossipMessage)>;
28type ControlMessageTx = mpsc::Sender<(NodeId, ControlMessage)>;
29
30/// Topic identifier for gossip messages
31pub type Topic = String;
32
33/// Message identifier
34pub type MessageId = [u8; 32];
35
36/// Control messages for gossip protocol
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub enum ControlMessage {
39    Graft {
40        topic: Topic,
41    },
42    Prune {
43        topic: Topic,
44        backoff: Duration,
45    },
46    IHave {
47        topic: Topic,
48        message_ids: Vec<MessageId>,
49    },
50    IWant {
51        message_ids: Vec<MessageId>,
52    },
53}
54
55/// Topic priority levels
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub enum TopicPriority {
58    Low,
59    Normal,
60    High,
61    Critical,
62}
63
64/// Message validation trait
65#[async_trait::async_trait]
66pub trait MessageValidator: Send + Sync {
67    /// Validate a message before propagation
68    async fn validate(&self, message: &GossipMessage) -> Result<bool>;
69}
70
71/// Gossip statistics
72#[derive(Debug, Clone, Default)]
73pub struct GossipStats {
74    /// Total messages sent
75    pub messages_sent: u64,
76
77    /// Total messages received
78    pub messages_received: u64,
79
80    /// Current mesh size
81    pub mesh_size: usize,
82
83    /// Number of active topics
84    pub topic_count: usize,
85
86    /// Total peers
87    pub peer_count: usize,
88
89    /// Messages by topic
90    pub messages_by_topic: HashMap<Topic, u64>,
91}
92
93/// Adaptive GossipSub implementation
94pub struct AdaptiveGossipSub {
95    /// Local node ID
96    _local_id: NodeId,
97
98    /// Mesh peers for each topic
99    mesh: Arc<RwLock<HashMap<Topic, HashSet<NodeId>>>>,
100
101    /// Fanout peers for topics we're not subscribed to
102    fanout: Arc<RwLock<HashMap<Topic, HashSet<NodeId>>>>,
103
104    /// Seen messages cache
105    seen_messages: Arc<RwLock<HashMap<MessageId, Instant>>>,
106
107    /// Message cache for IWANT requests
108    message_cache: Arc<RwLock<HashMap<MessageId, GossipMessage>>>,
109
110    /// Peer scores
111    peer_scores: Arc<RwLock<HashMap<NodeId, PeerScore>>>,
112
113    /// Topic parameters
114    topics: Arc<RwLock<HashMap<Topic, TopicParams>>>,
115
116    /// Topic priorities
117    topic_priorities: Arc<RwLock<HashMap<Topic, TopicPriority>>>,
118
119    /// Heartbeat interval
120    _heartbeat_interval: Duration,
121
122    /// Message validators by topic
123    message_validators: Arc<RwLock<HashMap<Topic, Box<dyn MessageValidator + Send + Sync>>>>,
124
125    /// Trust provider for peer scoring
126    trust_provider: Arc<dyn TrustProvider>,
127
128    /// Message receiver channel
129    _message_rx: Arc<RwLock<Option<GossipMessageRx>>>,
130
131    /// Control message sender
132    control_tx: Arc<RwLock<Option<ControlMessageTx>>>,
133
134    /// Churn detector
135    churn_detector: Arc<RwLock<ChurnDetector>>,
136
137    /// Statistics
138    stats: Arc<RwLock<GossipStats>>,
139}
140
141/// Gossip message
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct GossipMessage {
144    pub topic: Topic,
145    pub data: Vec<u8>,
146    pub from: NodeId,
147    pub seqno: u64,
148    pub timestamp: u64,
149}
150
151/// Peer score tracking
152#[derive(Debug, Clone)]
153pub struct PeerScore {
154    pub time_in_mesh: Duration,
155    pub first_message_deliveries: u64,
156    pub mesh_message_deliveries: u64,
157    pub invalid_messages: u64,
158    pub behavior_penalty: f64,
159    pub app_specific_score: f64, // From trust system
160}
161
162impl PeerScore {
163    #[allow(dead_code)]
164    fn new() -> Self {
165        Self {
166            time_in_mesh: Duration::ZERO,
167            first_message_deliveries: 0,
168            mesh_message_deliveries: 0,
169            invalid_messages: 0,
170            behavior_penalty: 0.0,
171            app_specific_score: 0.5,
172        }
173    }
174
175    pub fn score(&self) -> f64 {
176        let time_score = (self.time_in_mesh.as_secs() as f64 / 60.0).min(10.0) * 0.5;
177        let delivery_score = (self.first_message_deliveries as f64).min(100.0) / 100.0;
178        let mesh_score = (self.mesh_message_deliveries as f64).min(1000.0) / 1000.0 * 0.2;
179        let invalid_penalty = self.invalid_messages as f64 * -10.0;
180
181        time_score
182            + delivery_score
183            + mesh_score
184            + invalid_penalty
185            + self.behavior_penalty
186            + self.app_specific_score
187    }
188}
189
190/// Topic parameters
191#[derive(Debug, Clone)]
192pub struct TopicParams {
193    pub d: usize,                // Target mesh degree
194    pub d_low: usize,            // Lower bound
195    pub d_high: usize,           // Upper bound
196    pub d_out: usize,            // Outbound degree for neighbor exchange
197    pub graylist_threshold: f64, // Score below which peers are graylisted
198    pub mesh_message_deliveries_threshold: f64,
199    pub gossip_factor: f64, // % of peers to send IHave to
200    pub priority: TopicPriority,
201}
202
203impl Default for TopicParams {
204    fn default() -> Self {
205        Self {
206            d: 8,
207            d_low: 6,
208            d_high: 12,
209            d_out: 2,
210            graylist_threshold: -1.0,
211            mesh_message_deliveries_threshold: 0.5,
212            gossip_factor: 0.25,
213            priority: TopicPriority::Normal,
214        }
215    }
216}
217
218/// Churn detection and tracking
219#[derive(Debug, Clone)]
220pub struct ChurnDetector {
221    /// Recent peer join/leave events
222    events: VecDeque<(Instant, ChurnEvent)>,
223    /// Window size for churn calculation
224    window: Duration,
225    /// Current churn rate
226    churn_rate: f64,
227}
228
229#[derive(Debug, Clone)]
230#[allow(dead_code)]
231enum ChurnEvent {
232    PeerJoined(NodeId),
233    PeerLeft(NodeId),
234}
235
236/// Churn statistics for a time window
237#[derive(Debug)]
238pub struct ChurnStats {
239    /// Number of nodes that joined
240    pub joins: usize,
241    /// Number of nodes that left
242    pub leaves: usize,
243    /// Average session duration
244    pub avg_session_duration: Duration,
245    /// Node join times for uptime calculation
246    node_join_times: HashMap<NodeId, Instant>,
247}
248
249impl ChurnStats {
250    /// Get uptime for a specific node
251    pub fn get_node_uptime(&self, node_id: &NodeId) -> Duration {
252        self.node_join_times
253            .get(node_id)
254            .map(|join_time| Instant::now().duration_since(*join_time))
255            .unwrap_or(Duration::from_secs(0))
256    }
257}
258
259impl ChurnDetector {
260    fn new() -> Self {
261        Self {
262            events: VecDeque::new(),
263            window: Duration::from_secs(300), // 5 minute window
264            churn_rate: 0.0,
265        }
266    }
267
268    fn record_join(&mut self, peer: NodeId) {
269        self.events
270            .push_back((Instant::now(), ChurnEvent::PeerJoined(peer)));
271        self.update_rate();
272    }
273
274    fn record_leave(&mut self, peer: NodeId) {
275        self.events
276            .push_back((Instant::now(), ChurnEvent::PeerLeft(peer)));
277        self.update_rate();
278    }
279
280    fn update_rate(&mut self) {
281        let cutoff = Instant::now() - self.window;
282        self.events.retain(|(time, _)| *time > cutoff);
283
284        let joins = self
285            .events
286            .iter()
287            .filter(|(_, event)| matches!(event, ChurnEvent::PeerJoined(_)))
288            .count();
289        let leaves = self
290            .events
291            .iter()
292            .filter(|(_, event)| matches!(event, ChurnEvent::PeerLeft(_)))
293            .count();
294
295        // Churn rate as percentage of changes
296        self.churn_rate = (joins + leaves) as f64 / self.window.as_secs() as f64;
297    }
298
299    fn get_rate(&self) -> f64 {
300        self.churn_rate
301    }
302
303    pub async fn get_hourly_rates(&self, hours: usize) -> Vec<f64> {
304        let now = Instant::now();
305        let mut hourly_rates = vec![0.0; hours];
306
307        for (time, event) in &self.events {
308            let age = now.duration_since(*time);
309            let hour_index = (age.as_secs() / 3600) as usize;
310
311            if hour_index < hours {
312                match event {
313                    ChurnEvent::PeerJoined(_) | ChurnEvent::PeerLeft(_) => {
314                        hourly_rates[hour_index] += 1.0;
315                    }
316                }
317            }
318        }
319
320        // Normalize to rates
321        for rate in &mut hourly_rates {
322            *rate /= 3600.0; // Events per second
323        }
324
325        hourly_rates
326    }
327
328    pub async fn get_recent_stats(&self, window: Duration) -> ChurnStats {
329        let now = Instant::now();
330        let mut joins = 0;
331        let mut leaves = 0;
332        let mut _session_durations = Vec::new();
333        let mut _node_join_times = HashMap::new();
334
335        for (time, event) in &self.events {
336            if now.duration_since(*time) <= window {
337                match event {
338                    ChurnEvent::PeerJoined(node_id) => {
339                        joins += 1;
340                        _node_join_times.insert(node_id.clone(), *time);
341                    }
342                    ChurnEvent::PeerLeft(_) => leaves += 1,
343                }
344            }
345        }
346
347        let avg_session_duration = if _session_durations.is_empty() {
348            Duration::from_secs(3600) // Default 1 hour
349        } else {
350            Duration::from_secs(
351                _session_durations
352                    .iter()
353                    .map(|d: &Duration| d.as_secs())
354                    .sum::<u64>()
355                    / _session_durations.len() as u64,
356            )
357        };
358
359        ChurnStats {
360            joins,
361            leaves,
362            avg_session_duration,
363            node_join_times: _node_join_times,
364        }
365    }
366}
367
368impl AdaptiveGossipSub {
369    /// Create a new adaptive gossipsub instance
370    pub fn new(local_id: NodeId, trust_provider: Arc<dyn TrustProvider>) -> Self {
371        let (control_tx, _control_rx) = mpsc::channel(1000);
372        let (_message_tx, message_rx) = mpsc::channel(1000);
373
374        Self {
375            _local_id: local_id,
376            mesh: Arc::new(RwLock::new(HashMap::new())),
377            fanout: Arc::new(RwLock::new(HashMap::new())),
378            seen_messages: Arc::new(RwLock::new(HashMap::new())),
379            message_cache: Arc::new(RwLock::new(HashMap::new())),
380            peer_scores: Arc::new(RwLock::new(HashMap::new())),
381            topics: Arc::new(RwLock::new(HashMap::new())),
382            topic_priorities: Arc::new(RwLock::new(HashMap::new())),
383            _heartbeat_interval: Duration::from_secs(1),
384            message_validators: Arc::new(RwLock::new(HashMap::new())),
385            trust_provider,
386            _message_rx: Arc::new(RwLock::new(Some(message_rx))),
387            control_tx: Arc::new(RwLock::new(Some(control_tx))),
388            churn_detector: Arc::new(RwLock::new(ChurnDetector::new())),
389            stats: Arc::new(RwLock::new(GossipStats::default())),
390        }
391    }
392
393    /// Subscribe to a topic
394    pub async fn subscribe(&self, topic: &str) -> Result<()> {
395        let mut topics = self.topics.write().await;
396        topics
397            .entry(topic.to_string())
398            .or_insert_with(TopicParams::default);
399
400        let mut mesh = self.mesh.write().await;
401        mesh.insert(topic.to_string(), HashSet::new());
402
403        Ok(())
404    }
405
406    /// Unsubscribe from a topic
407    pub async fn unsubscribe(&self, topic: &str) -> Result<()> {
408        let mut mesh = self.mesh.write().await;
409        mesh.remove(topic);
410
411        Ok(())
412    }
413
414    /// Publish a message to a topic
415    pub async fn publish(&self, topic: &str, message: GossipMessage) -> Result<()> {
416        // Validate message before publishing
417        if !self.validate_message(&message).await? {
418            return Err(AdaptiveNetworkError::Gossip(
419                "Message validation failed".to_string(),
420            ));
421        }
422
423        let msg_id = self.compute_message_id(&message);
424
425        // Add to seen messages and cache
426        {
427            let mut seen = self.seen_messages.write().await;
428            seen.insert(msg_id, Instant::now());
429
430            let mut cache = self.message_cache.write().await;
431            cache.insert(msg_id, message.clone());
432        }
433
434        // Send to mesh peers
435        let mesh = self.mesh.read().await;
436        if let Some(mesh_peers) = mesh.get(topic) {
437            for peer in mesh_peers {
438                // In real implementation, send via network
439                self.send_message(peer, &message).await?;
440            }
441        } else {
442            // Use fanout if not subscribed
443            let fanout = self.fanout.read().await;
444            let fanout_peers = fanout
445                .get(topic)
446                .cloned()
447                .unwrap_or_else(|| self.get_fanout_peers(topic).unwrap_or_default());
448
449            for peer in &fanout_peers {
450                self.send_message(peer, &message).await?;
451            }
452        }
453
454        Ok(())
455    }
456
457    /// Send GRAFT control message
458    pub async fn send_graft(&self, peer: &NodeId, topic: &str) -> Result<()> {
459        let control_tx = self.control_tx.read().await;
460        if let Some(tx) = control_tx.as_ref() {
461            let msg = ControlMessage::Graft {
462                topic: topic.to_string(),
463            };
464            tx.send((peer.clone(), msg))
465                .await
466                .map_err(|_| AdaptiveNetworkError::Other("Failed to send GRAFT".to_string()))?;
467        }
468        Ok(())
469    }
470
471    /// Send PRUNE control message
472    pub async fn send_prune(&self, peer: &NodeId, topic: &str, backoff: Duration) -> Result<()> {
473        let control_tx = self.control_tx.read().await;
474        if let Some(tx) = control_tx.as_ref() {
475            let msg = ControlMessage::Prune {
476                topic: topic.to_string(),
477                backoff,
478            };
479            tx.send((peer.clone(), msg))
480                .await
481                .map_err(|_| AdaptiveNetworkError::Other("Failed to send PRUNE".to_string()))?;
482        }
483        Ok(())
484    }
485
486    /// Send IHAVE control message
487    pub async fn send_ihave(
488        &self,
489        peer: &NodeId,
490        topic: &str,
491        message_ids: Vec<MessageId>,
492    ) -> Result<()> {
493        let control_tx = self.control_tx.read().await;
494        if let Some(tx) = control_tx.as_ref() {
495            let msg = ControlMessage::IHave {
496                topic: topic.to_string(),
497                message_ids,
498            };
499            tx.send((peer.clone(), msg))
500                .await
501                .map_err(|_| AdaptiveNetworkError::Other("Failed to send IHAVE".to_string()))?;
502        }
503        Ok(())
504    }
505
506    /// Send IWANT control message
507    pub async fn send_iwant(&self, peer: &NodeId, message_ids: Vec<MessageId>) -> Result<()> {
508        let control_tx = self.control_tx.read().await;
509        if let Some(tx) = control_tx.as_ref() {
510            let msg = ControlMessage::IWant { message_ids };
511            tx.send((peer.clone(), msg))
512                .await
513                .map_err(|_| AdaptiveNetworkError::Other("Failed to send IWANT".to_string()))?;
514        }
515        Ok(())
516    }
517
518    /// Handle periodic heartbeat
519    pub async fn heartbeat(&self) {
520        let mesh = self.mesh.read().await.clone();
521
522        for (topic, mesh_peers) in mesh {
523            let params = {
524                let topics = self.topics.read().await;
525                topics.get(&topic).cloned().unwrap_or_default()
526            };
527
528            // Calculate adaptive mesh size based on churn
529            let target_size = self.calculate_adaptive_mesh_size(&topic).await;
530
531            // Remove low-scoring peers
532            let mut peers_to_remove = Vec::new();
533            {
534                let scores = self.peer_scores.read().await;
535                for peer in &mesh_peers {
536                    if let Some(score) = scores.get(peer)
537                        && score.score() < params.graylist_threshold
538                    {
539                        peers_to_remove.push(peer.clone());
540                    }
541                }
542            }
543
544            // Update mesh
545            let mut mesh_write = self.mesh.write().await;
546            if let Some(topic_mesh) = mesh_write.get_mut(&topic) {
547                // Send PRUNE messages and update churn detector
548                for peer in peers_to_remove {
549                    topic_mesh.remove(&peer);
550                    let _ = self
551                        .send_prune(&peer, &topic, Duration::from_secs(60))
552                        .await;
553
554                    // Record peer leaving mesh
555                    let mut churn = self.churn_detector.write().await;
556                    churn.record_leave(peer);
557                }
558
559                // Add high-scoring peers if below target
560                while topic_mesh.len() < target_size {
561                    if let Some(peer) = self.select_peer_for_mesh(&topic, topic_mesh).await {
562                        topic_mesh.insert(peer.clone());
563                        let _ = self.send_graft(&peer, &topic).await;
564
565                        // Record peer joining mesh
566                        let mut churn = self.churn_detector.write().await;
567                        churn.record_join(peer);
568                    } else {
569                        break;
570                    }
571                }
572            }
573        }
574
575        // Update peer scores
576        self.update_peer_scores().await;
577
578        // Clean old seen messages
579        self.clean_seen_messages().await;
580    }
581
582    /// Calculate adaptive mesh size based on network conditions
583    pub async fn calculate_adaptive_mesh_size(&self, topic: &str) -> usize {
584        let base_size = 8;
585
586        // Get churn rate from detector
587        let churn_rate = {
588            let churn = self.churn_detector.read().await;
589            churn.get_rate()
590        };
591
592        // Get topic priority
593        let priority_factor = {
594            let priorities = self.topic_priorities.read().await;
595            match priorities.get(topic) {
596                Some(TopicPriority::Critical) => 2.0,
597                Some(TopicPriority::High) => 1.5,
598                Some(TopicPriority::Normal) => 1.0,
599                Some(TopicPriority::Low) => 0.8,
600                None => 1.0,
601            }
602        };
603
604        // Increase mesh size based on churn and priority
605        let churn_factor = 1.0 + (churn_rate * 0.1).min(0.5); // Max 50% increase
606
607        (base_size as f64 * churn_factor * priority_factor).round() as usize
608    }
609
610    /// Select a peer to add to mesh
611    async fn select_peer_for_mesh(
612        &self,
613        _topic: &str,
614        current_mesh: &HashSet<NodeId>,
615    ) -> Option<NodeId> {
616        // Select from known peers not in mesh, sorted by score
617        let scores = self.peer_scores.read().await;
618        let mut candidates: Vec<_> = scores
619            .iter()
620            .filter(|(peer_id, _)| !current_mesh.contains(peer_id))
621            .map(|(peer_id, score)| (peer_id.clone(), score.score()))
622            .collect();
623
624        candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
625        candidates.first().map(|(peer, _)| peer.clone())
626    }
627
628    /// Update peer scores
629    async fn update_peer_scores(&self) {
630        let mut scores = self.peer_scores.write().await;
631        for (peer_id, score) in scores.iter_mut() {
632            // Update app-specific score from trust system
633            score.app_specific_score = self.trust_provider.get_trust(peer_id);
634
635            // Decay behavior penalty
636            score.behavior_penalty *= 0.99;
637        }
638    }
639
640    /// Clean old seen messages
641    async fn clean_seen_messages(&self) {
642        let cutoff = Instant::now() - Duration::from_secs(300); // 5 minutes
643        let mut seen = self.seen_messages.write().await;
644        seen.retain(|_, timestamp| *timestamp > cutoff);
645    }
646
647    /// Compute message ID
648    pub fn compute_message_id(&self, message: &GossipMessage) -> MessageId {
649        use sha2::{Digest, Sha256};
650        let mut hasher = Sha256::new();
651        hasher.update(message.topic.as_bytes());
652        hasher.update(message.from.hash);
653        hasher.update(message.seqno.to_le_bytes());
654        hasher.update(&message.data);
655
656        let result = hasher.finalize();
657        let mut id = [0u8; 32];
658        id.copy_from_slice(&result);
659        id
660    }
661
662    /// Send message to a peer (placeholder)
663    async fn send_message(&self, _peer: &NodeId, _message: &GossipMessage) -> Result<()> {
664        // In real implementation, send via network layer
665        Ok(())
666    }
667
668    /// Get fanout peers for a topic
669    fn get_fanout_peers(&self, _topic: &str) -> Option<HashSet<NodeId>> {
670        // In real implementation, select high-scoring peers
671        None
672    }
673
674    /// Handle incoming control message
675    pub async fn handle_control_message(
676        &self,
677        from: &NodeId,
678        message: ControlMessage,
679    ) -> Result<()> {
680        match message {
681            ControlMessage::Graft { topic } => {
682                // Peer wants to join our mesh
683                let mut mesh = self.mesh.write().await;
684                if let Some(topic_mesh) = mesh.get_mut(&topic) {
685                    // Check peer score before accepting
686                    let score = {
687                        let scores = self.peer_scores.read().await;
688                        scores.get(from).map(|s| s.score()).unwrap_or(0.0)
689                    };
690
691                    // If we have no prior score, fall back to trust provider's score
692                    let score = if score == 0.0 {
693                        self.trust_provider.get_trust(from)
694                    } else {
695                        score
696                    };
697
698                    if score > 0.0 {
699                        topic_mesh.insert(from.clone());
700                    } else {
701                        // Send PRUNE back if we don't want them
702                        let _ = self.send_prune(from, &topic, Duration::from_secs(60)).await;
703                    }
704                }
705            }
706            ControlMessage::Prune { topic, backoff: _ } => {
707                // Peer is removing us from their mesh
708                let mut mesh = self.mesh.write().await;
709                if let Some(topic_mesh) = mesh.get_mut(&topic) {
710                    topic_mesh.remove(from);
711                }
712            }
713            ControlMessage::IHave {
714                topic: _,
715                message_ids,
716            } => {
717                // Peer is announcing messages they have
718                let seen = self.seen_messages.read().await;
719                let mut want = Vec::new();
720
721                for msg_id in message_ids {
722                    if !seen.contains_key(&msg_id) {
723                        want.push(msg_id);
724                    }
725                }
726
727                if !want.is_empty() {
728                    let _ = self.send_iwant(from, want).await;
729                }
730            }
731            ControlMessage::IWant { message_ids } => {
732                // Peer wants specific messages
733                let cache = self.message_cache.read().await;
734                for msg_id in message_ids {
735                    if let Some(message) = cache.get(&msg_id) {
736                        let _ = self.send_message(from, message).await;
737                    }
738                }
739            }
740        }
741
742        Ok(())
743    }
744
745    /// Set topic priority
746    pub async fn set_topic_priority(&self, topic: &str, priority: TopicPriority) {
747        let mut priorities = self.topic_priorities.write().await;
748        priorities.insert(topic.to_string(), priority);
749    }
750
751    /// Register a message validator for a topic
752    pub async fn register_validator(
753        &self,
754        topic: &str,
755        validator: Box<dyn MessageValidator + Send + Sync>,
756    ) -> Result<()> {
757        let mut validators = self.message_validators.write().await;
758        validators.insert(topic.to_string(), validator);
759        Ok(())
760    }
761
762    /// Validate a message before processing
763    async fn validate_message(&self, message: &GossipMessage) -> Result<bool> {
764        let validators = self.message_validators.read().await;
765
766        if let Some(validator) = validators.get(&message.topic) {
767            validator.validate(message).await
768        } else {
769            // No validator registered, accept by default
770            Ok(true)
771        }
772    }
773
774    /// Reduce gossip fanout during high churn
775    pub async fn reduce_fanout(&self, factor: f64) {
776        // In a real implementation, would reduce mesh degree based on factor
777        // This would involve updating the target degree for mesh maintenance
778        let _ = factor; // Suppress unused warning
779    }
780
781    /// Get gossip statistics
782    pub async fn get_stats(&self) -> GossipStats {
783        let mut stats = self.stats.read().await.clone();
784
785        // Update current values
786        let mesh = self.mesh.read().await;
787        stats.mesh_size = mesh.values().map(|peers| peers.len()).sum();
788        stats.topic_count = mesh.len();
789
790        let peer_scores = self.peer_scores.read().await;
791        stats.peer_count = peer_scores.len();
792
793        stats
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800
801    #[tokio::test]
802    async fn test_gossipsub_subscribe() {
803        struct MockTrustProvider;
804        impl TrustProvider for MockTrustProvider {
805            fn get_trust(&self, _node: &NodeId) -> f64 {
806                0.5
807            }
808            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
809            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
810                HashMap::new()
811            }
812            fn remove_node(&self, _node: &NodeId) {}
813        }
814
815        use crate::peer_record::UserId;
816        use rand::RngCore;
817
818        let mut hash = [0u8; 32];
819        rand::thread_rng().fill_bytes(&mut hash);
820        let local_id = UserId::from_bytes(hash);
821
822        let trust_provider = Arc::new(MockTrustProvider);
823        let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
824
825        gossip.subscribe("test-topic").await.unwrap();
826
827        let mesh = gossip.mesh.read().await;
828        assert!(mesh.contains_key("test-topic"));
829    }
830
831    #[test]
832    fn test_peer_score() {
833        let mut score = PeerScore::new();
834        assert!(score.score() > 0.0);
835
836        score.invalid_messages = 5;
837        assert!(score.score() < 0.0);
838    }
839
840    #[test]
841    fn test_message_id() {
842        struct MockTrustProvider;
843        impl TrustProvider for MockTrustProvider {
844            fn get_trust(&self, _node: &NodeId) -> f64 {
845                0.5
846            }
847            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
848            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
849                HashMap::new()
850            }
851            fn remove_node(&self, _node: &NodeId) {}
852        }
853
854        use crate::peer_record::UserId;
855        use rand::RngCore;
856
857        let mut hash = [0u8; 32];
858        rand::thread_rng().fill_bytes(&mut hash);
859        let local_id = UserId::from_bytes(hash);
860
861        let trust_provider = Arc::new(MockTrustProvider);
862        let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
863
864        let mut hash2 = [0u8; 32];
865        rand::thread_rng().fill_bytes(&mut hash2);
866
867        let msg = GossipMessage {
868            topic: "test".to_string(),
869            data: vec![1, 2, 3],
870            from: UserId::from_bytes(hash2),
871            seqno: 1,
872            timestamp: 12345,
873        };
874
875        let id1 = gossip.compute_message_id(&msg);
876        let id2 = gossip.compute_message_id(&msg);
877
878        assert_eq!(id1, id2);
879    }
880
881    #[tokio::test]
882    async fn test_adaptive_mesh_size() {
883        use crate::peer_record::UserId;
884        use rand::RngCore;
885
886        struct MockTrustProvider;
887        impl TrustProvider for MockTrustProvider {
888            fn get_trust(&self, _node: &NodeId) -> f64 {
889                0.5
890            }
891            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
892            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
893                HashMap::new()
894            }
895            fn remove_node(&self, _node: &NodeId) {}
896        }
897
898        let mut hash = [0u8; 32];
899        rand::thread_rng().fill_bytes(&mut hash);
900        let local_id = UserId::from_bytes(hash);
901
902        let trust_provider = Arc::new(MockTrustProvider);
903        let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
904
905        // Set topic priority
906        gossip
907            .set_topic_priority("critical-topic", TopicPriority::Critical)
908            .await;
909        gossip
910            .set_topic_priority("low-topic", TopicPriority::Low)
911            .await;
912
913        // Test mesh size calculation
914        let critical_size = gossip.calculate_adaptive_mesh_size("critical-topic").await;
915        let normal_size = gossip.calculate_adaptive_mesh_size("normal-topic").await;
916        let low_size = gossip.calculate_adaptive_mesh_size("low-topic").await;
917
918        assert!(critical_size > normal_size);
919        assert!(normal_size > low_size);
920    }
921
922    #[test]
923    fn test_churn_detector() {
924        use crate::peer_record::UserId;
925        use rand::RngCore;
926
927        let mut detector = ChurnDetector::new();
928
929        // Add some join/leave events
930        for i in 0..10 {
931            let mut hash = [0u8; 32];
932            rand::thread_rng().fill_bytes(&mut hash);
933            hash[0] = i;
934            let peer = UserId::from_bytes(hash);
935
936            if i % 2 == 0 {
937                detector.record_join(peer);
938            } else {
939                detector.record_leave(peer);
940            }
941        }
942
943        let rate = detector.get_rate();
944        assert!(rate > 0.0);
945    }
946
947    #[tokio::test]
948    async fn test_control_messages() {
949        use crate::peer_record::UserId;
950        use rand::RngCore;
951
952        struct MockTrustProvider;
953        impl TrustProvider for MockTrustProvider {
954            fn get_trust(&self, _node: &NodeId) -> f64 {
955                0.8
956            }
957            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
958            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
959                HashMap::new()
960            }
961            fn remove_node(&self, _node: &NodeId) {}
962        }
963
964        let mut hash = [0u8; 32];
965        rand::thread_rng().fill_bytes(&mut hash);
966        let local_id = UserId::from_bytes(hash);
967
968        let trust_provider = Arc::new(MockTrustProvider);
969        let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
970
971        // Subscribe to a topic
972        gossip.subscribe("test-topic").await.unwrap();
973
974        // Test GRAFT handling
975        let mut peer_hash = [0u8; 32];
976        rand::thread_rng().fill_bytes(&mut peer_hash);
977        let peer_id = UserId::from_bytes(peer_hash);
978
979        let graft_msg = ControlMessage::Graft {
980            topic: "test-topic".to_string(),
981        };
982        gossip
983            .handle_control_message(&peer_id, graft_msg)
984            .await
985            .unwrap();
986
987        // Peer should be in mesh due to good trust score
988        let mesh = gossip.mesh.read().await;
989        assert!(mesh.get("test-topic").unwrap().contains(&peer_id));
990    }
991
992    #[tokio::test]
993    async fn test_message_validation() {
994        use crate::peer_record::UserId;
995        use rand::RngCore;
996
997        struct MockTrustProvider;
998        impl TrustProvider for MockTrustProvider {
999            fn get_trust(&self, _node: &NodeId) -> f64 {
1000                0.8
1001            }
1002            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
1003            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
1004                HashMap::new()
1005            }
1006            fn remove_node(&self, _node: &NodeId) {}
1007        }
1008
1009        // Custom validator that rejects messages with "bad" in the data
1010        struct TestValidator;
1011        #[async_trait::async_trait]
1012        impl MessageValidator for TestValidator {
1013            async fn validate(&self, message: &GossipMessage) -> Result<bool> {
1014                Ok(!message.data.windows(3).any(|w| w == b"bad"))
1015            }
1016        }
1017
1018        let mut hash = [0u8; 32];
1019        rand::thread_rng().fill_bytes(&mut hash);
1020        let local_id = UserId::from_bytes(hash);
1021
1022        let trust_provider = Arc::new(MockTrustProvider);
1023        let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
1024
1025        // Register validator
1026        gossip
1027            .register_validator("test-topic", Box::new(TestValidator))
1028            .await
1029            .unwrap();
1030
1031        // Test valid message
1032        let valid_message = GossipMessage {
1033            topic: "test-topic".to_string(),
1034            data: vec![1, 2, 3, 4], // No "bad" in data
1035            from: UserId::from_bytes([0; 32]),
1036            seqno: 1,
1037            timestamp: 12345,
1038        };
1039
1040        // Should succeed
1041        assert!(gossip.publish("test-topic", valid_message).await.is_ok());
1042
1043        // Test invalid message
1044        let invalid_message = GossipMessage {
1045            topic: "test-topic".to_string(),
1046            data: vec![b'b', b'a', b'd', b'!'], // Contains "bad"
1047            from: UserId::from_bytes([0; 32]),
1048            seqno: 2,
1049            timestamp: 12346,
1050        };
1051
1052        // Should fail validation
1053        assert!(gossip.publish("test-topic", invalid_message).await.is_err());
1054    }
1055
1056    #[tokio::test]
1057    async fn test_ihave_iwant_flow() {
1058        use crate::peer_record::UserId;
1059        use rand::RngCore;
1060
1061        struct MockTrustProvider;
1062        impl TrustProvider for MockTrustProvider {
1063            fn get_trust(&self, _node: &NodeId) -> f64 {
1064                0.8
1065            }
1066            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
1067            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
1068                HashMap::new()
1069            }
1070            fn remove_node(&self, _node: &NodeId) {}
1071        }
1072
1073        let mut hash = [0u8; 32];
1074        rand::thread_rng().fill_bytes(&mut hash);
1075        let local_id = UserId::from_bytes(hash);
1076
1077        let trust_provider = Arc::new(MockTrustProvider);
1078        let gossip = AdaptiveGossipSub::new(local_id, trust_provider);
1079
1080        // Create a test message
1081        let mut peer_hash = [0u8; 32];
1082        rand::thread_rng().fill_bytes(&mut peer_hash);
1083        let from_peer = UserId::from_bytes(peer_hash);
1084
1085        let message = GossipMessage {
1086            topic: "test-topic".to_string(),
1087            data: vec![1, 2, 3, 4],
1088            from: from_peer.clone(),
1089            seqno: 1,
1090            timestamp: 12345,
1091        };
1092
1093        // Publish message (adds to cache)
1094        gossip.publish("test-topic", message.clone()).await.unwrap();
1095
1096        // Message should be in cache
1097        let msg_id = gossip.compute_message_id(&message);
1098        let cache = gossip.message_cache.read().await;
1099        assert!(cache.contains_key(&msg_id));
1100    }
1101}