hive_btle/
gossip.rs

1//! Gossip protocol strategies for mesh synchronization
2//!
3//! This module provides configurable gossip strategies that determine how
4//! documents are propagated through the mesh. The key insight is that BLE
5//! mesh sync does NOT require full n² connectivity - epidemic gossip
6//! protocols achieve eventual consistency with O(log N) rounds.
7//!
8//! ## Gossip Protocol Fundamentals
9//!
10//! - **Push gossip**: Nodes proactively push updates to random peers
11//! - **Pull gossip**: Nodes periodically request updates from peers
12//! - **Push-pull**: Combines both for faster convergence
13//!
14//! For HIVE BLE mesh, we use push gossip with configurable fanout.
15//!
16//! ## Convergence Guarantees
17//!
18//! With fanout=2 and N nodes:
19//! - Expected rounds to reach all nodes: O(log N)
20//! - 10 nodes: ~4 rounds
21//! - 20 nodes: ~5 rounds
22//! - 50 nodes: ~6 rounds
23//!
24//! ## Usage
25//!
26//! ```rust
27//! use hive_btle::gossip::{GossipStrategy, RandomFanout};
28//! use hive_btle::peer::HivePeer;
29//!
30//! // Create a strategy with fanout of 2
31//! let strategy = RandomFanout::new(2);
32//!
33//! // Select peers to gossip to
34//! let peers: Vec<HivePeer> = vec![]; // your connected peers
35//! let selected = strategy.select_peers(&peers);
36//! ```
37
38#[cfg(not(feature = "std"))]
39use alloc::vec::Vec;
40
41use crate::document::MergeResult;
42use crate::peer::HivePeer;
43
44/// Trait for gossip peer selection strategies
45///
46/// Implementations determine which subset of connected peers receive
47/// each gossip message. The goal is efficient epidemic spread while
48/// minimizing bandwidth and battery usage.
49pub trait GossipStrategy: Send + Sync {
50    /// Select peers to send a gossip message to
51    ///
52    /// Given the list of connected peers, return those that should
53    /// receive the next gossip message. The selection should balance:
54    /// - Convergence speed (more peers = faster)
55    /// - Resource usage (fewer peers = less battery/bandwidth)
56    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer>;
57
58    /// Determine if an update should be forwarded after a merge
59    ///
60    /// Returns `true` if the merge result indicates new information
61    /// that should be propagated to other peers.
62    fn should_forward(&self, result: &MergeResult) -> bool {
63        // Default: forward if counter or emergency state changed
64        result.counter_changed || result.emergency_changed
65    }
66
67    /// Get the name of this strategy (for logging/debugging)
68    fn name(&self) -> &'static str;
69}
70
71/// Random fanout gossip strategy
72///
73/// Selects a random subset of peers for each gossip round.
74/// This is the classic epidemic gossip approach.
75///
76/// ## Fanout Selection
77///
78/// - **fanout=1**: Minimal, slow convergence, lowest overhead
79/// - **fanout=2**: Standard, O(log N) convergence, good balance
80/// - **fanout=3+**: Fast convergence, higher overhead
81///
82/// For most HIVE deployments, fanout=2 is recommended.
83#[derive(Debug, Clone)]
84pub struct RandomFanout {
85    /// Number of peers to select per round
86    fanout: usize,
87    /// Random seed for deterministic testing (None = use system random)
88    #[cfg(feature = "std")]
89    seed: Option<u64>,
90}
91
92impl RandomFanout {
93    /// Create a new random fanout strategy
94    ///
95    /// # Arguments
96    /// * `fanout` - Number of peers to select per gossip round
97    pub fn new(fanout: usize) -> Self {
98        Self {
99            fanout: fanout.max(1), // At least 1
100            #[cfg(feature = "std")]
101            seed: None,
102        }
103    }
104
105    /// Create with a fixed seed for deterministic testing
106    #[cfg(feature = "std")]
107    pub fn with_seed(fanout: usize, seed: u64) -> Self {
108        Self {
109            fanout: fanout.max(1),
110            seed: Some(seed),
111        }
112    }
113
114    /// Get a pseudo-random number
115    #[cfg(feature = "std")]
116    fn random_index(&self, max: usize, iteration: usize) -> usize {
117        use std::time::SystemTime;
118
119        let seed = self.seed.unwrap_or_else(|| {
120            SystemTime::now()
121                .duration_since(SystemTime::UNIX_EPOCH)
122                .map(|d| d.as_nanos() as u64)
123                .unwrap_or(12345)
124        });
125
126        // Simple LCG for lightweight randomness
127        let mixed = seed
128            .wrapping_mul(6364136223846793005)
129            .wrapping_add(iteration as u64);
130        (mixed as usize) % max
131    }
132}
133
134impl Default for RandomFanout {
135    fn default() -> Self {
136        Self::new(2) // Default fanout of 2
137    }
138}
139
140impl GossipStrategy for RandomFanout {
141    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
142        if peers.is_empty() {
143            return Vec::new();
144        }
145
146        // If we have fewer peers than fanout, return all
147        if peers.len() <= self.fanout {
148            return peers.iter().collect();
149        }
150
151        // Select random subset
152        #[cfg(feature = "std")]
153        {
154            let mut selected = Vec::with_capacity(self.fanout);
155            let mut used = std::collections::HashSet::new();
156
157            for i in 0..self.fanout * 3 {
158                // Try up to 3x fanout to find unique peers
159                if selected.len() >= self.fanout {
160                    break;
161                }
162
163                let idx = self.random_index(peers.len(), i);
164                if !used.contains(&idx) {
165                    used.insert(idx);
166                    selected.push(&peers[idx]);
167                }
168            }
169
170            selected
171        }
172
173        #[cfg(not(feature = "std"))]
174        {
175            // No_std fallback: just take first N peers
176            peers.iter().take(self.fanout).collect()
177        }
178    }
179
180    fn name(&self) -> &'static str {
181        "random_fanout"
182    }
183}
184
185/// Broadcast-all strategy
186///
187/// Sends to all connected peers. Use only for:
188/// - Very small meshes (< 5 nodes)
189/// - Emergency situations requiring immediate propagation
190/// - Testing/debugging
191///
192/// **Warning**: This is O(N) per round - not suitable for large meshes.
193#[derive(Debug, Clone, Default)]
194pub struct BroadcastAll;
195
196impl BroadcastAll {
197    /// Create a new broadcast-all strategy
198    pub fn new() -> Self {
199        Self
200    }
201}
202
203impl GossipStrategy for BroadcastAll {
204    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
205        peers.iter().collect()
206    }
207
208    fn name(&self) -> &'static str {
209        "broadcast_all"
210    }
211}
212
213/// Signal-strength based selection
214///
215/// Prefers peers with stronger signal (better reliability).
216/// Falls back to random selection for peers with similar signal.
217#[derive(Debug, Clone)]
218pub struct SignalBasedFanout {
219    /// Number of peers to select
220    fanout: usize,
221    /// Minimum RSSI difference to prefer one peer over another
222    rssi_threshold: i8,
223}
224
225impl SignalBasedFanout {
226    /// Create a new signal-based strategy
227    ///
228    /// # Arguments
229    /// * `fanout` - Number of peers to select
230    /// * `rssi_threshold` - RSSI difference (dB) to consider significant
231    pub fn new(fanout: usize, rssi_threshold: i8) -> Self {
232        Self {
233            fanout: fanout.max(1),
234            rssi_threshold,
235        }
236    }
237}
238
239impl Default for SignalBasedFanout {
240    fn default() -> Self {
241        Self::new(2, 10) // Default: 2 peers, 10dB threshold
242    }
243}
244
245impl GossipStrategy for SignalBasedFanout {
246    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
247        if peers.is_empty() {
248            return Vec::new();
249        }
250
251        if peers.len() <= self.fanout {
252            return peers.iter().collect();
253        }
254
255        // Sort by signal strength (higher RSSI = better)
256        let mut sorted: Vec<_> = peers.iter().collect();
257        sorted.sort_by(|a, b| b.rssi.cmp(&a.rssi));
258
259        // Take the best ones, but add some randomness for diversity
260        let mut selected: Vec<&HivePeer> = Vec::with_capacity(self.fanout);
261
262        // Always include the strongest peer
263        if let Some(best) = sorted.first() {
264            selected.push(best);
265        }
266
267        // For remaining slots, prefer strong signals but allow some diversity
268        for peer in sorted.iter().skip(1) {
269            if selected.len() >= self.fanout {
270                break;
271            }
272
273            // Check if this peer is significantly weaker than the last selected
274            let last_rssi = selected.last().map(|p| p.rssi).unwrap_or(-100);
275            let this_rssi = peer.rssi;
276
277            // Include if within threshold or we need more peers
278            if this_rssi >= last_rssi - self.rssi_threshold || selected.len() < self.fanout / 2 + 1
279            {
280                selected.push(peer);
281            }
282        }
283
284        // Fill remaining slots if needed
285        for peer in sorted.iter() {
286            if selected.len() >= self.fanout {
287                break;
288            }
289            // Check by node_id to avoid requiring PartialEq on HivePeer
290            let already_selected = selected.iter().any(|p| p.node_id == peer.node_id);
291            if !already_selected {
292                selected.push(peer);
293            }
294        }
295
296        selected
297    }
298
299    fn name(&self) -> &'static str {
300        "signal_based"
301    }
302}
303
304/// Emergency broadcast strategy
305///
306/// For emergency events, use maximum fanout to ensure rapid propagation.
307/// Automatically switches between normal and emergency modes.
308#[derive(Debug)]
309pub struct EmergencyAware {
310    /// Normal operation strategy
311    normal_fanout: usize,
312    /// Emergency fanout (usually all peers)
313    emergency_fanout: usize,
314    /// Whether we're in emergency mode
315    #[cfg(feature = "std")]
316    emergency_mode: std::sync::atomic::AtomicBool,
317}
318
319impl Clone for EmergencyAware {
320    fn clone(&self) -> Self {
321        Self {
322            normal_fanout: self.normal_fanout,
323            emergency_fanout: self.emergency_fanout,
324            #[cfg(feature = "std")]
325            emergency_mode: std::sync::atomic::AtomicBool::new(self.is_emergency()),
326        }
327    }
328}
329
330impl EmergencyAware {
331    /// Create a new emergency-aware strategy
332    pub fn new(normal_fanout: usize) -> Self {
333        Self {
334            normal_fanout: normal_fanout.max(1),
335            emergency_fanout: usize::MAX, // All peers during emergency
336            #[cfg(feature = "std")]
337            emergency_mode: std::sync::atomic::AtomicBool::new(false),
338        }
339    }
340
341    /// Set emergency mode
342    #[cfg(feature = "std")]
343    pub fn set_emergency(&self, active: bool) {
344        self.emergency_mode
345            .store(active, std::sync::atomic::Ordering::SeqCst);
346    }
347
348    /// Check if in emergency mode
349    #[cfg(feature = "std")]
350    pub fn is_emergency(&self) -> bool {
351        self.emergency_mode
352            .load(std::sync::atomic::Ordering::SeqCst)
353    }
354
355    fn effective_fanout(&self) -> usize {
356        #[cfg(feature = "std")]
357        {
358            if self.is_emergency() {
359                self.emergency_fanout
360            } else {
361                self.normal_fanout
362            }
363        }
364        #[cfg(not(feature = "std"))]
365        {
366            self.normal_fanout
367        }
368    }
369}
370
371impl Default for EmergencyAware {
372    fn default() -> Self {
373        Self::new(2)
374    }
375}
376
377impl GossipStrategy for EmergencyAware {
378    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
379        let fanout = self.effective_fanout();
380
381        if peers.len() <= fanout {
382            return peers.iter().collect();
383        }
384
385        // During emergency: all peers
386        // Normal: use random fanout behavior
387        peers.iter().take(fanout).collect()
388    }
389
390    fn should_forward(&self, result: &MergeResult) -> bool {
391        // Always forward during emergency mode
392        #[cfg(feature = "std")]
393        if self.is_emergency() {
394            return true;
395        }
396
397        // Switch to emergency mode if we received an emergency
398        #[cfg(feature = "std")]
399        if result.is_emergency() || result.emergency_changed {
400            self.set_emergency(true);
401        }
402
403        result.counter_changed || result.emergency_changed
404    }
405
406    fn name(&self) -> &'static str {
407        "emergency_aware"
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use crate::NodeId;
415
416    fn make_peer(id: u32, rssi: i8) -> HivePeer {
417        HivePeer {
418            node_id: NodeId::new(id),
419            identifier: format!("device-{}", id),
420            mesh_id: Some("TEST".to_string()),
421            name: Some(format!("HIVE-{:08X}", id)),
422            rssi,
423            is_connected: true,
424            last_seen_ms: 0,
425        }
426    }
427
428    #[test]
429    fn test_random_fanout_basic() {
430        let strategy = RandomFanout::new(2);
431
432        // Empty peers
433        let peers: Vec<HivePeer> = vec![];
434        assert!(strategy.select_peers(&peers).is_empty());
435
436        // Fewer peers than fanout
437        let peers = vec![make_peer(1, -50)];
438        let selected = strategy.select_peers(&peers);
439        assert_eq!(selected.len(), 1);
440
441        // More peers than fanout
442        let peers = vec![
443            make_peer(1, -50),
444            make_peer(2, -60),
445            make_peer(3, -70),
446            make_peer(4, -80),
447        ];
448        let selected = strategy.select_peers(&peers);
449        assert_eq!(selected.len(), 2);
450    }
451
452    #[test]
453    fn test_broadcast_all() {
454        let strategy = BroadcastAll::new();
455
456        let peers = vec![make_peer(1, -50), make_peer(2, -60), make_peer(3, -70)];
457
458        let selected = strategy.select_peers(&peers);
459        assert_eq!(selected.len(), 3);
460    }
461
462    #[test]
463    fn test_signal_based() {
464        let strategy = SignalBasedFanout::new(2, 10);
465
466        let peers = vec![
467            make_peer(1, -80), // Weak
468            make_peer(2, -50), // Strong
469            make_peer(3, -90), // Very weak
470            make_peer(4, -55), // Strong-ish
471        ];
472
473        let selected = strategy.select_peers(&peers);
474        assert_eq!(selected.len(), 2);
475
476        // Should prefer stronger signals
477        let node_ids: Vec<_> = selected.iter().map(|p| p.node_id.as_u32()).collect();
478        assert!(node_ids.contains(&2)); // Strongest should be included
479    }
480
481    #[test]
482    fn test_emergency_aware() {
483        let strategy = EmergencyAware::new(2);
484
485        let peers = vec![
486            make_peer(1, -50),
487            make_peer(2, -60),
488            make_peer(3, -70),
489            make_peer(4, -80),
490        ];
491
492        // Normal mode: limited fanout
493        assert!(!strategy.is_emergency());
494        let selected = strategy.select_peers(&peers);
495        assert_eq!(selected.len(), 2);
496
497        // Emergency mode: all peers
498        strategy.set_emergency(true);
499        assert!(strategy.is_emergency());
500        let selected = strategy.select_peers(&peers);
501        assert_eq!(selected.len(), 4);
502    }
503
504    #[test]
505    fn test_should_forward() {
506        let strategy = RandomFanout::default();
507
508        // Should forward if counter changed
509        let result = MergeResult {
510            source_node: NodeId::new(1),
511            event: None,
512            counter_changed: true,
513            emergency_changed: false,
514            total_count: 10,
515        };
516        assert!(strategy.should_forward(&result));
517
518        // Should forward if emergency changed
519        let result = MergeResult {
520            source_node: NodeId::new(1),
521            event: None,
522            counter_changed: false,
523            emergency_changed: true,
524            total_count: 10,
525        };
526        assert!(strategy.should_forward(&result));
527
528        // Should NOT forward if nothing changed
529        let result = MergeResult {
530            source_node: NodeId::new(1),
531            event: None,
532            counter_changed: false,
533            emergency_changed: false,
534            total_count: 10,
535        };
536        assert!(!strategy.should_forward(&result));
537    }
538}