hive_btle/
gossip.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Gossip protocol strategies for mesh synchronization
17//!
18//! This module provides configurable gossip strategies that determine how
19//! documents are propagated through the mesh. The key insight is that BLE
20//! mesh sync does NOT require full n² connectivity - epidemic gossip
21//! protocols achieve eventual consistency with O(log N) rounds.
22//!
23//! ## Gossip Protocol Fundamentals
24//!
25//! - **Push gossip**: Nodes proactively push updates to random peers
26//! - **Pull gossip**: Nodes periodically request updates from peers
27//! - **Push-pull**: Combines both for faster convergence
28//!
29//! For HIVE BLE mesh, we use push gossip with configurable fanout.
30//!
31//! ## Convergence Guarantees
32//!
33//! With fanout=2 and N nodes:
34//! - Expected rounds to reach all nodes: O(log N)
35//! - 10 nodes: ~4 rounds
36//! - 20 nodes: ~5 rounds
37//! - 50 nodes: ~6 rounds
38//!
39//! ## Usage
40//!
41//! ```rust
42//! use hive_btle::gossip::{GossipStrategy, RandomFanout};
43//! use hive_btle::peer::HivePeer;
44//!
45//! // Create a strategy with fanout of 2
46//! let strategy = RandomFanout::new(2);
47//!
48//! // Select peers to gossip to
49//! let peers: Vec<HivePeer> = vec![]; // your connected peers
50//! let selected = strategy.select_peers(&peers);
51//! ```
52
53#[cfg(not(feature = "std"))]
54use alloc::vec::Vec;
55
56use crate::document::MergeResult;
57use crate::peer::HivePeer;
58
59/// Trait for gossip peer selection strategies
60///
61/// Implementations determine which subset of connected peers receive
62/// each gossip message. The goal is efficient epidemic spread while
63/// minimizing bandwidth and battery usage.
64pub trait GossipStrategy: Send + Sync {
65    /// Select peers to send a gossip message to
66    ///
67    /// Given the list of connected peers, return those that should
68    /// receive the next gossip message. The selection should balance:
69    /// - Convergence speed (more peers = faster)
70    /// - Resource usage (fewer peers = less battery/bandwidth)
71    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer>;
72
73    /// Determine if an update should be forwarded after a merge
74    ///
75    /// Returns `true` if the merge result indicates new information
76    /// that should be propagated to other peers.
77    fn should_forward(&self, result: &MergeResult) -> bool {
78        // Default: forward if counter, emergency, or chat state changed
79        result.counter_changed || result.emergency_changed || result.chat_changed
80    }
81
82    /// Get the name of this strategy (for logging/debugging)
83    fn name(&self) -> &'static str;
84}
85
86/// Random fanout gossip strategy
87///
88/// Selects a random subset of peers for each gossip round.
89/// This is the classic epidemic gossip approach.
90///
91/// ## Fanout Selection
92///
93/// - **fanout=1**: Minimal, slow convergence, lowest overhead
94/// - **fanout=2**: Standard, O(log N) convergence, good balance
95/// - **fanout=3+**: Fast convergence, higher overhead
96///
97/// For most HIVE deployments, fanout=2 is recommended.
98#[derive(Debug, Clone)]
99pub struct RandomFanout {
100    /// Number of peers to select per round
101    fanout: usize,
102    /// Random seed for deterministic testing (None = use system random)
103    #[cfg(feature = "std")]
104    seed: Option<u64>,
105}
106
107impl RandomFanout {
108    /// Create a new random fanout strategy
109    ///
110    /// # Arguments
111    /// * `fanout` - Number of peers to select per gossip round
112    pub fn new(fanout: usize) -> Self {
113        Self {
114            fanout: fanout.max(1), // At least 1
115            #[cfg(feature = "std")]
116            seed: None,
117        }
118    }
119
120    /// Create with a fixed seed for deterministic testing
121    #[cfg(feature = "std")]
122    pub fn with_seed(fanout: usize, seed: u64) -> Self {
123        Self {
124            fanout: fanout.max(1),
125            seed: Some(seed),
126        }
127    }
128
129    /// Get a pseudo-random number
130    #[cfg(feature = "std")]
131    fn random_index(&self, max: usize, iteration: usize) -> usize {
132        use std::time::SystemTime;
133
134        let seed = self.seed.unwrap_or_else(|| {
135            SystemTime::now()
136                .duration_since(SystemTime::UNIX_EPOCH)
137                .map(|d| d.as_nanos() as u64)
138                .unwrap_or(12345)
139        });
140
141        // Simple LCG for lightweight randomness
142        let mixed = seed
143            .wrapping_mul(6364136223846793005)
144            .wrapping_add(iteration as u64);
145        (mixed as usize) % max
146    }
147}
148
149impl Default for RandomFanout {
150    fn default() -> Self {
151        Self::new(2) // Default fanout of 2
152    }
153}
154
155impl GossipStrategy for RandomFanout {
156    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
157        if peers.is_empty() {
158            return Vec::new();
159        }
160
161        // If we have fewer peers than fanout, return all
162        if peers.len() <= self.fanout {
163            return peers.iter().collect();
164        }
165
166        // Select random subset
167        #[cfg(feature = "std")]
168        {
169            let mut selected = Vec::with_capacity(self.fanout);
170            let mut used = std::collections::HashSet::new();
171
172            for i in 0..self.fanout * 3 {
173                // Try up to 3x fanout to find unique peers
174                if selected.len() >= self.fanout {
175                    break;
176                }
177
178                let idx = self.random_index(peers.len(), i);
179                if !used.contains(&idx) {
180                    used.insert(idx);
181                    selected.push(&peers[idx]);
182                }
183            }
184
185            selected
186        }
187
188        #[cfg(not(feature = "std"))]
189        {
190            // No_std fallback: just take first N peers
191            peers.iter().take(self.fanout).collect()
192        }
193    }
194
195    fn name(&self) -> &'static str {
196        "random_fanout"
197    }
198}
199
200/// Broadcast-all strategy
201///
202/// Sends to all connected peers. Use only for:
203/// - Very small meshes (< 5 nodes)
204/// - Emergency situations requiring immediate propagation
205/// - Testing/debugging
206///
207/// **Warning**: This is O(N) per round - not suitable for large meshes.
208#[derive(Debug, Clone, Default)]
209pub struct BroadcastAll;
210
211impl BroadcastAll {
212    /// Create a new broadcast-all strategy
213    pub fn new() -> Self {
214        Self
215    }
216}
217
218impl GossipStrategy for BroadcastAll {
219    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
220        peers.iter().collect()
221    }
222
223    fn name(&self) -> &'static str {
224        "broadcast_all"
225    }
226}
227
228/// Signal-strength based selection
229///
230/// Prefers peers with stronger signal (better reliability).
231/// Falls back to random selection for peers with similar signal.
232#[derive(Debug, Clone)]
233pub struct SignalBasedFanout {
234    /// Number of peers to select
235    fanout: usize,
236    /// Minimum RSSI difference to prefer one peer over another
237    rssi_threshold: i8,
238}
239
240impl SignalBasedFanout {
241    /// Create a new signal-based strategy
242    ///
243    /// # Arguments
244    /// * `fanout` - Number of peers to select
245    /// * `rssi_threshold` - RSSI difference (dB) to consider significant
246    pub fn new(fanout: usize, rssi_threshold: i8) -> Self {
247        Self {
248            fanout: fanout.max(1),
249            rssi_threshold,
250        }
251    }
252}
253
254impl Default for SignalBasedFanout {
255    fn default() -> Self {
256        Self::new(2, 10) // Default: 2 peers, 10dB threshold
257    }
258}
259
260impl GossipStrategy for SignalBasedFanout {
261    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
262        if peers.is_empty() {
263            return Vec::new();
264        }
265
266        if peers.len() <= self.fanout {
267            return peers.iter().collect();
268        }
269
270        // Sort by signal strength (higher RSSI = better)
271        let mut sorted: Vec<_> = peers.iter().collect();
272        sorted.sort_by(|a, b| b.rssi.cmp(&a.rssi));
273
274        // Take the best ones, but add some randomness for diversity
275        let mut selected: Vec<&HivePeer> = Vec::with_capacity(self.fanout);
276
277        // Always include the strongest peer
278        if let Some(best) = sorted.first() {
279            selected.push(best);
280        }
281
282        // For remaining slots, prefer strong signals but allow some diversity
283        for peer in sorted.iter().skip(1) {
284            if selected.len() >= self.fanout {
285                break;
286            }
287
288            // Check if this peer is significantly weaker than the last selected
289            let last_rssi = selected.last().map(|p| p.rssi).unwrap_or(-100);
290            let this_rssi = peer.rssi;
291
292            // Include if within threshold or we need more peers
293            if this_rssi >= last_rssi - self.rssi_threshold || selected.len() < self.fanout / 2 + 1
294            {
295                selected.push(peer);
296            }
297        }
298
299        // Fill remaining slots if needed
300        for peer in sorted.iter() {
301            if selected.len() >= self.fanout {
302                break;
303            }
304            // Check by node_id to avoid requiring PartialEq on HivePeer
305            let already_selected = selected.iter().any(|p| p.node_id == peer.node_id);
306            if !already_selected {
307                selected.push(peer);
308            }
309        }
310
311        selected
312    }
313
314    fn name(&self) -> &'static str {
315        "signal_based"
316    }
317}
318
319/// Emergency broadcast strategy
320///
321/// For emergency events, use maximum fanout to ensure rapid propagation.
322/// Automatically switches between normal and emergency modes.
323#[derive(Debug)]
324pub struct EmergencyAware {
325    /// Normal operation strategy
326    normal_fanout: usize,
327    /// Emergency fanout (usually all peers)
328    emergency_fanout: usize,
329    /// Whether we're in emergency mode
330    #[cfg(feature = "std")]
331    emergency_mode: std::sync::atomic::AtomicBool,
332}
333
334impl Clone for EmergencyAware {
335    fn clone(&self) -> Self {
336        Self {
337            normal_fanout: self.normal_fanout,
338            emergency_fanout: self.emergency_fanout,
339            #[cfg(feature = "std")]
340            emergency_mode: std::sync::atomic::AtomicBool::new(self.is_emergency()),
341        }
342    }
343}
344
345impl EmergencyAware {
346    /// Create a new emergency-aware strategy
347    pub fn new(normal_fanout: usize) -> Self {
348        Self {
349            normal_fanout: normal_fanout.max(1),
350            emergency_fanout: usize::MAX, // All peers during emergency
351            #[cfg(feature = "std")]
352            emergency_mode: std::sync::atomic::AtomicBool::new(false),
353        }
354    }
355
356    /// Set emergency mode
357    #[cfg(feature = "std")]
358    pub fn set_emergency(&self, active: bool) {
359        self.emergency_mode
360            .store(active, std::sync::atomic::Ordering::SeqCst);
361    }
362
363    /// Check if in emergency mode
364    #[cfg(feature = "std")]
365    pub fn is_emergency(&self) -> bool {
366        self.emergency_mode
367            .load(std::sync::atomic::Ordering::SeqCst)
368    }
369
370    fn effective_fanout(&self) -> usize {
371        #[cfg(feature = "std")]
372        {
373            if self.is_emergency() {
374                self.emergency_fanout
375            } else {
376                self.normal_fanout
377            }
378        }
379        #[cfg(not(feature = "std"))]
380        {
381            self.normal_fanout
382        }
383    }
384}
385
386impl Default for EmergencyAware {
387    fn default() -> Self {
388        Self::new(2)
389    }
390}
391
392impl GossipStrategy for EmergencyAware {
393    fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
394        let fanout = self.effective_fanout();
395
396        if peers.len() <= fanout {
397            return peers.iter().collect();
398        }
399
400        // During emergency: all peers
401        // Normal: use random fanout behavior
402        peers.iter().take(fanout).collect()
403    }
404
405    fn should_forward(&self, result: &MergeResult) -> bool {
406        // Always forward during emergency mode
407        #[cfg(feature = "std")]
408        if self.is_emergency() {
409            return true;
410        }
411
412        // Switch to emergency mode if we received an emergency
413        #[cfg(feature = "std")]
414        if result.is_emergency() || result.emergency_changed {
415            self.set_emergency(true);
416        }
417
418        result.counter_changed || result.emergency_changed
419    }
420
421    fn name(&self) -> &'static str {
422        "emergency_aware"
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::NodeId;
430
431    fn make_peer(id: u32, rssi: i8) -> HivePeer {
432        HivePeer {
433            node_id: NodeId::new(id),
434            identifier: format!("device-{}", id),
435            mesh_id: Some("TEST".to_string()),
436            name: Some(format!("HIVE-{:08X}", id)),
437            rssi,
438            is_connected: true,
439            last_seen_ms: 0,
440        }
441    }
442
443    #[test]
444    fn test_random_fanout_basic() {
445        let strategy = RandomFanout::new(2);
446
447        // Empty peers
448        let peers: Vec<HivePeer> = vec![];
449        assert!(strategy.select_peers(&peers).is_empty());
450
451        // Fewer peers than fanout
452        let peers = vec![make_peer(1, -50)];
453        let selected = strategy.select_peers(&peers);
454        assert_eq!(selected.len(), 1);
455
456        // More peers than fanout
457        let peers = vec![
458            make_peer(1, -50),
459            make_peer(2, -60),
460            make_peer(3, -70),
461            make_peer(4, -80),
462        ];
463        let selected = strategy.select_peers(&peers);
464        assert_eq!(selected.len(), 2);
465    }
466
467    #[test]
468    fn test_broadcast_all() {
469        let strategy = BroadcastAll::new();
470
471        let peers = vec![make_peer(1, -50), make_peer(2, -60), make_peer(3, -70)];
472
473        let selected = strategy.select_peers(&peers);
474        assert_eq!(selected.len(), 3);
475    }
476
477    #[test]
478    fn test_signal_based() {
479        let strategy = SignalBasedFanout::new(2, 10);
480
481        let peers = vec![
482            make_peer(1, -80), // Weak
483            make_peer(2, -50), // Strong
484            make_peer(3, -90), // Very weak
485            make_peer(4, -55), // Strong-ish
486        ];
487
488        let selected = strategy.select_peers(&peers);
489        assert_eq!(selected.len(), 2);
490
491        // Should prefer stronger signals
492        let node_ids: Vec<_> = selected.iter().map(|p| p.node_id.as_u32()).collect();
493        assert!(node_ids.contains(&2)); // Strongest should be included
494    }
495
496    #[test]
497    fn test_emergency_aware() {
498        let strategy = EmergencyAware::new(2);
499
500        let peers = vec![
501            make_peer(1, -50),
502            make_peer(2, -60),
503            make_peer(3, -70),
504            make_peer(4, -80),
505        ];
506
507        // Normal mode: limited fanout
508        assert!(!strategy.is_emergency());
509        let selected = strategy.select_peers(&peers);
510        assert_eq!(selected.len(), 2);
511
512        // Emergency mode: all peers
513        strategy.set_emergency(true);
514        assert!(strategy.is_emergency());
515        let selected = strategy.select_peers(&peers);
516        assert_eq!(selected.len(), 4);
517    }
518
519    #[test]
520    fn test_should_forward() {
521        let strategy = RandomFanout::default();
522
523        // Should forward if counter changed
524        let result = MergeResult {
525            source_node: NodeId::new(1),
526            event: None,
527            counter_changed: true,
528            emergency_changed: false,
529            chat_changed: false,
530            total_count: 10,
531        };
532        assert!(strategy.should_forward(&result));
533
534        // Should forward if emergency changed
535        let result = MergeResult {
536            source_node: NodeId::new(1),
537            event: None,
538            counter_changed: false,
539            emergency_changed: true,
540            chat_changed: false,
541            total_count: 10,
542        };
543        assert!(strategy.should_forward(&result));
544
545        // Should NOT forward if nothing changed
546        let result = MergeResult {
547            source_node: NodeId::new(1),
548            event: None,
549            counter_changed: false,
550            emergency_changed: false,
551            chat_changed: false,
552            total_count: 10,
553        };
554        assert!(!strategy.should_forward(&result));
555    }
556}