peat_btle/observer.rs
1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Observer pattern for Peat mesh events
17//!
18//! This module provides the event types and observer trait for receiving
19//! notifications about mesh state changes. Platform implementations register
20//! observers to receive callbacks when peers are discovered, connected,
21//! disconnected, or when documents are synced.
22//!
23//! ## Usage
24//!
25//! ```ignore
26//! use peat_btle::observer::{PeatEvent, PeatObserver};
27//!
28//! struct MyObserver;
29//!
30//! impl PeatObserver for MyObserver {
31//! fn on_event(&self, event: PeatEvent) {
32//! match event {
33//! PeatEvent::PeerDiscovered { peer } => {
34//! println!("Discovered: {}", peer.display_name());
35//! }
36//! PeatEvent::EmergencyReceived { from_node } => {
37//! println!("EMERGENCY from {:08X}", from_node.as_u32());
38//! }
39//! _ => {}
40//! }
41//! }
42//! }
43//! ```
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
47#[cfg(feature = "std")]
48use std::sync::Arc;
49
50// Re-import Vec for PeatEvent variants
51#[cfg(feature = "std")]
52use std::string::String;
53#[cfg(feature = "std")]
54use std::vec::Vec;
55
56use crate::peer::PeatPeer;
57use crate::sync::crdt::EventType;
58use crate::NodeId;
59
60/// Events emitted by the Peat mesh
61///
62/// These events notify observers about changes in mesh state, peer lifecycle,
63/// and document synchronization.
64#[derive(Debug, Clone)]
65pub enum PeatEvent {
66 // ==================== Peer Lifecycle Events ====================
67 /// A new peer was discovered via BLE scanning
68 PeerDiscovered {
69 /// The discovered peer
70 peer: PeatPeer,
71 },
72
73 /// A peer connected to us (either direction)
74 PeerConnected {
75 /// Node ID of the connected peer
76 node_id: NodeId,
77 },
78
79 /// A peer disconnected
80 PeerDisconnected {
81 /// Node ID of the disconnected peer
82 node_id: NodeId,
83 /// Reason for disconnection
84 reason: DisconnectReason,
85 },
86
87 /// A peer was removed due to timeout (stale)
88 PeerLost {
89 /// Node ID of the lost peer
90 node_id: NodeId,
91 },
92
93 // ==================== Mesh Events ====================
94 /// An emergency event was received from a peer
95 EmergencyReceived {
96 /// Node ID that sent the emergency
97 from_node: NodeId,
98 },
99
100 /// An ACK event was received from a peer
101 AckReceived {
102 /// Node ID that sent the ACK
103 from_node: NodeId,
104 },
105
106 /// A generic event was received from a peer
107 EventReceived {
108 /// Node ID that sent the event
109 from_node: NodeId,
110 /// Type of event
111 event_type: EventType,
112 },
113
114 /// A document was synced with a peer
115 DocumentSynced {
116 /// Node ID that we synced with
117 from_node: NodeId,
118 /// Updated total counter value
119 total_count: u64,
120 },
121
122 /// An app-layer document was received and stored/merged
123 ///
124 /// Emitted when a registered app document type (0xC0-0xCF) is received
125 /// and successfully processed through the document registry.
126 AppDocumentReceived {
127 /// Document type ID (0xC0-0xCF)
128 type_id: u8,
129 /// Source node that created the document
130 source_node: NodeId,
131 /// Document creation timestamp
132 timestamp: u64,
133 /// True if the document was new or changed after merge
134 changed: bool,
135 },
136
137 // ==================== Mesh State Events ====================
138 /// Mesh state changed (peer count, connected count)
139 MeshStateChanged {
140 /// Total number of known peers
141 peer_count: usize,
142 /// Number of connected peers
143 connected_count: usize,
144 },
145
146 /// All peers have acknowledged an emergency
147 AllPeersAcked {
148 /// Number of peers that acknowledged
149 ack_count: usize,
150 },
151
152 // ==================== Per-Peer E2EE Events ====================
153 /// E2EE session established with a peer
154 PeerE2eeEstablished {
155 /// Node ID of the peer we established E2EE with
156 peer_node_id: NodeId,
157 },
158
159 /// E2EE session closed with a peer
160 PeerE2eeClosed {
161 /// Node ID of the peer whose E2EE session closed
162 peer_node_id: NodeId,
163 },
164
165 /// Received an E2EE encrypted message from a peer
166 PeerE2eeMessageReceived {
167 /// Node ID of the sender
168 from_node: NodeId,
169 /// Decrypted message data
170 data: Vec<u8>,
171 },
172
173 /// E2EE session failed to establish
174 PeerE2eeFailed {
175 /// Node ID of the peer
176 peer_node_id: NodeId,
177 /// Error description
178 error: String,
179 },
180
181 // ==================== Security Events ====================
182 /// A security violation was detected
183 SecurityViolation {
184 /// Type of violation
185 kind: SecurityViolationKind,
186 /// Optional source identifier (node_id, BLE identifier, etc.)
187 source: Option<String>,
188 },
189
190 // ==================== Relay Events ====================
191 /// A message was relayed to other peers
192 MessageRelayed {
193 /// Original sender of the message
194 origin_node: NodeId,
195 /// Number of peers the message was relayed to
196 relay_count: usize,
197 /// Current hop count
198 hop_count: u8,
199 },
200
201 /// A duplicate message was detected and dropped
202 DuplicateMessageDropped {
203 /// Original sender of the message
204 origin_node: NodeId,
205 /// How many times we've seen this message
206 seen_count: u32,
207 },
208
209 /// A message was dropped due to TTL expiration
210 MessageTtlExpired {
211 /// Original sender of the message
212 origin_node: NodeId,
213 /// Hop count when dropped
214 hop_count: u8,
215 },
216
217 // ==================== ADR-059 Translator-Frame Events ====================
218 /// A 0xB6 translator frame was successfully decoded but no
219 /// `DecodedDocumentCallback` is installed (the Slice 1.b.3-ships-
220 /// before-1.b.4 release-skew window). Emit one event per frame so
221 /// operators staging the rollout can see, in real time, exactly how
222 /// many translator-frame payloads the bridge is decoding into the
223 /// void. ADR-059 Amendment 1's no-op-drop spec requires this be
224 /// operator-observable, not just `log::debug!` (which is disabled
225 /// at default log levels).
226 #[cfg(feature = "translator-codec")]
227 TranslatorNoCallback {
228 /// BleTranslator collection name (e.g. `"tracks"`).
229 collection: String,
230 /// BLE peer identifier from the receive context, when known.
231 peer: Option<String>,
232 },
233}
234
235impl PeatEvent {
236 /// Create a peer discovered event
237 pub fn peer_discovered(peer: PeatPeer) -> Self {
238 Self::PeerDiscovered { peer }
239 }
240
241 /// Create a peer connected event
242 pub fn peer_connected(node_id: NodeId) -> Self {
243 Self::PeerConnected { node_id }
244 }
245
246 /// Create a peer disconnected event
247 pub fn peer_disconnected(node_id: NodeId, reason: DisconnectReason) -> Self {
248 Self::PeerDisconnected { node_id, reason }
249 }
250
251 /// Create a peer lost event (timeout)
252 pub fn peer_lost(node_id: NodeId) -> Self {
253 Self::PeerLost { node_id }
254 }
255
256 /// Create an emergency received event
257 pub fn emergency_received(from_node: NodeId) -> Self {
258 Self::EmergencyReceived { from_node }
259 }
260
261 /// Create an ACK received event
262 pub fn ack_received(from_node: NodeId) -> Self {
263 Self::AckReceived { from_node }
264 }
265
266 /// Create a generic event received
267 pub fn event_received(from_node: NodeId, event_type: EventType) -> Self {
268 Self::EventReceived {
269 from_node,
270 event_type,
271 }
272 }
273
274 /// Create a document synced event
275 pub fn document_synced(from_node: NodeId, total_count: u64) -> Self {
276 Self::DocumentSynced {
277 from_node,
278 total_count,
279 }
280 }
281
282 /// Create an app document received event
283 pub fn app_document_received(
284 type_id: u8,
285 source_node: NodeId,
286 timestamp: u64,
287 changed: bool,
288 ) -> Self {
289 Self::AppDocumentReceived {
290 type_id,
291 source_node,
292 timestamp,
293 changed,
294 }
295 }
296
297 /// Create a peer E2EE established event
298 pub fn peer_e2ee_established(peer_node_id: NodeId) -> Self {
299 Self::PeerE2eeEstablished { peer_node_id }
300 }
301
302 /// Create a peer E2EE closed event
303 pub fn peer_e2ee_closed(peer_node_id: NodeId) -> Self {
304 Self::PeerE2eeClosed { peer_node_id }
305 }
306
307 /// Create a peer E2EE message received event
308 pub fn peer_e2ee_message_received(from_node: NodeId, data: Vec<u8>) -> Self {
309 Self::PeerE2eeMessageReceived { from_node, data }
310 }
311
312 /// Create a peer E2EE failed event
313 pub fn peer_e2ee_failed(peer_node_id: NodeId, error: String) -> Self {
314 Self::PeerE2eeFailed {
315 peer_node_id,
316 error,
317 }
318 }
319
320 /// Create a security violation event
321 pub fn security_violation(kind: SecurityViolationKind, source: Option<String>) -> Self {
322 Self::SecurityViolation { kind, source }
323 }
324}
325
326/// Reason for peer disconnection
327#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
328pub enum DisconnectReason {
329 /// Local initiated disconnect
330 LocalRequest,
331 /// Remote peer initiated disconnect
332 RemoteRequest,
333 /// Connection timed out
334 Timeout,
335 /// BLE link lost
336 LinkLoss,
337 /// Connection failed
338 ConnectionFailed,
339 /// Unknown reason
340 #[default]
341 Unknown,
342}
343
344/// Types of security violations that can be detected
345#[derive(Debug, Clone, Copy, PartialEq, Eq)]
346pub enum SecurityViolationKind {
347 /// Received unencrypted document when strict encryption mode is enabled
348 UnencryptedInStrictMode,
349 /// Decryption failed (wrong key or corrupted data)
350 DecryptionFailed,
351 /// Replay attack detected (duplicate message counter)
352 ReplayDetected,
353 /// Message from unknown/unauthorized node
354 UnauthorizedNode,
355}
356
357/// Observer trait for receiving Peat mesh events
358///
359/// Implement this trait to receive callbacks when mesh events occur.
360/// Observers must be thread-safe (Send + Sync) as they may be called
361/// from any thread.
362///
363/// ## Platform Notes
364///
365/// - **iOS/macOS**: Wrap in a Swift class that conforms to this protocol via UniFFI
366/// - **Android**: Implement via JNI callback interface
367/// - **ESP32**: Use direct Rust implementation with static callbacks
368pub trait PeatObserver: Send + Sync {
369 /// Called when a mesh event occurs
370 ///
371 /// This method should return quickly to avoid blocking the mesh.
372 /// If heavy processing is needed, dispatch to another thread.
373 fn on_event(&self, event: PeatEvent);
374}
375
376/// A simple observer that collects events into a vector (useful for testing)
377#[cfg(feature = "std")]
378#[derive(Debug, Default)]
379pub struct CollectingObserver {
380 events: std::sync::Mutex<Vec<PeatEvent>>,
381}
382
383#[cfg(feature = "std")]
384impl CollectingObserver {
385 /// Create a new collecting observer
386 pub fn new() -> Self {
387 Self {
388 events: std::sync::Mutex::new(Vec::new()),
389 }
390 }
391
392 /// Get all collected events
393 pub fn events(&self) -> Vec<PeatEvent> {
394 self.events.lock().unwrap().clone()
395 }
396
397 /// Clear collected events
398 pub fn clear(&self) {
399 self.events.lock().unwrap().clear();
400 }
401
402 /// Get count of collected events
403 pub fn count(&self) -> usize {
404 self.events.lock().unwrap().len()
405 }
406}
407
408#[cfg(feature = "std")]
409impl PeatObserver for CollectingObserver {
410 fn on_event(&self, event: PeatEvent) {
411 self.events.lock().unwrap().push(event);
412 }
413}
414
415/// Helper to manage multiple observers
416#[cfg(feature = "std")]
417pub struct ObserverManager {
418 observers: std::sync::RwLock<Vec<Arc<dyn PeatObserver>>>,
419}
420
421#[cfg(feature = "std")]
422impl Default for ObserverManager {
423 fn default() -> Self {
424 Self::new()
425 }
426}
427
428#[cfg(feature = "std")]
429impl ObserverManager {
430 /// Create a new observer manager
431 pub fn new() -> Self {
432 Self {
433 observers: std::sync::RwLock::new(Vec::new()),
434 }
435 }
436
437 /// Add an observer
438 pub fn add(&self, observer: Arc<dyn PeatObserver>) {
439 self.observers.write().unwrap().push(observer);
440 }
441
442 /// Remove an observer (by Arc pointer equality)
443 pub fn remove(&self, observer: &Arc<dyn PeatObserver>) {
444 self.observers
445 .write()
446 .unwrap()
447 .retain(|o| !Arc::ptr_eq(o, observer));
448 }
449
450 /// Notify all observers of an event
451 pub fn notify(&self, event: PeatEvent) {
452 // Use try_read to avoid panicking on poisoned locks
453 if let Ok(observers) = self.observers.try_read() {
454 for observer in observers.iter() {
455 observer.on_event(event.clone());
456 }
457 }
458 }
459
460 /// Get the number of registered observers
461 pub fn count(&self) -> usize {
462 self.observers.read().unwrap().len()
463 }
464}
465
466#[cfg(all(test, feature = "std"))]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_collecting_observer() {
472 let observer = CollectingObserver::new();
473
474 observer.on_event(PeatEvent::peer_connected(NodeId::new(0x12345678)));
475 observer.on_event(PeatEvent::emergency_received(NodeId::new(0x87654321)));
476
477 assert_eq!(observer.count(), 2);
478
479 let events = observer.events();
480 assert!(matches!(events[0], PeatEvent::PeerConnected { .. }));
481 assert!(matches!(events[1], PeatEvent::EmergencyReceived { .. }));
482
483 observer.clear();
484 assert_eq!(observer.count(), 0);
485 }
486
487 #[test]
488 fn test_observer_manager() {
489 let manager = ObserverManager::new();
490
491 // Keep concrete references for count checks
492 let obs1_concrete = Arc::new(CollectingObserver::new());
493 let obs2_concrete = Arc::new(CollectingObserver::new());
494 let observer1: Arc<dyn PeatObserver> = obs1_concrete.clone();
495 let observer2: Arc<dyn PeatObserver> = obs2_concrete.clone();
496
497 manager.add(observer1.clone());
498 manager.add(observer2.clone());
499
500 assert_eq!(manager.count(), 2);
501
502 manager.notify(PeatEvent::peer_connected(NodeId::new(0x12345678)));
503
504 assert_eq!(obs1_concrete.count(), 1);
505 assert_eq!(obs2_concrete.count(), 1);
506
507 manager.remove(&observer1);
508 assert_eq!(manager.count(), 1);
509
510 manager.notify(PeatEvent::peer_lost(NodeId::new(0x12345678)));
511
512 assert_eq!(obs1_concrete.count(), 1); // Not notified
513 assert_eq!(obs2_concrete.count(), 2); // Got both events
514 }
515}