hive_btle/
peer_manager.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Peer management for HIVE BLE mesh
17//!
18//! This module provides centralized peer tracking, connection management,
19//! and sync scheduling. It replaces the duplicated peer management logic
20//! that was previously in iOS, Android, and ESP32 implementations.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use hive_btle::peer_manager::PeerManager;
26//! use hive_btle::peer::PeerManagerConfig;
27//! use hive_btle::NodeId;
28//!
29//! let config = PeerManagerConfig::with_mesh_id("DEMO");
30//! let manager = PeerManager::new(NodeId::new(0x12345678), config);
31//!
32//! // Called by platform BLE adapter on discovery
33//! if let Some(node_id) = manager.on_discovered("device-uuid", Some("HIVE_DEMO-AABBCCDD"), -70, Some("DEMO")) {
34//!     println!("Discovered peer: {:08X}", node_id.as_u32());
35//! }
36//! ```
37
38#[cfg(not(feature = "std"))]
39use alloc::{
40    collections::BTreeMap,
41    string::{String, ToString},
42    vec::Vec,
43};
44#[cfg(feature = "std")]
45use std::collections::BTreeMap;
46#[cfg(feature = "std")]
47use std::sync::RwLock;
48
49#[cfg(not(feature = "std"))]
50use spin::RwLock;
51
52use crate::observer::{DisconnectReason, HiveEvent};
53use crate::peer::{HivePeer, PeerManagerConfig};
54use crate::NodeId;
55
56/// Centralized peer manager for HIVE mesh
57///
58/// Tracks discovered peers, their connection state, and sync history.
59/// Thread-safe and designed for use from platform BLE callbacks.
60pub struct PeerManager {
61    /// Configuration
62    config: PeerManagerConfig,
63
64    /// Our node ID
65    node_id: NodeId,
66
67    /// Peers indexed by NodeId
68    #[cfg(feature = "std")]
69    peers: RwLock<BTreeMap<NodeId, HivePeer>>,
70    #[cfg(not(feature = "std"))]
71    peers: RwLock<BTreeMap<NodeId, HivePeer>>,
72
73    /// Map from platform identifier to NodeId for quick lookup
74    #[cfg(feature = "std")]
75    identifier_map: RwLock<BTreeMap<String, NodeId>>,
76    #[cfg(not(feature = "std"))]
77    identifier_map: RwLock<BTreeMap<String, NodeId>>,
78
79    /// Last sync timestamp per peer (for cooldown)
80    #[cfg(feature = "std")]
81    sync_history: RwLock<BTreeMap<NodeId, u64>>,
82    #[cfg(not(feature = "std"))]
83    sync_history: RwLock<BTreeMap<NodeId, u64>>,
84}
85
86impl PeerManager {
87    /// Create a new peer manager
88    pub fn new(node_id: NodeId, config: PeerManagerConfig) -> Self {
89        Self {
90            config,
91            node_id,
92            peers: RwLock::new(BTreeMap::new()),
93            identifier_map: RwLock::new(BTreeMap::new()),
94            sync_history: RwLock::new(BTreeMap::new()),
95        }
96    }
97
98    /// Get our node ID
99    pub fn node_id(&self) -> NodeId {
100        self.node_id
101    }
102
103    /// Get the mesh ID
104    pub fn mesh_id(&self) -> &str {
105        &self.config.mesh_id
106    }
107
108    /// Check if a device mesh ID matches our mesh
109    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
110        self.config.matches_mesh(device_mesh_id)
111    }
112
113    /// Handle a discovered BLE device
114    ///
115    /// Called by the platform BLE adapter when a device is discovered during scanning.
116    /// Parses the device name to extract NodeId and mesh ID.
117    ///
118    /// Returns `Some((node_id, is_new))` if this is a HIVE device on our mesh,
119    /// where `is_new` indicates if this is a newly discovered peer.
120    /// Returns `None` if the device should be ignored.
121    pub fn on_discovered(
122        &self,
123        identifier: &str,
124        name: Option<&str>,
125        rssi: i8,
126        mesh_id: Option<&str>,
127        now_ms: u64,
128    ) -> Option<(NodeId, bool)> {
129        // Check mesh ID match
130        if !self.matches_mesh(mesh_id) {
131            return None;
132        }
133
134        // Parse node ID from name (format: "HIVE_MESH-XXXXXXXX")
135        let node_id = parse_node_id_from_name(name)?;
136
137        // Don't track ourselves
138        if node_id == self.node_id {
139            return None;
140        }
141
142        let mut peers = self.peers.write().unwrap();
143        let mut id_map = self.identifier_map.write().unwrap();
144
145        // Check if we already have this peer by identifier (different device, same node)
146        if let Some(&existing_node_id) = id_map.get(identifier) {
147            if existing_node_id != node_id {
148                // Identifier changed node IDs - remove old mapping
149                peers.remove(&existing_node_id);
150            }
151        }
152
153        // Check max peers limit
154        if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
155            return None; // At capacity
156        }
157
158        let is_new = !peers.contains_key(&node_id);
159
160        // Update or insert peer
161        let peer = peers.entry(node_id).or_insert_with(|| {
162            HivePeer::new(
163                node_id,
164                identifier.to_string(),
165                mesh_id.map(|s| s.to_string()),
166                name.map(|s| s.to_string()),
167                rssi,
168            )
169        });
170
171        // Update existing peer
172        peer.rssi = rssi;
173        peer.touch(now_ms);
174        if let Some(n) = name {
175            peer.name = Some(n.to_string());
176        }
177
178        // Update identifier map
179        id_map.insert(identifier.to_string(), node_id);
180
181        Some((node_id, is_new))
182    }
183
184    /// Handle a peer connection
185    ///
186    /// Called by the platform BLE adapter when a connection is established.
187    /// Returns the NodeId if found, or None if this identifier is unknown.
188    pub fn on_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
189        let id_map = self.identifier_map.read().unwrap();
190        let node_id = id_map.get(identifier).copied()?;
191        drop(id_map);
192
193        let mut peers = self.peers.write().unwrap();
194        if let Some(peer) = peers.get_mut(&node_id) {
195            peer.is_connected = true;
196            peer.touch(now_ms);
197        }
198
199        Some(node_id)
200    }
201
202    /// Handle a peer disconnection
203    ///
204    /// Called by the platform BLE adapter when a connection is lost.
205    /// Returns the NodeId and disconnect reason if found.
206    pub fn on_disconnected(
207        &self,
208        identifier: &str,
209        reason: DisconnectReason,
210    ) -> Option<(NodeId, DisconnectReason)> {
211        let id_map = self.identifier_map.read().unwrap();
212        let node_id = id_map.get(identifier).copied()?;
213        drop(id_map);
214
215        let mut peers = self.peers.write().unwrap();
216        if let Some(peer) = peers.get_mut(&node_id) {
217            peer.is_connected = false;
218        }
219
220        Some((node_id, reason))
221    }
222
223    /// Handle a peer disconnection by NodeId
224    ///
225    /// Alternative to on_disconnected() when only NodeId is known (e.g., ESP32).
226    /// Returns true if the peer was found and marked disconnected.
227    pub fn on_disconnected_by_node_id(&self, node_id: NodeId, _reason: DisconnectReason) -> bool {
228        let mut peers = self.peers.write().unwrap();
229        if let Some(peer) = peers.get_mut(&node_id) {
230            peer.is_connected = false;
231            true
232        } else {
233            false
234        }
235    }
236
237    /// Register a peer from an incoming BLE connection
238    ///
239    /// Called when a remote device connects to us as a peripheral.
240    /// Creates a peer entry if one doesn't exist for this identifier.
241    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
242        // Don't track ourselves
243        if node_id == self.node_id {
244            return false;
245        }
246
247        let mut peers = self.peers.write().unwrap();
248        let mut id_map = self.identifier_map.write().unwrap();
249
250        // Check max peers limit
251        if peers.len() >= self.config.max_peers && !peers.contains_key(&node_id) {
252            return false;
253        }
254
255        let is_new = !peers.contains_key(&node_id);
256
257        let peer = peers.entry(node_id).or_insert_with(|| {
258            HivePeer::new(
259                node_id,
260                identifier.to_string(),
261                Some(self.config.mesh_id.clone()),
262                None,
263                -70, // Default RSSI for incoming connections
264            )
265        });
266
267        peer.is_connected = true;
268        peer.touch(now_ms);
269
270        // Update identifier if changed
271        if peer.identifier != identifier {
272            id_map.remove(&peer.identifier);
273            peer.identifier = identifier.to_string();
274        }
275        id_map.insert(identifier.to_string(), node_id);
276
277        is_new
278    }
279
280    /// Check if we should sync with a peer
281    ///
282    /// Returns true if enough time has passed since the last sync (cooldown).
283    pub fn should_sync_with(&self, node_id: NodeId, now_ms: u64) -> bool {
284        let history = self.sync_history.read().unwrap();
285        match history.get(&node_id) {
286            Some(&last_sync) => now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms,
287            None => true, // Never synced
288        }
289    }
290
291    /// Record that we synced with a peer
292    pub fn record_sync(&self, node_id: NodeId, now_ms: u64) {
293        let mut history = self.sync_history.write().unwrap();
294        history.insert(node_id, now_ms);
295    }
296
297    /// Clean up stale peers
298    ///
299    /// Removes peers that haven't been seen within the timeout period.
300    /// Returns list of removed NodeIds for generating PeerLost events.
301    pub fn cleanup_stale(&self, now_ms: u64) -> Vec<NodeId> {
302        let mut peers = self.peers.write().unwrap();
303        let mut id_map = self.identifier_map.write().unwrap();
304        let mut history = self.sync_history.write().unwrap();
305
306        let mut removed = Vec::new();
307
308        // Find stale peers
309        let stale: Vec<NodeId> = peers
310            .iter()
311            .filter(|(_, peer)| peer.is_stale(now_ms, self.config.peer_timeout_ms))
312            .map(|(&node_id, _)| node_id)
313            .collect();
314
315        // Remove them
316        for node_id in stale {
317            if let Some(peer) = peers.remove(&node_id) {
318                id_map.remove(&peer.identifier);
319                history.remove(&node_id);
320                removed.push(node_id);
321            }
322        }
323
324        removed
325    }
326
327    /// Get all known peers
328    pub fn get_peers(&self) -> Vec<HivePeer> {
329        let peers = self.peers.read().unwrap();
330        peers.values().cloned().collect()
331    }
332
333    /// Get connected peers only
334    pub fn get_connected_peers(&self) -> Vec<HivePeer> {
335        let peers = self.peers.read().unwrap();
336        peers.values().filter(|p| p.is_connected).cloned().collect()
337    }
338
339    /// Get a specific peer by NodeId
340    pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
341        let peers = self.peers.read().unwrap();
342        peers.get(&node_id).cloned()
343    }
344
345    /// Get a peer by platform identifier
346    pub fn get_peer_by_identifier(&self, identifier: &str) -> Option<HivePeer> {
347        let id_map = self.identifier_map.read().unwrap();
348        let node_id = id_map.get(identifier).copied()?;
349        drop(id_map);
350
351        let peers = self.peers.read().unwrap();
352        peers.get(&node_id).cloned()
353    }
354
355    /// Get NodeId for a platform identifier
356    pub fn get_node_id(&self, identifier: &str) -> Option<NodeId> {
357        let id_map = self.identifier_map.read().unwrap();
358        id_map.get(identifier).copied()
359    }
360
361    /// Get peer count
362    pub fn peer_count(&self) -> usize {
363        self.peers.read().unwrap().len()
364    }
365
366    /// Get connected peer count
367    pub fn connected_count(&self) -> usize {
368        self.peers
369            .read()
370            .unwrap()
371            .values()
372            .filter(|p| p.is_connected)
373            .count()
374    }
375
376    /// Get peers that need sync (connected and past cooldown)
377    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
378        let peers = self.peers.read().unwrap();
379        let history = self.sync_history.read().unwrap();
380
381        peers
382            .values()
383            .filter(|peer| {
384                if !peer.is_connected {
385                    return false;
386                }
387                match history.get(&peer.node_id) {
388                    Some(&last_sync) => {
389                        now_ms.saturating_sub(last_sync) >= self.config.sync_cooldown_ms
390                    }
391                    None => true,
392                }
393            })
394            .cloned()
395            .collect()
396    }
397
398    /// Generate events for current mesh state
399    ///
400    /// Useful for notifying observers of the current state after initialization.
401    pub fn generate_state_event(&self) -> HiveEvent {
402        HiveEvent::MeshStateChanged {
403            peer_count: self.peer_count(),
404            connected_count: self.connected_count(),
405        }
406    }
407}
408
409/// Parse a NodeId from a HIVE device name
410///
411/// Expected format: "HIVE_MESH-XXXXXXXX" where XXXXXXXX is the hex node ID
412fn parse_node_id_from_name(name: Option<&str>) -> Option<NodeId> {
413    let name = name?;
414
415    // Find the last hyphen and parse hex after it
416    let hyphen_pos = name.rfind('-')?;
417    let hex_part = &name[hyphen_pos + 1..];
418
419    // Parse as hex (should be 8 characters)
420    if hex_part.len() != 8 {
421        return None;
422    }
423
424    u32::from_str_radix(hex_part, 16).ok().map(NodeId::new)
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn test_parse_node_id_from_name() {
433        assert_eq!(
434            parse_node_id_from_name(Some("HIVE_DEMO-12345678")),
435            Some(NodeId::new(0x12345678))
436        );
437        assert_eq!(
438            parse_node_id_from_name(Some("HIVE_ALPHA-AABBCCDD")),
439            Some(NodeId::new(0xAABBCCDD))
440        );
441        assert_eq!(parse_node_id_from_name(Some("Invalid")), None);
442        assert_eq!(parse_node_id_from_name(Some("HIVE_DEMO-123")), None); // Too short
443        assert_eq!(parse_node_id_from_name(None), None);
444    }
445
446    #[test]
447    fn test_peer_discovery() {
448        let config = PeerManagerConfig::with_mesh_id("DEMO");
449        let manager = PeerManager::new(NodeId::new(0x11111111), config);
450
451        // Discover a peer
452        let result = manager.on_discovered(
453            "device-uuid-1",
454            Some("HIVE_DEMO-22222222"),
455            -65,
456            Some("DEMO"),
457            1000,
458        );
459        assert!(result.is_some());
460        let (node_id, is_new) = result.unwrap();
461        assert_eq!(node_id.as_u32(), 0x22222222);
462        assert!(is_new);
463
464        // Same peer again - not new
465        let result = manager.on_discovered(
466            "device-uuid-1",
467            Some("HIVE_DEMO-22222222"),
468            -60,
469            Some("DEMO"),
470            2000,
471        );
472        assert!(result.is_some());
473        let (_, is_new) = result.unwrap();
474        assert!(!is_new);
475
476        // Check peer is tracked
477        assert_eq!(manager.peer_count(), 1);
478        let peer = manager.get_peer(NodeId::new(0x22222222)).unwrap();
479        assert_eq!(peer.rssi, -60); // Updated
480    }
481
482    #[test]
483    fn test_mesh_filtering() {
484        let config = PeerManagerConfig::with_mesh_id("ALPHA");
485        let manager = PeerManager::new(NodeId::new(0x11111111), config);
486
487        // Wrong mesh - ignored
488        let result = manager.on_discovered(
489            "device-uuid-1",
490            Some("HIVE_BETA-22222222"),
491            -65,
492            Some("BETA"),
493            1000,
494        );
495        assert!(result.is_none());
496        assert_eq!(manager.peer_count(), 0);
497
498        // Correct mesh - accepted
499        let result = manager.on_discovered(
500            "device-uuid-2",
501            Some("HIVE_ALPHA-33333333"),
502            -65,
503            Some("ALPHA"),
504            1000,
505        );
506        assert!(result.is_some());
507        assert_eq!(manager.peer_count(), 1);
508    }
509
510    #[test]
511    fn test_self_filtering() {
512        let config = PeerManagerConfig::with_mesh_id("DEMO");
513        let manager = PeerManager::new(NodeId::new(0x12345678), config);
514
515        // Discovering ourselves - ignored
516        let result = manager.on_discovered(
517            "my-device-uuid",
518            Some("HIVE_DEMO-12345678"),
519            -30,
520            Some("DEMO"),
521            1000,
522        );
523        assert!(result.is_none());
524        assert_eq!(manager.peer_count(), 0);
525    }
526
527    #[test]
528    fn test_connection_lifecycle() {
529        let config = PeerManagerConfig::with_mesh_id("DEMO");
530        let manager = PeerManager::new(NodeId::new(0x11111111), config);
531
532        // Discover
533        manager.on_discovered(
534            "device-uuid-1",
535            Some("HIVE_DEMO-22222222"),
536            -65,
537            Some("DEMO"),
538            1000,
539        );
540        assert_eq!(manager.connected_count(), 0);
541
542        // Connect
543        let node_id = manager.on_connected("device-uuid-1", 2000);
544        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
545        assert_eq!(manager.connected_count(), 1);
546
547        // Disconnect
548        let result = manager.on_disconnected("device-uuid-1", DisconnectReason::RemoteRequest);
549        assert!(result.is_some());
550        assert_eq!(manager.connected_count(), 0);
551        assert_eq!(manager.peer_count(), 1); // Still tracked
552    }
553
554    #[test]
555    fn test_stale_cleanup() {
556        let config = PeerManagerConfig::with_mesh_id("DEMO").peer_timeout(10_000);
557        let manager = PeerManager::new(NodeId::new(0x11111111), config);
558
559        // Discover at t=1000
560        manager.on_discovered(
561            "device-uuid-1",
562            Some("HIVE_DEMO-22222222"),
563            -65,
564            Some("DEMO"),
565            1000,
566        );
567        assert_eq!(manager.peer_count(), 1);
568
569        // Not stale at t=5000
570        let removed = manager.cleanup_stale(5000);
571        assert!(removed.is_empty());
572        assert_eq!(manager.peer_count(), 1);
573
574        // Stale at t=20000 (10s timeout exceeded)
575        let removed = manager.cleanup_stale(20000);
576        assert_eq!(removed.len(), 1);
577        assert_eq!(removed[0].as_u32(), 0x22222222);
578        assert_eq!(manager.peer_count(), 0);
579    }
580
581    #[test]
582    fn test_sync_cooldown() {
583        let config = PeerManagerConfig::with_mesh_id("DEMO");
584        let manager = PeerManager::new(NodeId::new(0x11111111), config);
585        let peer_id = NodeId::new(0x22222222);
586
587        // Never synced - should sync
588        assert!(manager.should_sync_with(peer_id, 1000));
589
590        // Record sync
591        manager.record_sync(peer_id, 1000);
592
593        // Too soon - shouldn't sync (cooldown is 30s)
594        assert!(!manager.should_sync_with(peer_id, 5000));
595
596        // After cooldown - should sync
597        assert!(manager.should_sync_with(peer_id, 35000));
598    }
599
600    #[test]
601    fn test_max_peers_limit() {
602        let config = PeerManagerConfig::with_mesh_id("DEMO").max_peers(2);
603        let manager = PeerManager::new(NodeId::new(0x11111111), config);
604
605        // First two accepted
606        let result = manager.on_discovered(
607            "uuid-1",
608            Some("HIVE_DEMO-22222222"),
609            -65,
610            Some("DEMO"),
611            1000,
612        );
613        assert!(result.is_some());
614
615        let result = manager.on_discovered(
616            "uuid-2",
617            Some("HIVE_DEMO-33333333"),
618            -65,
619            Some("DEMO"),
620            1000,
621        );
622        assert!(result.is_some());
623
624        // Third rejected - at capacity
625        let result = manager.on_discovered(
626            "uuid-3",
627            Some("HIVE_DEMO-44444444"),
628            -65,
629            Some("DEMO"),
630            1000,
631        );
632        assert!(result.is_none());
633        assert_eq!(manager.peer_count(), 2);
634    }
635
636    #[test]
637    fn test_incoming_connection() {
638        let config = PeerManagerConfig::with_mesh_id("DEMO");
639        let manager = PeerManager::new(NodeId::new(0x11111111), config);
640
641        // Incoming connection from unknown peer
642        let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
643        assert!(is_new);
644        assert_eq!(manager.peer_count(), 1);
645        assert_eq!(manager.connected_count(), 1);
646
647        // Same peer reconnects - not new
648        let is_new = manager.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 2000);
649        assert!(!is_new);
650    }
651}