#[cfg(not(feature = "std"))]
use alloc::{
collections::BTreeMap,
string::{String, ToString},
vec::Vec,
};
#[cfg(feature = "std")]
use std::collections::BTreeMap;
#[cfg(feature = "std")]
use std::sync::RwLock;
#[cfg(not(feature = "std"))]
use spin::RwLock;
use crate::observer::{DisconnectReason, PeatEvent};
use crate::peer::{PeatPeer, PeerManagerConfig};
use crate::NodeId;
pub struct PeerManager {
config: PeerManagerConfig,
node_id: NodeId,
#[cfg(feature = "std")]
peers: RwLock<BTreeMap<NodeId, PeatPeer>>,
#[cfg(not(feature = "std"))]
peers: RwLock<BTreeMap<NodeId, PeatPeer>>,
#[cfg(feature = "std")]
identifier_map: RwLock<BTreeMap<String, NodeId>>,
#[cfg(not(feature = "std"))]
identifier_map: RwLock<BTreeMap<String, NodeId>>,
#[cfg(feature = "std")]
sync_history: RwLock<BTreeMap<NodeId, u64>>,
#[cfg(not(feature = "std"))]
sync_history: RwLock<BTreeMap<NodeId, u64>>,
}
impl PeerManager {
pub fn new(node_id: NodeId, config: PeerManagerConfig) -> Self {
Self {
config,
node_id,
peers: RwLock::new(BTreeMap::new()),
identifier_map: RwLock::new(BTreeMap::new()),
sync_history: RwLock::new(BTreeMap::new()),
}
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn mesh_id(&self) -> &str {
&self.config.mesh_id
}
pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
self.config.matches_mesh(device_mesh_id)
}
pub fn on_discovered(
&self,
identifier: &str,
name: Option<&str>,
rssi: i8,
mesh_id: Option<&str>,
now_ms: u64,
) -> Option<(NodeId, bool)> {
if !self.matches_mesh(mesh_id) {
return None;
}
let node_id = parse_node_id_from_name(name)?;
if node_id == self.node_id {
return None;
}
let mut peers = self.peers.write().unwrap();
let mut id_map = self.identifier_map.write().unwrap();
if let Some(&existing_node_id) = id_map.get(identifier) {
if existing_node_id != node_id {
peers.remove(&existing_node_id);
}
}
if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
return None; }
let is_new = !peers.contains_key(&node_id);
let peer = peers.entry(node_id).or_insert_with(|| {
PeatPeer::new(
node_id,
identifier.to_string(),
mesh_id.map(|s| s.to_string()),
name.map(|s| s.to_string()),
rssi,
)
});
peer.rssi = rssi;
peer.touch(now_ms);
if let Some(n) = name {
peer.name = Some(n.to_string());
}
id_map.insert(identifier.to_string(), node_id);
Some((node_id, is_new))
}
pub fn on_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
let id_map = self.identifier_map.read().unwrap();
let node_id = id_map.get(identifier).copied()?;
drop(id_map);
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get_mut(&node_id) {
peer.is_connected = true;
peer.touch(now_ms);
}
Some(node_id)
}
pub fn on_disconnected(
&self,
identifier: &str,
reason: DisconnectReason,
) -> Option<(NodeId, DisconnectReason)> {
let id_map = self.identifier_map.read().unwrap();
let node_id = id_map.get(identifier).copied()?;
drop(id_map);
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get_mut(&node_id) {
peer.is_connected = false;
}
Some((node_id, reason))
}
pub fn on_disconnected_by_node_id(&self, node_id: NodeId, _reason: DisconnectReason) -> bool {
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get_mut(&node_id) {
peer.is_connected = false;
true
} else {
false
}
}
pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
if node_id == self.node_id {
return false;
}
let mut peers = self.peers.write().unwrap();
let mut id_map = self.identifier_map.write().unwrap();
if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
return false;
}
let is_new = !peers.contains_key(&node_id);
let peer = peers.entry(node_id).or_insert_with(|| {
PeatPeer::new(
node_id,
identifier.to_string(),
Some(self.config.mesh_id.clone()),
None,
-70, )
});
peer.is_connected = true;
peer.touch(now_ms);
if peer.identifier != identifier {
id_map.remove(&peer.identifier);
peer.identifier = identifier.to_string();
}
id_map.insert(identifier.to_string(), node_id);
is_new
}
pub fn should_sync_with(&self, node_id: NodeId, now_ms: u64) -> bool {
let history = self.sync_history.read().unwrap();
match history.get(&node_id) {
Some(&last_sync) => now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms,
None => true, }
}
pub fn record_sync(&self, node_id: NodeId, now_ms: u64) {
let mut history = self.sync_history.write().unwrap();
history.insert(node_id, now_ms);
}
pub fn cleanup_stale(&self, now_ms: u64) -> Vec<NodeId> {
let mut peers = self.peers.write().unwrap();
let mut id_map = self.identifier_map.write().unwrap();
let mut history = self.sync_history.write().unwrap();
let mut removed = Vec::new();
let stale: Vec<NodeId> = peers
.iter()
.filter(|(_, peer)| peer.is_stale(now_ms, self.config.peer_timeout_ms))
.map(|(&node_id, _)| node_id)
.collect();
for node_id in stale {
if let Some(peer) = peers.remove(&node_id) {
id_map.remove(&peer.identifier);
history.remove(&node_id);
removed.push(node_id);
}
}
removed
}
pub fn get_peers(&self) -> Vec<PeatPeer> {
let peers = self.peers.read().unwrap();
peers.values().cloned().collect()
}
pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
let peers = self.peers.read().unwrap();
peers.values().filter(|p| p.is_connected).cloned().collect()
}
pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
let peers = self.peers.read().unwrap();
peers.get(&node_id).cloned()
}
pub fn get_peer_by_identifier(&self, identifier: &str) -> Option<PeatPeer> {
let id_map = self.identifier_map.read().unwrap();
let node_id = id_map.get(identifier).copied()?;
drop(id_map);
let peers = self.peers.read().unwrap();
peers.get(&node_id).cloned()
}
pub fn get_node_id(&self, identifier: &str) -> Option<NodeId> {
let id_map = self.identifier_map.read().unwrap();
id_map.get(identifier).copied()
}
pub fn register_identifier(&self, identifier: &str, node_id: NodeId) {
let mut id_map = self.identifier_map.write().unwrap();
id_map.insert(identifier.to_string(), node_id);
log::debug!(
"Registered identifier {} -> {:08X}",
identifier,
node_id.as_u32()
);
}
pub fn peer_count(&self) -> usize {
self.peers.read().unwrap().len()
}
pub fn connected_count(&self) -> usize {
self.peers
.read()
.unwrap()
.values()
.filter(|p| p.is_connected)
.count()
}
pub fn get_connected_identifiers(&self) -> Vec<String> {
self.peers
.read()
.unwrap()
.values()
.filter(|p| p.is_connected)
.map(|p| p.identifier.clone())
.collect()
}
pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
let peers = self.peers.read().unwrap();
let history = self.sync_history.read().unwrap();
peers
.values()
.filter(|peer| {
if !peer.is_connected {
return false;
}
match history.get(&peer.node_id) {
Some(&last_sync) => {
now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms
}
None => true,
}
})
.cloned()
.collect()
}
pub fn generate_state_event(&self) -> PeatEvent {
PeatEvent::MeshStateChanged {
peer_count: self.peer_count(),
connected_count: self.connected_count(),
}
}
}
fn parse_node_id_from_name(name: Option<&str>) -> Option<NodeId> {
let name = name?;
let hyphen_pos = name.rfind('-')?;
let hex_part = &name[hyphen_pos + 1..];
if hex_part.len() != 8 {
return None;
}
u32::from_str_radix(hex_part, 16).ok().map(NodeId::new)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_node_id_from_name() {
assert_eq!(
parse_node_id_from_name(Some("PEAT_DEMO-12345678")),
Some(NodeId::new(0x12345678))
);
assert_eq!(
parse_node_id_from_name(Some("PEAT_ALPHA-AABBCCDD")),
Some(NodeId::new(0xAABBCCDD))
);
assert_eq!(parse_node_id_from_name(Some("Invalid")), None);
assert_eq!(parse_node_id_from_name(Some("PEAT_DEMO-123")), None); assert_eq!(parse_node_id_from_name(None), None);
}
#[test]
fn test_peer_discovery() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let result = manager.on_discovered(
"device-uuid-1",
Some("PEAT_DEMO-22222222"),
-65,
Some("DEMO"),
1000,
);
assert!(result.is_some());
let (node_id, is_new) = result.unwrap();
assert_eq!(node_id.as_u32(), 0x22222222);
assert!(is_new);
let result = manager.on_discovered(
"device-uuid-1",
Some("PEAT_DEMO-22222222"),
-60,
Some("DEMO"),
2000,
);
assert!(result.is_some());
let (_, is_new) = result.unwrap();
assert!(!is_new);
assert_eq!(manager.peer_count(), 1);
let peer = manager.get_peer(NodeId::new(0x22222222)).unwrap();
assert_eq!(peer.rssi, -60); }
#[test]
fn test_mesh_filtering() {
let config = PeerManagerConfig::with_mesh_id("ALPHA");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let result = manager.on_discovered(
"device-uuid-1",
Some("PEAT_BETA-22222222"),
-65,
Some("BETA"),
1000,
);
assert!(result.is_none());
assert_eq!(manager.peer_count(), 0);
let result = manager.on_discovered(
"device-uuid-2",
Some("PEAT_ALPHA-33333333"),
-65,
Some("ALPHA"),
1000,
);
assert!(result.is_some());
assert_eq!(manager.peer_count(), 1);
}
#[test]
fn test_self_filtering() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x12345678), config);
let result = manager.on_discovered(
"my-device-uuid",
Some("PEAT_DEMO-12345678"),
-30,
Some("DEMO"),
1000,
);
assert!(result.is_none());
assert_eq!(manager.peer_count(), 0);
}
#[test]
fn test_connection_lifecycle() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
manager.on_discovered(
"device-uuid-1",
Some("PEAT_DEMO-22222222"),
-65,
Some("DEMO"),
1000,
);
assert_eq!(manager.connected_count(), 0);
let node_id = manager.on_connected("device-uuid-1", 2000);
assert_eq!(node_id, Some(NodeId::new(0x22222222)));
assert_eq!(manager.connected_count(), 1);
let result = manager.on_disconnected("device-uuid-1", DisconnectReason::RemoteRequest);
assert!(result.is_some());
assert_eq!(manager.connected_count(), 0);
assert_eq!(manager.peer_count(), 1); }
#[test]
fn test_stale_cleanup() {
let config = PeerManagerConfig::with_mesh_id("DEMO").peer_timeout(10_000);
let manager = PeerManager::new(NodeId::new(0x11111111), config);
manager.on_discovered(
"device-uuid-1",
Some("PEAT_DEMO-22222222"),
-65,
Some("DEMO"),
1000,
);
assert_eq!(manager.peer_count(), 1);
let removed = manager.cleanup_stale(5000);
assert!(removed.is_empty());
assert_eq!(manager.peer_count(), 1);
let removed = manager.cleanup_stale(20000);
assert_eq!(removed.len(), 1);
assert_eq!(removed[0].as_u32(), 0x22222222);
assert_eq!(manager.peer_count(), 0);
}
#[test]
fn test_sync_cooldown() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let peer_id = NodeId::new(0x22222222);
assert!(manager.should_sync_with(peer_id, 1000));
manager.record_sync(peer_id, 1000);
assert!(!manager.should_sync_with(peer_id, 5000));
assert!(manager.should_sync_with(peer_id, 35000));
}
#[test]
fn test_max_peers_limit() {
let config = PeerManagerConfig::with_mesh_id("DEMO").max_peers(2);
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let result = manager.on_discovered(
"uuid-1",
Some("PEAT_DEMO-22222222"),
-65,
Some("DEMO"),
1000,
);
assert!(result.is_some());
let result = manager.on_discovered(
"uuid-2",
Some("PEAT_DEMO-33333333"),
-65,
Some("DEMO"),
1000,
);
assert!(result.is_some());
let result = manager.on_discovered(
"uuid-3",
Some("PEAT_DEMO-44444444"),
-65,
Some("DEMO"),
1000,
);
assert!(result.is_none());
assert_eq!(manager.peer_count(), 2);
}
#[test]
fn test_incoming_connection() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
assert!(is_new);
assert_eq!(manager.peer_count(), 1);
assert_eq!(manager.connected_count(), 1);
let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 2000);
assert!(!is_new);
}
#[test]
fn test_address_rotation() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let result = manager.on_discovered(
"AA:BB:CC:DD:EE:01",
Some("PEAT_DEMO-22222222"),
-70,
Some("DEMO"),
1000,
);
assert!(result.is_some());
let (node_id, is_new) = result.unwrap();
assert_eq!(node_id, NodeId::new(0x22222222));
assert!(is_new);
assert_eq!(manager.peer_count(), 1);
let result = manager.on_discovered(
"AA:BB:CC:DD:EE:02", Some("PEAT_DEMO-22222222"), -65,
Some("DEMO"),
2000,
);
assert!(result.is_some());
let (node_id, is_new) = result.unwrap();
assert_eq!(node_id, NodeId::new(0x22222222));
assert!(!is_new); assert_eq!(manager.peer_count(), 1);
assert_eq!(
manager.get_node_id("AA:BB:CC:DD:EE:01"),
Some(NodeId::new(0x22222222))
);
assert_eq!(
manager.get_node_id("AA:BB:CC:DD:EE:02"),
Some(NodeId::new(0x22222222))
);
}
#[test]
fn test_address_rotation_with_different_names() {
let config = PeerManagerConfig::with_mesh_id("DEMO");
let manager = PeerManager::new(NodeId::new(0x11111111), config);
let result = manager.on_discovered(
"AA:BB:CC:DD:EE:01",
Some("PEAT_DEMO-AABBCCDD"),
-70,
Some("DEMO"),
1000,
);
assert!(result.is_some());
assert!(result.unwrap().1); assert_eq!(manager.peer_count(), 1);
let result = manager.on_discovered(
"11:22:33:44:55:66", Some("PEAT_DEMO-AABBCCDD"), -75,
Some("DEMO"),
2000,
);
assert!(result.is_some());
assert!(!result.unwrap().1); assert_eq!(manager.peer_count(), 1); }
}