#[cfg(not(feature = "std"))]
use alloc::{string::String, vec::Vec};
use crate::NodeId;
#[derive(Debug, Clone)]
pub struct PeatPeer {
pub node_id: NodeId,
pub identifier: String,
pub mesh_id: Option<String>,
pub name: Option<String>,
pub rssi: i8,
pub is_connected: bool,
pub last_seen_ms: u64,
}
impl PeatPeer {
pub fn new(
node_id: NodeId,
identifier: String,
mesh_id: Option<String>,
name: Option<String>,
rssi: i8,
) -> Self {
Self {
node_id,
identifier,
mesh_id,
name,
rssi,
is_connected: false,
last_seen_ms: 0,
}
}
pub fn touch(&mut self, now_ms: u64) {
self.last_seen_ms = now_ms;
}
pub fn is_stale(&self, now_ms: u64, timeout_ms: u64) -> bool {
if self.last_seen_ms == 0 {
return false; }
now_ms.saturating_sub(self.last_seen_ms) > timeout_ms
}
pub fn display_name(&self) -> &str {
self.name.as_deref().unwrap_or(self.identifier.as_str())
}
pub fn signal_strength(&self) -> SignalStrength {
match self.rssi {
r if r >= -50 => SignalStrength::Excellent,
r if r >= -70 => SignalStrength::Good,
r if r >= -85 => SignalStrength::Fair,
_ => SignalStrength::Weak,
}
}
}
impl Default for PeatPeer {
fn default() -> Self {
Self {
node_id: NodeId::default(),
identifier: String::new(),
mesh_id: None,
name: None,
rssi: -100,
is_connected: false,
last_seen_ms: 0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SignalStrength {
Excellent,
Good,
Fair,
Weak,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ConnectionState {
#[default]
Discovered,
Connecting,
Connected,
Degraded,
Disconnecting,
Disconnected,
Lost,
}
impl ConnectionState {
pub fn is_connected(&self) -> bool {
matches!(self, Self::Connected | Self::Degraded)
}
pub fn was_connected(&self) -> bool {
matches!(
self,
Self::Connected
| Self::Degraded
| Self::Disconnecting
| Self::Disconnected
| Self::Lost
)
}
pub fn is_degraded_or_worse(&self) -> bool {
matches!(
self,
Self::Degraded | Self::Disconnecting | Self::Disconnected | Self::Lost
)
}
}
pub use crate::platform::DisconnectReason;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlePeerLinkInfo {
pub state: ConnectionState,
pub last_rssi: Option<i8>,
}
#[derive(Debug, Clone)]
pub struct PeerConnectionState {
pub node_id: NodeId,
pub identifier: String,
pub state: ConnectionState,
pub discovered_at: u64,
pub connected_at: Option<u64>,
pub disconnected_at: Option<u64>,
pub disconnect_reason: Option<DisconnectReason>,
pub last_rssi: Option<i8>,
pub connection_count: u32,
pub documents_synced: u32,
pub bytes_received: u64,
pub bytes_sent: u64,
pub last_seen_ms: u64,
pub name: Option<String>,
pub mesh_id: Option<String>,
}
impl PeerConnectionState {
pub fn new_discovered(node_id: NodeId, identifier: String, now_ms: u64) -> Self {
Self {
node_id,
identifier,
state: ConnectionState::Discovered,
discovered_at: now_ms,
connected_at: None,
disconnected_at: None,
disconnect_reason: None,
last_rssi: None,
connection_count: 0,
documents_synced: 0,
bytes_received: 0,
bytes_sent: 0,
last_seen_ms: now_ms,
name: None,
mesh_id: None,
}
}
pub fn from_peer(peer: &PeatPeer, now_ms: u64) -> Self {
let state = if peer.is_connected {
ConnectionState::Connected
} else {
ConnectionState::Discovered
};
Self {
node_id: peer.node_id,
identifier: peer.identifier.clone(),
state,
discovered_at: now_ms,
connected_at: if peer.is_connected {
Some(now_ms)
} else {
None
},
disconnected_at: None,
disconnect_reason: None,
last_rssi: Some(peer.rssi),
connection_count: if peer.is_connected { 1 } else { 0 },
documents_synced: 0,
bytes_received: 0,
bytes_sent: 0,
last_seen_ms: peer.last_seen_ms,
name: peer.name.clone(),
mesh_id: peer.mesh_id.clone(),
}
}
pub fn set_connecting(&mut self, now_ms: u64) {
self.state = ConnectionState::Connecting;
self.last_seen_ms = now_ms;
}
pub fn set_connected(&mut self, now_ms: u64) {
self.state = ConnectionState::Connected;
self.connected_at = Some(now_ms);
self.connection_count += 1;
self.last_seen_ms = now_ms;
self.disconnect_reason = None;
}
pub fn set_degraded(&mut self, now_ms: u64) {
if self.state == ConnectionState::Connected {
self.state = ConnectionState::Degraded;
self.last_seen_ms = now_ms;
}
}
pub fn set_disconnected(&mut self, now_ms: u64, reason: DisconnectReason) {
self.state = ConnectionState::Disconnected;
self.disconnected_at = Some(now_ms);
self.disconnect_reason = Some(reason);
self.last_seen_ms = now_ms;
}
pub fn set_lost(&mut self, now_ms: u64) {
if self.state == ConnectionState::Disconnected {
self.state = ConnectionState::Lost;
self.last_seen_ms = now_ms;
}
}
pub fn update_rssi(&mut self, rssi: i8, now_ms: u64, degraded_threshold: i8) -> bool {
self.last_rssi = Some(rssi);
self.last_seen_ms = now_ms;
if self.state == ConnectionState::Connected && rssi < degraded_threshold {
self.state = ConnectionState::Degraded;
return true;
} else if self.state == ConnectionState::Degraded && rssi >= degraded_threshold {
self.state = ConnectionState::Connected;
}
false
}
pub fn record_transfer(&mut self, bytes_received: u64, bytes_sent: u64) {
self.bytes_received += bytes_received;
self.bytes_sent += bytes_sent;
}
pub fn record_sync(&mut self) {
self.documents_synced += 1;
}
pub fn time_since_connected(&self, now_ms: u64) -> Option<u64> {
self.connected_at.map(|t| now_ms.saturating_sub(t))
}
pub fn time_since_disconnected(&self, now_ms: u64) -> Option<u64> {
self.disconnected_at.map(|t| now_ms.saturating_sub(t))
}
pub fn connection_duration(&self, now_ms: u64) -> Option<u64> {
if self.state.is_connected() {
self.connected_at.map(|t| now_ms.saturating_sub(t))
} else {
None
}
}
pub fn signal_strength(&self) -> Option<SignalStrength> {
self.last_rssi.map(|rssi| match rssi {
r if r >= -50 => SignalStrength::Excellent,
r if r >= -70 => SignalStrength::Good,
r if r >= -85 => SignalStrength::Fair,
_ => SignalStrength::Weak,
})
}
}
#[cfg(feature = "std")]
use std::collections::BTreeMap;
#[cfg(not(feature = "std"))]
use alloc::collections::BTreeMap;
#[derive(Debug, Clone, Default)]
pub struct ConnectionStateGraph {
peers: BTreeMap<NodeId, PeerConnectionState>,
indirect_peers: BTreeMap<NodeId, IndirectPeer>,
rssi_degraded_threshold: i8,
lost_timeout_ms: u64,
indirect_peer_timeout_ms: u64,
}
impl ConnectionStateGraph {
pub fn new() -> Self {
Self {
peers: BTreeMap::new(),
indirect_peers: BTreeMap::new(),
rssi_degraded_threshold: -80,
lost_timeout_ms: 30_000,
indirect_peer_timeout_ms: 120_000, }
}
pub fn with_config(rssi_degraded_threshold: i8, lost_timeout_ms: u64) -> Self {
Self {
peers: BTreeMap::new(),
indirect_peers: BTreeMap::new(),
rssi_degraded_threshold,
lost_timeout_ms,
indirect_peer_timeout_ms: 120_000,
}
}
pub fn get_all(&self) -> Vec<&PeerConnectionState> {
self.peers.values().collect()
}
pub fn get_all_owned(&self) -> Vec<PeerConnectionState> {
self.peers.values().cloned().collect()
}
pub fn get_peer(&self, node_id: NodeId) -> Option<&PeerConnectionState> {
self.peers.get(&node_id)
}
pub fn get_peer_mut(&mut self, node_id: NodeId) -> Option<&mut PeerConnectionState> {
self.peers.get_mut(&node_id)
}
pub fn get_connected(&self) -> Vec<&PeerConnectionState> {
self.peers
.values()
.filter(|p| p.state.is_connected())
.collect()
}
pub fn get_degraded(&self) -> Vec<&PeerConnectionState> {
self.peers
.values()
.filter(|p| p.state == ConnectionState::Degraded)
.collect()
}
pub fn get_recently_disconnected(
&self,
within_ms: u64,
now_ms: u64,
) -> Vec<&PeerConnectionState> {
self.peers
.values()
.filter(|p| {
p.state == ConnectionState::Disconnected
&& p.disconnected_at
.map(|t| now_ms.saturating_sub(t) <= within_ms)
.unwrap_or(false)
})
.collect()
}
pub fn get_lost(&self) -> Vec<&PeerConnectionState> {
self.peers
.values()
.filter(|p| p.state == ConnectionState::Lost)
.collect()
}
pub fn get_with_history(&self) -> Vec<&PeerConnectionState> {
self.peers
.values()
.filter(|p| p.state.was_connected())
.collect()
}
pub fn state_counts(&self) -> StateCountSummary {
let mut summary = StateCountSummary::default();
for peer in self.peers.values() {
match peer.state {
ConnectionState::Discovered => summary.discovered += 1,
ConnectionState::Connecting => summary.connecting += 1,
ConnectionState::Connected => summary.connected += 1,
ConnectionState::Degraded => summary.degraded += 1,
ConnectionState::Disconnecting => summary.disconnecting += 1,
ConnectionState::Disconnected => summary.disconnected += 1,
ConnectionState::Lost => summary.lost += 1,
}
}
summary
}
pub fn len(&self) -> usize {
self.peers.len()
}
pub fn is_empty(&self) -> bool {
self.peers.is_empty()
}
pub fn on_discovered(
&mut self,
node_id: NodeId,
identifier: String,
name: Option<String>,
mesh_id: Option<String>,
rssi: i8,
now_ms: u64,
) -> &PeerConnectionState {
let entry = self.peers.entry(node_id).or_insert_with(|| {
PeerConnectionState::new_discovered(node_id, identifier.clone(), now_ms)
});
entry.last_rssi = Some(rssi);
entry.last_seen_ms = now_ms;
if name.is_some() {
entry.name = name;
}
if mesh_id.is_some() {
entry.mesh_id = mesh_id;
}
if entry.state == ConnectionState::Lost {
entry.state = ConnectionState::Disconnected;
}
entry
}
pub fn on_connecting(&mut self, node_id: NodeId, now_ms: u64) {
if let Some(peer) = self.peers.get_mut(&node_id) {
peer.set_connecting(now_ms);
}
}
pub fn on_connected(&mut self, node_id: NodeId, now_ms: u64) {
if let Some(peer) = self.peers.get_mut(&node_id) {
peer.set_connected(now_ms);
}
}
pub fn on_disconnected(&mut self, node_id: NodeId, reason: DisconnectReason, now_ms: u64) {
if let Some(peer) = self.peers.get_mut(&node_id) {
peer.set_disconnected(now_ms, reason);
}
}
pub fn update_rssi(&mut self, node_id: NodeId, rssi: i8, now_ms: u64) -> bool {
if let Some(peer) = self.peers.get_mut(&node_id) {
return peer.update_rssi(rssi, now_ms, self.rssi_degraded_threshold);
}
false
}
pub fn record_transfer(&mut self, node_id: NodeId, bytes_received: u64, bytes_sent: u64) {
if let Some(peer) = self.peers.get_mut(&node_id) {
peer.record_transfer(bytes_received, bytes_sent);
}
}
pub fn record_sync(&mut self, node_id: NodeId) {
if let Some(peer) = self.peers.get_mut(&node_id) {
peer.record_sync();
}
}
pub fn tick(&mut self, now_ms: u64) -> Vec<NodeId> {
let mut newly_lost = Vec::new();
for (node_id, peer) in self.peers.iter_mut() {
if peer.state == ConnectionState::Disconnected {
if let Some(disconnected_at) = peer.disconnected_at {
if now_ms.saturating_sub(disconnected_at) > self.lost_timeout_ms {
peer.set_lost(now_ms);
newly_lost.push(*node_id);
}
}
}
}
newly_lost
}
pub fn cleanup_lost(&mut self, older_than_ms: u64, now_ms: u64) -> Vec<NodeId> {
let to_remove: Vec<NodeId> = self
.peers
.iter()
.filter(|(_, p)| {
p.state == ConnectionState::Lost
&& now_ms.saturating_sub(p.last_seen_ms) > older_than_ms
})
.map(|(id, _)| *id)
.collect();
for id in &to_remove {
self.peers.remove(id);
}
to_remove
}
pub fn import_peer(&mut self, peer: &PeatPeer, now_ms: u64) {
let state = PeerConnectionState::from_peer(peer, now_ms);
self.peers.insert(peer.node_id, state);
}
pub fn on_relay_received(
&mut self,
source_peer: NodeId,
origin_node: NodeId,
hop_count: u8,
now_ms: u64,
) -> bool {
if hop_count > MAX_TRACKED_DEGREE {
return false;
}
if self.peers.contains_key(&origin_node) {
return false;
}
if let Some(existing) = self.indirect_peers.get_mut(&origin_node) {
existing.update_path(source_peer, hop_count, now_ms);
false
} else {
self.indirect_peers.insert(
origin_node,
IndirectPeer::new(origin_node, source_peer, hop_count, now_ms),
);
true
}
}
pub fn get_indirect_peers(&self) -> Vec<&IndirectPeer> {
self.indirect_peers.values().collect()
}
pub fn get_indirect_peers_owned(&self) -> Vec<IndirectPeer> {
self.indirect_peers.values().cloned().collect()
}
pub fn get_indirect_peer(&self, node_id: NodeId) -> Option<&IndirectPeer> {
self.indirect_peers.get(&node_id)
}
pub fn get_peers_by_degree(&self, degree: PeerDegree) -> Vec<NodeId> {
match degree {
PeerDegree::Direct => self.peers.keys().copied().collect(),
_ => self
.indirect_peers
.iter()
.filter(|(_, p)| p.degree() == Some(degree))
.map(|(id, _)| *id)
.collect(),
}
}
pub fn peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
if self.peers.contains_key(&node_id) {
Some(PeerDegree::Direct)
} else {
self.indirect_peers.get(&node_id).and_then(|p| p.degree())
}
}
pub fn get_paths_to(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
self.indirect_peers
.get(&node_id)
.map(|p| p.paths())
.unwrap_or_default()
}
pub fn is_known(&self, node_id: NodeId) -> bool {
self.peers.contains_key(&node_id) || self.indirect_peers.contains_key(&node_id)
}
pub fn cleanup_indirect(&mut self, now_ms: u64) -> Vec<NodeId> {
let to_remove: Vec<NodeId> = self
.indirect_peers
.iter()
.filter(|(_, p)| p.is_stale(now_ms, self.indirect_peer_timeout_ms))
.map(|(id, _)| *id)
.collect();
for id in &to_remove {
self.indirect_peers.remove(id);
}
to_remove
}
pub fn remove_via_peer(&mut self, via_peer: NodeId) {
let mut to_remove = Vec::new();
for (node_id, indirect) in self.indirect_peers.iter_mut() {
indirect.via_peers.remove(&via_peer);
if indirect.via_peers.is_empty() {
to_remove.push(*node_id);
} else {
indirect.min_hops = indirect.via_peers.values().copied().min().unwrap_or(255);
}
}
for id in to_remove {
self.indirect_peers.remove(&id);
}
}
pub fn full_state_counts(&self) -> FullStateCountSummary {
let direct = self.state_counts();
let mut one_hop = 0;
let mut two_hop = 0;
let mut three_hop = 0;
for peer in self.indirect_peers.values() {
match peer.min_hops {
1 => one_hop += 1,
2 => two_hop += 1,
3 => three_hop += 1,
_ => {}
}
}
FullStateCountSummary {
direct,
one_hop,
two_hop,
three_hop,
}
}
pub fn indirect_peer_count(&self) -> usize {
self.indirect_peers.len()
}
pub fn set_indirect_callsign(&mut self, node_id: NodeId, callsign: String) {
if let Some(peer) = self.indirect_peers.get_mut(&node_id) {
peer.callsign = Some(callsign);
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct StateCountSummary {
pub discovered: usize,
pub connecting: usize,
pub connected: usize,
pub degraded: usize,
pub disconnecting: usize,
pub disconnected: usize,
pub lost: usize,
}
impl StateCountSummary {
pub fn active_connections(&self) -> usize {
self.connected + self.degraded
}
pub fn total(&self) -> usize {
self.discovered
+ self.connecting
+ self.connected
+ self.degraded
+ self.disconnecting
+ self.disconnected
+ self.lost
}
}
pub const MAX_TRACKED_DEGREE: u8 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerDegree {
Direct,
OneHop,
TwoHop,
ThreeHop,
}
impl PeerDegree {
pub fn from_hops(hops: u8) -> Option<Self> {
match hops {
0 => Some(Self::Direct),
1 => Some(Self::OneHop),
2 => Some(Self::TwoHop),
3 => Some(Self::ThreeHop),
_ => None, }
}
pub fn hops(&self) -> u8 {
match self {
Self::Direct => 0,
Self::OneHop => 1,
Self::TwoHop => 2,
Self::ThreeHop => 3,
}
}
}
#[derive(Debug, Clone)]
pub struct IndirectPeer {
pub node_id: NodeId,
pub min_hops: u8,
pub via_peers: BTreeMap<NodeId, u8>,
pub discovered_at: u64,
pub last_seen_ms: u64,
pub messages_received: u32,
pub callsign: Option<String>,
}
impl IndirectPeer {
pub fn new(node_id: NodeId, via_peer: NodeId, hop_count: u8, now_ms: u64) -> Self {
let mut via_peers = BTreeMap::new();
via_peers.insert(via_peer, hop_count);
Self {
node_id,
min_hops: hop_count,
via_peers,
discovered_at: now_ms,
last_seen_ms: now_ms,
messages_received: 1,
callsign: None,
}
}
pub fn update_path(&mut self, via_peer: NodeId, hop_count: u8, now_ms: u64) -> bool {
self.last_seen_ms = now_ms;
self.messages_received += 1;
let is_better = hop_count < self.min_hops;
self.via_peers.insert(via_peer, hop_count);
if is_better {
self.min_hops = hop_count;
} else {
self.min_hops = self.via_peers.values().copied().min().unwrap_or(hop_count);
}
is_better
}
pub fn degree(&self) -> Option<PeerDegree> {
PeerDegree::from_hops(self.min_hops)
}
pub fn is_stale(&self, now_ms: u64, timeout_ms: u64) -> bool {
now_ms.saturating_sub(self.last_seen_ms) > timeout_ms
}
pub fn paths(&self) -> Vec<(NodeId, u8)> {
self.via_peers.iter().map(|(&k, &v)| (k, v)).collect()
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct FullStateCountSummary {
pub direct: StateCountSummary,
pub one_hop: usize,
pub two_hop: usize,
pub three_hop: usize,
}
impl FullStateCountSummary {
pub fn total(&self) -> usize {
self.direct.total() + self.one_hop + self.two_hop + self.three_hop
}
pub fn total_indirect(&self) -> usize {
self.one_hop + self.two_hop + self.three_hop
}
}
#[derive(Debug, Clone)]
pub struct PeerManagerConfig {
pub peer_timeout_ms: u64,
pub cleanup_interval_ms: u64,
pub sync_interval_ms: u64,
pub sync_cooldown_ms: u64,
pub auto_connect: bool,
pub mesh_id: String,
pub max_peers: usize,
pub rssi_degraded_threshold: i8,
pub lost_timeout_ms: u64,
}
impl Default for PeerManagerConfig {
fn default() -> Self {
Self {
peer_timeout_ms: 45_000, cleanup_interval_ms: 10_000, sync_interval_ms: 5_000, sync_cooldown_ms: 30_000, auto_connect: true,
mesh_id: String::from("DEMO"),
max_peers: 8,
rssi_degraded_threshold: -80, lost_timeout_ms: 30_000, }
}
}
impl PeerManagerConfig {
pub fn with_mesh_id(mesh_id: impl Into<String>) -> Self {
Self {
mesh_id: mesh_id.into(),
..Default::default()
}
}
pub fn peer_timeout(mut self, timeout_ms: u64) -> Self {
self.peer_timeout_ms = timeout_ms;
self
}
pub fn sync_interval(mut self, interval_ms: u64) -> Self {
self.sync_interval_ms = interval_ms;
self
}
pub fn auto_connect(mut self, enabled: bool) -> Self {
self.auto_connect = enabled;
self
}
pub fn max_peers(mut self, max: usize) -> Self {
self.max_peers = max;
self
}
pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
match device_mesh_id {
Some(id) => id == self.mesh_id,
None => true, }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_peer_stale_detection() {
let mut peer = PeatPeer::new(
NodeId::new(0x12345678),
"test-id".into(),
Some("DEMO".into()),
Some("PEAT_DEMO-12345678".into()),
-70,
);
peer.touch(1000);
assert!(!peer.is_stale(2000, 45_000));
assert!(peer.is_stale(50_000, 45_000));
}
#[test]
fn test_signal_strength() {
let peer_excellent = PeatPeer {
rssi: -45,
..Default::default()
};
assert_eq!(peer_excellent.signal_strength(), SignalStrength::Excellent);
let peer_good = PeatPeer {
rssi: -65,
..Default::default()
};
assert_eq!(peer_good.signal_strength(), SignalStrength::Good);
let peer_fair = PeatPeer {
rssi: -80,
..Default::default()
};
assert_eq!(peer_fair.signal_strength(), SignalStrength::Fair);
let peer_weak = PeatPeer {
rssi: -95,
..Default::default()
};
assert_eq!(peer_weak.signal_strength(), SignalStrength::Weak);
}
#[test]
fn test_mesh_matching() {
let config = PeerManagerConfig::with_mesh_id("ALPHA");
assert!(config.matches_mesh(Some("ALPHA")));
assert!(!config.matches_mesh(Some("BETA")));
assert!(config.matches_mesh(None));
}
}