1#[cfg(not(feature = "std"))]
39use alloc::{
40 collections::BTreeMap,
41 string::{String, ToString},
42 vec::Vec,
43};
44#[cfg(feature = "std")]
45use std::collections::BTreeMap;
46#[cfg(feature = "std")]
47use std::sync::RwLock;
48
49#[cfg(not(feature = "std"))]
50use spin::RwLock;
51
52use crate::observer::{DisconnectReason, HiveEvent};
53use crate::peer::{HivePeer, PeerManagerConfig};
54use crate::NodeId;
55
56pub struct PeerManager {
61 config: PeerManagerConfig,
63
64 node_id: NodeId,
66
67 #[cfg(feature = "std")]
69 peers: RwLock<BTreeMap<NodeId, HivePeer>>,
70 #[cfg(not(feature = "std"))]
71 peers: RwLock<BTreeMap<NodeId, HivePeer>>,
72
73 #[cfg(feature = "std")]
75 identifier_map: RwLock<BTreeMap<String, NodeId>>,
76 #[cfg(not(feature = "std"))]
77 identifier_map: RwLock<BTreeMap<String, NodeId>>,
78
79 #[cfg(feature = "std")]
81 sync_history: RwLock<BTreeMap<NodeId, u64>>,
82 #[cfg(not(feature = "std"))]
83 sync_history: RwLock<BTreeMap<NodeId, u64>>,
84}
85
86impl PeerManager {
87 pub fn new(node_id: NodeId, config: PeerManagerConfig) -> Self {
89 Self {
90 config,
91 node_id,
92 peers: RwLock::new(BTreeMap::new()),
93 identifier_map: RwLock::new(BTreeMap::new()),
94 sync_history: RwLock::new(BTreeMap::new()),
95 }
96 }
97
98 pub fn node_id(&self) -> NodeId {
100 self.node_id
101 }
102
103 pub fn mesh_id(&self) -> &str {
105 &self.config.mesh_id
106 }
107
108 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
110 self.config.matches_mesh(device_mesh_id)
111 }
112
113 pub fn on_discovered(
122 &self,
123 identifier: &str,
124 name: Option<&str>,
125 rssi: i8,
126 mesh_id: Option<&str>,
127 now_ms: u64,
128 ) -> Option<(NodeId, bool)> {
129 if !self.matches_mesh(mesh_id) {
131 return None;
132 }
133
134 let node_id = parse_node_id_from_name(name)?;
136
137 if node_id == self.node_id {
139 return None;
140 }
141
142 let mut peers = self.peers.write().unwrap();
143 let mut id_map = self.identifier_map.write().unwrap();
144
145 if let Some(&existing_node_id) = id_map.get(identifier) {
147 if existing_node_id != node_id {
148 peers.remove(&existing_node_id);
150 }
151 }
152
153 if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
155 return None; }
157
158 let is_new = !peers.contains_key(&node_id);
159
160 let peer = peers.entry(node_id).or_insert_with(|| {
162 HivePeer::new(
163 node_id,
164 identifier.to_string(),
165 mesh_id.map(|s| s.to_string()),
166 name.map(|s| s.to_string()),
167 rssi,
168 )
169 });
170
171 peer.rssi = rssi;
173 peer.touch(now_ms);
174 if let Some(n) = name {
175 peer.name = Some(n.to_string());
176 }
177
178 id_map.insert(identifier.to_string(), node_id);
180
181 Some((node_id, is_new))
182 }
183
184 pub fn on_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
189 let id_map = self.identifier_map.read().unwrap();
190 let node_id = id_map.get(identifier).copied()?;
191 drop(id_map);
192
193 let mut peers = self.peers.write().unwrap();
194 if let Some(peer) = peers.get_mut(&node_id) {
195 peer.is_connected = true;
196 peer.touch(now_ms);
197 }
198
199 Some(node_id)
200 }
201
202 pub fn on_disconnected(
207 &self,
208 identifier: &str,
209 reason: DisconnectReason,
210 ) -> Option<(NodeId, DisconnectReason)> {
211 let id_map = self.identifier_map.read().unwrap();
212 let node_id = id_map.get(identifier).copied()?;
213 drop(id_map);
214
215 let mut peers = self.peers.write().unwrap();
216 if let Some(peer) = peers.get_mut(&node_id) {
217 peer.is_connected = false;
218 }
219
220 Some((node_id, reason))
221 }
222
223 pub fn on_disconnected_by_node_id(&self, node_id: NodeId, _reason: DisconnectReason) -> bool {
228 let mut peers = self.peers.write().unwrap();
229 if let Some(peer) = peers.get_mut(&node_id) {
230 peer.is_connected = false;
231 true
232 } else {
233 false
234 }
235 }
236
237 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
242 if node_id == self.node_id {
244 return false;
245 }
246
247 let mut peers = self.peers.write().unwrap();
248 let mut id_map = self.identifier_map.write().unwrap();
249
250 if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
252 return false;
253 }
254
255 let is_new = !peers.contains_key(&node_id);
256
257 let peer = peers.entry(node_id).or_insert_with(|| {
258 HivePeer::new(
259 node_id,
260 identifier.to_string(),
261 Some(self.config.mesh_id.clone()),
262 None,
263 -70, )
265 });
266
267 peer.is_connected = true;
268 peer.touch(now_ms);
269
270 if peer.identifier != identifier {
272 id_map.remove(&peer.identifier);
273 peer.identifier = identifier.to_string();
274 }
275 id_map.insert(identifier.to_string(), node_id);
276
277 is_new
278 }
279
280 pub fn should_sync_with(&self, node_id: NodeId, now_ms: u64) -> bool {
284 let history = self.sync_history.read().unwrap();
285 match history.get(&node_id) {
286 Some(&last_sync) => now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms,
287 None => true, }
289 }
290
291 pub fn record_sync(&self, node_id: NodeId, now_ms: u64) {
293 let mut history = self.sync_history.write().unwrap();
294 history.insert(node_id, now_ms);
295 }
296
297 pub fn cleanup_stale(&self, now_ms: u64) -> Vec<NodeId> {
302 let mut peers = self.peers.write().unwrap();
303 let mut id_map = self.identifier_map.write().unwrap();
304 let mut history = self.sync_history.write().unwrap();
305
306 let mut removed = Vec::new();
307
308 let stale: Vec<NodeId> = peers
310 .iter()
311 .filter(|(_, peer)| peer.is_stale(now_ms, self.config.peer_timeout_ms))
312 .map(|(&node_id, _)| node_id)
313 .collect();
314
315 for node_id in stale {
317 if let Some(peer) = peers.remove(&node_id) {
318 id_map.remove(&peer.identifier);
319 history.remove(&node_id);
320 removed.push(node_id);
321 }
322 }
323
324 removed
325 }
326
327 pub fn get_peers(&self) -> Vec<HivePeer> {
329 let peers = self.peers.read().unwrap();
330 peers.values().cloned().collect()
331 }
332
333 pub fn get_connected_peers(&self) -> Vec<HivePeer> {
335 let peers = self.peers.read().unwrap();
336 peers.values().filter(|p| p.is_connected).cloned().collect()
337 }
338
339 pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
341 let peers = self.peers.read().unwrap();
342 peers.get(&node_id).cloned()
343 }
344
345 pub fn get_peer_by_identifier(&self, identifier: &str) -> Option<HivePeer> {
347 let id_map = self.identifier_map.read().unwrap();
348 let node_id = id_map.get(identifier).copied()?;
349 drop(id_map);
350
351 let peers = self.peers.read().unwrap();
352 peers.get(&node_id).cloned()
353 }
354
355 pub fn get_node_id(&self, identifier: &str) -> Option<NodeId> {
357 let id_map = self.identifier_map.read().unwrap();
358 id_map.get(identifier).copied()
359 }
360
361 pub fn register_identifier(&self, identifier: &str, node_id: NodeId) {
367 let mut id_map = self.identifier_map.write().unwrap();
368 id_map.insert(identifier.to_string(), node_id);
369 log::debug!(
370 "Registered identifier {} -> {:08X}",
371 identifier,
372 node_id.as_u32()
373 );
374 }
375
376 pub fn peer_count(&self) -> usize {
378 self.peers.read().unwrap().len()
379 }
380
381 pub fn connected_count(&self) -> usize {
383 self.peers
384 .read()
385 .unwrap()
386 .values()
387 .filter(|p| p.is_connected)
388 .count()
389 }
390
391 pub fn get_connected_identifiers(&self) -> Vec<String> {
395 self.peers
396 .read()
397 .unwrap()
398 .values()
399 .filter(|p| p.is_connected)
400 .map(|p| p.identifier.clone())
401 .collect()
402 }
403
404 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
406 let peers = self.peers.read().unwrap();
407 let history = self.sync_history.read().unwrap();
408
409 peers
410 .values()
411 .filter(|peer| {
412 if !peer.is_connected {
413 return false;
414 }
415 match history.get(&peer.node_id) {
416 Some(&last_sync) => {
417 now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms
418 }
419 None => true,
420 }
421 })
422 .cloned()
423 .collect()
424 }
425
426 pub fn generate_state_event(&self) -> HiveEvent {
430 HiveEvent::MeshStateChanged {
431 peer_count: self.peer_count(),
432 connected_count: self.connected_count(),
433 }
434 }
435}
436
437fn parse_node_id_from_name(name: Option<&str>) -> Option<NodeId> {
441 let name = name?;
442
443 let hyphen_pos = name.rfind('-')?;
445 let hex_part = &name[hyphen_pos + 1..];
446
447 if hex_part.len() != 8 {
449 return None;
450 }
451
452 u32::from_str_radix(hex_part, 16).ok().map(NodeId::new)
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458
459 #[test]
460 fn test_parse_node_id_from_name() {
461 assert_eq!(
462 parse_node_id_from_name(Some("HIVE_DEMO-12345678")),
463 Some(NodeId::new(0x12345678))
464 );
465 assert_eq!(
466 parse_node_id_from_name(Some("HIVE_ALPHA-AABBCCDD")),
467 Some(NodeId::new(0xAABBCCDD))
468 );
469 assert_eq!(parse_node_id_from_name(Some("Invalid")), None);
470 assert_eq!(parse_node_id_from_name(Some("HIVE_DEMO-123")), None); assert_eq!(parse_node_id_from_name(None), None);
472 }
473
474 #[test]
475 fn test_peer_discovery() {
476 let config = PeerManagerConfig::with_mesh_id("DEMO");
477 let manager = PeerManager::new(NodeId::new(0x11111111), config);
478
479 let result = manager.on_discovered(
481 "device-uuid-1",
482 Some("HIVE_DEMO-22222222"),
483 -65,
484 Some("DEMO"),
485 1000,
486 );
487 assert!(result.is_some());
488 let (node_id, is_new) = result.unwrap();
489 assert_eq!(node_id.as_u32(), 0x22222222);
490 assert!(is_new);
491
492 let result = manager.on_discovered(
494 "device-uuid-1",
495 Some("HIVE_DEMO-22222222"),
496 -60,
497 Some("DEMO"),
498 2000,
499 );
500 assert!(result.is_some());
501 let (_, is_new) = result.unwrap();
502 assert!(!is_new);
503
504 assert_eq!(manager.peer_count(), 1);
506 let peer = manager.get_peer(NodeId::new(0x22222222)).unwrap();
507 assert_eq!(peer.rssi, -60); }
509
510 #[test]
511 fn test_mesh_filtering() {
512 let config = PeerManagerConfig::with_mesh_id("ALPHA");
513 let manager = PeerManager::new(NodeId::new(0x11111111), config);
514
515 let result = manager.on_discovered(
517 "device-uuid-1",
518 Some("HIVE_BETA-22222222"),
519 -65,
520 Some("BETA"),
521 1000,
522 );
523 assert!(result.is_none());
524 assert_eq!(manager.peer_count(), 0);
525
526 let result = manager.on_discovered(
528 "device-uuid-2",
529 Some("HIVE_ALPHA-33333333"),
530 -65,
531 Some("ALPHA"),
532 1000,
533 );
534 assert!(result.is_some());
535 assert_eq!(manager.peer_count(), 1);
536 }
537
538 #[test]
539 fn test_self_filtering() {
540 let config = PeerManagerConfig::with_mesh_id("DEMO");
541 let manager = PeerManager::new(NodeId::new(0x12345678), config);
542
543 let result = manager.on_discovered(
545 "my-device-uuid",
546 Some("HIVE_DEMO-12345678"),
547 -30,
548 Some("DEMO"),
549 1000,
550 );
551 assert!(result.is_none());
552 assert_eq!(manager.peer_count(), 0);
553 }
554
555 #[test]
556 fn test_connection_lifecycle() {
557 let config = PeerManagerConfig::with_mesh_id("DEMO");
558 let manager = PeerManager::new(NodeId::new(0x11111111), config);
559
560 manager.on_discovered(
562 "device-uuid-1",
563 Some("HIVE_DEMO-22222222"),
564 -65,
565 Some("DEMO"),
566 1000,
567 );
568 assert_eq!(manager.connected_count(), 0);
569
570 let node_id = manager.on_connected("device-uuid-1", 2000);
572 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
573 assert_eq!(manager.connected_count(), 1);
574
575 let result = manager.on_disconnected("device-uuid-1", DisconnectReason::RemoteRequest);
577 assert!(result.is_some());
578 assert_eq!(manager.connected_count(), 0);
579 assert_eq!(manager.peer_count(), 1); }
581
582 #[test]
583 fn test_stale_cleanup() {
584 let config = PeerManagerConfig::with_mesh_id("DEMO").peer_timeout(10_000);
585 let manager = PeerManager::new(NodeId::new(0x11111111), config);
586
587 manager.on_discovered(
589 "device-uuid-1",
590 Some("HIVE_DEMO-22222222"),
591 -65,
592 Some("DEMO"),
593 1000,
594 );
595 assert_eq!(manager.peer_count(), 1);
596
597 let removed = manager.cleanup_stale(5000);
599 assert!(removed.is_empty());
600 assert_eq!(manager.peer_count(), 1);
601
602 let removed = manager.cleanup_stale(20000);
604 assert_eq!(removed.len(), 1);
605 assert_eq!(removed[0].as_u32(), 0x22222222);
606 assert_eq!(manager.peer_count(), 0);
607 }
608
609 #[test]
610 fn test_sync_cooldown() {
611 let config = PeerManagerConfig::with_mesh_id("DEMO");
612 let manager = PeerManager::new(NodeId::new(0x11111111), config);
613 let peer_id = NodeId::new(0x22222222);
614
615 assert!(manager.should_sync_with(peer_id, 1000));
617
618 manager.record_sync(peer_id, 1000);
620
621 assert!(!manager.should_sync_with(peer_id, 5000));
623
624 assert!(manager.should_sync_with(peer_id, 35000));
626 }
627
628 #[test]
629 fn test_max_peers_limit() {
630 let config = PeerManagerConfig::with_mesh_id("DEMO").max_peers(2);
631 let manager = PeerManager::new(NodeId::new(0x11111111), config);
632
633 let result = manager.on_discovered(
635 "uuid-1",
636 Some("HIVE_DEMO-22222222"),
637 -65,
638 Some("DEMO"),
639 1000,
640 );
641 assert!(result.is_some());
642
643 let result = manager.on_discovered(
644 "uuid-2",
645 Some("HIVE_DEMO-33333333"),
646 -65,
647 Some("DEMO"),
648 1000,
649 );
650 assert!(result.is_some());
651
652 let result = manager.on_discovered(
654 "uuid-3",
655 Some("HIVE_DEMO-44444444"),
656 -65,
657 Some("DEMO"),
658 1000,
659 );
660 assert!(result.is_none());
661 assert_eq!(manager.peer_count(), 2);
662 }
663
664 #[test]
665 fn test_incoming_connection() {
666 let config = PeerManagerConfig::with_mesh_id("DEMO");
667 let manager = PeerManager::new(NodeId::new(0x11111111), config);
668
669 let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
671 assert!(is_new);
672 assert_eq!(manager.peer_count(), 1);
673 assert_eq!(manager.connected_count(), 1);
674
675 let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 2000);
677 assert!(!is_new);
678 }
679
680 #[test]
681 fn test_address_rotation() {
682 let config = PeerManagerConfig::with_mesh_id("DEMO");
684 let manager = PeerManager::new(NodeId::new(0x11111111), config);
685
686 let result = manager.on_discovered(
688 "AA:BB:CC:DD:EE:01",
689 Some("HIVE_DEMO-22222222"),
690 -70,
691 Some("DEMO"),
692 1000,
693 );
694 assert!(result.is_some());
695 let (node_id, is_new) = result.unwrap();
696 assert_eq!(node_id, NodeId::new(0x22222222));
697 assert!(is_new);
698 assert_eq!(manager.peer_count(), 1);
699
700 let result = manager.on_discovered(
702 "AA:BB:CC:DD:EE:02", Some("HIVE_DEMO-22222222"), -65,
705 Some("DEMO"),
706 2000,
707 );
708 assert!(result.is_some());
709 let (node_id, is_new) = result.unwrap();
710 assert_eq!(node_id, NodeId::new(0x22222222));
711 assert!(!is_new); assert_eq!(manager.peer_count(), 1); assert_eq!(
716 manager.get_node_id("AA:BB:CC:DD:EE:01"),
717 Some(NodeId::new(0x22222222))
718 );
719 assert_eq!(
720 manager.get_node_id("AA:BB:CC:DD:EE:02"),
721 Some(NodeId::new(0x22222222))
722 );
723 }
724
725 #[test]
726 fn test_address_rotation_with_different_names() {
727 let config = PeerManagerConfig::with_mesh_id("DEMO");
729 let manager = PeerManager::new(NodeId::new(0x11111111), config);
730
731 let result = manager.on_discovered(
733 "AA:BB:CC:DD:EE:01",
734 Some("HIVE_DEMO-AABBCCDD"),
735 -70,
736 Some("DEMO"),
737 1000,
738 );
739 assert!(result.is_some());
740 assert!(result.unwrap().1); assert_eq!(manager.peer_count(), 1);
742
743 let result = manager.on_discovered(
745 "11:22:33:44:55:66", Some("HIVE_DEMO-AABBCCDD"), -75,
748 Some("DEMO"),
749 2000,
750 );
751 assert!(result.is_some());
752 assert!(!result.unwrap().1); assert_eq!(manager.peer_count(), 1); }
755}