1#[cfg(not(feature = "std"))]
39use alloc::vec::Vec;
40
41use crate::document::MergeResult;
42use crate::peer::HivePeer;
43
44pub trait GossipStrategy: Send + Sync {
50 fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer>;
57
58 fn should_forward(&self, result: &MergeResult) -> bool {
63 result.counter_changed || result.emergency_changed
65 }
66
67 fn name(&self) -> &'static str;
69}
70
71#[derive(Debug, Clone)]
84pub struct RandomFanout {
85 fanout: usize,
87 #[cfg(feature = "std")]
89 seed: Option<u64>,
90}
91
92impl RandomFanout {
93 pub fn new(fanout: usize) -> Self {
98 Self {
99 fanout: fanout.max(1), #[cfg(feature = "std")]
101 seed: None,
102 }
103 }
104
105 #[cfg(feature = "std")]
107 pub fn with_seed(fanout: usize, seed: u64) -> Self {
108 Self {
109 fanout: fanout.max(1),
110 seed: Some(seed),
111 }
112 }
113
114 #[cfg(feature = "std")]
116 fn random_index(&self, max: usize, iteration: usize) -> usize {
117 use std::time::SystemTime;
118
119 let seed = self.seed.unwrap_or_else(|| {
120 SystemTime::now()
121 .duration_since(SystemTime::UNIX_EPOCH)
122 .map(|d| d.as_nanos() as u64)
123 .unwrap_or(12345)
124 });
125
126 let mixed = seed
128 .wrapping_mul(6364136223846793005)
129 .wrapping_add(iteration as u64);
130 (mixed as usize) % max
131 }
132}
133
134impl Default for RandomFanout {
135 fn default() -> Self {
136 Self::new(2) }
138}
139
140impl GossipStrategy for RandomFanout {
141 fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
142 if peers.is_empty() {
143 return Vec::new();
144 }
145
146 if peers.len() <= self.fanout {
148 return peers.iter().collect();
149 }
150
151 #[cfg(feature = "std")]
153 {
154 let mut selected = Vec::with_capacity(self.fanout);
155 let mut used = std::collections::HashSet::new();
156
157 for i in 0..self.fanout * 3 {
158 if selected.len() >= self.fanout {
160 break;
161 }
162
163 let idx = self.random_index(peers.len(), i);
164 if !used.contains(&idx) {
165 used.insert(idx);
166 selected.push(&peers[idx]);
167 }
168 }
169
170 selected
171 }
172
173 #[cfg(not(feature = "std"))]
174 {
175 peers.iter().take(self.fanout).collect()
177 }
178 }
179
180 fn name(&self) -> &'static str {
181 "random_fanout"
182 }
183}
184
185#[derive(Debug, Clone, Default)]
194pub struct BroadcastAll;
195
196impl BroadcastAll {
197 pub fn new() -> Self {
199 Self
200 }
201}
202
203impl GossipStrategy for BroadcastAll {
204 fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
205 peers.iter().collect()
206 }
207
208 fn name(&self) -> &'static str {
209 "broadcast_all"
210 }
211}
212
213#[derive(Debug, Clone)]
218pub struct SignalBasedFanout {
219 fanout: usize,
221 rssi_threshold: i8,
223}
224
225impl SignalBasedFanout {
226 pub fn new(fanout: usize, rssi_threshold: i8) -> Self {
232 Self {
233 fanout: fanout.max(1),
234 rssi_threshold,
235 }
236 }
237}
238
239impl Default for SignalBasedFanout {
240 fn default() -> Self {
241 Self::new(2, 10) }
243}
244
245impl GossipStrategy for SignalBasedFanout {
246 fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
247 if peers.is_empty() {
248 return Vec::new();
249 }
250
251 if peers.len() <= self.fanout {
252 return peers.iter().collect();
253 }
254
255 let mut sorted: Vec<_> = peers.iter().collect();
257 sorted.sort_by(|a, b| b.rssi.cmp(&a.rssi));
258
259 let mut selected: Vec<&HivePeer> = Vec::with_capacity(self.fanout);
261
262 if let Some(best) = sorted.first() {
264 selected.push(best);
265 }
266
267 for peer in sorted.iter().skip(1) {
269 if selected.len() >= self.fanout {
270 break;
271 }
272
273 let last_rssi = selected.last().map(|p| p.rssi).unwrap_or(-100);
275 let this_rssi = peer.rssi;
276
277 if this_rssi >= last_rssi - self.rssi_threshold || selected.len() < self.fanout / 2 + 1
279 {
280 selected.push(peer);
281 }
282 }
283
284 for peer in sorted.iter() {
286 if selected.len() >= self.fanout {
287 break;
288 }
289 let already_selected = selected.iter().any(|p| p.node_id == peer.node_id);
291 if !already_selected {
292 selected.push(peer);
293 }
294 }
295
296 selected
297 }
298
299 fn name(&self) -> &'static str {
300 "signal_based"
301 }
302}
303
304#[derive(Debug)]
309pub struct EmergencyAware {
310 normal_fanout: usize,
312 emergency_fanout: usize,
314 #[cfg(feature = "std")]
316 emergency_mode: std::sync::atomic::AtomicBool,
317}
318
319impl Clone for EmergencyAware {
320 fn clone(&self) -> Self {
321 Self {
322 normal_fanout: self.normal_fanout,
323 emergency_fanout: self.emergency_fanout,
324 #[cfg(feature = "std")]
325 emergency_mode: std::sync::atomic::AtomicBool::new(self.is_emergency()),
326 }
327 }
328}
329
330impl EmergencyAware {
331 pub fn new(normal_fanout: usize) -> Self {
333 Self {
334 normal_fanout: normal_fanout.max(1),
335 emergency_fanout: usize::MAX, #[cfg(feature = "std")]
337 emergency_mode: std::sync::atomic::AtomicBool::new(false),
338 }
339 }
340
341 #[cfg(feature = "std")]
343 pub fn set_emergency(&self, active: bool) {
344 self.emergency_mode
345 .store(active, std::sync::atomic::Ordering::SeqCst);
346 }
347
348 #[cfg(feature = "std")]
350 pub fn is_emergency(&self) -> bool {
351 self.emergency_mode
352 .load(std::sync::atomic::Ordering::SeqCst)
353 }
354
355 fn effective_fanout(&self) -> usize {
356 #[cfg(feature = "std")]
357 {
358 if self.is_emergency() {
359 self.emergency_fanout
360 } else {
361 self.normal_fanout
362 }
363 }
364 #[cfg(not(feature = "std"))]
365 {
366 self.normal_fanout
367 }
368 }
369}
370
371impl Default for EmergencyAware {
372 fn default() -> Self {
373 Self::new(2)
374 }
375}
376
377impl GossipStrategy for EmergencyAware {
378 fn select_peers<'a>(&self, peers: &'a [HivePeer]) -> Vec<&'a HivePeer> {
379 let fanout = self.effective_fanout();
380
381 if peers.len() <= fanout {
382 return peers.iter().collect();
383 }
384
385 peers.iter().take(fanout).collect()
388 }
389
390 fn should_forward(&self, result: &MergeResult) -> bool {
391 #[cfg(feature = "std")]
393 if self.is_emergency() {
394 return true;
395 }
396
397 #[cfg(feature = "std")]
399 if result.is_emergency() || result.emergency_changed {
400 self.set_emergency(true);
401 }
402
403 result.counter_changed || result.emergency_changed
404 }
405
406 fn name(&self) -> &'static str {
407 "emergency_aware"
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use crate::NodeId;
415
416 fn make_peer(id: u32, rssi: i8) -> HivePeer {
417 HivePeer {
418 node_id: NodeId::new(id),
419 identifier: format!("device-{}", id),
420 mesh_id: Some("TEST".to_string()),
421 name: Some(format!("HIVE-{:08X}", id)),
422 rssi,
423 is_connected: true,
424 last_seen_ms: 0,
425 }
426 }
427
428 #[test]
429 fn test_random_fanout_basic() {
430 let strategy = RandomFanout::new(2);
431
432 let peers: Vec<HivePeer> = vec![];
434 assert!(strategy.select_peers(&peers).is_empty());
435
436 let peers = vec![make_peer(1, -50)];
438 let selected = strategy.select_peers(&peers);
439 assert_eq!(selected.len(), 1);
440
441 let peers = vec![
443 make_peer(1, -50),
444 make_peer(2, -60),
445 make_peer(3, -70),
446 make_peer(4, -80),
447 ];
448 let selected = strategy.select_peers(&peers);
449 assert_eq!(selected.len(), 2);
450 }
451
452 #[test]
453 fn test_broadcast_all() {
454 let strategy = BroadcastAll::new();
455
456 let peers = vec![make_peer(1, -50), make_peer(2, -60), make_peer(3, -70)];
457
458 let selected = strategy.select_peers(&peers);
459 assert_eq!(selected.len(), 3);
460 }
461
462 #[test]
463 fn test_signal_based() {
464 let strategy = SignalBasedFanout::new(2, 10);
465
466 let peers = vec![
467 make_peer(1, -80), make_peer(2, -50), make_peer(3, -90), make_peer(4, -55), ];
472
473 let selected = strategy.select_peers(&peers);
474 assert_eq!(selected.len(), 2);
475
476 let node_ids: Vec<_> = selected.iter().map(|p| p.node_id.as_u32()).collect();
478 assert!(node_ids.contains(&2)); }
480
481 #[test]
482 fn test_emergency_aware() {
483 let strategy = EmergencyAware::new(2);
484
485 let peers = vec![
486 make_peer(1, -50),
487 make_peer(2, -60),
488 make_peer(3, -70),
489 make_peer(4, -80),
490 ];
491
492 assert!(!strategy.is_emergency());
494 let selected = strategy.select_peers(&peers);
495 assert_eq!(selected.len(), 2);
496
497 strategy.set_emergency(true);
499 assert!(strategy.is_emergency());
500 let selected = strategy.select_peers(&peers);
501 assert_eq!(selected.len(), 4);
502 }
503
504 #[test]
505 fn test_should_forward() {
506 let strategy = RandomFanout::default();
507
508 let result = MergeResult {
510 source_node: NodeId::new(1),
511 event: None,
512 counter_changed: true,
513 emergency_changed: false,
514 total_count: 10,
515 };
516 assert!(strategy.should_forward(&result));
517
518 let result = MergeResult {
520 source_node: NodeId::new(1),
521 event: None,
522 counter_changed: false,
523 emergency_changed: true,
524 total_count: 10,
525 };
526 assert!(strategy.should_forward(&result));
527
528 let result = MergeResult {
530 source_node: NodeId::new(1),
531 event: None,
532 counter_changed: false,
533 emergency_changed: false,
534 total_count: 10,
535 };
536 assert!(!strategy.should_forward(&result));
537 }
538}