#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use crate::document::MergeResult;
use crate::peer::PeatPeer;
pub trait GossipStrategy: Send + Sync {
fn select_peers<'a>(&self, peers: &'a [PeatPeer]) -> Vec<&'a PeatPeer>;
fn should_forward(&self, result: &MergeResult) -> bool {
result.counter_changed || result.emergency_changed || result.chat_changed
}
fn name(&self) -> &'static str;
}
#[derive(Debug, Clone)]
pub struct RandomFanout {
fanout: usize,
#[cfg(feature = "std")]
seed: Option<u64>,
}
impl RandomFanout {
pub fn new(fanout: usize) -> Self {
Self {
fanout: fanout.max(1), #[cfg(feature = "std")]
seed: None,
}
}
#[cfg(feature = "std")]
pub fn with_seed(fanout: usize, seed: u64) -> Self {
Self {
fanout: fanout.max(1),
seed: Some(seed),
}
}
#[cfg(feature = "std")]
fn random_index(&self, max: usize, iteration: usize) -> usize {
use std::time::SystemTime;
let seed = self.seed.unwrap_or_else(|| {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(12345)
});
let mixed = seed
.wrapping_mul(6364136223846793005)
.wrapping_add(iteration as u64);
(mixed as usize) % max
}
}
impl Default for RandomFanout {
fn default() -> Self {
Self::new(2) }
}
impl GossipStrategy for RandomFanout {
fn select_peers<'a>(&self, peers: &'a [PeatPeer]) -> Vec<&'a PeatPeer> {
if peers.is_empty() {
return Vec::new();
}
if peers.len() <= self.fanout {
return peers.iter().collect();
}
#[cfg(feature = "std")]
{
let mut selected = Vec::with_capacity(self.fanout);
let mut used = std::collections::HashSet::new();
for i in 0..self.fanout * 3 {
if selected.len() >= self.fanout {
break;
}
let idx = self.random_index(peers.len(), i);
if !used.contains(&idx) {
used.insert(idx);
selected.push(&peers[idx]);
}
}
selected
}
#[cfg(not(feature = "std"))]
{
peers.iter().take(self.fanout).collect()
}
}
fn name(&self) -> &'static str {
"random_fanout"
}
}
#[derive(Debug, Clone, Default)]
pub struct BroadcastAll;
impl BroadcastAll {
pub fn new() -> Self {
Self
}
}
impl GossipStrategy for BroadcastAll {
fn select_peers<'a>(&self, peers: &'a [PeatPeer]) -> Vec<&'a PeatPeer> {
peers.iter().collect()
}
fn name(&self) -> &'static str {
"broadcast_all"
}
}
#[derive(Debug, Clone)]
pub struct SignalBasedFanout {
fanout: usize,
rssi_threshold: i8,
}
impl SignalBasedFanout {
pub fn new(fanout: usize, rssi_threshold: i8) -> Self {
Self {
fanout: fanout.max(1),
rssi_threshold,
}
}
}
impl Default for SignalBasedFanout {
fn default() -> Self {
Self::new(2, 10) }
}
impl GossipStrategy for SignalBasedFanout {
fn select_peers<'a>(&self, peers: &'a [PeatPeer]) -> Vec<&'a PeatPeer> {
if peers.is_empty() {
return Vec::new();
}
if peers.len() <= self.fanout {
return peers.iter().collect();
}
let mut sorted: Vec<_> = peers.iter().collect();
sorted.sort_by_key(|p| std::cmp::Reverse(p.rssi));
let mut selected: Vec<&PeatPeer> = Vec::with_capacity(self.fanout);
if let Some(best) = sorted.first() {
selected.push(best);
}
for peer in sorted.iter().skip(1) {
if selected.len() >= self.fanout {
break;
}
let last_rssi = selected.last().map(|p| p.rssi).unwrap_or(-100);
let this_rssi = peer.rssi;
if this_rssi >= last_rssi - self.rssi_threshold || selected.len() < self.fanout / 2 + 1
{
selected.push(peer);
}
}
for peer in sorted.iter() {
if selected.len() >= self.fanout {
break;
}
let already_selected = selected.iter().any(|p| p.node_id == peer.node_id);
if !already_selected {
selected.push(peer);
}
}
selected
}
fn name(&self) -> &'static str {
"signal_based"
}
}
#[derive(Debug)]
pub struct EmergencyAware {
normal_fanout: usize,
emergency_fanout: usize,
#[cfg(feature = "std")]
emergency_mode: std::sync::atomic::AtomicBool,
}
impl Clone for EmergencyAware {
fn clone(&self) -> Self {
Self {
normal_fanout: self.normal_fanout,
emergency_fanout: self.emergency_fanout,
#[cfg(feature = "std")]
emergency_mode: std::sync::atomic::AtomicBool::new(self.is_emergency()),
}
}
}
impl EmergencyAware {
pub fn new(normal_fanout: usize) -> Self {
Self {
normal_fanout: normal_fanout.max(1),
emergency_fanout: usize::MAX, #[cfg(feature = "std")]
emergency_mode: std::sync::atomic::AtomicBool::new(false),
}
}
#[cfg(feature = "std")]
pub fn set_emergency(&self, active: bool) {
self.emergency_mode
.store(active, std::sync::atomic::Ordering::SeqCst);
}
#[cfg(feature = "std")]
pub fn is_emergency(&self) -> bool {
self.emergency_mode
.load(std::sync::atomic::Ordering::SeqCst)
}
fn effective_fanout(&self) -> usize {
#[cfg(feature = "std")]
{
if self.is_emergency() {
self.emergency_fanout
} else {
self.normal_fanout
}
}
#[cfg(not(feature = "std"))]
{
self.normal_fanout
}
}
}
impl Default for EmergencyAware {
fn default() -> Self {
Self::new(2)
}
}
impl GossipStrategy for EmergencyAware {
fn select_peers<'a>(&self, peers: &'a [PeatPeer]) -> Vec<&'a PeatPeer> {
let fanout = self.effective_fanout();
if peers.len() <= fanout {
return peers.iter().collect();
}
peers.iter().take(fanout).collect()
}
fn should_forward(&self, result: &MergeResult) -> bool {
#[cfg(feature = "std")]
if self.is_emergency() {
return true;
}
#[cfg(feature = "std")]
if result.is_emergency() || result.emergency_changed {
self.set_emergency(true);
}
result.counter_changed || result.emergency_changed
}
fn name(&self) -> &'static str {
"emergency_aware"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::NodeId;
fn make_peer(id: u32, rssi: i8) -> PeatPeer {
PeatPeer {
node_id: NodeId::new(id),
identifier: format!("device-{}", id),
mesh_id: Some("TEST".to_string()),
name: Some(format!("PEAT-{:08X}", id)),
rssi,
is_connected: true,
last_seen_ms: 0,
}
}
#[test]
fn test_random_fanout_basic() {
let strategy = RandomFanout::new(2);
let peers: Vec<PeatPeer> = vec![];
assert!(strategy.select_peers(&peers).is_empty());
let peers = vec![make_peer(1, -50)];
let selected = strategy.select_peers(&peers);
assert_eq!(selected.len(), 1);
let peers = vec![
make_peer(1, -50),
make_peer(2, -60),
make_peer(3, -70),
make_peer(4, -80),
];
let selected = strategy.select_peers(&peers);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_broadcast_all() {
let strategy = BroadcastAll::new();
let peers = vec![make_peer(1, -50), make_peer(2, -60), make_peer(3, -70)];
let selected = strategy.select_peers(&peers);
assert_eq!(selected.len(), 3);
}
#[test]
fn test_signal_based() {
let strategy = SignalBasedFanout::new(2, 10);
let peers = vec![
make_peer(1, -80), make_peer(2, -50), make_peer(3, -90), make_peer(4, -55), ];
let selected = strategy.select_peers(&peers);
assert_eq!(selected.len(), 2);
let node_ids: Vec<_> = selected.iter().map(|p| p.node_id.as_u32()).collect();
assert!(node_ids.contains(&2)); }
#[test]
fn test_emergency_aware() {
let strategy = EmergencyAware::new(2);
let peers = vec![
make_peer(1, -50),
make_peer(2, -60),
make_peer(3, -70),
make_peer(4, -80),
];
assert!(!strategy.is_emergency());
let selected = strategy.select_peers(&peers);
assert_eq!(selected.len(), 2);
strategy.set_emergency(true);
assert!(strategy.is_emergency());
let selected = strategy.select_peers(&peers);
assert_eq!(selected.len(), 4);
}
#[test]
fn test_should_forward() {
let strategy = RandomFanout::default();
let result = MergeResult {
source_node: NodeId::new(1),
event: None,
peer_peripheral: None,
counter_changed: true,
emergency_changed: false,
chat_changed: false,
total_count: 10,
};
assert!(strategy.should_forward(&result));
let result = MergeResult {
source_node: NodeId::new(1),
event: None,
peer_peripheral: None,
counter_changed: false,
emergency_changed: true,
chat_changed: false,
total_count: 10,
};
assert!(strategy.should_forward(&result));
let result = MergeResult {
source_node: NodeId::new(1),
event: None,
peer_peripheral: None,
counter_changed: false,
emergency_changed: false,
chat_changed: false,
total_count: 10,
};
assert!(!strategy.should_forward(&result));
}
}