1#[cfg(not(feature = "std"))]
39use alloc::{
40 collections::BTreeMap,
41 string::{String, ToString},
42 vec::Vec,
43};
44#[cfg(feature = "std")]
45use std::collections::BTreeMap;
46#[cfg(feature = "std")]
47use std::sync::RwLock;
48
49#[cfg(not(feature = "std"))]
50use spin::RwLock;
51
52use crate::observer::{DisconnectReason, HiveEvent};
53use crate::peer::{HivePeer, PeerManagerConfig};
54use crate::NodeId;
55
56pub struct PeerManager {
61 config: PeerManagerConfig,
63
64 node_id: NodeId,
66
67 #[cfg(feature = "std")]
69 peers: RwLock<BTreeMap<NodeId, HivePeer>>,
70 #[cfg(not(feature = "std"))]
71 peers: RwLock<BTreeMap<NodeId, HivePeer>>,
72
73 #[cfg(feature = "std")]
75 identifier_map: RwLock<BTreeMap<String, NodeId>>,
76 #[cfg(not(feature = "std"))]
77 identifier_map: RwLock<BTreeMap<String, NodeId>>,
78
79 #[cfg(feature = "std")]
81 sync_history: RwLock<BTreeMap<NodeId, u64>>,
82 #[cfg(not(feature = "std"))]
83 sync_history: RwLock<BTreeMap<NodeId, u64>>,
84}
85
86impl PeerManager {
87 pub fn new(node_id: NodeId, config: PeerManagerConfig) -> Self {
89 Self {
90 config,
91 node_id,
92 peers: RwLock::new(BTreeMap::new()),
93 identifier_map: RwLock::new(BTreeMap::new()),
94 sync_history: RwLock::new(BTreeMap::new()),
95 }
96 }
97
98 pub fn node_id(&self) -> NodeId {
100 self.node_id
101 }
102
103 pub fn mesh_id(&self) -> &str {
105 &self.config.mesh_id
106 }
107
108 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
110 self.config.matches_mesh(device_mesh_id)
111 }
112
113 pub fn on_discovered(
122 &self,
123 identifier: &str,
124 name: Option<&str>,
125 rssi: i8,
126 mesh_id: Option<&str>,
127 now_ms: u64,
128 ) -> Option<(NodeId, bool)> {
129 if !self.matches_mesh(mesh_id) {
131 return None;
132 }
133
134 let node_id = parse_node_id_from_name(name)?;
136
137 if node_id == self.node_id {
139 return None;
140 }
141
142 let mut peers = self.peers.write().unwrap();
143 let mut id_map = self.identifier_map.write().unwrap();
144
145 if let Some(&existing_node_id) = id_map.get(identifier) {
147 if existing_node_id != node_id {
148 peers.remove(&existing_node_id);
150 }
151 }
152
153 if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
155 return None; }
157
158 let is_new = !peers.contains_key(&node_id);
159
160 let peer = peers.entry(node_id).or_insert_with(|| {
162 HivePeer::new(
163 node_id,
164 identifier.to_string(),
165 mesh_id.map(|s| s.to_string()),
166 name.map(|s| s.to_string()),
167 rssi,
168 )
169 });
170
171 peer.rssi = rssi;
173 peer.touch(now_ms);
174 if let Some(n) = name {
175 peer.name = Some(n.to_string());
176 }
177
178 id_map.insert(identifier.to_string(), node_id);
180
181 Some((node_id, is_new))
182 }
183
184 pub fn on_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
189 let id_map = self.identifier_map.read().unwrap();
190 let node_id = id_map.get(identifier).copied()?;
191 drop(id_map);
192
193 let mut peers = self.peers.write().unwrap();
194 if let Some(peer) = peers.get_mut(&node_id) {
195 peer.is_connected = true;
196 peer.touch(now_ms);
197 }
198
199 Some(node_id)
200 }
201
202 pub fn on_disconnected(
207 &self,
208 identifier: &str,
209 reason: DisconnectReason,
210 ) -> Option<(NodeId, DisconnectReason)> {
211 let id_map = self.identifier_map.read().unwrap();
212 let node_id = id_map.get(identifier).copied()?;
213 drop(id_map);
214
215 let mut peers = self.peers.write().unwrap();
216 if let Some(peer) = peers.get_mut(&node_id) {
217 peer.is_connected = false;
218 }
219
220 Some((node_id, reason))
221 }
222
223 pub fn on_disconnected_by_node_id(&self, node_id: NodeId, _reason: DisconnectReason) -> bool {
228 let mut peers = self.peers.write().unwrap();
229 if let Some(peer) = peers.get_mut(&node_id) {
230 peer.is_connected = false;
231 true
232 } else {
233 false
234 }
235 }
236
237 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
242 if node_id == self.node_id {
244 return false;
245 }
246
247 let mut peers = self.peers.write().unwrap();
248 let mut id_map = self.identifier_map.write().unwrap();
249
250 if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
252 return false;
253 }
254
255 let is_new = !peers.contains_key(&node_id);
256
257 let peer = peers.entry(node_id).or_insert_with(|| {
258 HivePeer::new(
259 node_id,
260 identifier.to_string(),
261 Some(self.config.mesh_id.clone()),
262 None,
263 -70, )
265 });
266
267 peer.is_connected = true;
268 peer.touch(now_ms);
269
270 if peer.identifier != identifier {
272 id_map.remove(&peer.identifier);
273 peer.identifier = identifier.to_string();
274 }
275 id_map.insert(identifier.to_string(), node_id);
276
277 is_new
278 }
279
280 pub fn should_sync_with(&self, node_id: NodeId, now_ms: u64) -> bool {
284 let history = self.sync_history.read().unwrap();
285 match history.get(&node_id) {
286 Some(&last_sync) => now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms,
287 None => true, }
289 }
290
291 pub fn record_sync(&self, node_id: NodeId, now_ms: u64) {
293 let mut history = self.sync_history.write().unwrap();
294 history.insert(node_id, now_ms);
295 }
296
297 pub fn cleanup_stale(&self, now_ms: u64) -> Vec<NodeId> {
302 let mut peers = self.peers.write().unwrap();
303 let mut id_map = self.identifier_map.write().unwrap();
304 let mut history = self.sync_history.write().unwrap();
305
306 let mut removed = Vec::new();
307
308 let stale: Vec<NodeId> = peers
310 .iter()
311 .filter(|(_, peer)| peer.is_stale(now_ms, self.config.peer_timeout_ms))
312 .map(|(&node_id, _)| node_id)
313 .collect();
314
315 for node_id in stale {
317 if let Some(peer) = peers.remove(&node_id) {
318 id_map.remove(&peer.identifier);
319 history.remove(&node_id);
320 removed.push(node_id);
321 }
322 }
323
324 removed
325 }
326
327 pub fn get_peers(&self) -> Vec<HivePeer> {
329 let peers = self.peers.read().unwrap();
330 peers.values().cloned().collect()
331 }
332
333 pub fn get_connected_peers(&self) -> Vec<HivePeer> {
335 let peers = self.peers.read().unwrap();
336 peers.values().filter(|p| p.is_connected).cloned().collect()
337 }
338
339 pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
341 let peers = self.peers.read().unwrap();
342 peers.get(&node_id).cloned()
343 }
344
345 pub fn get_peer_by_identifier(&self, identifier: &str) -> Option<HivePeer> {
347 let id_map = self.identifier_map.read().unwrap();
348 let node_id = id_map.get(identifier).copied()?;
349 drop(id_map);
350
351 let peers = self.peers.read().unwrap();
352 peers.get(&node_id).cloned()
353 }
354
355 pub fn get_node_id(&self, identifier: &str) -> Option<NodeId> {
357 let id_map = self.identifier_map.read().unwrap();
358 id_map.get(identifier).copied()
359 }
360
361 pub fn peer_count(&self) -> usize {
363 self.peers.read().unwrap().len()
364 }
365
366 pub fn connected_count(&self) -> usize {
368 self.peers
369 .read()
370 .unwrap()
371 .values()
372 .filter(|p| p.is_connected)
373 .count()
374 }
375
376 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
378 let peers = self.peers.read().unwrap();
379 let history = self.sync_history.read().unwrap();
380
381 peers
382 .values()
383 .filter(|peer| {
384 if !peer.is_connected {
385 return false;
386 }
387 match history.get(&peer.node_id) {
388 Some(&last_sync) => {
389 now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms
390 }
391 None => true,
392 }
393 })
394 .cloned()
395 .collect()
396 }
397
398 pub fn generate_state_event(&self) -> HiveEvent {
402 HiveEvent::MeshStateChanged {
403 peer_count: self.peer_count(),
404 connected_count: self.connected_count(),
405 }
406 }
407}
408
409fn parse_node_id_from_name(name: Option<&str>) -> Option<NodeId> {
413 let name = name?;
414
415 let hyphen_pos = name.rfind('-')?;
417 let hex_part = &name[hyphen_pos + 1..];
418
419 if hex_part.len() != 8 {
421 return None;
422 }
423
424 u32::from_str_radix(hex_part, 16).ok().map(NodeId::new)
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn test_parse_node_id_from_name() {
433 assert_eq!(
434 parse_node_id_from_name(Some("HIVE_DEMO-12345678")),
435 Some(NodeId::new(0x12345678))
436 );
437 assert_eq!(
438 parse_node_id_from_name(Some("HIVE_ALPHA-AABBCCDD")),
439 Some(NodeId::new(0xAABBCCDD))
440 );
441 assert_eq!(parse_node_id_from_name(Some("Invalid")), None);
442 assert_eq!(parse_node_id_from_name(Some("HIVE_DEMO-123")), None); assert_eq!(parse_node_id_from_name(None), None);
444 }
445
446 #[test]
447 fn test_peer_discovery() {
448 let config = PeerManagerConfig::with_mesh_id("DEMO");
449 let manager = PeerManager::new(NodeId::new(0x11111111), config);
450
451 let result = manager.on_discovered(
453 "device-uuid-1",
454 Some("HIVE_DEMO-22222222"),
455 -65,
456 Some("DEMO"),
457 1000,
458 );
459 assert!(result.is_some());
460 let (node_id, is_new) = result.unwrap();
461 assert_eq!(node_id.as_u32(), 0x22222222);
462 assert!(is_new);
463
464 let result = manager.on_discovered(
466 "device-uuid-1",
467 Some("HIVE_DEMO-22222222"),
468 -60,
469 Some("DEMO"),
470 2000,
471 );
472 assert!(result.is_some());
473 let (_, is_new) = result.unwrap();
474 assert!(!is_new);
475
476 assert_eq!(manager.peer_count(), 1);
478 let peer = manager.get_peer(NodeId::new(0x22222222)).unwrap();
479 assert_eq!(peer.rssi, -60); }
481
482 #[test]
483 fn test_mesh_filtering() {
484 let config = PeerManagerConfig::with_mesh_id("ALPHA");
485 let manager = PeerManager::new(NodeId::new(0x11111111), config);
486
487 let result = manager.on_discovered(
489 "device-uuid-1",
490 Some("HIVE_BETA-22222222"),
491 -65,
492 Some("BETA"),
493 1000,
494 );
495 assert!(result.is_none());
496 assert_eq!(manager.peer_count(), 0);
497
498 let result = manager.on_discovered(
500 "device-uuid-2",
501 Some("HIVE_ALPHA-33333333"),
502 -65,
503 Some("ALPHA"),
504 1000,
505 );
506 assert!(result.is_some());
507 assert_eq!(manager.peer_count(), 1);
508 }
509
510 #[test]
511 fn test_self_filtering() {
512 let config = PeerManagerConfig::with_mesh_id("DEMO");
513 let manager = PeerManager::new(NodeId::new(0x12345678), config);
514
515 let result = manager.on_discovered(
517 "my-device-uuid",
518 Some("HIVE_DEMO-12345678"),
519 -30,
520 Some("DEMO"),
521 1000,
522 );
523 assert!(result.is_none());
524 assert_eq!(manager.peer_count(), 0);
525 }
526
527 #[test]
528 fn test_connection_lifecycle() {
529 let config = PeerManagerConfig::with_mesh_id("DEMO");
530 let manager = PeerManager::new(NodeId::new(0x11111111), config);
531
532 manager.on_discovered(
534 "device-uuid-1",
535 Some("HIVE_DEMO-22222222"),
536 -65,
537 Some("DEMO"),
538 1000,
539 );
540 assert_eq!(manager.connected_count(), 0);
541
542 let node_id = manager.on_connected("device-uuid-1", 2000);
544 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
545 assert_eq!(manager.connected_count(), 1);
546
547 let result = manager.on_disconnected("device-uuid-1", DisconnectReason::RemoteRequest);
549 assert!(result.is_some());
550 assert_eq!(manager.connected_count(), 0);
551 assert_eq!(manager.peer_count(), 1); }
553
554 #[test]
555 fn test_stale_cleanup() {
556 let config = PeerManagerConfig::with_mesh_id("DEMO").peer_timeout(10_000);
557 let manager = PeerManager::new(NodeId::new(0x11111111), config);
558
559 manager.on_discovered(
561 "device-uuid-1",
562 Some("HIVE_DEMO-22222222"),
563 -65,
564 Some("DEMO"),
565 1000,
566 );
567 assert_eq!(manager.peer_count(), 1);
568
569 let removed = manager.cleanup_stale(5000);
571 assert!(removed.is_empty());
572 assert_eq!(manager.peer_count(), 1);
573
574 let removed = manager.cleanup_stale(20000);
576 assert_eq!(removed.len(), 1);
577 assert_eq!(removed[0].as_u32(), 0x22222222);
578 assert_eq!(manager.peer_count(), 0);
579 }
580
581 #[test]
582 fn test_sync_cooldown() {
583 let config = PeerManagerConfig::with_mesh_id("DEMO");
584 let manager = PeerManager::new(NodeId::new(0x11111111), config);
585 let peer_id = NodeId::new(0x22222222);
586
587 assert!(manager.should_sync_with(peer_id, 1000));
589
590 manager.record_sync(peer_id, 1000);
592
593 assert!(!manager.should_sync_with(peer_id, 5000));
595
596 assert!(manager.should_sync_with(peer_id, 35000));
598 }
599
600 #[test]
601 fn test_max_peers_limit() {
602 let config = PeerManagerConfig::with_mesh_id("DEMO").max_peers(2);
603 let manager = PeerManager::new(NodeId::new(0x11111111), config);
604
605 let result = manager.on_discovered(
607 "uuid-1",
608 Some("HIVE_DEMO-22222222"),
609 -65,
610 Some("DEMO"),
611 1000,
612 );
613 assert!(result.is_some());
614
615 let result = manager.on_discovered(
616 "uuid-2",
617 Some("HIVE_DEMO-33333333"),
618 -65,
619 Some("DEMO"),
620 1000,
621 );
622 assert!(result.is_some());
623
624 let result = manager.on_discovered(
626 "uuid-3",
627 Some("HIVE_DEMO-44444444"),
628 -65,
629 Some("DEMO"),
630 1000,
631 );
632 assert!(result.is_none());
633 assert_eq!(manager.peer_count(), 2);
634 }
635
636 #[test]
637 fn test_incoming_connection() {
638 let config = PeerManagerConfig::with_mesh_id("DEMO");
639 let manager = PeerManager::new(NodeId::new(0x11111111), config);
640
641 let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
643 assert!(is_new);
644 assert_eq!(manager.peer_count(), 1);
645 assert_eq!(manager.connected_count(), 1);
646
647 let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 2000);
649 assert!(!is_new);
650 }
651}