1#[cfg(not(feature = "std"))]
24use alloc::{
25 collections::BTreeMap,
26 string::{String, ToString},
27 vec::Vec,
28};
29#[cfg(feature = "std")]
30use std::collections::BTreeMap;
31#[cfg(feature = "std")]
32use std::sync::RwLock;
33
34#[cfg(not(feature = "std"))]
35use spin::RwLock;
36
37use crate::observer::{DisconnectReason, HiveEvent};
38use crate::peer::{HivePeer, PeerManagerConfig};
39use crate::NodeId;
40
41pub struct PeerManager {
46 config: PeerManagerConfig,
48
49 node_id: NodeId,
51
52 #[cfg(feature = "std")]
54 peers: RwLock<BTreeMap<NodeId, HivePeer>>,
55 #[cfg(not(feature = "std"))]
56 peers: RwLock<BTreeMap<NodeId, HivePeer>>,
57
58 #[cfg(feature = "std")]
60 identifier_map: RwLock<BTreeMap<String, NodeId>>,
61 #[cfg(not(feature = "std"))]
62 identifier_map: RwLock<BTreeMap<String, NodeId>>,
63
64 #[cfg(feature = "std")]
66 sync_history: RwLock<BTreeMap<NodeId, u64>>,
67 #[cfg(not(feature = "std"))]
68 sync_history: RwLock<BTreeMap<NodeId, u64>>,
69}
70
71impl PeerManager {
72 pub fn new(node_id: NodeId, config: PeerManagerConfig) -> Self {
74 Self {
75 config,
76 node_id,
77 peers: RwLock::new(BTreeMap::new()),
78 identifier_map: RwLock::new(BTreeMap::new()),
79 sync_history: RwLock::new(BTreeMap::new()),
80 }
81 }
82
83 pub fn node_id(&self) -> NodeId {
85 self.node_id
86 }
87
88 pub fn mesh_id(&self) -> &str {
90 &self.config.mesh_id
91 }
92
93 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
95 self.config.matches_mesh(device_mesh_id)
96 }
97
98 pub fn on_discovered(
107 &self,
108 identifier: &str,
109 name: Option<&str>,
110 rssi: i8,
111 mesh_id: Option<&str>,
112 now_ms: u64,
113 ) -> Option<(NodeId, bool)> {
114 if !self.matches_mesh(mesh_id) {
116 return None;
117 }
118
119 let node_id = parse_node_id_from_name(name)?;
121
122 if node_id == self.node_id {
124 return None;
125 }
126
127 let mut peers = self.peers.write().unwrap();
128 let mut id_map = self.identifier_map.write().unwrap();
129
130 if let Some(&existing_node_id) = id_map.get(identifier) {
132 if existing_node_id != node_id {
133 peers.remove(&existing_node_id);
135 }
136 }
137
138 if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
140 return None; }
142
143 let is_new = !peers.contains_key(&node_id);
144
145 let peer = peers.entry(node_id).or_insert_with(|| {
147 HivePeer::new(
148 node_id,
149 identifier.to_string(),
150 mesh_id.map(|s| s.to_string()),
151 name.map(|s| s.to_string()),
152 rssi,
153 )
154 });
155
156 peer.rssi = rssi;
158 peer.touch(now_ms);
159 if let Some(n) = name {
160 peer.name = Some(n.to_string());
161 }
162
163 id_map.insert(identifier.to_string(), node_id);
165
166 Some((node_id, is_new))
167 }
168
169 pub fn on_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
174 let id_map = self.identifier_map.read().unwrap();
175 let node_id = id_map.get(identifier).copied()?;
176 drop(id_map);
177
178 let mut peers = self.peers.write().unwrap();
179 if let Some(peer) = peers.get_mut(&node_id) {
180 peer.is_connected = true;
181 peer.touch(now_ms);
182 }
183
184 Some(node_id)
185 }
186
187 pub fn on_disconnected(
192 &self,
193 identifier: &str,
194 reason: DisconnectReason,
195 ) -> Option<(NodeId, DisconnectReason)> {
196 let id_map = self.identifier_map.read().unwrap();
197 let node_id = id_map.get(identifier).copied()?;
198 drop(id_map);
199
200 let mut peers = self.peers.write().unwrap();
201 if let Some(peer) = peers.get_mut(&node_id) {
202 peer.is_connected = false;
203 }
204
205 Some((node_id, reason))
206 }
207
208 pub fn on_disconnected_by_node_id(&self, node_id: NodeId, _reason: DisconnectReason) -> bool {
213 let mut peers = self.peers.write().unwrap();
214 if let Some(peer) = peers.get_mut(&node_id) {
215 peer.is_connected = false;
216 true
217 } else {
218 false
219 }
220 }
221
222 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
227 if node_id == self.node_id {
229 return false;
230 }
231
232 let mut peers = self.peers.write().unwrap();
233 let mut id_map = self.identifier_map.write().unwrap();
234
235 if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
237 return false;
238 }
239
240 let is_new = !peers.contains_key(&node_id);
241
242 let peer = peers.entry(node_id).or_insert_with(|| {
243 HivePeer::new(
244 node_id,
245 identifier.to_string(),
246 Some(self.config.mesh_id.clone()),
247 None,
248 -70, )
250 });
251
252 peer.is_connected = true;
253 peer.touch(now_ms);
254
255 if peer.identifier != identifier {
257 id_map.remove(&peer.identifier);
258 peer.identifier = identifier.to_string();
259 }
260 id_map.insert(identifier.to_string(), node_id);
261
262 is_new
263 }
264
265 pub fn should_sync_with(&self, node_id: NodeId, now_ms: u64) -> bool {
269 let history = self.sync_history.read().unwrap();
270 match history.get(&node_id) {
271 Some(&last_sync) => now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms,
272 None => true, }
274 }
275
276 pub fn record_sync(&self, node_id: NodeId, now_ms: u64) {
278 let mut history = self.sync_history.write().unwrap();
279 history.insert(node_id, now_ms);
280 }
281
282 pub fn cleanup_stale(&self, now_ms: u64) -> Vec<NodeId> {
287 let mut peers = self.peers.write().unwrap();
288 let mut id_map = self.identifier_map.write().unwrap();
289 let mut history = self.sync_history.write().unwrap();
290
291 let mut removed = Vec::new();
292
293 let stale: Vec<NodeId> = peers
295 .iter()
296 .filter(|(_, peer)| peer.is_stale(now_ms, self.config.peer_timeout_ms))
297 .map(|(&node_id, _)| node_id)
298 .collect();
299
300 for node_id in stale {
302 if let Some(peer) = peers.remove(&node_id) {
303 id_map.remove(&peer.identifier);
304 history.remove(&node_id);
305 removed.push(node_id);
306 }
307 }
308
309 removed
310 }
311
312 pub fn get_peers(&self) -> Vec<HivePeer> {
314 let peers = self.peers.read().unwrap();
315 peers.values().cloned().collect()
316 }
317
318 pub fn get_connected_peers(&self) -> Vec<HivePeer> {
320 let peers = self.peers.read().unwrap();
321 peers.values().filter(|p| p.is_connected).cloned().collect()
322 }
323
324 pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
326 let peers = self.peers.read().unwrap();
327 peers.get(&node_id).cloned()
328 }
329
330 pub fn get_peer_by_identifier(&self, identifier: &str) -> Option<HivePeer> {
332 let id_map = self.identifier_map.read().unwrap();
333 let node_id = id_map.get(identifier).copied()?;
334 drop(id_map);
335
336 let peers = self.peers.read().unwrap();
337 peers.get(&node_id).cloned()
338 }
339
340 pub fn get_node_id(&self, identifier: &str) -> Option<NodeId> {
342 let id_map = self.identifier_map.read().unwrap();
343 id_map.get(identifier).copied()
344 }
345
346 pub fn peer_count(&self) -> usize {
348 self.peers.read().unwrap().len()
349 }
350
351 pub fn connected_count(&self) -> usize {
353 self.peers
354 .read()
355 .unwrap()
356 .values()
357 .filter(|p| p.is_connected)
358 .count()
359 }
360
361 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
363 let peers = self.peers.read().unwrap();
364 let history = self.sync_history.read().unwrap();
365
366 peers
367 .values()
368 .filter(|peer| {
369 if !peer.is_connected {
370 return false;
371 }
372 match history.get(&peer.node_id) {
373 Some(&last_sync) => {
374 now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms
375 }
376 None => true,
377 }
378 })
379 .cloned()
380 .collect()
381 }
382
383 pub fn generate_state_event(&self) -> HiveEvent {
387 HiveEvent::MeshStateChanged {
388 peer_count: self.peer_count(),
389 connected_count: self.connected_count(),
390 }
391 }
392}
393
394fn parse_node_id_from_name(name: Option<&str>) -> Option<NodeId> {
398 let name = name?;
399
400 let hyphen_pos = name.rfind('-')?;
402 let hex_part = &name[hyphen_pos + 1..];
403
404 if hex_part.len() != 8 {
406 return None;
407 }
408
409 u32::from_str_radix(hex_part, 16).ok().map(NodeId::new)
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415
416 #[test]
417 fn test_parse_node_id_from_name() {
418 assert_eq!(
419 parse_node_id_from_name(Some("HIVE_DEMO-12345678")),
420 Some(NodeId::new(0x12345678))
421 );
422 assert_eq!(
423 parse_node_id_from_name(Some("HIVE_ALPHA-AABBCCDD")),
424 Some(NodeId::new(0xAABBCCDD))
425 );
426 assert_eq!(parse_node_id_from_name(Some("Invalid")), None);
427 assert_eq!(parse_node_id_from_name(Some("HIVE_DEMO-123")), None); assert_eq!(parse_node_id_from_name(None), None);
429 }
430
431 #[test]
432 fn test_peer_discovery() {
433 let config = PeerManagerConfig::with_mesh_id("DEMO");
434 let manager = PeerManager::new(NodeId::new(0x11111111), config);
435
436 let result = manager.on_discovered(
438 "device-uuid-1",
439 Some("HIVE_DEMO-22222222"),
440 -65,
441 Some("DEMO"),
442 1000,
443 );
444 assert!(result.is_some());
445 let (node_id, is_new) = result.unwrap();
446 assert_eq!(node_id.as_u32(), 0x22222222);
447 assert!(is_new);
448
449 let result = manager.on_discovered(
451 "device-uuid-1",
452 Some("HIVE_DEMO-22222222"),
453 -60,
454 Some("DEMO"),
455 2000,
456 );
457 assert!(result.is_some());
458 let (_, is_new) = result.unwrap();
459 assert!(!is_new);
460
461 assert_eq!(manager.peer_count(), 1);
463 let peer = manager.get_peer(NodeId::new(0x22222222)).unwrap();
464 assert_eq!(peer.rssi, -60); }
466
467 #[test]
468 fn test_mesh_filtering() {
469 let config = PeerManagerConfig::with_mesh_id("ALPHA");
470 let manager = PeerManager::new(NodeId::new(0x11111111), config);
471
472 let result = manager.on_discovered(
474 "device-uuid-1",
475 Some("HIVE_BETA-22222222"),
476 -65,
477 Some("BETA"),
478 1000,
479 );
480 assert!(result.is_none());
481 assert_eq!(manager.peer_count(), 0);
482
483 let result = manager.on_discovered(
485 "device-uuid-2",
486 Some("HIVE_ALPHA-33333333"),
487 -65,
488 Some("ALPHA"),
489 1000,
490 );
491 assert!(result.is_some());
492 assert_eq!(manager.peer_count(), 1);
493 }
494
495 #[test]
496 fn test_self_filtering() {
497 let config = PeerManagerConfig::with_mesh_id("DEMO");
498 let manager = PeerManager::new(NodeId::new(0x12345678), config);
499
500 let result = manager.on_discovered(
502 "my-device-uuid",
503 Some("HIVE_DEMO-12345678"),
504 -30,
505 Some("DEMO"),
506 1000,
507 );
508 assert!(result.is_none());
509 assert_eq!(manager.peer_count(), 0);
510 }
511
512 #[test]
513 fn test_connection_lifecycle() {
514 let config = PeerManagerConfig::with_mesh_id("DEMO");
515 let manager = PeerManager::new(NodeId::new(0x11111111), config);
516
517 manager.on_discovered(
519 "device-uuid-1",
520 Some("HIVE_DEMO-22222222"),
521 -65,
522 Some("DEMO"),
523 1000,
524 );
525 assert_eq!(manager.connected_count(), 0);
526
527 let node_id = manager.on_connected("device-uuid-1", 2000);
529 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
530 assert_eq!(manager.connected_count(), 1);
531
532 let result = manager.on_disconnected("device-uuid-1", DisconnectReason::RemoteRequest);
534 assert!(result.is_some());
535 assert_eq!(manager.connected_count(), 0);
536 assert_eq!(manager.peer_count(), 1); }
538
539 #[test]
540 fn test_stale_cleanup() {
541 let config = PeerManagerConfig::with_mesh_id("DEMO").peer_timeout(10_000);
542 let manager = PeerManager::new(NodeId::new(0x11111111), config);
543
544 manager.on_discovered(
546 "device-uuid-1",
547 Some("HIVE_DEMO-22222222"),
548 -65,
549 Some("DEMO"),
550 1000,
551 );
552 assert_eq!(manager.peer_count(), 1);
553
554 let removed = manager.cleanup_stale(5000);
556 assert!(removed.is_empty());
557 assert_eq!(manager.peer_count(), 1);
558
559 let removed = manager.cleanup_stale(20000);
561 assert_eq!(removed.len(), 1);
562 assert_eq!(removed[0].as_u32(), 0x22222222);
563 assert_eq!(manager.peer_count(), 0);
564 }
565
566 #[test]
567 fn test_sync_cooldown() {
568 let config = PeerManagerConfig::with_mesh_id("DEMO");
569 let manager = PeerManager::new(NodeId::new(0x11111111), config);
570 let peer_id = NodeId::new(0x22222222);
571
572 assert!(manager.should_sync_with(peer_id, 1000));
574
575 manager.record_sync(peer_id, 1000);
577
578 assert!(!manager.should_sync_with(peer_id, 5000));
580
581 assert!(manager.should_sync_with(peer_id, 35000));
583 }
584
585 #[test]
586 fn test_max_peers_limit() {
587 let config = PeerManagerConfig::with_mesh_id("DEMO").max_peers(2);
588 let manager = PeerManager::new(NodeId::new(0x11111111), config);
589
590 let result = manager.on_discovered(
592 "uuid-1",
593 Some("HIVE_DEMO-22222222"),
594 -65,
595 Some("DEMO"),
596 1000,
597 );
598 assert!(result.is_some());
599
600 let result = manager.on_discovered(
601 "uuid-2",
602 Some("HIVE_DEMO-33333333"),
603 -65,
604 Some("DEMO"),
605 1000,
606 );
607 assert!(result.is_some());
608
609 let result = manager.on_discovered(
611 "uuid-3",
612 Some("HIVE_DEMO-44444444"),
613 -65,
614 Some("DEMO"),
615 1000,
616 );
617 assert!(result.is_none());
618 assert_eq!(manager.peer_count(), 2);
619 }
620
621 #[test]
622 fn test_incoming_connection() {
623 let config = PeerManagerConfig::with_mesh_id("DEMO");
624 let manager = PeerManager::new(NodeId::new(0x11111111), config);
625
626 let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
628 assert!(is_new);
629 assert_eq!(manager.peer_count(), 1);
630 assert_eq!(manager.connected_count(), 1);
631
632 let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 2000);
634 assert!(!is_new);
635 }
636}