hive_btle/
observer.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Observer pattern for HIVE mesh events
17//!
18//! This module provides the event types and observer trait for receiving
19//! notifications about mesh state changes. Platform implementations register
20//! observers to receive callbacks when peers are discovered, connected,
21//! disconnected, or when documents are synced.
22//!
23//! ## Usage
24//!
25//! ```ignore
26//! use hive_btle::observer::{HiveEvent, HiveObserver};
27//!
28//! struct MyObserver;
29//!
30//! impl HiveObserver for MyObserver {
31//!     fn on_event(&self, event: HiveEvent) {
32//!         match event {
33//!             HiveEvent::PeerDiscovered { peer } => {
34//!                 println!("Discovered: {}", peer.display_name());
35//!             }
36//!             HiveEvent::EmergencyReceived { from_node } => {
37//!                 println!("EMERGENCY from {:08X}", from_node.as_u32());
38//!             }
39//!             _ => {}
40//!         }
41//!     }
42//! }
43//! ```
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
47#[cfg(feature = "std")]
48use std::sync::Arc;
49
50// Re-import Vec for HiveEvent variants
51#[cfg(feature = "std")]
52use std::string::String;
53#[cfg(feature = "std")]
54use std::vec::Vec;
55
56use crate::peer::HivePeer;
57use crate::sync::crdt::EventType;
58use crate::NodeId;
59
60/// Events emitted by the HIVE mesh
61///
62/// These events notify observers about changes in mesh state, peer lifecycle,
63/// and document synchronization.
64#[derive(Debug, Clone)]
65pub enum HiveEvent {
66    // ==================== Peer Lifecycle Events ====================
67    /// A new peer was discovered via BLE scanning
68    PeerDiscovered {
69        /// The discovered peer
70        peer: HivePeer,
71    },
72
73    /// A peer connected to us (either direction)
74    PeerConnected {
75        /// Node ID of the connected peer
76        node_id: NodeId,
77    },
78
79    /// A peer disconnected
80    PeerDisconnected {
81        /// Node ID of the disconnected peer
82        node_id: NodeId,
83        /// Reason for disconnection
84        reason: DisconnectReason,
85    },
86
87    /// A peer was removed due to timeout (stale)
88    PeerLost {
89        /// Node ID of the lost peer
90        node_id: NodeId,
91    },
92
93    // ==================== Mesh Events ====================
94    /// An emergency event was received from a peer
95    EmergencyReceived {
96        /// Node ID that sent the emergency
97        from_node: NodeId,
98    },
99
100    /// An ACK event was received from a peer
101    AckReceived {
102        /// Node ID that sent the ACK
103        from_node: NodeId,
104    },
105
106    /// A generic event was received from a peer
107    EventReceived {
108        /// Node ID that sent the event
109        from_node: NodeId,
110        /// Type of event
111        event_type: EventType,
112    },
113
114    /// A document was synced with a peer
115    DocumentSynced {
116        /// Node ID that we synced with
117        from_node: NodeId,
118        /// Updated total counter value
119        total_count: u64,
120    },
121
122    // ==================== Mesh State Events ====================
123    /// Mesh state changed (peer count, connected count)
124    MeshStateChanged {
125        /// Total number of known peers
126        peer_count: usize,
127        /// Number of connected peers
128        connected_count: usize,
129    },
130
131    /// All peers have acknowledged an emergency
132    AllPeersAcked {
133        /// Number of peers that acknowledged
134        ack_count: usize,
135    },
136
137    // ==================== Per-Peer E2EE Events ====================
138    /// E2EE session established with a peer
139    PeerE2eeEstablished {
140        /// Node ID of the peer we established E2EE with
141        peer_node_id: NodeId,
142    },
143
144    /// E2EE session closed with a peer
145    PeerE2eeClosed {
146        /// Node ID of the peer whose E2EE session closed
147        peer_node_id: NodeId,
148    },
149
150    /// Received an E2EE encrypted message from a peer
151    PeerE2eeMessageReceived {
152        /// Node ID of the sender
153        from_node: NodeId,
154        /// Decrypted message data
155        data: Vec<u8>,
156    },
157
158    /// E2EE session failed to establish
159    PeerE2eeFailed {
160        /// Node ID of the peer
161        peer_node_id: NodeId,
162        /// Error description
163        error: String,
164    },
165
166    // ==================== Security Events ====================
167    /// A security violation was detected
168    SecurityViolation {
169        /// Type of violation
170        kind: SecurityViolationKind,
171        /// Optional source identifier (node_id, BLE identifier, etc.)
172        source: Option<String>,
173    },
174
175    // ==================== Relay Events ====================
176    /// A message was relayed to other peers
177    MessageRelayed {
178        /// Original sender of the message
179        origin_node: NodeId,
180        /// Number of peers the message was relayed to
181        relay_count: usize,
182        /// Current hop count
183        hop_count: u8,
184    },
185
186    /// A duplicate message was detected and dropped
187    DuplicateMessageDropped {
188        /// Original sender of the message
189        origin_node: NodeId,
190        /// How many times we've seen this message
191        seen_count: u32,
192    },
193
194    /// A message was dropped due to TTL expiration
195    MessageTtlExpired {
196        /// Original sender of the message
197        origin_node: NodeId,
198        /// Hop count when dropped
199        hop_count: u8,
200    },
201}
202
203impl HiveEvent {
204    /// Create a peer discovered event
205    pub fn peer_discovered(peer: HivePeer) -> Self {
206        Self::PeerDiscovered { peer }
207    }
208
209    /// Create a peer connected event
210    pub fn peer_connected(node_id: NodeId) -> Self {
211        Self::PeerConnected { node_id }
212    }
213
214    /// Create a peer disconnected event
215    pub fn peer_disconnected(node_id: NodeId, reason: DisconnectReason) -> Self {
216        Self::PeerDisconnected { node_id, reason }
217    }
218
219    /// Create a peer lost event (timeout)
220    pub fn peer_lost(node_id: NodeId) -> Self {
221        Self::PeerLost { node_id }
222    }
223
224    /// Create an emergency received event
225    pub fn emergency_received(from_node: NodeId) -> Self {
226        Self::EmergencyReceived { from_node }
227    }
228
229    /// Create an ACK received event
230    pub fn ack_received(from_node: NodeId) -> Self {
231        Self::AckReceived { from_node }
232    }
233
234    /// Create a generic event received
235    pub fn event_received(from_node: NodeId, event_type: EventType) -> Self {
236        Self::EventReceived {
237            from_node,
238            event_type,
239        }
240    }
241
242    /// Create a document synced event
243    pub fn document_synced(from_node: NodeId, total_count: u64) -> Self {
244        Self::DocumentSynced {
245            from_node,
246            total_count,
247        }
248    }
249
250    /// Create a peer E2EE established event
251    pub fn peer_e2ee_established(peer_node_id: NodeId) -> Self {
252        Self::PeerE2eeEstablished { peer_node_id }
253    }
254
255    /// Create a peer E2EE closed event
256    pub fn peer_e2ee_closed(peer_node_id: NodeId) -> Self {
257        Self::PeerE2eeClosed { peer_node_id }
258    }
259
260    /// Create a peer E2EE message received event
261    pub fn peer_e2ee_message_received(from_node: NodeId, data: Vec<u8>) -> Self {
262        Self::PeerE2eeMessageReceived { from_node, data }
263    }
264
265    /// Create a peer E2EE failed event
266    pub fn peer_e2ee_failed(peer_node_id: NodeId, error: String) -> Self {
267        Self::PeerE2eeFailed {
268            peer_node_id,
269            error,
270        }
271    }
272
273    /// Create a security violation event
274    pub fn security_violation(kind: SecurityViolationKind, source: Option<String>) -> Self {
275        Self::SecurityViolation { kind, source }
276    }
277}
278
279/// Reason for peer disconnection
280#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
281pub enum DisconnectReason {
282    /// Local initiated disconnect
283    LocalRequest,
284    /// Remote peer initiated disconnect
285    RemoteRequest,
286    /// Connection timed out
287    Timeout,
288    /// BLE link lost
289    LinkLoss,
290    /// Connection failed
291    ConnectionFailed,
292    /// Unknown reason
293    #[default]
294    Unknown,
295}
296
297/// Types of security violations that can be detected
298#[derive(Debug, Clone, Copy, PartialEq, Eq)]
299pub enum SecurityViolationKind {
300    /// Received unencrypted document when strict encryption mode is enabled
301    UnencryptedInStrictMode,
302    /// Decryption failed (wrong key or corrupted data)
303    DecryptionFailed,
304    /// Replay attack detected (duplicate message counter)
305    ReplayDetected,
306    /// Message from unknown/unauthorized node
307    UnauthorizedNode,
308}
309
310/// Observer trait for receiving HIVE mesh events
311///
312/// Implement this trait to receive callbacks when mesh events occur.
313/// Observers must be thread-safe (Send + Sync) as they may be called
314/// from any thread.
315///
316/// ## Platform Notes
317///
318/// - **iOS/macOS**: Wrap in a Swift class that conforms to this protocol via UniFFI
319/// - **Android**: Implement via JNI callback interface
320/// - **ESP32**: Use direct Rust implementation with static callbacks
321pub trait HiveObserver: Send + Sync {
322    /// Called when a mesh event occurs
323    ///
324    /// This method should return quickly to avoid blocking the mesh.
325    /// If heavy processing is needed, dispatch to another thread.
326    fn on_event(&self, event: HiveEvent);
327}
328
329/// A simple observer that collects events into a vector (useful for testing)
330#[cfg(feature = "std")]
331#[derive(Debug, Default)]
332pub struct CollectingObserver {
333    events: std::sync::Mutex<Vec<HiveEvent>>,
334}
335
336#[cfg(feature = "std")]
337impl CollectingObserver {
338    /// Create a new collecting observer
339    pub fn new() -> Self {
340        Self {
341            events: std::sync::Mutex::new(Vec::new()),
342        }
343    }
344
345    /// Get all collected events
346    pub fn events(&self) -> Vec<HiveEvent> {
347        self.events.lock().unwrap().clone()
348    }
349
350    /// Clear collected events
351    pub fn clear(&self) {
352        self.events.lock().unwrap().clear();
353    }
354
355    /// Get count of collected events
356    pub fn count(&self) -> usize {
357        self.events.lock().unwrap().len()
358    }
359}
360
361#[cfg(feature = "std")]
362impl HiveObserver for CollectingObserver {
363    fn on_event(&self, event: HiveEvent) {
364        self.events.lock().unwrap().push(event);
365    }
366}
367
368/// Helper to manage multiple observers
369#[cfg(feature = "std")]
370pub struct ObserverManager {
371    observers: std::sync::RwLock<Vec<Arc<dyn HiveObserver>>>,
372}
373
374#[cfg(feature = "std")]
375impl Default for ObserverManager {
376    fn default() -> Self {
377        Self::new()
378    }
379}
380
381#[cfg(feature = "std")]
382impl ObserverManager {
383    /// Create a new observer manager
384    pub fn new() -> Self {
385        Self {
386            observers: std::sync::RwLock::new(Vec::new()),
387        }
388    }
389
390    /// Add an observer
391    pub fn add(&self, observer: Arc<dyn HiveObserver>) {
392        self.observers.write().unwrap().push(observer);
393    }
394
395    /// Remove an observer (by Arc pointer equality)
396    pub fn remove(&self, observer: &Arc<dyn HiveObserver>) {
397        self.observers
398            .write()
399            .unwrap()
400            .retain(|o| !Arc::ptr_eq(o, observer));
401    }
402
403    /// Notify all observers of an event
404    pub fn notify(&self, event: HiveEvent) {
405        // Use try_read to avoid panicking on poisoned locks
406        if let Ok(observers) = self.observers.try_read() {
407            for observer in observers.iter() {
408                observer.on_event(event.clone());
409            }
410        }
411    }
412
413    /// Get the number of registered observers
414    pub fn count(&self) -> usize {
415        self.observers.read().unwrap().len()
416    }
417}
418
419#[cfg(all(test, feature = "std"))]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_collecting_observer() {
425        let observer = CollectingObserver::new();
426
427        observer.on_event(HiveEvent::peer_connected(NodeId::new(0x12345678)));
428        observer.on_event(HiveEvent::emergency_received(NodeId::new(0x87654321)));
429
430        assert_eq!(observer.count(), 2);
431
432        let events = observer.events();
433        assert!(matches!(events[0], HiveEvent::PeerConnected { .. }));
434        assert!(matches!(events[1], HiveEvent::EmergencyReceived { .. }));
435
436        observer.clear();
437        assert_eq!(observer.count(), 0);
438    }
439
440    #[test]
441    fn test_observer_manager() {
442        let manager = ObserverManager::new();
443
444        // Keep concrete references for count checks
445        let obs1_concrete = Arc::new(CollectingObserver::new());
446        let obs2_concrete = Arc::new(CollectingObserver::new());
447        let observer1: Arc<dyn HiveObserver> = obs1_concrete.clone();
448        let observer2: Arc<dyn HiveObserver> = obs2_concrete.clone();
449
450        manager.add(observer1.clone());
451        manager.add(observer2.clone());
452
453        assert_eq!(manager.count(), 2);
454
455        manager.notify(HiveEvent::peer_connected(NodeId::new(0x12345678)));
456
457        assert_eq!(obs1_concrete.count(), 1);
458        assert_eq!(obs2_concrete.count(), 1);
459
460        manager.remove(&observer1);
461        assert_eq!(manager.count(), 1);
462
463        manager.notify(HiveEvent::peer_lost(NodeId::new(0x12345678)));
464
465        assert_eq!(obs1_concrete.count(), 1); // Not notified
466        assert_eq!(obs2_concrete.count(), 2); // Got both events
467    }
468}