Skip to main content

hive_btle/
observer.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Observer pattern for HIVE mesh events
17//!
18//! This module provides the event types and observer trait for receiving
19//! notifications about mesh state changes. Platform implementations register
20//! observers to receive callbacks when peers are discovered, connected,
21//! disconnected, or when documents are synced.
22//!
23//! ## Usage
24//!
25//! ```ignore
26//! use hive_btle::observer::{HiveEvent, HiveObserver};
27//!
28//! struct MyObserver;
29//!
30//! impl HiveObserver for MyObserver {
31//!     fn on_event(&self, event: HiveEvent) {
32//!         match event {
33//!             HiveEvent::PeerDiscovered { peer } => {
34//!                 println!("Discovered: {}", peer.display_name());
35//!             }
36//!             HiveEvent::EmergencyReceived { from_node } => {
37//!                 println!("EMERGENCY from {:08X}", from_node.as_u32());
38//!             }
39//!             _ => {}
40//!         }
41//!     }
42//! }
43//! ```
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
47#[cfg(feature = "std")]
48use std::sync::Arc;
49
50// Re-import Vec for HiveEvent variants
51#[cfg(feature = "std")]
52use std::string::String;
53#[cfg(feature = "std")]
54use std::vec::Vec;
55
56use crate::peer::HivePeer;
57use crate::sync::crdt::EventType;
58use crate::NodeId;
59
60/// Events emitted by the HIVE mesh
61///
62/// These events notify observers about changes in mesh state, peer lifecycle,
63/// and document synchronization.
64#[derive(Debug, Clone)]
65pub enum HiveEvent {
66    // ==================== Peer Lifecycle Events ====================
67    /// A new peer was discovered via BLE scanning
68    PeerDiscovered {
69        /// The discovered peer
70        peer: HivePeer,
71    },
72
73    /// A peer connected to us (either direction)
74    PeerConnected {
75        /// Node ID of the connected peer
76        node_id: NodeId,
77    },
78
79    /// A peer disconnected
80    PeerDisconnected {
81        /// Node ID of the disconnected peer
82        node_id: NodeId,
83        /// Reason for disconnection
84        reason: DisconnectReason,
85    },
86
87    /// A peer was removed due to timeout (stale)
88    PeerLost {
89        /// Node ID of the lost peer
90        node_id: NodeId,
91    },
92
93    // ==================== Mesh Events ====================
94    /// An emergency event was received from a peer
95    EmergencyReceived {
96        /// Node ID that sent the emergency
97        from_node: NodeId,
98    },
99
100    /// An ACK event was received from a peer
101    AckReceived {
102        /// Node ID that sent the ACK
103        from_node: NodeId,
104    },
105
106    /// A generic event was received from a peer
107    EventReceived {
108        /// Node ID that sent the event
109        from_node: NodeId,
110        /// Type of event
111        event_type: EventType,
112    },
113
114    /// A document was synced with a peer
115    DocumentSynced {
116        /// Node ID that we synced with
117        from_node: NodeId,
118        /// Updated total counter value
119        total_count: u64,
120    },
121
122    /// An app-layer document was received and stored/merged
123    ///
124    /// Emitted when a registered app document type (0xC0-0xCF) is received
125    /// and successfully processed through the document registry.
126    AppDocumentReceived {
127        /// Document type ID (0xC0-0xCF)
128        type_id: u8,
129        /// Source node that created the document
130        source_node: NodeId,
131        /// Document creation timestamp
132        timestamp: u64,
133        /// True if the document was new or changed after merge
134        changed: bool,
135    },
136
137    // ==================== Mesh State Events ====================
138    /// Mesh state changed (peer count, connected count)
139    MeshStateChanged {
140        /// Total number of known peers
141        peer_count: usize,
142        /// Number of connected peers
143        connected_count: usize,
144    },
145
146    /// All peers have acknowledged an emergency
147    AllPeersAcked {
148        /// Number of peers that acknowledged
149        ack_count: usize,
150    },
151
152    // ==================== Per-Peer E2EE Events ====================
153    /// E2EE session established with a peer
154    PeerE2eeEstablished {
155        /// Node ID of the peer we established E2EE with
156        peer_node_id: NodeId,
157    },
158
159    /// E2EE session closed with a peer
160    PeerE2eeClosed {
161        /// Node ID of the peer whose E2EE session closed
162        peer_node_id: NodeId,
163    },
164
165    /// Received an E2EE encrypted message from a peer
166    PeerE2eeMessageReceived {
167        /// Node ID of the sender
168        from_node: NodeId,
169        /// Decrypted message data
170        data: Vec<u8>,
171    },
172
173    /// E2EE session failed to establish
174    PeerE2eeFailed {
175        /// Node ID of the peer
176        peer_node_id: NodeId,
177        /// Error description
178        error: String,
179    },
180
181    // ==================== Security Events ====================
182    /// A security violation was detected
183    SecurityViolation {
184        /// Type of violation
185        kind: SecurityViolationKind,
186        /// Optional source identifier (node_id, BLE identifier, etc.)
187        source: Option<String>,
188    },
189
190    // ==================== Relay Events ====================
191    /// A message was relayed to other peers
192    MessageRelayed {
193        /// Original sender of the message
194        origin_node: NodeId,
195        /// Number of peers the message was relayed to
196        relay_count: usize,
197        /// Current hop count
198        hop_count: u8,
199    },
200
201    /// A duplicate message was detected and dropped
202    DuplicateMessageDropped {
203        /// Original sender of the message
204        origin_node: NodeId,
205        /// How many times we've seen this message
206        seen_count: u32,
207    },
208
209    /// A message was dropped due to TTL expiration
210    MessageTtlExpired {
211        /// Original sender of the message
212        origin_node: NodeId,
213        /// Hop count when dropped
214        hop_count: u8,
215    },
216}
217
218impl HiveEvent {
219    /// Create a peer discovered event
220    pub fn peer_discovered(peer: HivePeer) -> Self {
221        Self::PeerDiscovered { peer }
222    }
223
224    /// Create a peer connected event
225    pub fn peer_connected(node_id: NodeId) -> Self {
226        Self::PeerConnected { node_id }
227    }
228
229    /// Create a peer disconnected event
230    pub fn peer_disconnected(node_id: NodeId, reason: DisconnectReason) -> Self {
231        Self::PeerDisconnected { node_id, reason }
232    }
233
234    /// Create a peer lost event (timeout)
235    pub fn peer_lost(node_id: NodeId) -> Self {
236        Self::PeerLost { node_id }
237    }
238
239    /// Create an emergency received event
240    pub fn emergency_received(from_node: NodeId) -> Self {
241        Self::EmergencyReceived { from_node }
242    }
243
244    /// Create an ACK received event
245    pub fn ack_received(from_node: NodeId) -> Self {
246        Self::AckReceived { from_node }
247    }
248
249    /// Create a generic event received
250    pub fn event_received(from_node: NodeId, event_type: EventType) -> Self {
251        Self::EventReceived {
252            from_node,
253            event_type,
254        }
255    }
256
257    /// Create a document synced event
258    pub fn document_synced(from_node: NodeId, total_count: u64) -> Self {
259        Self::DocumentSynced {
260            from_node,
261            total_count,
262        }
263    }
264
265    /// Create an app document received event
266    pub fn app_document_received(
267        type_id: u8,
268        source_node: NodeId,
269        timestamp: u64,
270        changed: bool,
271    ) -> Self {
272        Self::AppDocumentReceived {
273            type_id,
274            source_node,
275            timestamp,
276            changed,
277        }
278    }
279
280    /// Create a peer E2EE established event
281    pub fn peer_e2ee_established(peer_node_id: NodeId) -> Self {
282        Self::PeerE2eeEstablished { peer_node_id }
283    }
284
285    /// Create a peer E2EE closed event
286    pub fn peer_e2ee_closed(peer_node_id: NodeId) -> Self {
287        Self::PeerE2eeClosed { peer_node_id }
288    }
289
290    /// Create a peer E2EE message received event
291    pub fn peer_e2ee_message_received(from_node: NodeId, data: Vec<u8>) -> Self {
292        Self::PeerE2eeMessageReceived { from_node, data }
293    }
294
295    /// Create a peer E2EE failed event
296    pub fn peer_e2ee_failed(peer_node_id: NodeId, error: String) -> Self {
297        Self::PeerE2eeFailed {
298            peer_node_id,
299            error,
300        }
301    }
302
303    /// Create a security violation event
304    pub fn security_violation(kind: SecurityViolationKind, source: Option<String>) -> Self {
305        Self::SecurityViolation { kind, source }
306    }
307}
308
309/// Reason for peer disconnection
310#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
311pub enum DisconnectReason {
312    /// Local initiated disconnect
313    LocalRequest,
314    /// Remote peer initiated disconnect
315    RemoteRequest,
316    /// Connection timed out
317    Timeout,
318    /// BLE link lost
319    LinkLoss,
320    /// Connection failed
321    ConnectionFailed,
322    /// Unknown reason
323    #[default]
324    Unknown,
325}
326
327/// Types of security violations that can be detected
328#[derive(Debug, Clone, Copy, PartialEq, Eq)]
329pub enum SecurityViolationKind {
330    /// Received unencrypted document when strict encryption mode is enabled
331    UnencryptedInStrictMode,
332    /// Decryption failed (wrong key or corrupted data)
333    DecryptionFailed,
334    /// Replay attack detected (duplicate message counter)
335    ReplayDetected,
336    /// Message from unknown/unauthorized node
337    UnauthorizedNode,
338}
339
340/// Observer trait for receiving HIVE mesh events
341///
342/// Implement this trait to receive callbacks when mesh events occur.
343/// Observers must be thread-safe (Send + Sync) as they may be called
344/// from any thread.
345///
346/// ## Platform Notes
347///
348/// - **iOS/macOS**: Wrap in a Swift class that conforms to this protocol via UniFFI
349/// - **Android**: Implement via JNI callback interface
350/// - **ESP32**: Use direct Rust implementation with static callbacks
351pub trait HiveObserver: Send + Sync {
352    /// Called when a mesh event occurs
353    ///
354    /// This method should return quickly to avoid blocking the mesh.
355    /// If heavy processing is needed, dispatch to another thread.
356    fn on_event(&self, event: HiveEvent);
357}
358
359/// A simple observer that collects events into a vector (useful for testing)
360#[cfg(feature = "std")]
361#[derive(Debug, Default)]
362pub struct CollectingObserver {
363    events: std::sync::Mutex<Vec<HiveEvent>>,
364}
365
366#[cfg(feature = "std")]
367impl CollectingObserver {
368    /// Create a new collecting observer
369    pub fn new() -> Self {
370        Self {
371            events: std::sync::Mutex::new(Vec::new()),
372        }
373    }
374
375    /// Get all collected events
376    pub fn events(&self) -> Vec<HiveEvent> {
377        self.events.lock().unwrap().clone()
378    }
379
380    /// Clear collected events
381    pub fn clear(&self) {
382        self.events.lock().unwrap().clear();
383    }
384
385    /// Get count of collected events
386    pub fn count(&self) -> usize {
387        self.events.lock().unwrap().len()
388    }
389}
390
391#[cfg(feature = "std")]
392impl HiveObserver for CollectingObserver {
393    fn on_event(&self, event: HiveEvent) {
394        self.events.lock().unwrap().push(event);
395    }
396}
397
398/// Helper to manage multiple observers
399#[cfg(feature = "std")]
400pub struct ObserverManager {
401    observers: std::sync::RwLock<Vec<Arc<dyn HiveObserver>>>,
402}
403
404#[cfg(feature = "std")]
405impl Default for ObserverManager {
406    fn default() -> Self {
407        Self::new()
408    }
409}
410
411#[cfg(feature = "std")]
412impl ObserverManager {
413    /// Create a new observer manager
414    pub fn new() -> Self {
415        Self {
416            observers: std::sync::RwLock::new(Vec::new()),
417        }
418    }
419
420    /// Add an observer
421    pub fn add(&self, observer: Arc<dyn HiveObserver>) {
422        self.observers.write().unwrap().push(observer);
423    }
424
425    /// Remove an observer (by Arc pointer equality)
426    pub fn remove(&self, observer: &Arc<dyn HiveObserver>) {
427        self.observers
428            .write()
429            .unwrap()
430            .retain(|o| !Arc::ptr_eq(o, observer));
431    }
432
433    /// Notify all observers of an event
434    pub fn notify(&self, event: HiveEvent) {
435        // Use try_read to avoid panicking on poisoned locks
436        if let Ok(observers) = self.observers.try_read() {
437            for observer in observers.iter() {
438                observer.on_event(event.clone());
439            }
440        }
441    }
442
443    /// Get the number of registered observers
444    pub fn count(&self) -> usize {
445        self.observers.read().unwrap().len()
446    }
447}
448
449#[cfg(all(test, feature = "std"))]
450mod tests {
451    use super::*;
452
453    #[test]
454    fn test_collecting_observer() {
455        let observer = CollectingObserver::new();
456
457        observer.on_event(HiveEvent::peer_connected(NodeId::new(0x12345678)));
458        observer.on_event(HiveEvent::emergency_received(NodeId::new(0x87654321)));
459
460        assert_eq!(observer.count(), 2);
461
462        let events = observer.events();
463        assert!(matches!(events[0], HiveEvent::PeerConnected { .. }));
464        assert!(matches!(events[1], HiveEvent::EmergencyReceived { .. }));
465
466        observer.clear();
467        assert_eq!(observer.count(), 0);
468    }
469
470    #[test]
471    fn test_observer_manager() {
472        let manager = ObserverManager::new();
473
474        // Keep concrete references for count checks
475        let obs1_concrete = Arc::new(CollectingObserver::new());
476        let obs2_concrete = Arc::new(CollectingObserver::new());
477        let observer1: Arc<dyn HiveObserver> = obs1_concrete.clone();
478        let observer2: Arc<dyn HiveObserver> = obs2_concrete.clone();
479
480        manager.add(observer1.clone());
481        manager.add(observer2.clone());
482
483        assert_eq!(manager.count(), 2);
484
485        manager.notify(HiveEvent::peer_connected(NodeId::new(0x12345678)));
486
487        assert_eq!(obs1_concrete.count(), 1);
488        assert_eq!(obs2_concrete.count(), 1);
489
490        manager.remove(&observer1);
491        assert_eq!(manager.count(), 1);
492
493        manager.notify(HiveEvent::peer_lost(NodeId::new(0x12345678)));
494
495        assert_eq!(obs1_concrete.count(), 1); // Not notified
496        assert_eq!(obs2_concrete.count(), 2); // Got both events
497    }
498}