Skip to main content

peat_btle/
peat_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PeatMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for Peat BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use peat_btle::peat_mesh::{PeatMesh, PeatMeshConfig};
26//! use peat_btle::observer::{PeatEvent, PeatObserver};
27//! use peat_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = PeatMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = PeatMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl PeatObserver for MyObserver {
39//!     fn on_event(&self, event: PeatEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("PEAT_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66    TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72    ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73    PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78    RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94/// Configuration for PeatMesh
95#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97    /// Our node ID
98    pub node_id: NodeId,
99
100    /// Our callsign (e.g., "ALPHA-1")
101    pub callsign: String,
102
103    /// Mesh ID to filter peers (e.g., "DEMO")
104    pub mesh_id: String,
105
106    /// Peripheral type for this device
107    pub peripheral_type: PeripheralType,
108
109    /// Peer management configuration
110    pub peer_config: PeerManagerConfig,
111
112    /// Sync interval in milliseconds (how often to broadcast state)
113    pub sync_interval_ms: u64,
114
115    /// Whether to auto-broadcast on emergency/ack
116    pub auto_broadcast_events: bool,
117
118    /// Optional shared secret for mesh-wide encryption (32 bytes)
119    ///
120    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
121    /// transmission and decrypted upon receipt. All nodes in the mesh must
122    /// share the same secret to communicate.
123    pub encryption_secret: Option<[u8; 32]>,
124
125    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
126    ///
127    /// When true and encryption is enabled, any unencrypted documents received
128    /// will be rejected and trigger a SecurityViolation event. This prevents
129    /// downgrade attacks where an adversary sends unencrypted malicious documents.
130    ///
131    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
132    pub strict_encryption: bool,
133
134    /// Enable multi-hop relay
135    ///
136    /// When enabled, received messages will be forwarded to other peers based
137    /// on the gossip strategy. Requires message deduplication to prevent loops.
138    ///
139    /// Default: false
140    pub enable_relay: bool,
141
142    /// Maximum hops for relay messages (TTL)
143    ///
144    /// Messages will not be relayed beyond this many hops from the origin.
145    /// Default: 7
146    pub max_relay_hops: u8,
147
148    /// Gossip fanout for relay
149    ///
150    /// Number of peers to forward each message to. Higher values increase
151    /// convergence speed but also bandwidth usage.
152    /// Default: 2
153    pub relay_fanout: usize,
154
155    /// TTL for seen message cache (milliseconds)
156    ///
157    /// How long to remember message IDs for deduplication.
158    /// Default: 300_000 (5 minutes)
159    pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163    /// Create a new configuration with required fields
164    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165        Self {
166            node_id,
167            callsign: callsign.into(),
168            mesh_id: mesh_id.into(),
169            peripheral_type: PeripheralType::SoldierSensor,
170            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171            sync_interval_ms: 5000,
172            auto_broadcast_events: true,
173            encryption_secret: None,
174            strict_encryption: false,
175            enable_relay: false,
176            max_relay_hops: DEFAULT_MAX_HOPS,
177            relay_fanout: 2,
178            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179        }
180    }
181
182    /// Enable mesh-wide encryption with a shared secret
183    ///
184    /// All documents will be encrypted using ChaCha20-Poly1305 before
185    /// transmission. All mesh participants must use the same secret.
186    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187        self.encryption_secret = Some(secret);
188        self
189    }
190
191    /// Set peripheral type
192    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193        self.peripheral_type = ptype;
194        self
195    }
196
197    /// Set sync interval
198    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199        self.sync_interval_ms = interval_ms;
200        self
201    }
202
203    /// Set peer timeout
204    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205        self.peer_config.peer_timeout_ms = timeout_ms;
206        self
207    }
208
209    /// Set max peers (for embedded systems)
210    pub fn with_max_peers(mut self, max: usize) -> Self {
211        self.peer_config.max_peers = max;
212        self
213    }
214
215    /// Enable strict encryption mode
216    ///
217    /// When enabled (and encryption is also enabled), any unencrypted documents
218    /// received will be rejected and trigger a `SecurityViolation` event.
219    /// This prevents downgrade attacks.
220    ///
221    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
222    pub fn with_strict_encryption(mut self) -> Self {
223        self.strict_encryption = true;
224        self
225    }
226
227    /// Enable multi-hop relay
228    ///
229    /// When enabled, received messages will be forwarded to other connected peers
230    /// based on the gossip strategy. This enables mesh-wide message propagation.
231    pub fn with_relay(mut self) -> Self {
232        self.enable_relay = true;
233        self
234    }
235
236    /// Set maximum relay hops (TTL)
237    ///
238    /// Messages will not be relayed beyond this many hops from the origin.
239    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240        self.max_relay_hops = max_hops;
241        self
242    }
243
244    /// Set gossip fanout for relay
245    ///
246    /// Number of peers to forward each message to.
247    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248        self.relay_fanout = fanout.max(1);
249        self
250    }
251
252    /// Set TTL for seen message cache
253    ///
254    /// How long to remember message IDs for deduplication (milliseconds).
255    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256        self.seen_cache_ttl_ms = ttl_ms;
257        self
258    }
259}
260
261/// Type alias for app document storage to reduce type complexity
262#[cfg(feature = "std")]
263type AppDocumentStore =
264    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266/// Main facade for Peat BLE mesh operations
267///
268/// Composes peer management, document sync, and observer notifications.
269/// Platform implementations call into this from their BLE callbacks.
270#[cfg(feature = "std")]
271pub struct PeatMesh {
272    /// Configuration
273    config: PeatMeshConfig,
274
275    /// Peer manager
276    peer_manager: PeerManager,
277
278    /// Document sync
279    document_sync: DocumentSync,
280
281    /// Observer manager
282    observers: ObserverManager,
283
284    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
285    last_sync_ms: std::sync::atomic::AtomicU32,
286
287    /// Last cleanup time
288    last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290    /// Optional mesh-wide encryption key (derived from shared secret)
291    encryption_key: Option<MeshEncryptionKey>,
292
293    /// Optional per-peer E2EE session manager
294    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296    /// Connection state graph for tracking peer connection lifecycle
297    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299    /// Seen message cache for relay deduplication
300    seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302    /// Gossip strategy for relay peer selection
303    gossip_strategy: Box<dyn GossipStrategy>,
304
305    /// Delta encoder for per-peer sync state tracking
306    ///
307    /// Tracks what data has been sent to each peer to enable delta sync
308    /// (sending only changes instead of full documents).
309    delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311    /// This node's cryptographic identity (Ed25519 keypair)
312    ///
313    /// When set, the node_id is derived from the public key and documents
314    /// can be signed for authenticity verification.
315    identity: Option<DeviceIdentity>,
316
317    /// TOFU identity registry for tracking peer identities
318    ///
319    /// Maps node_id to public key on first contact, rejects mismatches.
320    identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322    /// Peripheral state received from peers
323    ///
324    /// Stores the most recent peripheral data (callsign, location, etc.)
325    /// received from each peer via document sync.
326    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328    /// Document registry for app-layer CRDT types.
329    ///
330    /// Enables external crates to register custom document types that sync
331    /// through the mesh using the extensible registry pattern.
332    document_registry: DocumentRegistry,
333
334    /// Storage for app-layer documents.
335    ///
336    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
337    /// document instance. Values are type-erased boxes that can be downcast
338    /// using the document registry handlers.
339    app_documents: AppDocumentStore,
340
341    /// ADR-059 BleTranslator instance for inbound translator-frame decoding.
342    ///
343    /// One translator per PeatMesh, constructed with `TranslationConfig::default()`.
344    /// Operators wanting custom collection names can swap via
345    /// `set_translator_config`. Always present when the `mesh-translator`
346    /// feature is enabled — the receive-dispatch branch in
347    /// `on_ble_data_received_anonymous` (and siblings) calls
348    /// `decode_inbound_sync` on it for every 0xB6 frame.
349    #[cfg(feature = "mesh-translator")]
350    ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352    /// Optional `DecodedDocumentCallback` set by Rust-native consumers
353    /// (peat-sim integration tests, future Rust hosts). Originally
354    /// specified as the peat-ffi-side wiring in Amendment 1; Amendment 2
355    /// re-aimed Slice 1.b.4 at the host (peat-atak-plugin et al.) via
356    /// the UniFFI [`decoded_document_json_callback`](Self::decoded_document_json_callback)
357    /// path because peat-ffi never owns a `PeatMesh` handle. This Rust
358    /// trait callback is preserved for in-process Rust consumers.
359    ///
360    /// `None` until installed. Both this and the JSON callback fire
361    /// independently when both are set; `PeatEvent::TranslatorNoCallback`
362    /// fires on the observer channel only when **both** are absent so
363    /// the release-skew window stays operator-observable in real time
364    /// (see `crate::observer::PeatEvent`).
365    #[cfg(feature = "mesh-translator")]
366    decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
367
368    /// Optional UniFFI-exported `DecodedDocumentJsonCallback` set by
369    /// host bindings (Kotlin / Swift via peat-btle's UniFFI surface).
370    /// ADR-059 Amendment 2 specifies this as the production wiring
371    /// point for cross-transport receive — peat-atak-plugin's
372    /// `BleDecodedDocumentBridge` implements the trait and forwards
373    /// each decoded doc into peat-ffi's `publishDocument` with
374    /// `origin="ble"`.
375    #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
376    decoded_document_json_callback:
377        std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
378}
379
380#[cfg(feature = "std")]
381impl PeatMesh {
382    /// Create a new PeatMesh instance
383    pub fn new(config: PeatMeshConfig) -> Self {
384        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
385        let document_sync = DocumentSync::with_peripheral_type(
386            config.node_id,
387            &config.callsign,
388            config.peripheral_type,
389        );
390
391        // Derive encryption key from shared secret if configured
392        let encryption_key = config
393            .encryption_secret
394            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
395
396        // Create connection state graph with config thresholds
397        let connection_graph = ConnectionStateGraph::with_config(
398            config.peer_config.rssi_degraded_threshold,
399            config.peer_config.lost_timeout_ms,
400        );
401
402        // Create seen message cache for relay deduplication
403        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
404
405        // Create gossip strategy for relay
406        let gossip_strategy: Box<dyn GossipStrategy> =
407            Box::new(RandomFanout::new(config.relay_fanout));
408
409        // Create delta encoder for per-peer sync state tracking
410        let delta_encoder = DeltaEncoder::new(config.node_id);
411
412        let document_registry = DocumentRegistry::new();
413
414        Self {
415            config,
416            peer_manager,
417            document_sync,
418            observers: ObserverManager::new(),
419            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
420            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
421            encryption_key,
422            peer_sessions: std::sync::Mutex::new(None),
423            connection_graph: std::sync::Mutex::new(connection_graph),
424            seen_cache: std::sync::Mutex::new(seen_cache),
425            gossip_strategy,
426            delta_encoder: std::sync::Mutex::new(delta_encoder),
427            identity: None,
428            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
429            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
430            document_registry,
431            app_documents: std::sync::RwLock::new(HashMap::new()),
432            #[cfg(feature = "mesh-translator")]
433            ble_translator: std::sync::RwLock::new(
434                crate::translator::BleTranslator::with_defaults(),
435            ),
436            #[cfg(feature = "mesh-translator")]
437            decoded_document_callback: std::sync::RwLock::new(None),
438            #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
439            decoded_document_json_callback: std::sync::RwLock::new(None),
440        }
441    }
442
443    /// Create a new PeatMesh with a cryptographic identity
444    ///
445    /// The node_id will be derived from the identity's public key, overriding
446    /// any node_id specified in the config. This ensures cryptographic binding
447    /// between node_id and identity.
448    pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
449        // Override node_id with identity-derived value
450        let mut config = config;
451        config.node_id = identity.node_id();
452
453        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
454        let document_sync = DocumentSync::with_peripheral_type(
455            config.node_id,
456            &config.callsign,
457            config.peripheral_type,
458        );
459
460        let encryption_key = config
461            .encryption_secret
462            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
463
464        let connection_graph = ConnectionStateGraph::with_config(
465            config.peer_config.rssi_degraded_threshold,
466            config.peer_config.lost_timeout_ms,
467        );
468
469        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
470        let gossip_strategy: Box<dyn GossipStrategy> =
471            Box::new(RandomFanout::new(config.relay_fanout));
472        let delta_encoder = DeltaEncoder::new(config.node_id);
473
474        let document_registry = DocumentRegistry::new();
475
476        Self {
477            config,
478            peer_manager,
479            document_sync,
480            observers: ObserverManager::new(),
481            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
482            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
483            encryption_key,
484            peer_sessions: std::sync::Mutex::new(None),
485            connection_graph: std::sync::Mutex::new(connection_graph),
486            seen_cache: std::sync::Mutex::new(seen_cache),
487            gossip_strategy,
488            delta_encoder: std::sync::Mutex::new(delta_encoder),
489            identity: Some(identity),
490            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
491            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
492            document_registry,
493            app_documents: std::sync::RwLock::new(HashMap::new()),
494            #[cfg(feature = "mesh-translator")]
495            ble_translator: std::sync::RwLock::new(
496                crate::translator::BleTranslator::with_defaults(),
497            ),
498            #[cfg(feature = "mesh-translator")]
499            decoded_document_callback: std::sync::RwLock::new(None),
500            #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
501            decoded_document_json_callback: std::sync::RwLock::new(None),
502        }
503    }
504
505    /// Create a new PeatMesh from genesis data
506    ///
507    /// This is the recommended way to create a mesh for production use.
508    /// The mesh will be configured with:
509    /// - node_id derived from identity
510    /// - mesh_id from genesis
511    /// - encryption enabled using genesis-derived secret
512    pub fn from_genesis(
513        genesis: &crate::security::MeshGenesis,
514        identity: DeviceIdentity,
515        callsign: &str,
516    ) -> Self {
517        let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
518            .with_encryption(genesis.encryption_secret());
519
520        Self::with_identity(config, identity)
521    }
522
523    /// Create a PeatMesh from persisted state.
524    ///
525    /// Restores mesh configuration from previously saved state, including:
526    /// - Device identity (Ed25519 keypair)
527    /// - Mesh genesis (if present)
528    /// - Identity registry (TOFU cache)
529    ///
530    /// Use this on device boot to restore mesh membership without re-provisioning.
531    ///
532    /// # Arguments
533    ///
534    /// * `state` - Previously persisted state
535    /// * `callsign` - Human-readable identifier (may differ from original)
536    ///
537    /// # Errors
538    ///
539    /// Returns `PersistenceError` if the identity cannot be restored.
540    ///
541    /// # Example
542    ///
543    /// ```ignore
544    /// // On boot, restore from secure storage
545    /// let state = PersistedState::load(&storage)?;
546    /// let mesh = PeatMesh::from_persisted(state, "SENSOR-01")?;
547    /// ```
548    #[cfg(feature = "std")]
549    pub fn from_persisted(
550        state: crate::security::PersistedState,
551        callsign: &str,
552    ) -> Result<Self, crate::security::PersistenceError> {
553        // Restore identity
554        let identity = state.restore_identity()?;
555
556        // Restore genesis (if present)
557        let genesis = state.restore_genesis();
558
559        // Create mesh with or without genesis
560        let mesh = if let Some(ref gen) = genesis {
561            Self::from_genesis(gen, identity, callsign)
562        } else {
563            let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
564            Self::with_identity(config, identity)
565        };
566
567        // Restore identity registry
568        let restored_registry = state.restore_registry();
569        if let Ok(mut registry) = mesh.identity_registry.lock() {
570            *registry = restored_registry;
571        }
572
573        log::info!(
574            "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
575            mesh.config.node_id.as_u32(),
576            mesh.known_identity_count()
577        );
578
579        Ok(mesh)
580    }
581
582    /// Create persisted state from current mesh state.
583    ///
584    /// Captures the current identity, genesis, and registry for persistence.
585    /// Call this periodically or before shutdown to save state.
586    ///
587    /// # Arguments
588    ///
589    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
590    ///
591    /// # Returns
592    ///
593    /// `None` if the mesh has no identity bound.
594    #[cfg(feature = "std")]
595    pub fn to_persisted_state(
596        &self,
597        genesis: Option<&crate::security::MeshGenesis>,
598    ) -> Option<crate::security::PersistedState> {
599        let identity = self.identity.as_ref()?;
600        let registry = self.identity_registry.lock().ok()?;
601
602        Some(crate::security::PersistedState::with_registry(
603            identity, genesis, &registry,
604        ))
605    }
606
607    // ==================== ADR-059 translator-frame receive ====================
608
609    /// Install the [`DecodedDocumentCallback`](crate::DecodedDocumentCallback)
610    /// invoked when a 0xB6 translator frame decodes successfully.
611    ///
612    /// Idempotent — calling again replaces the previous callback. Pass an
613    /// `Arc<dyn DecodedDocumentCallback>` so the receive dispatch can clone
614    /// the handle without holding the lock during invocation.
615    ///
616    /// Released-skew note: Slice 1.b.3 ships peat-btle with this setter and
617    /// the receive-dispatch branch; Slice 1.b.4 wires the callback from
618    /// peat-ffi in a *later* release. Until peat-ffi installs one, decoded
619    /// frames are silently dropped (logged at debug level).
620    #[cfg(feature = "mesh-translator")]
621    pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
622        if let Ok(mut slot) = self.decoded_document_callback.write() {
623            *slot = Some(cb);
624        }
625    }
626
627    /// Install the UniFFI-exported
628    /// [`DecodedDocumentJsonCallback`](crate::DecodedDocumentJsonCallback)
629    /// invoked when a 0xB6 translator frame decodes successfully (ADR-059
630    /// Amendment 2 — host-side wiring point for cross-transport receive).
631    ///
632    /// Idempotent — calling again replaces the previous callback. Fires
633    /// independently of the Rust-trait
634    /// [`DecodedDocumentCallback`](crate::DecodedDocumentCallback): when
635    /// both are installed, both are invoked once per decoded frame; when
636    /// neither is installed, `PeatEvent::TranslatorNoCallback` fires on
637    /// the observer channel so the gap is operator-observable.
638    #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
639    pub fn set_decoded_document_json_callback(
640        &self,
641        cb: Box<dyn crate::DecodedDocumentJsonCallback>,
642    ) {
643        // UniFFI 0.31 callback_interface generates a `Box<dyn Trait>` on
644        // the FFI surface. We store as `Arc<dyn Trait>` internally so the
645        // receive dispatch can snapshot-clone the handle out of the
646        // RwLock without holding the lock during user-supplied dispatch
647        // — same pattern as `decoded_document_callback`.
648        let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
649        if let Ok(mut slot) = self.decoded_document_json_callback.write() {
650            *slot = Some(cb_arc);
651        }
652    }
653
654    /// Replace the active `BleTranslator` configuration. Optional; the
655    /// default (operator collection names = "tracks"/"platforms"/
656    /// "alerts"/"canned_messages") covers most deployments.
657    #[cfg(feature = "mesh-translator")]
658    pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
659        if let Ok(mut t) = self.ble_translator.write() {
660            *t = crate::translator::BleTranslator::new(config);
661        }
662    }
663
664    /// Translator-frame / reserved-marker dispatcher. Returns `true` if the
665    /// first byte of `decrypted` fell in `0xB6..=0xBF` (handled or silently
666    /// dropped); the caller MUST stop processing and return `None`. Returns
667    /// `false` if the byte was something else; the caller continues with
668    /// the existing legacy / delta-document flow.
669    ///
670    /// `peer` is the BLE peer's identifier (typically a peripheral address).
671    /// `source_node` is the peer's `NodeId` when known — used to populate
672    /// `TranslationContext.local_wire_id` (the hex-u32 form), which the
673    /// `tracks` collection's decoder requires per the trait contract.
674    /// Anonymous receive paths pass `None`; tracks decoding will then return
675    /// an `Err` (logged + dropped) but other collections succeed.
676    #[cfg(feature = "mesh-translator")]
677    fn try_handle_translator_marker(
678        &self,
679        decrypted: &[u8],
680        peer: Option<&str>,
681        source_node: Option<NodeId>,
682    ) -> bool {
683        if decrypted.is_empty() {
684            return false;
685        }
686        let marker = decrypted[0];
687
688        // Reserved range — silent drop for forward-compat. Receivers running
689        // *this* peat-btle release MUST drop unknown future-translator
690        // markers without falling through to merge_document, otherwise a
691        // 0xB7..=0xBF frame would corrupt the GCounter (ADR-059 Amendment 1
692        // §"Backwards compatibility… 2").
693        if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
694            log::warn!(
695                "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
696                decrypted.len()
697            );
698            return true;
699        }
700
701        if marker != TRANSLATOR_FRAME_MARKER {
702            return false;
703        }
704
705        // 0xB6 translator dispatch.
706        if decrypted.len() < 2 {
707            log::warn!(
708                "ble: dropping truncated translator frame (len={}, missing collection code)",
709                decrypted.len()
710            );
711            return true;
712        }
713
714        let code = decrypted[1];
715        let payload = &decrypted[2..];
716
717        // Resolve the collection name + build a TranslationContext under
718        // the read-lock, drop the lock before invoking decode_inbound_sync.
719        let (collection, decode_result) = {
720            let translator = match self.ble_translator.read() {
721                Ok(g) => g,
722                Err(_) => {
723                    log::warn!("ble: translator RwLock poisoned; dropping frame");
724                    return true;
725                }
726            };
727            let collection = match translator.code_to_collection(code) {
728                Some(c) => c.to_string(),
729                None => {
730                    log::warn!(
731                        "ble: dropping translator frame with unknown collection code 0x{code:02X}"
732                    );
733                    return true;
734                }
735            };
736
737            let mut ctx =
738                peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
739                    .with_collection(collection.clone());
740            if let Some(node) = source_node {
741                ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
742            }
743
744            let result = translator.decode_inbound_sync(payload, &ctx);
745            (collection, result)
746        };
747
748        match decode_result {
749            Ok(Some(doc)) => {
750                // Snapshot both callback Arcs so we don't hold the read
751                // lock during user-supplied dispatch. Per ADR-059
752                // Amendment 2, both callbacks fire independently when
753                // both are installed; `PeatEvent::TranslatorNoCallback`
754                // fires only when **both** slots are empty.
755                let rust_cb = self
756                    .decoded_document_callback
757                    .read()
758                    .ok()
759                    .and_then(|g| g.as_ref().cloned());
760
761                #[cfg(feature = "uniffi")]
762                let json_cb = self
763                    .decoded_document_json_callback
764                    .read()
765                    .ok()
766                    .and_then(|g| g.as_ref().cloned());
767                #[cfg(not(feature = "uniffi"))]
768                let json_cb: Option<()> = None;
769
770                let any_callback = rust_cb.is_some() || json_cb.is_some();
771
772                // Fire the Rust-trait callback (Rust-native consumers,
773                // peat-sim integration tests, future Rust-native hosts).
774                if let Some(cb) = &rust_cb {
775                    cb.on_document(&collection, doc.clone(), peer);
776                }
777
778                // Fire the UniFFI-exported JSON callback (host bindings —
779                // peat-atak-plugin's BleDecodedDocumentBridge and
780                // equivalents). Serialization to JSON is intentional per
781                // Amendment 2 §"Why JSON-string instead of typed Document":
782                // peat_mesh::sync::Document has no UniFFI binding, and
783                // every host that consumes this callback already
784                // round-trips docs as JSON via peat-ffi's
785                // `publishDocument`-shaped FFI. A serde failure here is
786                // a wire-format-drift signal worth surfacing — log and
787                // skip the JSON callback for this frame; the Rust-trait
788                // callback (if installed) still fired above.
789                #[cfg(feature = "uniffi")]
790                if let Some(json_cb) = json_cb {
791                    match serde_json::to_string(&doc) {
792                        Ok(doc_json) => {
793                            json_cb.on_document(
794                                collection.clone(),
795                                doc_json,
796                                peer.map(str::to_string),
797                            );
798                        }
799                        Err(e) => {
800                            log::warn!(
801                                "ble: failed to serialize decoded {} doc to JSON \
802                                 for UniFFI callback: {}",
803                                collection,
804                                e
805                            );
806                        }
807                    }
808                }
809
810                if !any_callback {
811                    // Slice 1.b.3-ships-before-1.b.4 release-skew window:
812                    // decoded successfully but no consumer is wired yet.
813                    // Per ADR-059 Amendment 1 / Amendment 2, this branch
814                    // must surface through the operator-observable
815                    // PeatEvent channel so deployments staging the
816                    // rollout see, in real time, how many translator-
817                    // frame payloads the bridge is decoding into the
818                    // void. `log::debug!` alone is not a monitoring
819                    // signal — it is disabled at default log levels.
820                    log::debug!(
821                        "ble: decoded {} frame but no DecodedDocument*Callback installed",
822                        collection
823                    );
824                    self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
825                        collection: collection.clone(),
826                        peer: peer.map(str::to_string),
827                    });
828                }
829            }
830            Ok(None) => {
831                log::debug!(
832                    "ble: codec declined translator frame for collection {}",
833                    collection
834                );
835            }
836            Err(e) => {
837                log::warn!(
838                    "ble: translator frame decode error (collection={}): {:#}",
839                    collection,
840                    e
841                );
842            }
843        }
844
845        true
846    }
847
848    // ==================== Encryption ====================
849
850    /// Check if mesh-wide encryption is enabled
851    pub fn is_encryption_enabled(&self) -> bool {
852        self.encryption_key.is_some()
853    }
854
855    /// Check if strict encryption mode is enabled
856    ///
857    /// Returns true only if both encryption and strict_encryption are enabled.
858    pub fn is_strict_encryption_enabled(&self) -> bool {
859        self.config.strict_encryption && self.encryption_key.is_some()
860    }
861
862    /// Enable mesh-wide encryption with a shared secret
863    ///
864    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
865    /// All mesh participants must use the same secret to communicate.
866    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
867        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
868            &self.config.mesh_id,
869            secret,
870        ));
871    }
872
873    /// Disable mesh-wide encryption
874    pub fn disable_encryption(&mut self) {
875        self.encryption_key = None;
876    }
877
878    /// Encrypt document bytes for transmission
879    ///
880    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
881    /// original bytes if encryption is disabled.
882    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
883        match &self.encryption_key {
884            Some(key) => {
885                // Encrypt and prepend marker
886                match key.encrypt_to_bytes(plaintext) {
887                    Ok(ciphertext) => {
888                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
889                        buf.push(ENCRYPTED_MARKER);
890                        buf.push(0x00); // reserved
891                        buf.extend_from_slice(&ciphertext);
892                        buf
893                    }
894                    Err(e) => {
895                        log::error!("Encryption failed: {}", e);
896                        // Fall back to unencrypted on error (shouldn't happen)
897                        plaintext.to_vec()
898                    }
899                }
900            }
901            None => plaintext.to_vec(),
902        }
903    }
904
905    /// Decrypt document bytes received from peer
906    ///
907    /// Returns the decrypted bytes if encrypted and valid, or the original
908    /// bytes if not encrypted. Returns None if decryption fails.
909    ///
910    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
911    /// unencrypted documents are rejected and trigger a SecurityViolation event.
912    fn decrypt_document<'a>(
913        &self,
914        data: &'a [u8],
915        source_hint: Option<&str>,
916    ) -> Option<std::borrow::Cow<'a, [u8]>> {
917        log::debug!(
918            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
919            data.len(),
920            data.first().copied().unwrap_or(0),
921            source_hint
922        );
923
924        // Check for encrypted marker
925        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
926            // Encrypted document
927            let _reserved = data[1];
928            let encrypted_payload = &data[2..];
929
930            log::debug!(
931                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
932                encrypted_payload.len()
933            );
934
935            match &self.encryption_key {
936                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
937                    Ok(plaintext) => {
938                        log::debug!(
939                            "decrypt_document: SUCCESS, plaintext len={}",
940                            plaintext.len()
941                        );
942                        Some(std::borrow::Cow::Owned(plaintext))
943                    }
944                    Err(e) => {
945                        log::warn!(
946                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
947                            e,
948                            encrypted_payload.len(),
949                            source_hint
950                        );
951                        self.notify(PeatEvent::SecurityViolation {
952                            kind: SecurityViolationKind::DecryptionFailed,
953                            source: source_hint.map(String::from),
954                        });
955                        None
956                    }
957                },
958                None => {
959                    log::warn!(
960                        "decrypt_document: encryption not enabled but received encrypted doc"
961                    );
962                    None
963                }
964            }
965        } else {
966            // Unencrypted document
967            // Check strict encryption mode
968            if self.config.strict_encryption && self.encryption_key.is_some() {
969                log::warn!(
970                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
971                    source_hint
972                );
973                self.notify(PeatEvent::SecurityViolation {
974                    kind: SecurityViolationKind::UnencryptedInStrictMode,
975                    source: source_hint.map(String::from),
976                });
977                None
978            } else {
979                // Permissive mode: accept unencrypted for backward compatibility
980                Some(std::borrow::Cow::Borrowed(data))
981            }
982        }
983    }
984
985    // ==================== Transport Layer API ====================
986
987    /// Decrypt data without parsing (transport-only operation)
988    ///
989    /// This method provides raw decrypted bytes for apps that want to handle
990    /// message parsing themselves (using peat-lite or other libraries).
991    ///
992    /// # Arguments
993    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
994    ///
995    /// # Returns
996    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
997    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
998    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
999        self.decrypt_document(data, None)
1000            .map(|cow| cow.into_owned())
1001    }
1002
1003    // ==================== Identity ====================
1004
1005    /// Check if this mesh has a cryptographic identity
1006    pub fn has_identity(&self) -> bool {
1007        self.identity.is_some()
1008    }
1009
1010    /// Get this node's public key (if identity is configured)
1011    pub fn public_key(&self) -> Option<[u8; 32]> {
1012        self.identity.as_ref().map(|id| id.public_key())
1013    }
1014
1015    /// Create an identity attestation for this node
1016    ///
1017    /// Returns None if no identity is configured.
1018    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1019        self.identity
1020            .as_ref()
1021            .map(|id| id.create_attestation(now_ms))
1022    }
1023
1024    /// Verify and register a peer's identity attestation
1025    ///
1026    /// Implements TOFU (Trust On First Use):
1027    /// - On first contact, registers the node_id → public_key binding
1028    /// - On subsequent contacts, verifies the public key matches
1029    ///
1030    /// Returns the verification result. Security violations should be handled
1031    /// by the caller (e.g., disconnect, alert).
1032    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1033        self.identity_registry
1034            .lock()
1035            .unwrap()
1036            .verify_or_register(attestation)
1037    }
1038
1039    /// Check if a peer's identity is known (has been registered)
1040    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1041        self.identity_registry.lock().unwrap().is_known(node_id)
1042    }
1043
1044    /// Get a peer's public key if known
1045    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1046        self.identity_registry
1047            .lock()
1048            .unwrap()
1049            .get_public_key(node_id)
1050            .copied()
1051    }
1052
1053    /// Get the number of known peer identities
1054    pub fn known_identity_count(&self) -> usize {
1055        self.identity_registry.lock().unwrap().len()
1056    }
1057
1058    /// Pre-register a peer's identity (for out-of-band key exchange)
1059    ///
1060    /// Use this when keys are exchanged through a secure side channel
1061    /// (e.g., QR code, NFC tap, or provisioning server).
1062    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1063        self.identity_registry
1064            .lock()
1065            .unwrap()
1066            .pre_register(node_id, public_key, now_ms);
1067    }
1068
1069    /// Remove a peer's identity from the registry
1070    ///
1071    /// Use with caution - this allows re-registration with a different key.
1072    pub fn forget_peer_identity(&self, node_id: NodeId) {
1073        self.identity_registry.lock().unwrap().remove(node_id);
1074    }
1075
1076    /// Sign arbitrary data with this node's identity
1077    ///
1078    /// Returns None if no identity is configured.
1079    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1080        self.identity.as_ref().map(|id| id.sign(data))
1081    }
1082
1083    /// Verify a signature from a peer
1084    ///
1085    /// Uses the peer's public key from the identity registry.
1086    /// Returns false if peer is unknown or signature is invalid.
1087    pub fn verify_peer_signature(
1088        &self,
1089        node_id: NodeId,
1090        data: &[u8],
1091        signature: &[u8; 64],
1092    ) -> bool {
1093        if let Some(public_key) = self.peer_public_key(node_id) {
1094            crate::security::verify_signature(&public_key, data, signature)
1095        } else {
1096            false
1097        }
1098    }
1099
1100    // ==================== Multi-Hop Relay ====================
1101
1102    /// Check if multi-hop relay is enabled
1103    pub fn is_relay_enabled(&self) -> bool {
1104        self.config.enable_relay
1105    }
1106
1107    /// Enable multi-hop relay
1108    pub fn enable_relay(&mut self) {
1109        self.config.enable_relay = true;
1110    }
1111
1112    /// Disable multi-hop relay
1113    pub fn disable_relay(&mut self) {
1114        self.config.enable_relay = false;
1115    }
1116
1117    /// Check if a message has been seen before (for deduplication)
1118    ///
1119    /// Returns true if the message was already seen (duplicate).
1120    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1121        self.seen_cache.lock().unwrap().has_seen(message_id)
1122    }
1123
1124    /// Mark a message as seen
1125    ///
1126    /// Returns true if this is a new message (first time seen).
1127    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1128        self.seen_cache
1129            .lock()
1130            .unwrap()
1131            .check_and_mark(message_id, origin, now_ms)
1132    }
1133
1134    /// Get the number of entries in the seen message cache
1135    pub fn seen_cache_size(&self) -> usize {
1136        self.seen_cache.lock().unwrap().len()
1137    }
1138
1139    /// Clear the seen message cache
1140    pub fn clear_seen_cache(&self) {
1141        self.seen_cache.lock().unwrap().clear();
1142    }
1143
1144    /// Wrap a document in a relay envelope for multi-hop transmission
1145    ///
1146    /// The returned bytes can be sent to peers and will be automatically
1147    /// relayed through the mesh if relay is enabled on receiving nodes.
1148    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1149        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1150            .with_max_hops(self.config.max_relay_hops);
1151        envelope.encode()
1152    }
1153
1154    /// Get peers to relay a message to
1155    ///
1156    /// Uses the configured gossip strategy to select relay targets.
1157    /// Excludes the source peer (if provided) to avoid sending back to sender.
1158    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1159        let connected = self.peer_manager.get_connected_peers();
1160        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1161            connected
1162                .into_iter()
1163                .filter(|p| p.node_id != exclude)
1164                .collect()
1165        } else {
1166            connected
1167        };
1168
1169        self.gossip_strategy
1170            .select_peers(&filtered)
1171            .into_iter()
1172            .cloned()
1173            .collect()
1174    }
1175
1176    /// Process an incoming relay envelope
1177    ///
1178    /// Handles deduplication, TTL checking, and determines if the message
1179    /// should be processed and/or relayed.
1180    ///
1181    /// Returns:
1182    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
1183    /// - `Ok(None)` if message was a duplicate or TTL expired
1184    /// - `Err` if parsing failed
1185    pub fn process_relay_envelope(
1186        &self,
1187        data: &[u8],
1188        source_peer: NodeId,
1189        now_ms: u64,
1190    ) -> Option<RelayDecision> {
1191        // Parse envelope
1192        let envelope = RelayEnvelope::decode(data)?;
1193
1194        // Update indirect peer graph if origin differs from source
1195        // This means the message was relayed through source_peer from origin_node
1196        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1197            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1198                source_peer,
1199                envelope.origin_node,
1200                envelope.hop_count,
1201                now_ms,
1202            );
1203
1204            if is_new {
1205                log::debug!(
1206                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1207                    envelope.origin_node.as_u32(),
1208                    source_peer.as_u32(),
1209                    envelope.hop_count
1210                );
1211            }
1212        }
1213
1214        // Check deduplication
1215        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1216            // Duplicate message
1217            let stats = self
1218                .seen_cache
1219                .lock()
1220                .unwrap()
1221                .get_stats(&envelope.message_id);
1222            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1223
1224            self.notify(PeatEvent::DuplicateMessageDropped {
1225                origin_node: envelope.origin_node,
1226                seen_count,
1227            });
1228
1229            log::debug!(
1230                "Dropped duplicate message {} from {:08X} (seen {} times)",
1231                envelope.message_id,
1232                envelope.origin_node.as_u32(),
1233                seen_count
1234            );
1235            return None;
1236        }
1237
1238        // Check TTL
1239        if !envelope.can_relay() {
1240            self.notify(PeatEvent::MessageTtlExpired {
1241                origin_node: envelope.origin_node,
1242                hop_count: envelope.hop_count,
1243            });
1244
1245            log::debug!(
1246                "Message {} from {:08X} TTL expired at hop {}",
1247                envelope.message_id,
1248                envelope.origin_node.as_u32(),
1249                envelope.hop_count
1250            );
1251
1252            // Still process locally even if TTL expired
1253            return Some(RelayDecision {
1254                payload: envelope.payload,
1255                origin_node: envelope.origin_node,
1256                hop_count: envelope.hop_count,
1257                should_relay: false,
1258                relay_envelope: None,
1259            });
1260        }
1261
1262        // Determine if we should relay
1263        let should_relay = self.config.enable_relay;
1264        let relay_envelope = if should_relay {
1265            envelope.relay() // Increments hop count
1266        } else {
1267            None
1268        };
1269
1270        Some(RelayDecision {
1271            payload: envelope.payload,
1272            origin_node: envelope.origin_node,
1273            hop_count: envelope.hop_count,
1274            should_relay,
1275            relay_envelope,
1276        })
1277    }
1278
1279    /// Build a document wrapped in a relay envelope
1280    ///
1281    /// Convenience method that builds the document, encrypts it (if enabled),
1282    /// and wraps it in a relay envelope for multi-hop transmission.
1283    pub fn build_relay_document(&self) -> Vec<u8> {
1284        let doc = self.build_document(); // Already encrypted if encryption enabled
1285        self.wrap_for_relay(doc)
1286    }
1287
1288    // ==================== Delta Sync ====================
1289
1290    /// Register a peer for delta sync tracking
1291    ///
1292    /// Call this when a peer connects to start tracking what data has been
1293    /// sent to them. This enables future delta sync (sending only changes).
1294    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1295        let mut encoder = self.delta_encoder.lock().unwrap();
1296        encoder.add_peer(peer_id);
1297        log::debug!(
1298            "Registered peer {:08X} for delta sync tracking",
1299            peer_id.as_u32()
1300        );
1301    }
1302
1303    /// Unregister a peer from delta sync tracking
1304    ///
1305    /// Call this when a peer disconnects to clean up tracking state.
1306    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1307        let mut encoder = self.delta_encoder.lock().unwrap();
1308        encoder.remove_peer(peer_id);
1309        log::debug!(
1310            "Unregistered peer {:08X} from delta sync tracking",
1311            peer_id.as_u32()
1312        );
1313    }
1314
1315    /// Reset delta sync state for a peer
1316    ///
1317    /// Call this when a peer reconnects to force a full sync on next
1318    /// communication. This clears the record of what was previously sent.
1319    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1320        let mut encoder = self.delta_encoder.lock().unwrap();
1321        encoder.reset_peer(peer_id);
1322        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1323    }
1324
1325    /// Record bytes sent to a peer (for delta statistics)
1326    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1327        let mut encoder = self.delta_encoder.lock().unwrap();
1328        encoder.record_sent(peer_id, bytes);
1329    }
1330
1331    /// Record bytes received from a peer (for delta statistics)
1332    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1333        let mut encoder = self.delta_encoder.lock().unwrap();
1334        encoder.record_received(peer_id, bytes, timestamp);
1335    }
1336
1337    /// Get delta sync statistics
1338    ///
1339    /// Returns aggregate statistics about delta sync across all peers,
1340    /// including bytes sent/received and sync counts.
1341    pub fn delta_stats(&self) -> DeltaStats {
1342        self.delta_encoder.lock().unwrap().stats()
1343    }
1344
1345    /// Get delta sync statistics for a specific peer
1346    ///
1347    /// Returns the bytes sent/received and sync count for a single peer.
1348    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1349        let encoder = self.delta_encoder.lock().unwrap();
1350        encoder
1351            .get_peer_state(peer_id)
1352            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1353    }
1354
1355    /// Build a delta document for a specific peer
1356    ///
1357    /// This only includes operations that have changed since the last sync
1358    /// with this peer. Uses the delta encoder to track per-peer state.
1359    ///
1360    /// Returns the encoded delta document bytes, or None if there's nothing
1361    /// new to send to this peer.
1362    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1363        // Collect all current operations
1364        let mut all_operations: Vec<Operation> = Vec::new();
1365
1366        // Add counter operations (one per node that has contributed)
1367        // Use the count value as the "timestamp" for tracking - only send if count increased
1368        for (node_id_u32, count) in self.document_sync.counter_entries() {
1369            all_operations.push(Operation::IncrementCounter {
1370                counter_id: 0, // Default mesh counter
1371                node_id: NodeId::new(node_id_u32),
1372                amount: count,
1373                timestamp: count, // Use count as timestamp for delta tracking
1374            });
1375        }
1376
1377        // Add peripheral update
1378        // Use event timestamp if available, otherwise use 1 for initial send
1379        let peripheral = self.document_sync.peripheral_snapshot();
1380        let peripheral_timestamp = peripheral
1381            .last_event
1382            .as_ref()
1383            .map(|e| e.timestamp)
1384            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1385        all_operations.push(Operation::UpdatePeripheral {
1386            peripheral,
1387            timestamp: peripheral_timestamp,
1388        });
1389
1390        // Add emergency operations if active
1391        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1392            let source_node = NodeId::new(emergency.source_node());
1393            let timestamp = emergency.timestamp();
1394
1395            // Add SetEmergency operation
1396            all_operations.push(Operation::SetEmergency {
1397                source_node,
1398                timestamp,
1399                known_peers: emergency.all_nodes(),
1400            });
1401
1402            // Add AckEmergency for each node that has acked
1403            for acked_node in emergency.acked_nodes() {
1404                all_operations.push(Operation::AckEmergency {
1405                    node_id: NodeId::new(acked_node),
1406                    emergency_timestamp: timestamp,
1407                });
1408            }
1409        }
1410
1411        // Add app document operations
1412        for app_op in self.app_document_delta_ops() {
1413            all_operations.push(Operation::App(app_op));
1414        }
1415
1416        // Filter operations for this peer (only send what's new)
1417        let filtered_operations: Vec<Operation> = {
1418            let encoder = self.delta_encoder.lock().unwrap();
1419            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1420                all_operations
1421                    .into_iter()
1422                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1423                    .collect()
1424            } else {
1425                // Unknown peer, send all operations
1426                all_operations
1427            }
1428        };
1429
1430        // If nothing new to send, return None
1431        if filtered_operations.is_empty() {
1432            return None;
1433        }
1434
1435        // Mark operations as sent
1436        {
1437            let mut encoder = self.delta_encoder.lock().unwrap();
1438            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1439                for op in &filtered_operations {
1440                    peer_state.mark_sent(&op.key(), op.timestamp());
1441                }
1442            }
1443        }
1444
1445        // Build the delta document
1446        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1447        for op in filtered_operations {
1448            delta.add_operation(op);
1449        }
1450
1451        // Encode and optionally encrypt
1452        let encoded = delta.encode();
1453        let result = self.encrypt_document(&encoded);
1454
1455        // Record stats
1456        {
1457            let mut encoder = self.delta_encoder.lock().unwrap();
1458            encoder.record_sent(peer_id, result.len());
1459        }
1460
1461        Some(result)
1462    }
1463
1464    /// Build a full delta document (for broadcast or new peers)
1465    ///
1466    /// Unlike `build_delta_document_for_peer`, this includes all state
1467    /// regardless of what has been sent before. Use this for broadcasts.
1468    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1469        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1470
1471        // Add all counter operations
1472        for (node_id_u32, count) in self.document_sync.counter_entries() {
1473            delta.add_operation(Operation::IncrementCounter {
1474                counter_id: 0,
1475                node_id: NodeId::new(node_id_u32),
1476                amount: count,
1477                timestamp: now_ms,
1478            });
1479        }
1480
1481        // Add peripheral
1482        let peripheral = self.document_sync.peripheral_snapshot();
1483        let peripheral_timestamp = peripheral
1484            .last_event
1485            .as_ref()
1486            .map(|e| e.timestamp)
1487            .unwrap_or(now_ms);
1488        delta.add_operation(Operation::UpdatePeripheral {
1489            peripheral,
1490            timestamp: peripheral_timestamp,
1491        });
1492
1493        // Add emergency if active
1494        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1495            let source_node = NodeId::new(emergency.source_node());
1496            let timestamp = emergency.timestamp();
1497
1498            delta.add_operation(Operation::SetEmergency {
1499                source_node,
1500                timestamp,
1501                known_peers: emergency.all_nodes(),
1502            });
1503
1504            for acked_node in emergency.acked_nodes() {
1505                delta.add_operation(Operation::AckEmergency {
1506                    node_id: NodeId::new(acked_node),
1507                    emergency_timestamp: timestamp,
1508                });
1509            }
1510        }
1511
1512        // Add app document operations
1513        for app_op in self.app_document_delta_ops() {
1514            delta.add_operation(Operation::App(app_op));
1515        }
1516
1517        let encoded = delta.encode();
1518        self.encrypt_document(&encoded)
1519    }
1520
1521    /// Internal: Process a received delta document
1522    ///
1523    /// Applies operations from a delta document to local state.
1524    fn process_delta_document_internal(
1525        &self,
1526        source_node: NodeId,
1527        data: &[u8],
1528        now_ms: u64,
1529        relay_data: Option<Vec<u8>>,
1530        origin_node: Option<NodeId>,
1531        hop_count: u8,
1532    ) -> Option<DataReceivedResult> {
1533        // Decode the delta document
1534        let delta = DeltaDocument::decode(data)?;
1535
1536        // Don't process our own documents
1537        if delta.origin_node == self.config.node_id {
1538            return None;
1539        }
1540
1541        // Apply operations to local state
1542        let mut counter_changed = false;
1543        let mut emergency_changed = false;
1544        let mut is_emergency = false;
1545        let mut is_ack = false;
1546        let mut event_timestamp = 0u64;
1547        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1548
1549        log::info!(
1550            "Delta document from {:08X}: {} operations, data_len={}",
1551            delta.origin_node.as_u32(),
1552            delta.operations.len(),
1553            data.len()
1554        );
1555        for op in &delta.operations {
1556            log::info!("  Operation: {}", op.key());
1557            match op {
1558                Operation::IncrementCounter {
1559                    node_id, amount, ..
1560                } => {
1561                    // Merge counter value (take max)
1562                    let current = self.document_sync.counter_entries();
1563                    let current_value = current
1564                        .iter()
1565                        .find(|(id, _)| *id == node_id.as_u32())
1566                        .map(|(_, v)| *v)
1567                        .unwrap_or(0);
1568
1569                    if *amount > current_value {
1570                        // Need to merge - this is handled by the counter merge logic
1571                        // For now, we record that counter changed
1572                        counter_changed = true;
1573                    }
1574                }
1575                Operation::UpdatePeripheral {
1576                    peripheral,
1577                    timestamp,
1578                } => {
1579                    // Store peer peripheral for callsign lookup
1580                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1581                        peripherals.insert(delta.origin_node, peripheral.clone());
1582                    }
1583                    // Track the peripheral for the result
1584                    peer_peripheral = Some(peripheral.clone());
1585                    // Track the timestamp for the result
1586                    if *timestamp > event_timestamp {
1587                        event_timestamp = *timestamp;
1588                    }
1589                }
1590                Operation::SetEmergency { timestamp, .. } => {
1591                    is_emergency = true;
1592                    emergency_changed = true;
1593                    event_timestamp = *timestamp;
1594                }
1595                Operation::AckEmergency {
1596                    emergency_timestamp,
1597                    ..
1598                } => {
1599                    is_ack = true;
1600                    emergency_changed = true;
1601                    if *emergency_timestamp > event_timestamp {
1602                        event_timestamp = *emergency_timestamp;
1603                    }
1604                }
1605                Operation::ClearEmergency {
1606                    emergency_timestamp,
1607                } => {
1608                    emergency_changed = true;
1609                    if *emergency_timestamp > event_timestamp {
1610                        event_timestamp = *emergency_timestamp;
1611                    }
1612                }
1613                Operation::App(app_op) => {
1614                    // Handle app-layer document operation
1615                    //
1616                    // The timestamp field may contain version info in the upper bits
1617                    // (used by delta encoder for change detection). Extract the
1618                    // original document timestamp from the lower 48 bits.
1619                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1620
1621                    log::info!(
1622                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1623                        app_op.type_id,
1624                        app_op.op_code,
1625                        app_op.source_node,
1626                        doc_timestamp,
1627                        app_op.payload.len()
1628                    );
1629
1630                    // Try to find/create document and apply the delta operation
1631                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1632                    let changed = {
1633                        let mut docs = self.app_documents.write().unwrap();
1634
1635                        if let Some(existing) = docs.get_mut(&doc_key) {
1636                            // Apply delta to existing document (merge via CRDT)
1637                            self.document_registry.apply_delta_op(
1638                                app_op.type_id,
1639                                existing.as_mut(),
1640                                app_op,
1641                            )
1642                        } else {
1643                            // Try to decode from payload (for FULL_STATE operations)
1644                            if let Some(decoded) = self
1645                                .document_registry
1646                                .decode(app_op.type_id, &app_op.payload)
1647                            {
1648                                docs.insert(doc_key, decoded);
1649                                true
1650                            } else {
1651                                // For delta ops on unknown documents, we can't apply
1652                                // The full state will be sent later
1653                                log::debug!(
1654                                    "Received delta for unknown doc {:?}, waiting for full state",
1655                                    doc_key
1656                                );
1657                                false
1658                            }
1659                        }
1660                    };
1661
1662                    // Emit observer event with the real document timestamp
1663                    self.observers.notify(PeatEvent::app_document_received(
1664                        app_op.type_id,
1665                        NodeId::new(app_op.source_node),
1666                        doc_timestamp,
1667                        changed,
1668                    ));
1669                }
1670            }
1671        }
1672
1673        // Record sync
1674        self.peer_manager.record_sync(source_node, now_ms);
1675
1676        // Record delta received
1677        {
1678            let mut encoder = self.delta_encoder.lock().unwrap();
1679            encoder.record_received(&source_node, data.len(), now_ms);
1680        }
1681
1682        // Generate events based on what was received
1683        if is_emergency {
1684            self.notify(PeatEvent::EmergencyReceived {
1685                from_node: delta.origin_node,
1686            });
1687        } else if is_ack {
1688            self.notify(PeatEvent::AckReceived {
1689                from_node: delta.origin_node,
1690            });
1691        }
1692
1693        if counter_changed {
1694            let total_count = self.document_sync.total_count();
1695            self.notify(PeatEvent::DocumentSynced {
1696                from_node: delta.origin_node,
1697                total_count,
1698            });
1699        }
1700
1701        // Emit relay event if we're relaying
1702        if relay_data.is_some() {
1703            let relay_targets = self.get_relay_targets(Some(source_node));
1704            self.notify(PeatEvent::MessageRelayed {
1705                origin_node: origin_node.unwrap_or(delta.origin_node),
1706                relay_count: relay_targets.len(),
1707                hop_count,
1708            });
1709        }
1710
1711        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1712            DataReceivedResult::peripheral_fields(&peer_peripheral);
1713
1714        Some(DataReceivedResult {
1715            source_node: delta.origin_node,
1716            is_emergency,
1717            is_ack,
1718            counter_changed,
1719            emergency_changed,
1720            total_count: self.document_sync.total_count(),
1721            event_timestamp,
1722            relay_data,
1723            origin_node,
1724            hop_count,
1725            callsign,
1726            battery_percent,
1727            heart_rate,
1728            event_type,
1729            latitude,
1730            longitude,
1731            altitude,
1732        })
1733    }
1734
1735    // ==================== Per-Peer E2EE ====================
1736
1737    /// Enable per-peer E2EE capability
1738    ///
1739    /// Creates a new identity key for this node. This allows establishing
1740    /// encrypted sessions with specific peers where only the sender and
1741    /// recipient can read messages (other mesh members cannot).
1742    pub fn enable_peer_e2ee(&self) {
1743        let mut sessions = self.peer_sessions.lock().unwrap();
1744        if sessions.is_none() {
1745            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1746            log::info!(
1747                "Per-peer E2EE enabled for node {:08X}",
1748                self.config.node_id.as_u32()
1749            );
1750        }
1751    }
1752
1753    /// Disable per-peer E2EE capability
1754    ///
1755    /// Clears all peer sessions and disables E2EE.
1756    pub fn disable_peer_e2ee(&self) {
1757        let mut sessions = self.peer_sessions.lock().unwrap();
1758        *sessions = None;
1759        log::info!("Per-peer E2EE disabled");
1760    }
1761
1762    /// Check if per-peer E2EE is enabled
1763    pub fn is_peer_e2ee_enabled(&self) -> bool {
1764        self.peer_sessions.lock().unwrap().is_some()
1765    }
1766
1767    /// Get our E2EE public key (for sharing with peers)
1768    ///
1769    /// Returns None if per-peer E2EE is not enabled.
1770    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1771        self.peer_sessions
1772            .lock()
1773            .unwrap()
1774            .as_ref()
1775            .map(|s| s.our_public_key())
1776    }
1777
1778    /// Initiate E2EE session with a specific peer
1779    ///
1780    /// Returns the key exchange message bytes to send to the peer.
1781    /// The message should be broadcast/sent to the peer.
1782    /// Returns None if per-peer E2EE is not enabled.
1783    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1784        let mut sessions = self.peer_sessions.lock().unwrap();
1785        let session_mgr = sessions.as_mut()?;
1786
1787        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1788        let mut buf = Vec::with_capacity(2 + 37);
1789        buf.push(KEY_EXCHANGE_MARKER);
1790        buf.push(0x00); // reserved
1791        buf.extend_from_slice(&key_exchange.encode());
1792
1793        log::info!(
1794            "Initiated E2EE session with peer {:08X}",
1795            peer_node_id.as_u32()
1796        );
1797        Some(buf)
1798    }
1799
1800    /// Check if we have an established E2EE session with a peer
1801    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1802        self.peer_sessions
1803            .lock()
1804            .unwrap()
1805            .as_ref()
1806            .is_some_and(|s| s.has_session(peer_node_id))
1807    }
1808
1809    /// Get E2EE session state with a peer
1810    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1811        self.peer_sessions
1812            .lock()
1813            .unwrap()
1814            .as_ref()
1815            .and_then(|s| s.session_state(peer_node_id))
1816    }
1817
1818    /// Send an E2EE encrypted message to a specific peer
1819    ///
1820    /// Returns the encrypted message bytes to send, or None if no session exists.
1821    /// The message should be sent directly to the peer (not broadcast).
1822    pub fn send_peer_e2ee(
1823        &self,
1824        peer_node_id: NodeId,
1825        plaintext: &[u8],
1826        now_ms: u64,
1827    ) -> Option<Vec<u8>> {
1828        let mut sessions = self.peer_sessions.lock().unwrap();
1829        let session_mgr = sessions.as_mut()?;
1830
1831        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1832            Ok(encrypted) => {
1833                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1834                buf.push(PEER_E2EE_MARKER);
1835                buf.push(0x00); // reserved
1836                buf.extend_from_slice(&encrypted.encode());
1837                Some(buf)
1838            }
1839            Err(e) => {
1840                log::warn!(
1841                    "Failed to encrypt for peer {:08X}: {:?}",
1842                    peer_node_id.as_u32(),
1843                    e
1844                );
1845                None
1846            }
1847        }
1848    }
1849
1850    /// Close E2EE session with a peer
1851    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1852        let mut sessions = self.peer_sessions.lock().unwrap();
1853        if let Some(session_mgr) = sessions.as_mut() {
1854            session_mgr.close_session(peer_node_id);
1855            self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1856            log::info!(
1857                "Closed E2EE session with peer {:08X}",
1858                peer_node_id.as_u32()
1859            );
1860        }
1861    }
1862
1863    /// Get count of active E2EE sessions
1864    pub fn peer_e2ee_session_count(&self) -> usize {
1865        self.peer_sessions
1866            .lock()
1867            .unwrap()
1868            .as_ref()
1869            .map(|s| s.session_count())
1870            .unwrap_or(0)
1871    }
1872
1873    /// Get count of established E2EE sessions
1874    pub fn peer_e2ee_established_count(&self) -> usize {
1875        self.peer_sessions
1876            .lock()
1877            .unwrap()
1878            .as_ref()
1879            .map(|s| s.established_count())
1880            .unwrap_or(0)
1881    }
1882
1883    /// Handle incoming key exchange message
1884    ///
1885    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1886    /// Returns the response key exchange bytes to send back, or None if invalid.
1887    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1888        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1889            return None;
1890        }
1891
1892        let payload = &data[2..];
1893        let msg = KeyExchangeMessage::decode(payload)?;
1894
1895        let mut sessions = self.peer_sessions.lock().unwrap();
1896        let session_mgr = sessions.as_mut()?;
1897
1898        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1899
1900        if established {
1901            self.notify(PeatEvent::PeerE2eeEstablished {
1902                peer_node_id: msg.sender_node_id,
1903            });
1904            log::info!(
1905                "E2EE session established with peer {:08X}",
1906                msg.sender_node_id.as_u32()
1907            );
1908        }
1909
1910        // Return response key exchange
1911        let mut buf = Vec::with_capacity(2 + 37);
1912        buf.push(KEY_EXCHANGE_MARKER);
1913        buf.push(0x00);
1914        buf.extend_from_slice(&response.encode());
1915        Some(buf)
1916    }
1917
1918    /// Handle incoming E2EE encrypted message
1919    ///
1920    /// Called internally when we receive a PEER_E2EE_MARKER message.
1921    /// Decrypts and notifies observers of the received message.
1922    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1923        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1924            return None;
1925        }
1926
1927        let payload = &data[2..];
1928        let msg = PeerEncryptedMessage::decode(payload)?;
1929
1930        let mut sessions = self.peer_sessions.lock().unwrap();
1931        let session_mgr = sessions.as_mut()?;
1932
1933        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1934            Ok(plaintext) => {
1935                // Notify observers of the decrypted message
1936                self.notify(PeatEvent::PeerE2eeMessageReceived {
1937                    from_node: msg.sender_node_id,
1938                    data: plaintext.clone(),
1939                });
1940                Some(plaintext)
1941            }
1942            Err(e) => {
1943                log::warn!(
1944                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1945                    msg.sender_node_id.as_u32(),
1946                    e
1947                );
1948                None
1949            }
1950        }
1951    }
1952
1953    // ==================== Configuration ====================
1954
1955    /// Get our node ID
1956    pub fn node_id(&self) -> NodeId {
1957        self.config.node_id
1958    }
1959
1960    /// Get our callsign
1961    pub fn callsign(&self) -> &str {
1962        &self.config.callsign
1963    }
1964
1965    /// Get the mesh ID
1966    pub fn mesh_id(&self) -> &str {
1967        &self.config.mesh_id
1968    }
1969
1970    /// Get the device name for BLE advertising
1971    pub fn device_name(&self) -> String {
1972        format!(
1973            "PEAT_{}-{:08X}",
1974            self.config.mesh_id,
1975            self.config.node_id.as_u32()
1976        )
1977    }
1978
1979    /// Get a peer's callsign by node ID
1980    ///
1981    /// Returns the callsign from the peer's most recently received peripheral data,
1982    /// or None if no peripheral data has been received from this peer.
1983    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1984        self.peer_peripherals.read().ok().and_then(|peripherals| {
1985            peripherals
1986                .get(&node_id)
1987                .map(|p| p.callsign_str().to_string())
1988        })
1989    }
1990
1991    /// Get a peer's full peripheral data by node ID
1992    ///
1993    /// Returns a clone of the peripheral data from the peer's most recently received
1994    /// document, or None if no peripheral data has been received from this peer.
1995    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1996        self.peer_peripherals
1997            .read()
1998            .ok()
1999            .and_then(|peripherals| peripherals.get(&node_id).cloned())
2000    }
2001
2002    // ==================== Document Registry ====================
2003
2004    /// Get a reference to the document registry.
2005    ///
2006    /// Use this to register custom document types for mesh synchronization.
2007    ///
2008    /// # Example
2009    ///
2010    /// ```ignore
2011    /// use peat_btle::registry::DocumentType;
2012    ///
2013    /// // Register a custom document type
2014    /// mesh.document_registry().register::<MyCustomDocument>();
2015    /// ```
2016    pub fn document_registry(&self) -> &DocumentRegistry {
2017        &self.document_registry
2018    }
2019
2020    /// Store an app-layer document.
2021    ///
2022    /// If a document with the same identity (type_id, source_node, timestamp)
2023    /// already exists, it will be merged using CRDT semantics.
2024    ///
2025    /// Returns true if the document was newly added or changed via merge.
2026    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2027        let type_id = T::TYPE_ID;
2028        let (source_node, timestamp) = doc.identity();
2029        let key = (type_id, source_node, timestamp);
2030
2031        let mut docs = self.app_documents.write().unwrap();
2032
2033        if let Some(existing) = docs.get_mut(&key) {
2034            // Merge with existing document
2035            self.document_registry
2036                .merge(type_id, existing.as_mut(), &doc)
2037        } else {
2038            // Insert new document
2039            docs.insert(key, Box::new(doc));
2040            true
2041        }
2042    }
2043
2044    /// Store a type-erased app-layer document.
2045    ///
2046    /// Used when receiving documents from the network where the type is
2047    /// determined at runtime.
2048    ///
2049    /// Returns true if the document was newly added or changed via merge.
2050    pub fn store_app_document_boxed(
2051        &self,
2052        type_id: u8,
2053        source_node: u32,
2054        timestamp: u64,
2055        doc: Box<dyn core::any::Any + Send + Sync>,
2056    ) -> bool {
2057        let key = (type_id, source_node, timestamp);
2058
2059        let mut docs = self.app_documents.write().unwrap();
2060
2061        if let Some(existing) = docs.get_mut(&key) {
2062            // Merge with existing document
2063            self.document_registry
2064                .merge(type_id, existing.as_mut(), doc.as_ref())
2065        } else {
2066            // Insert new document
2067            docs.insert(key, doc);
2068            true
2069        }
2070    }
2071
2072    /// Get a stored app-layer document by identity.
2073    ///
2074    /// Returns None if not found or if downcast to T fails.
2075    pub fn get_app_document<T: crate::registry::DocumentType>(
2076        &self,
2077        source_node: u32,
2078        timestamp: u64,
2079    ) -> Option<T> {
2080        let key = (T::TYPE_ID, source_node, timestamp);
2081
2082        let docs = self.app_documents.read().unwrap();
2083        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2084    }
2085
2086    /// Get all stored app-layer documents of a specific type.
2087    ///
2088    /// Returns a vector of all documents of type T currently stored.
2089    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2090        let docs = self.app_documents.read().unwrap();
2091        docs.iter()
2092            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2093            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2094            .collect()
2095    }
2096
2097    /// Get delta operations for all stored app documents.
2098    ///
2099    /// Used during sync to include app documents in the delta stream.
2100    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2101        let docs = self.app_documents.read().unwrap();
2102        let mut ops = Vec::new();
2103
2104        for ((type_id, _source, _ts), doc) in docs.iter() {
2105            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2106                ops.push(op);
2107            }
2108        }
2109
2110        ops
2111    }
2112
2113    /// Get all document keys for a given type.
2114    ///
2115    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
2116    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2117        let docs = self.app_documents.read().unwrap();
2118        docs.keys()
2119            .filter(|(tid, _, _)| *tid == type_id)
2120            .map(|(_, source, ts)| (*source, *ts))
2121            .collect()
2122    }
2123
2124    /// Get the number of stored app documents.
2125    pub fn app_document_count(&self) -> usize {
2126        self.app_documents.read().unwrap().len()
2127    }
2128
2129    // ==================== Observer Management ====================
2130
2131    /// Add an observer for mesh events
2132    pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2133        self.observers.add(observer);
2134    }
2135
2136    /// Remove an observer
2137    pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2138        self.observers.remove(observer);
2139    }
2140
2141    // ==================== User Actions ====================
2142
2143    /// Send an emergency alert
2144    ///
2145    /// Returns the document bytes to broadcast to all peers.
2146    /// If encryption is enabled, the document is encrypted.
2147    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2148        let data = self.document_sync.send_emergency(timestamp);
2149        self.notify(PeatEvent::MeshStateChanged {
2150            peer_count: self.peer_manager.peer_count(),
2151            connected_count: self.peer_manager.connected_count(),
2152        });
2153        self.encrypt_document(&data)
2154    }
2155
2156    /// Send an ACK response
2157    ///
2158    /// Returns the document bytes to broadcast to all peers.
2159    /// If encryption is enabled, the document is encrypted.
2160    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2161        let data = self.document_sync.send_ack(timestamp);
2162        self.notify(PeatEvent::MeshStateChanged {
2163            peer_count: self.peer_manager.peer_count(),
2164            connected_count: self.peer_manager.connected_count(),
2165        });
2166        self.encrypt_document(&data)
2167    }
2168
2169    /// Broadcast arbitrary bytes over the mesh.
2170    ///
2171    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
2172    /// and returns bytes ready to send to all connected peers.
2173    ///
2174    /// This is useful for sending extension data like CannedMessages from peat-lite.
2175    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2176        self.encrypt_document(payload)
2177    }
2178
2179    /// Clear the current event (emergency or ack)
2180    pub fn clear_event(&self) {
2181        self.document_sync.clear_event();
2182    }
2183
2184    /// Check if emergency is active
2185    pub fn is_emergency_active(&self) -> bool {
2186        self.document_sync.is_emergency_active()
2187    }
2188
2189    /// Check if ACK is active
2190    pub fn is_ack_active(&self) -> bool {
2191        self.document_sync.is_ack_active()
2192    }
2193
2194    /// Get current event type
2195    pub fn current_event(&self) -> Option<EventType> {
2196        self.document_sync.current_event()
2197    }
2198
2199    // ==================== Emergency Management (Document-Based) ====================
2200
2201    /// Start a new emergency event with ACK tracking
2202    ///
2203    /// Creates an emergency event that tracks ACKs from all known peers.
2204    /// Pass the list of known peer node IDs to track.
2205    /// Returns the document bytes to broadcast.
2206    /// If encryption is enabled, the document is encrypted.
2207    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2208        let data = self.document_sync.start_emergency(timestamp, known_peers);
2209        self.notify(PeatEvent::MeshStateChanged {
2210            peer_count: self.peer_manager.peer_count(),
2211            connected_count: self.peer_manager.connected_count(),
2212        });
2213        self.encrypt_document(&data)
2214    }
2215
2216    /// Start a new emergency using all currently known peers
2217    ///
2218    /// Convenience method that automatically includes all discovered peers.
2219    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2220        let peers: Vec<u32> = self
2221            .peer_manager
2222            .get_peers()
2223            .iter()
2224            .map(|p| p.node_id.as_u32())
2225            .collect();
2226        self.start_emergency(timestamp, &peers)
2227    }
2228
2229    /// Record our ACK for the current emergency
2230    ///
2231    /// Returns the document bytes to broadcast, or None if no emergency is active.
2232    /// If encryption is enabled, the document is encrypted.
2233    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2234        let result = self.document_sync.ack_emergency(timestamp);
2235        if result.is_some() {
2236            self.notify(PeatEvent::MeshStateChanged {
2237                peer_count: self.peer_manager.peer_count(),
2238                connected_count: self.peer_manager.connected_count(),
2239            });
2240        }
2241        result.map(|data| self.encrypt_document(&data))
2242    }
2243
2244    /// Clear the current emergency event
2245    pub fn clear_emergency(&self) {
2246        self.document_sync.clear_emergency();
2247    }
2248
2249    /// Check if there's an active emergency
2250    pub fn has_active_emergency(&self) -> bool {
2251        self.document_sync.has_active_emergency()
2252    }
2253
2254    /// Get emergency status info
2255    ///
2256    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
2257    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2258        self.document_sync.get_emergency_status()
2259    }
2260
2261    /// Check if a specific peer has ACKed the current emergency
2262    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2263        self.document_sync.has_peer_acked(peer_id)
2264    }
2265
2266    /// Check if all peers have ACKed the current emergency
2267    pub fn all_peers_acked(&self) -> bool {
2268        self.document_sync.all_peers_acked()
2269    }
2270
2271    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
2272
2273    /// Send a chat message
2274    ///
2275    /// Adds the message to the local CRDT and returns the document bytes
2276    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
2277    ///
2278    /// Returns the encrypted document bytes if the message was new,
2279    /// or None if it was a duplicate.
2280    #[cfg(feature = "legacy-chat")]
2281    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2282        if self.document_sync.add_chat_message(sender, text, timestamp) {
2283            Some(self.encrypt_document(&self.build_document()))
2284        } else {
2285            None
2286        }
2287    }
2288
2289    /// Send a chat reply
2290    ///
2291    /// Adds the reply to the local CRDT with reply-to information and returns
2292    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
2293    ///
2294    /// Returns the encrypted document bytes if the message was new,
2295    /// or None if it was a duplicate.
2296    #[cfg(feature = "legacy-chat")]
2297    pub fn send_chat_reply(
2298        &self,
2299        sender: &str,
2300        text: &str,
2301        reply_to_node: u32,
2302        reply_to_timestamp: u64,
2303        timestamp: u64,
2304    ) -> Option<Vec<u8>> {
2305        if self.document_sync.add_chat_reply(
2306            sender,
2307            text,
2308            reply_to_node,
2309            reply_to_timestamp,
2310            timestamp,
2311        ) {
2312            Some(self.encrypt_document(&self.build_document()))
2313        } else {
2314            None
2315        }
2316    }
2317
2318    /// Get the number of chat messages in the local CRDT
2319    #[cfg(feature = "legacy-chat")]
2320    pub fn chat_count(&self) -> usize {
2321        self.document_sync.chat_count()
2322    }
2323
2324    /// Get chat messages newer than a timestamp
2325    ///
2326    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2327    #[cfg(feature = "legacy-chat")]
2328    pub fn chat_messages_since(
2329        &self,
2330        since_timestamp: u64,
2331    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2332        self.document_sync.chat_messages_since(since_timestamp)
2333    }
2334
2335    /// Get all chat messages
2336    ///
2337    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2338    #[cfg(feature = "legacy-chat")]
2339    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2340        self.document_sync.all_chat_messages()
2341    }
2342
2343    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2344
2345    /// Called when a BLE device is discovered
2346    ///
2347    /// Returns `Some(PeatPeer)` if this is a new Peat peer on our mesh.
2348    pub fn on_ble_discovered(
2349        &self,
2350        identifier: &str,
2351        name: Option<&str>,
2352        rssi: i8,
2353        mesh_id: Option<&str>,
2354        now_ms: u64,
2355    ) -> Option<PeatPeer> {
2356        let (node_id, is_new) = self
2357            .peer_manager
2358            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2359
2360        let peer = self.peer_manager.get_peer(node_id)?;
2361
2362        // Update connection graph
2363        {
2364            let mut graph = self.connection_graph.lock().unwrap();
2365            graph.on_discovered(
2366                node_id,
2367                identifier.to_string(),
2368                name.map(|s| s.to_string()),
2369                mesh_id.map(|s| s.to_string()),
2370                rssi,
2371                now_ms,
2372            );
2373        }
2374
2375        if is_new {
2376            self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2377            self.notify_mesh_state_changed();
2378        }
2379
2380        Some(peer)
2381    }
2382
2383    /// Called when a BLE connection is established (outgoing)
2384    ///
2385    /// Returns the NodeId if this identifier is known.
2386    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2387        let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2388            Some(id) => id,
2389            None => {
2390                log::warn!(
2391                    "on_ble_connected: identifier {:?} not in peer map — \
2392                     use on_incoming_connection() for peripheral connections",
2393                    identifier
2394                );
2395                return None;
2396            }
2397        };
2398
2399        // Update connection graph
2400        {
2401            let mut graph = self.connection_graph.lock().unwrap();
2402            graph.on_connected(node_id, now_ms);
2403        }
2404
2405        // Register peer for delta sync tracking
2406        self.register_peer_for_delta(&node_id);
2407
2408        self.notify(PeatEvent::PeerConnected { node_id });
2409        self.notify_mesh_state_changed();
2410        Some(node_id)
2411    }
2412
2413    /// Called when a BLE connection is lost
2414    pub fn on_ble_disconnected(
2415        &self,
2416        identifier: &str,
2417        reason: DisconnectReason,
2418    ) -> Option<NodeId> {
2419        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2420
2421        // Update connection graph (convert observer reason to platform reason)
2422        {
2423            let mut graph = self.connection_graph.lock().unwrap();
2424            let platform_reason = match observer_reason {
2425                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2426                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2427                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2428                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2429                DisconnectReason::ConnectionFailed => {
2430                    crate::platform::DisconnectReason::ConnectionFailed
2431                }
2432                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2433            };
2434            let now_ms = std::time::SystemTime::now()
2435                .duration_since(std::time::UNIX_EPOCH)
2436                .map(|d| d.as_millis() as u64)
2437                .unwrap_or(0);
2438            graph.on_disconnected(node_id, platform_reason, now_ms);
2439
2440            // Remove indirect peer paths that went through this peer
2441            // These paths may no longer be valid since the direct connection is lost
2442            graph.remove_via_peer(node_id);
2443        }
2444
2445        // Unregister peer from delta sync tracking
2446        self.unregister_peer_for_delta(&node_id);
2447
2448        self.notify(PeatEvent::PeerDisconnected {
2449            node_id,
2450            reason: observer_reason,
2451        });
2452        self.notify_mesh_state_changed();
2453        Some(node_id)
2454    }
2455
2456    /// Called when a BLE connection is lost, using NodeId directly
2457    ///
2458    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2459    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2460        if self
2461            .peer_manager
2462            .on_disconnected_by_node_id(node_id, reason)
2463        {
2464            // Update connection graph
2465            {
2466                let mut graph = self.connection_graph.lock().unwrap();
2467                let platform_reason = match reason {
2468                    DisconnectReason::LocalRequest => {
2469                        crate::platform::DisconnectReason::LocalRequest
2470                    }
2471                    DisconnectReason::RemoteRequest => {
2472                        crate::platform::DisconnectReason::RemoteRequest
2473                    }
2474                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2475                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2476                    DisconnectReason::ConnectionFailed => {
2477                        crate::platform::DisconnectReason::ConnectionFailed
2478                    }
2479                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2480                };
2481                let now_ms = std::time::SystemTime::now()
2482                    .duration_since(std::time::UNIX_EPOCH)
2483                    .map(|d| d.as_millis() as u64)
2484                    .unwrap_or(0);
2485                graph.on_disconnected(node_id, platform_reason, now_ms);
2486
2487                // Remove indirect peer paths that went through this peer
2488                graph.remove_via_peer(node_id);
2489            }
2490
2491            // Unregister peer from delta sync tracking
2492            self.unregister_peer_for_delta(&node_id);
2493
2494            self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2495            self.notify_mesh_state_changed();
2496        }
2497    }
2498
2499    /// Called when a remote device connects to us (incoming connection)
2500    ///
2501    /// Use this when we're acting as a peripheral and a central connects to us.
2502    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2503        let is_new = self
2504            .peer_manager
2505            .on_incoming_connection(identifier, node_id, now_ms);
2506
2507        // Update connection graph
2508        {
2509            let mut graph = self.connection_graph.lock().unwrap();
2510            if is_new {
2511                graph.on_discovered(
2512                    node_id,
2513                    identifier.to_string(),
2514                    None,
2515                    Some(self.config.mesh_id.clone()),
2516                    -50, // Default good RSSI for incoming connections
2517                    now_ms,
2518                );
2519            }
2520            graph.on_connected(node_id, now_ms);
2521        }
2522
2523        // Register peer for delta sync tracking
2524        self.register_peer_for_delta(&node_id);
2525
2526        if is_new {
2527            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2528                self.notify(PeatEvent::PeerDiscovered { peer });
2529            }
2530        }
2531
2532        self.notify(PeatEvent::PeerConnected { node_id });
2533        self.notify_mesh_state_changed();
2534
2535        is_new
2536    }
2537
2538    /// Called when data is received from a peer
2539    ///
2540    /// Parses the document, merges it, and generates appropriate events.
2541    /// If encryption is enabled, decrypts the document first.
2542    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2543    /// Returns the source NodeId and whether the document contained an event.
2544    pub fn on_ble_data_received(
2545        &self,
2546        identifier: &str,
2547        data: &[u8],
2548        now_ms: u64,
2549    ) -> Option<DataReceivedResult> {
2550        // Get node ID from identifier
2551        let node_id = self.peer_manager.get_node_id(identifier)?;
2552
2553        // Check for special message types first
2554        if data.len() >= 2 {
2555            match data[0] {
2556                KEY_EXCHANGE_MARKER => {
2557                    // Handle key exchange - returns response to send back
2558                    let _response = self.handle_key_exchange(data, now_ms);
2559                    // Return None as this isn't a document sync
2560                    return None;
2561                }
2562                PEER_E2EE_MARKER => {
2563                    // Handle encrypted peer message
2564                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2565                    // Return None as this isn't a document sync
2566                    return None;
2567                }
2568                RELAY_ENVELOPE_MARKER => {
2569                    // Handle relay envelope for multi-hop
2570                    return self
2571                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2572                }
2573                _ => {}
2574            }
2575        }
2576
2577        // Direct document (not relay envelope)
2578        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2579    }
2580
2581    /// Internal: Process document data with identifier as source hint
2582    #[allow(clippy::too_many_arguments)]
2583    fn process_document_data_with_identifier(
2584        &self,
2585        source_node: NodeId,
2586        identifier: &str,
2587        data: &[u8],
2588        now_ms: u64,
2589        relay_data: Option<Vec<u8>>,
2590        origin_node: Option<NodeId>,
2591        hop_count: u8,
2592    ) -> Option<DataReceivedResult> {
2593        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2594        let decrypted = self.decrypt_document(data, Some(identifier))?;
2595
2596        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
2597        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
2598        #[cfg(feature = "mesh-translator")]
2599        if self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2600            return None;
2601        }
2602
2603        // Check if this is a delta document (wire format v2)
2604        if DeltaDocument::is_delta_document(&decrypted) {
2605            return self.process_delta_document_internal(
2606                source_node,
2607                &decrypted,
2608                now_ms,
2609                relay_data,
2610                origin_node,
2611                hop_count,
2612            );
2613        }
2614
2615        // Merge the document (legacy wire format v1)
2616        let result = self.document_sync.merge_document(&decrypted)?;
2617
2618        // Store peer peripheral if present (for callsign lookup)
2619        if let Some(ref peripheral) = result.peer_peripheral {
2620            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2621                peripherals.insert(result.source_node, peripheral.clone());
2622            }
2623        }
2624
2625        // Record sync
2626        self.peer_manager.record_sync(source_node, now_ms);
2627
2628        // Generate events based on what was received
2629        if result.is_emergency() {
2630            self.notify(PeatEvent::EmergencyReceived {
2631                from_node: result.source_node,
2632            });
2633        } else if result.is_ack() {
2634            self.notify(PeatEvent::AckReceived {
2635                from_node: result.source_node,
2636            });
2637        }
2638
2639        if result.counter_changed {
2640            self.notify(PeatEvent::DocumentSynced {
2641                from_node: result.source_node,
2642                total_count: result.total_count,
2643            });
2644        }
2645
2646        // Emit relay event if we're relaying
2647        if relay_data.is_some() {
2648            let relay_targets = self.get_relay_targets(Some(source_node));
2649            self.notify(PeatEvent::MessageRelayed {
2650                origin_node: origin_node.unwrap_or(result.source_node),
2651                relay_count: relay_targets.len(),
2652                hop_count,
2653            });
2654        }
2655
2656        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2657            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2658
2659        Some(DataReceivedResult {
2660            source_node: result.source_node,
2661            is_emergency: result.is_emergency(),
2662            is_ack: result.is_ack(),
2663            counter_changed: result.counter_changed,
2664            emergency_changed: result.emergency_changed,
2665            total_count: result.total_count,
2666            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2667            relay_data,
2668            origin_node,
2669            hop_count,
2670            callsign,
2671            battery_percent,
2672            heart_rate,
2673            event_type,
2674            latitude,
2675            longitude,
2676            altitude,
2677        })
2678    }
2679
2680    /// Internal: Handle relay envelope with identifier as source hint
2681    fn handle_relay_envelope_with_identifier(
2682        &self,
2683        source_node: NodeId,
2684        identifier: &str,
2685        data: &[u8],
2686        now_ms: u64,
2687    ) -> Option<DataReceivedResult> {
2688        // Process the relay envelope
2689        let envelope = RelayEnvelope::decode(data)?;
2690
2691        // Check deduplication
2692        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2693            let stats = self
2694                .seen_cache
2695                .lock()
2696                .unwrap()
2697                .get_stats(&envelope.message_id);
2698            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2699
2700            self.notify(PeatEvent::DuplicateMessageDropped {
2701                origin_node: envelope.origin_node,
2702                seen_count,
2703            });
2704            return None;
2705        }
2706
2707        // Check TTL and get relay data
2708        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2709            envelope.relay().map(|e| e.encode())
2710        } else {
2711            if !envelope.can_relay() {
2712                self.notify(PeatEvent::MessageTtlExpired {
2713                    origin_node: envelope.origin_node,
2714                    hop_count: envelope.hop_count,
2715                });
2716            }
2717            None
2718        };
2719
2720        // Process the inner payload
2721        self.process_document_data_with_identifier(
2722            source_node,
2723            identifier,
2724            &envelope.payload,
2725            now_ms,
2726            relay_data,
2727            Some(envelope.origin_node),
2728            envelope.hop_count,
2729        )
2730    }
2731
2732    /// Called when data is received but we don't have the identifier mapped
2733    ///
2734    /// Use this when receiving data from a peripheral we discovered.
2735    /// If encryption is enabled, decrypts the document first.
2736    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2737    /// Handles relay envelopes for multi-hop mesh operation.
2738    pub fn on_ble_data_received_from_node(
2739        &self,
2740        node_id: NodeId,
2741        data: &[u8],
2742        now_ms: u64,
2743    ) -> Option<DataReceivedResult> {
2744        // Check for special message types first
2745        if data.len() >= 2 {
2746            match data[0] {
2747                KEY_EXCHANGE_MARKER => {
2748                    let _response = self.handle_key_exchange(data, now_ms);
2749                    return None;
2750                }
2751                PEER_E2EE_MARKER => {
2752                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2753                    return None;
2754                }
2755                RELAY_ENVELOPE_MARKER => {
2756                    // Handle relay envelope for multi-hop
2757                    return self.handle_relay_envelope(node_id, data, now_ms);
2758                }
2759                _ => {}
2760            }
2761        }
2762
2763        // Direct document (not relay envelope)
2764        self.process_document_data(node_id, data, now_ms, None, None, 0)
2765    }
2766
2767    /// Called when encrypted data is received from an unknown peer
2768    ///
2769    /// This handles the case where we receive an encrypted document from a BLE address
2770    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2771    /// The function decrypts first using the mesh key, then extracts the source_node
2772    /// from the decrypted document header and registers the peer.
2773    ///
2774    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2775    /// Returns `None` if decryption fails or the document is invalid.
2776    pub fn on_ble_data_received_anonymous(
2777        &self,
2778        identifier: &str,
2779        data: &[u8],
2780        now_ms: u64,
2781    ) -> Option<DataReceivedResult> {
2782        log::debug!(
2783            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2784            identifier,
2785            data.len(),
2786            data.first().copied().unwrap_or(0)
2787        );
2788
2789        // Try to decrypt (handles both encrypted and unencrypted documents)
2790        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2791            Some(d) => d,
2792            None => {
2793                log::warn!(
2794                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2795                    data.len(),
2796                    identifier
2797                );
2798                return None;
2799            }
2800        };
2801
2802        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6
2803        // translator frames through BleTranslator::decode_inbound, and
2804        // silent-drop reserved 0xB7..=0xBF frames before they reach
2805        // PeatDocument::decode (where a 0xB6+ payload could hit the
2806        // GCounter-pollution hazard for inputs whose bytes 8..11 fall
2807        // <= MAX_COUNTER_ENTRIES). Anonymous receive: source_node is
2808        // unknown until the legacy path extracts it; for translator
2809        // frames there's no PeatDocument header to extract from, so we
2810        // pass None and accept that `tracks` decoding will Err for lack
2811        // of `local_wire_id` (other collections succeed).
2812        #[cfg(feature = "mesh-translator")]
2813        if self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2814            return None;
2815        }
2816
2817        // Extract source_node from decrypted document header
2818        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2819        if decrypted.len() < 8 {
2820            log::warn!("Decrypted document too short to extract source_node");
2821            return None;
2822        }
2823
2824        let source_node_u32 =
2825            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2826        let source_node = NodeId::new(source_node_u32);
2827
2828        log::info!(
2829            "Anonymous document from {}: source_node={:08X}, len={}",
2830            identifier,
2831            source_node_u32,
2832            decrypted.len()
2833        );
2834
2835        // Register the peer with this identifier so future lookups work
2836        // This handles BLE address rotation
2837        self.peer_manager
2838            .register_identifier(identifier, source_node);
2839
2840        // Check if this is a delta document
2841        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2842        log::info!(
2843            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2844            is_delta,
2845            decrypted.first().copied().unwrap_or(0),
2846            decrypted.len()
2847        );
2848
2849        if is_delta {
2850            return self.process_delta_document_internal(
2851                source_node,
2852                &decrypted,
2853                now_ms,
2854                None,
2855                None,
2856                0,
2857            );
2858        }
2859
2860        // Handle app-layer message (0xAF marker from peat-lite CannedMessages)
2861        // These are handled by consumers who register DocumentType implementations
2862        // via the DocumentRegistry. Pass through as relay_data for platform layer.
2863        const APP_LAYER_MARKER: u8 = 0xAF;
2864        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2865            log::debug!(
2866                "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2867                source_node.as_u32(),
2868                decrypted.len()
2869            );
2870            return Some(DataReceivedResult {
2871                source_node,
2872                is_emergency: false,
2873                is_ack: false,
2874                counter_changed: false,
2875                emergency_changed: false,
2876                total_count: 0,
2877                event_timestamp: now_ms,
2878                relay_data: Some(decrypted.to_vec()),
2879                origin_node: None,
2880                hop_count: 0,
2881                callsign: None,
2882                battery_percent: None,
2883                heart_rate: None,
2884                event_type: None,
2885                latitude: None,
2886                longitude: None,
2887                altitude: None,
2888            });
2889        }
2890
2891        // Merge the document (legacy wire format v1)
2892        log::info!(
2893            "Processing legacy document from {:08X}",
2894            source_node.as_u32()
2895        );
2896        let result = self.document_sync.merge_document(&decrypted)?;
2897
2898        // Log what we got from the merge
2899        log::info!(
2900            "Merge result: peer_peripheral={}, counter_changed={}",
2901            result.peer_peripheral.is_some(),
2902            result.counter_changed
2903        );
2904        if let Some(ref p) = result.peer_peripheral {
2905            log::info!("Peripheral callsign: '{}'", p.callsign_str());
2906        }
2907
2908        // Record sync
2909        self.peer_manager.record_sync(source_node, now_ms);
2910
2911        // Generate events
2912        if result.is_emergency() {
2913            self.notify(PeatEvent::EmergencyReceived {
2914                from_node: result.source_node,
2915            });
2916        } else if result.is_ack() {
2917            self.notify(PeatEvent::AckReceived {
2918                from_node: result.source_node,
2919            });
2920        }
2921
2922        if result.counter_changed {
2923            self.notify(PeatEvent::DocumentSynced {
2924                from_node: result.source_node,
2925                total_count: result.total_count,
2926            });
2927        }
2928
2929        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2930            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2931
2932        Some(DataReceivedResult {
2933            source_node: result.source_node,
2934            is_emergency: result.is_emergency(),
2935            is_ack: result.is_ack(),
2936            counter_changed: result.counter_changed,
2937            emergency_changed: result.emergency_changed,
2938            total_count: result.total_count,
2939            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2940            relay_data: None,
2941            origin_node: None,
2942            hop_count: 0,
2943            callsign,
2944            battery_percent,
2945            heart_rate,
2946            event_type,
2947            latitude,
2948            longitude,
2949            altitude,
2950        })
2951    }
2952
2953    /// Internal: Process document data (shared by direct and relay paths)
2954    fn process_document_data(
2955        &self,
2956        source_node: NodeId,
2957        data: &[u8],
2958        now_ms: u64,
2959        relay_data: Option<Vec<u8>>,
2960        origin_node: Option<NodeId>,
2961        hop_count: u8,
2962    ) -> Option<DataReceivedResult> {
2963        // Decrypt if encrypted (mesh-wide encryption)
2964        let source_hint = format!("node:{:08X}", source_node.as_u32());
2965        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2966
2967        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
2968        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
2969        #[cfg(feature = "mesh-translator")]
2970        if self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
2971            return None;
2972        }
2973
2974        // Check if this is a delta document (wire format v2)
2975        if DeltaDocument::is_delta_document(&decrypted) {
2976            return self.process_delta_document_internal(
2977                source_node,
2978                &decrypted,
2979                now_ms,
2980                relay_data,
2981                origin_node,
2982                hop_count,
2983            );
2984        }
2985
2986        // Merge the document (legacy wire format v1)
2987        let result = self.document_sync.merge_document(&decrypted)?;
2988
2989        // Store peer peripheral if present (for callsign lookup)
2990        if let Some(ref peripheral) = result.peer_peripheral {
2991            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2992                peripherals.insert(result.source_node, peripheral.clone());
2993            }
2994        }
2995
2996        // Record sync
2997        self.peer_manager.record_sync(source_node, now_ms);
2998
2999        // Generate events based on what was received
3000        if result.is_emergency() {
3001            self.notify(PeatEvent::EmergencyReceived {
3002                from_node: result.source_node,
3003            });
3004        } else if result.is_ack() {
3005            self.notify(PeatEvent::AckReceived {
3006                from_node: result.source_node,
3007            });
3008        }
3009
3010        if result.counter_changed {
3011            self.notify(PeatEvent::DocumentSynced {
3012                from_node: result.source_node,
3013                total_count: result.total_count,
3014            });
3015        }
3016
3017        // Emit relay event if we're relaying
3018        if relay_data.is_some() {
3019            let relay_targets = self.get_relay_targets(Some(source_node));
3020            self.notify(PeatEvent::MessageRelayed {
3021                origin_node: origin_node.unwrap_or(result.source_node),
3022                relay_count: relay_targets.len(),
3023                hop_count,
3024            });
3025        }
3026
3027        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3028            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3029
3030        Some(DataReceivedResult {
3031            source_node: result.source_node,
3032            is_emergency: result.is_emergency(),
3033            is_ack: result.is_ack(),
3034            counter_changed: result.counter_changed,
3035            emergency_changed: result.emergency_changed,
3036            total_count: result.total_count,
3037            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3038            relay_data,
3039            origin_node,
3040            hop_count,
3041            callsign,
3042            battery_percent,
3043            heart_rate,
3044            event_type,
3045            latitude,
3046            longitude,
3047            altitude,
3048        })
3049    }
3050
3051    /// Internal: Handle relay envelope
3052    fn handle_relay_envelope(
3053        &self,
3054        source_node: NodeId,
3055        data: &[u8],
3056        now_ms: u64,
3057    ) -> Option<DataReceivedResult> {
3058        // Process the relay envelope
3059        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3060
3061        // Get relay data if we should relay
3062        let relay_data = if decision.should_relay {
3063            decision.relay_data()
3064        } else {
3065            None
3066        };
3067
3068        // Process the inner payload
3069        self.process_document_data(
3070            source_node,
3071            &decision.payload,
3072            now_ms,
3073            relay_data,
3074            Some(decision.origin_node),
3075            decision.hop_count,
3076        )
3077    }
3078
3079    /// Called when data is received without a known identifier
3080    ///
3081    /// This is the simplest data receive method - it extracts the source node_id
3082    /// from the document itself. Use this when you don't track identifiers
3083    /// (e.g., ESP32 NimBLE).
3084    /// If encryption is enabled, decrypts the document first.
3085    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
3086    /// Handles relay envelopes for multi-hop mesh operation.
3087    pub fn on_ble_data(
3088        &self,
3089        identifier: &str,
3090        data: &[u8],
3091        now_ms: u64,
3092    ) -> Option<DataReceivedResult> {
3093        // Check for special message types first
3094        if data.len() >= 2 {
3095            match data[0] {
3096                KEY_EXCHANGE_MARKER => {
3097                    let _response = self.handle_key_exchange(data, now_ms);
3098                    return None;
3099                }
3100                PEER_E2EE_MARKER => {
3101                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3102                    return None;
3103                }
3104                RELAY_ENVELOPE_MARKER => {
3105                    // Handle relay envelope - extract origin from envelope
3106                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3107                }
3108                _ => {}
3109            }
3110        }
3111
3112        // Direct document - process normally
3113        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3114    }
3115
3116    /// Internal: Process incoming document (handles peer registration)
3117    fn process_incoming_document(
3118        &self,
3119        identifier: &str,
3120        data: &[u8],
3121        now_ms: u64,
3122        relay_data: Option<Vec<u8>>,
3123        origin_node: Option<NodeId>,
3124        hop_count: u8,
3125    ) -> Option<DataReceivedResult> {
3126        // Decrypt if encrypted (mesh-wide encryption)
3127        let decrypted = self.decrypt_document(data, Some(identifier))?;
3128
3129        // Merge the document (extracts node_id internally)
3130        let result = self.document_sync.merge_document(&decrypted)?;
3131
3132        // Record sync using the source_node from the merged document
3133        self.peer_manager.record_sync(result.source_node, now_ms);
3134
3135        // Only register the identifier mapping for direct messages (not relayed)
3136        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
3137        // not the origin node (who created the document). The relay source is already registered
3138        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
3139        if origin_node.is_none() {
3140            // Direct message - register the peer with this identifier
3141            let is_new =
3142                self.peer_manager
3143                    .on_incoming_connection(identifier, result.source_node, now_ms);
3144
3145            // Update connection graph to track connection state
3146            {
3147                let mut graph = self.connection_graph.lock().unwrap();
3148                if is_new {
3149                    graph.on_discovered(
3150                        result.source_node,
3151                        identifier.to_string(),
3152                        None,
3153                        Some(self.config.mesh_id.clone()),
3154                        -50, // Default RSSI for data-based discovery
3155                        now_ms,
3156                    );
3157                }
3158                graph.on_connected(result.source_node, now_ms);
3159            }
3160        }
3161
3162        // Generate events based on what was received
3163        if result.is_emergency() {
3164            self.notify(PeatEvent::EmergencyReceived {
3165                from_node: result.source_node,
3166            });
3167        } else if result.is_ack() {
3168            self.notify(PeatEvent::AckReceived {
3169                from_node: result.source_node,
3170            });
3171        }
3172
3173        if result.counter_changed {
3174            self.notify(PeatEvent::DocumentSynced {
3175                from_node: result.source_node,
3176                total_count: result.total_count,
3177            });
3178        }
3179
3180        // Emit relay event if we're relaying
3181        if relay_data.is_some() {
3182            let relay_targets = self.get_relay_targets(Some(result.source_node));
3183            self.notify(PeatEvent::MessageRelayed {
3184                origin_node: origin_node.unwrap_or(result.source_node),
3185                relay_count: relay_targets.len(),
3186                hop_count,
3187            });
3188        }
3189
3190        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3191            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3192
3193        Some(DataReceivedResult {
3194            source_node: result.source_node,
3195            is_emergency: result.is_emergency(),
3196            is_ack: result.is_ack(),
3197            counter_changed: result.counter_changed,
3198            emergency_changed: result.emergency_changed,
3199            total_count: result.total_count,
3200            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3201            relay_data,
3202            origin_node,
3203            hop_count,
3204            callsign,
3205            battery_percent,
3206            heart_rate,
3207            event_type,
3208            latitude,
3209            longitude,
3210            altitude,
3211        })
3212    }
3213
3214    /// Internal: Handle relay envelope with incoming connection registration
3215    fn handle_relay_envelope_with_incoming(
3216        &self,
3217        identifier: &str,
3218        data: &[u8],
3219        now_ms: u64,
3220    ) -> Option<DataReceivedResult> {
3221        // Parse envelope to get origin
3222        let envelope = RelayEnvelope::decode(data)?;
3223
3224        // Try to look up the source peer from identifier to register indirect path
3225        // If we know who sent this relay, we can track indirect peers via them
3226        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3227            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3228                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3229                    source_peer,
3230                    envelope.origin_node,
3231                    envelope.hop_count,
3232                    now_ms,
3233                );
3234
3235                if is_new {
3236                    log::debug!(
3237                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3238                        envelope.origin_node.as_u32(),
3239                        source_peer.as_u32(),
3240                        envelope.hop_count
3241                    );
3242                }
3243            }
3244        }
3245
3246        // Check deduplication
3247        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3248            // Duplicate - get stats for event
3249            let stats = self
3250                .seen_cache
3251                .lock()
3252                .unwrap()
3253                .get_stats(&envelope.message_id);
3254            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3255
3256            self.notify(PeatEvent::DuplicateMessageDropped {
3257                origin_node: envelope.origin_node,
3258                seen_count,
3259            });
3260            return None;
3261        }
3262
3263        // Check TTL
3264        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3265            let relay_env = envelope.relay();
3266            (true, relay_env.map(|e| e.encode()))
3267        } else {
3268            if !envelope.can_relay() {
3269                self.notify(PeatEvent::MessageTtlExpired {
3270                    origin_node: envelope.origin_node,
3271                    hop_count: envelope.hop_count,
3272                });
3273            }
3274            (false, None)
3275        };
3276
3277        // Process the inner payload
3278        self.process_incoming_document(
3279            identifier,
3280            &envelope.payload,
3281            now_ms,
3282            if should_relay { relay_data } else { None },
3283            Some(envelope.origin_node),
3284            envelope.hop_count,
3285        )
3286    }
3287
3288    // ==================== Periodic Maintenance ====================
3289
3290    /// Periodic tick - call this regularly (e.g., every second)
3291    ///
3292    /// Performs:
3293    /// - Stale peer cleanup
3294    /// - Periodic sync broadcast (if interval elapsed)
3295    ///
3296    /// Returns `Some(data)` if a sync broadcast is needed.
3297    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3298        use std::sync::atomic::Ordering;
3299
3300        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
3301        let now_ms_32 = now_ms as u32;
3302
3303        // Cleanup stale peers
3304        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3305        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3306        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3307            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3308            let removed = self.peer_manager.cleanup_stale(now_ms);
3309            for node_id in &removed {
3310                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3311            }
3312            if !removed.is_empty() {
3313                self.notify_mesh_state_changed();
3314            }
3315
3316            // Run connection graph maintenance (transition Disconnected -> Lost)
3317            {
3318                let mut graph = self.connection_graph.lock().unwrap();
3319                let newly_lost = graph.tick(now_ms);
3320                // Also cleanup peers lost for more than peer_timeout
3321                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3322                drop(graph);
3323
3324                // Emit PeerLost events for newly lost peers from graph
3325                // (these may differ from peer_manager removals)
3326                for node_id in newly_lost {
3327                    // Only notify if not already notified by peer_manager
3328                    if !removed.contains(&node_id) {
3329                        self.notify(PeatEvent::PeerLost { node_id });
3330                    }
3331                }
3332            }
3333        }
3334
3335        // Check if sync broadcast is needed
3336        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3337        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3338        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3339            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3340            // Only broadcast if we have connected peers
3341            if self.peer_manager.connected_count() > 0 {
3342                let doc = self.document_sync.build_document();
3343                return Some(self.encrypt_document(&doc));
3344            }
3345        }
3346
3347        None
3348    }
3349
3350    /// Periodic tick returning per-peer delta documents
3351    ///
3352    /// Unlike `tick()` which broadcasts a single document to all peers,
3353    /// this returns targeted deltas that only include changes each peer
3354    /// hasn't seen. Use this for platforms that support per-peer transmission.
3355    ///
3356    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3357    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3358    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3359        use std::sync::atomic::Ordering;
3360        let now_ms_32 = now_ms as u32;
3361
3362        // Cleanup stale peers (same as tick())
3363        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3364        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3365        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3366            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3367            let removed = self.peer_manager.cleanup_stale(now_ms);
3368            for node_id in &removed {
3369                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3370            }
3371            if !removed.is_empty() {
3372                self.notify_mesh_state_changed();
3373            }
3374
3375            // Run connection graph maintenance
3376            {
3377                let mut graph = self.connection_graph.lock().unwrap();
3378                let newly_lost = graph.tick(now_ms);
3379                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3380                drop(graph);
3381
3382                for node_id in newly_lost {
3383                    if !removed.contains(&node_id) {
3384                        self.notify(PeatEvent::PeerLost { node_id });
3385                    }
3386                }
3387            }
3388        }
3389
3390        // Check if sync is needed
3391        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3392        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3393        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3394            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3395
3396            // Build document for each connected peer
3397            let doc = self.document_sync.build_document();
3398            let encrypted = self.encrypt_document(&doc);
3399            let mut results = Vec::new();
3400            for peer in self.get_connected_peers() {
3401                results.push((peer.node_id, encrypted.clone()));
3402            }
3403            return results;
3404        }
3405
3406        Vec::new()
3407    }
3408
3409    // ==================== State Queries ====================
3410
3411    /// Get all known peers
3412    pub fn get_peers(&self) -> Vec<PeatPeer> {
3413        self.peer_manager.get_peers()
3414    }
3415
3416    /// Get connected peers only
3417    pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3418        self.peer_manager.get_connected_peers()
3419    }
3420
3421    /// Get a specific peer by NodeId
3422    pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3423        self.peer_manager.get_peer(node_id)
3424    }
3425
3426    /// Get peer count
3427    pub fn peer_count(&self) -> usize {
3428        self.peer_manager.peer_count()
3429    }
3430
3431    /// Get connected peer count
3432    pub fn connected_count(&self) -> usize {
3433        self.peer_manager.connected_count()
3434    }
3435
3436    /// Check if a device mesh ID matches our mesh
3437    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3438        self.peer_manager.matches_mesh(device_mesh_id)
3439    }
3440
3441    // ==================== Connection State Graph ====================
3442
3443    /// Get the connection state graph with all peer states
3444    ///
3445    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3446    /// Apps can use this to display appropriate UI indicators:
3447    /// - Green for Connected peers
3448    /// - Yellow for Degraded or RecentlyDisconnected peers
3449    /// - Gray for Lost peers
3450    ///
3451    /// # Example
3452    /// ```ignore
3453    /// let states = mesh.get_connection_graph();
3454    /// for peer in states {
3455    ///     match peer.state {
3456    ///         ConnectionState::Connected => show_green_indicator(&peer),
3457    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3458    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3459    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3460    ///         _ => {}
3461    ///     }
3462    /// }
3463    /// ```
3464    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3465        self.connection_graph.lock().unwrap().get_all_owned()
3466    }
3467
3468    /// Get a specific peer's connection state
3469    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3470        self.connection_graph
3471            .lock()
3472            .unwrap()
3473            .get_peer(node_id)
3474            .cloned()
3475    }
3476
3477    /// Get all currently connected peers from the connection graph
3478    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3479        self.connection_graph
3480            .lock()
3481            .unwrap()
3482            .get_connected()
3483            .into_iter()
3484            .cloned()
3485            .collect()
3486    }
3487
3488    /// Get peers in degraded state (connected but poor signal quality)
3489    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3490        self.connection_graph
3491            .lock()
3492            .unwrap()
3493            .get_degraded()
3494            .into_iter()
3495            .cloned()
3496            .collect()
3497    }
3498
3499    /// Get peers that disconnected within the specified time window
3500    ///
3501    /// Useful for showing "stale" peers that were recently connected.
3502    pub fn get_recently_disconnected(
3503        &self,
3504        within_ms: u64,
3505        now_ms: u64,
3506    ) -> Vec<PeerConnectionState> {
3507        self.connection_graph
3508            .lock()
3509            .unwrap()
3510            .get_recently_disconnected(within_ms, now_ms)
3511            .into_iter()
3512            .cloned()
3513            .collect()
3514    }
3515
3516    /// Get peers in Lost state (disconnected and no longer advertising)
3517    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3518        self.connection_graph
3519            .lock()
3520            .unwrap()
3521            .get_lost()
3522            .into_iter()
3523            .cloned()
3524            .collect()
3525    }
3526
3527    /// Get summary counts of peers in each connection state
3528    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3529        self.connection_graph.lock().unwrap().state_counts()
3530    }
3531
3532    // ==================== Indirect Peer Methods ====================
3533
3534    /// Get all indirect (multi-hop) peers
3535    ///
3536    /// Returns peers discovered via relay messages that are not directly
3537    /// connected via BLE. Each indirect peer includes the minimum hop count
3538    /// and the direct peers through which they can be reached.
3539    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3540        self.connection_graph
3541            .lock()
3542            .unwrap()
3543            .get_indirect_peers_owned()
3544    }
3545
3546    /// Get the degree (hop count) for a specific peer
3547    ///
3548    /// Returns:
3549    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3550    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3551    /// - `None` if peer is not known
3552    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3553        self.connection_graph.lock().unwrap().peer_degree(node_id)
3554    }
3555
3556    /// Get full state counts including indirect peers
3557    ///
3558    /// Returns counts of direct peers by connection state plus counts
3559    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3560    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3561        self.connection_graph.lock().unwrap().full_state_counts()
3562    }
3563
3564    /// Get all paths to reach an indirect peer
3565    ///
3566    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3567    /// known routes to the specified peer.
3568    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3569        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3570    }
3571
3572    /// Check if a node is known (either direct or indirect)
3573    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3574        self.connection_graph.lock().unwrap().is_known(node_id)
3575    }
3576
3577    /// Get number of indirect peers
3578    pub fn indirect_peer_count(&self) -> usize {
3579        self.connection_graph.lock().unwrap().indirect_peer_count()
3580    }
3581
3582    /// Cleanup stale indirect peers
3583    ///
3584    /// Removes indirect peers that haven't been seen within the timeout.
3585    /// Returns the list of removed peer IDs.
3586    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3587        self.connection_graph
3588            .lock()
3589            .unwrap()
3590            .cleanup_indirect(now_ms)
3591    }
3592
3593    /// Get total counter value
3594    pub fn total_count(&self) -> u64 {
3595        self.document_sync.total_count()
3596    }
3597
3598    /// Get document version
3599    pub fn document_version(&self) -> u32 {
3600        self.document_sync.version()
3601    }
3602
3603    /// Get document version (alias)
3604    pub fn version(&self) -> u32 {
3605        self.document_sync.version()
3606    }
3607
3608    /// Update health status (battery percentage)
3609    pub fn update_health(&self, battery_percent: u8) {
3610        self.document_sync.update_health(battery_percent);
3611    }
3612
3613    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
3614    pub fn update_activity(&self, activity: u8) {
3615        self.document_sync.update_activity(activity);
3616    }
3617
3618    /// Update full health status (battery and activity)
3619    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3620        self.document_sync
3621            .update_health_full(battery_percent, activity);
3622    }
3623
3624    /// Update heart rate
3625    pub fn update_heart_rate(&self, heart_rate: u8) {
3626        self.document_sync.update_heart_rate(heart_rate);
3627    }
3628
3629    /// Update location
3630    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3631        self.document_sync
3632            .update_location(latitude, longitude, altitude);
3633    }
3634
3635    /// Clear location
3636    pub fn clear_location(&self) {
3637        self.document_sync.clear_location();
3638    }
3639
3640    /// Update callsign
3641    pub fn update_callsign(&self, callsign: &str) {
3642        self.document_sync.update_callsign(callsign);
3643    }
3644
3645    /// Set peripheral event type
3646    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3647        self.document_sync
3648            .set_peripheral_event(event_type, timestamp);
3649    }
3650
3651    /// Clear peripheral event
3652    pub fn clear_peripheral_event(&self) {
3653        self.document_sync.clear_peripheral_event();
3654    }
3655
3656    /// Update full peripheral state in one call
3657    ///
3658    /// This is the most efficient way to update all peripheral data before
3659    /// calling `build_document()` for encrypted transmission.
3660    #[allow(clippy::too_many_arguments)]
3661    pub fn update_peripheral_state(
3662        &self,
3663        callsign: &str,
3664        battery_percent: u8,
3665        heart_rate: Option<u8>,
3666        latitude: Option<f32>,
3667        longitude: Option<f32>,
3668        altitude: Option<f32>,
3669        event_type: Option<EventType>,
3670        timestamp: u64,
3671    ) {
3672        self.document_sync.update_peripheral_state(
3673            callsign,
3674            battery_percent,
3675            heart_rate,
3676            latitude,
3677            longitude,
3678            altitude,
3679            event_type,
3680            timestamp,
3681        );
3682    }
3683
3684    /// Build current document for transmission
3685    ///
3686    /// If encryption is enabled, the document is encrypted.
3687    pub fn build_document(&self) -> Vec<u8> {
3688        let doc = self.document_sync.build_document();
3689        self.encrypt_document(&doc)
3690    }
3691
3692    /// Get peers that should be synced with
3693    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3694        self.peer_manager.peers_needing_sync(now_ms)
3695    }
3696
3697    // ==================== Internal Helpers ====================
3698
3699    fn notify(&self, event: PeatEvent) {
3700        self.observers.notify(event);
3701    }
3702
3703    fn notify_mesh_state_changed(&self) {
3704        self.notify(PeatEvent::MeshStateChanged {
3705            peer_count: self.peer_manager.peer_count(),
3706            connected_count: self.peer_manager.connected_count(),
3707        });
3708    }
3709
3710    // ==================== CannedMessage Integration ====================
3711    //
3712    // These methods provide deduplication support for peat-lite CannedMessages.
3713    // They use document identity (source_node + timestamp) instead of content hash,
3714    // because CRDT merge can change byte ordering.
3715
3716    /// Check if a CannedMessage should be processed.
3717    ///
3718    /// Uses document identity (source_node + timestamp) for deduplication.
3719    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
3720    ///
3721    /// # Arguments
3722    /// * `source_node` - The source node ID from the CannedMessage
3723    /// * `timestamp` - The timestamp from the CannedMessage
3724    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
3725    ///
3726    /// # Returns
3727    /// `true` if this message is new and should be processed,
3728    /// `false` if it was seen recently and should be skipped.
3729    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3730        // Create a unique key from source_node and timestamp
3731        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3732        let mut id_bytes = [0u8; 16];
3733        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3734        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3735        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3736
3737        // Check the seen cache
3738        let seen = self.seen_cache.lock().unwrap();
3739        !seen.has_seen(&message_id)
3740    }
3741
3742    /// Mark a CannedMessage as seen (for deduplication).
3743    ///
3744    /// Call this after processing a CannedMessage to prevent reprocessing
3745    /// the same message from other relay paths.
3746    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3747        let now = std::time::SystemTime::now()
3748            .duration_since(std::time::UNIX_EPOCH)
3749            .map(|d| d.as_millis() as u64)
3750            .unwrap_or(0);
3751
3752        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3753        let mut id_bytes = [0u8; 16];
3754        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3755        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3756        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3757        let origin = NodeId::new(source_node);
3758
3759        let mut seen = self.seen_cache.lock().unwrap();
3760        seen.mark_seen(message_id, origin, now);
3761    }
3762
3763    /// Get list of connected peer identifiers for relay.
3764    ///
3765    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
3766    /// to other peers after deduplication check.
3767    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3768        self.peer_manager.get_connected_identifiers()
3769    }
3770}
3771
3772/// Result from receiving BLE data
3773#[derive(Debug, Clone)]
3774pub struct DataReceivedResult {
3775    /// Node that sent this data
3776    pub source_node: NodeId,
3777
3778    /// Whether this contained an emergency event
3779    pub is_emergency: bool,
3780
3781    /// Whether this contained an ACK event
3782    pub is_ack: bool,
3783
3784    /// Whether the counter changed (new data)
3785    pub counter_changed: bool,
3786
3787    /// Whether emergency state changed (new emergency or ACK updates)
3788    pub emergency_changed: bool,
3789
3790    /// Updated total count
3791    pub total_count: u64,
3792
3793    /// Event timestamp (if event present) - use to detect duplicate events
3794    pub event_timestamp: u64,
3795
3796    /// Data to relay to other peers (if multi-hop relay is enabled)
3797    ///
3798    /// When present, the platform adapter should send this data to peers
3799    /// returned by `get_relay_targets(Some(source_node))`.
3800    pub relay_data: Option<Vec<u8>>,
3801
3802    /// Origin node for relay (may differ from source_node for relayed messages)
3803    pub origin_node: Option<NodeId>,
3804
3805    /// Current hop count (for relayed messages)
3806    pub hop_count: u8,
3807
3808    // ========== Peripheral data from sender ==========
3809    /// Sender's callsign (up to 12 chars)
3810    pub callsign: Option<String>,
3811
3812    /// Sender's battery percentage (0-100)
3813    pub battery_percent: Option<u8>,
3814
3815    /// Sender's heart rate (BPM)
3816    pub heart_rate: Option<u8>,
3817
3818    /// Sender's event type (from PeripheralEvent)
3819    pub event_type: Option<u8>,
3820
3821    /// Sender's latitude
3822    pub latitude: Option<f32>,
3823
3824    /// Sender's longitude
3825    pub longitude: Option<f32>,
3826
3827    /// Sender's altitude (meters)
3828    pub altitude: Option<f32>,
3829}
3830
3831impl DataReceivedResult {
3832    /// Extract peripheral fields from an Option<Peripheral>
3833    #[allow(clippy::type_complexity)]
3834    fn peripheral_fields(
3835        peripheral: &Option<crate::sync::crdt::Peripheral>,
3836    ) -> (
3837        Option<String>,
3838        Option<u8>,
3839        Option<u8>,
3840        Option<u8>,
3841        Option<f32>,
3842        Option<f32>,
3843        Option<f32>,
3844    ) {
3845        match peripheral {
3846            Some(p) => {
3847                let callsign = {
3848                    let s = p.callsign_str();
3849                    if s.is_empty() {
3850                        None
3851                    } else {
3852                        Some(s.to_string())
3853                    }
3854                };
3855                let battery = if p.health.battery_percent > 0 {
3856                    Some(p.health.battery_percent)
3857                } else {
3858                    None
3859                };
3860                let heart_rate = p.health.heart_rate;
3861                let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3862                let (lat, lon, alt) = match &p.location {
3863                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3864                    None => (None, None, None),
3865                };
3866                (callsign, battery, heart_rate, event_type, lat, lon, alt)
3867            }
3868            None => (None, None, None, None, None, None, None),
3869        }
3870    }
3871}
3872
3873/// Decision from processing a relay envelope
3874#[derive(Debug, Clone)]
3875pub struct RelayDecision {
3876    /// The payload (document) to process locally
3877    pub payload: Vec<u8>,
3878
3879    /// Original sender of the message
3880    pub origin_node: NodeId,
3881
3882    /// Current hop count
3883    pub hop_count: u8,
3884
3885    /// Whether this message should be relayed to other peers
3886    pub should_relay: bool,
3887
3888    /// The relay envelope to forward (with incremented hop count)
3889    ///
3890    /// Only present if `should_relay` is true and TTL not expired.
3891    pub relay_envelope: Option<RelayEnvelope>,
3892}
3893
3894impl RelayDecision {
3895    /// Get the relay data to send to peers
3896    ///
3897    /// Returns None if relay is not needed.
3898    pub fn relay_data(&self) -> Option<Vec<u8>> {
3899        self.relay_envelope.as_ref().map(|e| e.encode())
3900    }
3901}
3902
3903#[cfg(all(test, feature = "std"))]
3904mod tests {
3905    use super::*;
3906    use crate::observer::CollectingObserver;
3907
3908    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
3909    const TEST_TIMESTAMP: u64 = 1705276800000;
3910
3911    fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
3912        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3913        PeatMesh::new(config)
3914    }
3915
3916    #[test]
3917    fn test_mesh_creation() {
3918        let mesh = create_mesh(0x12345678, "ALPHA-1");
3919
3920        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3921        assert_eq!(mesh.callsign(), "ALPHA-1");
3922        assert_eq!(mesh.mesh_id(), "TEST");
3923        assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
3924    }
3925
3926    #[test]
3927    fn test_peer_discovery() {
3928        let mesh = create_mesh(0x11111111, "ALPHA-1");
3929        let observer = Arc::new(CollectingObserver::new());
3930        mesh.add_observer(observer.clone());
3931
3932        // Discover a peer
3933        let peer = mesh.on_ble_discovered(
3934            "device-uuid",
3935            Some("PEAT_TEST-22222222"),
3936            -65,
3937            Some("TEST"),
3938            1000,
3939        );
3940
3941        assert!(peer.is_some());
3942        let peer = peer.unwrap();
3943        assert_eq!(peer.node_id.as_u32(), 0x22222222);
3944
3945        // Check events were generated
3946        let events = observer.events();
3947        assert!(events
3948            .iter()
3949            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3950        assert!(events
3951            .iter()
3952            .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
3953    }
3954
3955    #[test]
3956    fn test_connection_lifecycle() {
3957        let mesh = create_mesh(0x11111111, "ALPHA-1");
3958        let observer = Arc::new(CollectingObserver::new());
3959        mesh.add_observer(observer.clone());
3960
3961        // Discover and connect
3962        mesh.on_ble_discovered(
3963            "device-uuid",
3964            Some("PEAT_TEST-22222222"),
3965            -65,
3966            Some("TEST"),
3967            1000,
3968        );
3969
3970        let node_id = mesh.on_ble_connected("device-uuid", 2000);
3971        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3972        assert_eq!(mesh.connected_count(), 1);
3973
3974        // Disconnect
3975        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3976        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3977        assert_eq!(mesh.connected_count(), 0);
3978
3979        // Check events
3980        let events = observer.events();
3981        assert!(events
3982            .iter()
3983            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3984        assert!(events
3985            .iter()
3986            .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
3987    }
3988
3989    #[test]
3990    fn test_emergency_flow() {
3991        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3992        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3993
3994        let observer2 = Arc::new(CollectingObserver::new());
3995        mesh2.add_observer(observer2.clone());
3996
3997        // mesh1 sends emergency
3998        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3999        assert!(mesh1.is_emergency_active());
4000
4001        // mesh2 receives it
4002        let result =
4003            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4004
4005        assert!(result.is_some());
4006        let result = result.unwrap();
4007        assert!(result.is_emergency);
4008        assert_eq!(result.source_node.as_u32(), 0x11111111);
4009
4010        // Check events on mesh2
4011        let events = observer2.events();
4012        assert!(events
4013            .iter()
4014            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4015    }
4016
4017    #[test]
4018    fn test_ack_flow() {
4019        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4020        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4021
4022        let observer2 = Arc::new(CollectingObserver::new());
4023        mesh2.add_observer(observer2.clone());
4024
4025        // mesh1 sends ACK
4026        let doc = mesh1.send_ack(TEST_TIMESTAMP);
4027        assert!(mesh1.is_ack_active());
4028
4029        // mesh2 receives it
4030        let result =
4031            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4032
4033        assert!(result.is_some());
4034        let result = result.unwrap();
4035        assert!(result.is_ack);
4036
4037        // Check events on mesh2
4038        let events = observer2.events();
4039        assert!(events
4040            .iter()
4041            .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4042    }
4043
4044    #[test]
4045    fn test_tick_cleanup() {
4046        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4047            .with_peer_timeout(10_000);
4048        let mesh = PeatMesh::new(config);
4049
4050        let observer = Arc::new(CollectingObserver::new());
4051        mesh.add_observer(observer.clone());
4052
4053        // Discover a peer
4054        mesh.on_ble_discovered(
4055            "device-uuid",
4056            Some("PEAT_TEST-22222222"),
4057            -65,
4058            Some("TEST"),
4059            1000,
4060        );
4061        assert_eq!(mesh.peer_count(), 1);
4062
4063        // Tick at t=5000 - not stale yet
4064        mesh.tick(5000);
4065        assert_eq!(mesh.peer_count(), 1);
4066
4067        // Tick at t=20000 - peer is stale (10s timeout exceeded)
4068        mesh.tick(20000);
4069        assert_eq!(mesh.peer_count(), 0);
4070
4071        // Check PeerLost event
4072        let events = observer.events();
4073        assert!(events
4074            .iter()
4075            .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4076    }
4077
4078    #[test]
4079    fn test_tick_sync_broadcast() {
4080        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4081            .with_sync_interval(5000);
4082        let mesh = PeatMesh::new(config);
4083
4084        // Discover and connect a peer first
4085        mesh.on_ble_discovered(
4086            "device-uuid",
4087            Some("PEAT_TEST-22222222"),
4088            -65,
4089            Some("TEST"),
4090            1000,
4091        );
4092        mesh.on_ble_connected("device-uuid", 1000);
4093
4094        // First tick at t=0 sets last_sync
4095        let _result = mesh.tick(0);
4096        // May or may not broadcast depending on initial state
4097
4098        // Tick before interval - no broadcast
4099        let result = mesh.tick(3000);
4100        assert!(result.is_none());
4101
4102        // After interval - should broadcast
4103        let result = mesh.tick(6000);
4104        assert!(result.is_some());
4105
4106        // Immediate second tick - no broadcast (interval not elapsed)
4107        let result = mesh.tick(6100);
4108        assert!(result.is_none());
4109
4110        // After another interval - should broadcast again
4111        let result = mesh.tick(12000);
4112        assert!(result.is_some());
4113    }
4114
4115    #[test]
4116    fn test_incoming_connection() {
4117        let mesh = create_mesh(0x11111111, "ALPHA-1");
4118        let observer = Arc::new(CollectingObserver::new());
4119        mesh.add_observer(observer.clone());
4120
4121        // Incoming connection from unknown peer
4122        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4123
4124        assert!(is_new);
4125        assert_eq!(mesh.peer_count(), 1);
4126        assert_eq!(mesh.connected_count(), 1);
4127
4128        // Check events
4129        let events = observer.events();
4130        assert!(events
4131            .iter()
4132            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4133        assert!(events
4134            .iter()
4135            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4136    }
4137
4138    #[test]
4139    fn test_mesh_filtering() {
4140        let mesh = create_mesh(0x11111111, "ALPHA-1");
4141
4142        // Wrong mesh - ignored
4143        let peer = mesh.on_ble_discovered(
4144            "device-uuid-1",
4145            Some("PEAT_OTHER-22222222"),
4146            -65,
4147            Some("OTHER"),
4148            1000,
4149        );
4150        assert!(peer.is_none());
4151        assert_eq!(mesh.peer_count(), 0);
4152
4153        // Correct mesh - accepted
4154        let peer = mesh.on_ble_discovered(
4155            "device-uuid-2",
4156            Some("PEAT_TEST-33333333"),
4157            -65,
4158            Some("TEST"),
4159            1000,
4160        );
4161        assert!(peer.is_some());
4162        assert_eq!(mesh.peer_count(), 1);
4163    }
4164
4165    // ==================== Encryption Tests ====================
4166
4167    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4168        let config =
4169            PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4170        PeatMesh::new(config)
4171    }
4172
4173    #[test]
4174    fn test_encryption_enabled() {
4175        let secret = [0x42u8; 32];
4176        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4177
4178        assert!(mesh.is_encryption_enabled());
4179    }
4180
4181    #[test]
4182    fn test_encryption_disabled_by_default() {
4183        let mesh = create_mesh(0x11111111, "ALPHA-1");
4184
4185        assert!(!mesh.is_encryption_enabled());
4186    }
4187
4188    #[test]
4189    fn test_encrypted_document_exchange() {
4190        let secret = [0x42u8; 32];
4191        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4192        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4193
4194        // mesh1 sends document
4195        let doc = mesh1.build_document();
4196
4197        // Document should be encrypted (starts with ENCRYPTED_MARKER)
4198        assert!(doc.len() >= 2);
4199        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4200
4201        // mesh2 receives and decrypts
4202        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4203
4204        assert!(result.is_some());
4205        let result = result.unwrap();
4206        assert_eq!(result.source_node.as_u32(), 0x11111111);
4207    }
4208
4209    #[test]
4210    fn test_encrypted_emergency_exchange() {
4211        let secret = [0x42u8; 32];
4212        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4213        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4214
4215        let observer = Arc::new(CollectingObserver::new());
4216        mesh2.add_observer(observer.clone());
4217
4218        // mesh1 sends emergency
4219        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4220
4221        // mesh2 receives and decrypts
4222        let result =
4223            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4224
4225        assert!(result.is_some());
4226        let result = result.unwrap();
4227        assert!(result.is_emergency);
4228
4229        // Check EmergencyReceived event was fired
4230        let events = observer.events();
4231        assert!(events
4232            .iter()
4233            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4234    }
4235
4236    #[test]
4237    fn test_wrong_key_fails_decrypt() {
4238        let secret1 = [0x42u8; 32];
4239        let secret2 = [0x43u8; 32]; // Different key
4240        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4241        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4242
4243        // mesh1 sends document
4244        let doc = mesh1.build_document();
4245
4246        // mesh2 cannot decrypt (wrong key)
4247        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4248
4249        assert!(result.is_none());
4250    }
4251
4252    #[test]
4253    fn test_unencrypted_mesh_can_read_unencrypted() {
4254        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4255        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4256
4257        // mesh1 sends document (unencrypted)
4258        let doc = mesh1.build_document();
4259
4260        // mesh2 receives (also unencrypted)
4261        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4262
4263        assert!(result.is_some());
4264    }
4265
4266    #[test]
4267    fn test_encrypted_mesh_can_receive_unencrypted() {
4268        // Backward compatibility: encrypted mesh can receive unencrypted docs
4269        let secret = [0x42u8; 32];
4270        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
4271        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
4272
4273        // mesh1 sends unencrypted document
4274        let doc = mesh1.build_document();
4275
4276        // mesh2 can receive unencrypted (backward compat)
4277        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4278
4279        assert!(result.is_some());
4280    }
4281
4282    #[test]
4283    fn test_unencrypted_mesh_cannot_receive_encrypted() {
4284        let secret = [0x42u8; 32];
4285        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
4286        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
4287
4288        // mesh1 sends encrypted document
4289        let doc = mesh1.build_document();
4290
4291        // mesh2 cannot decrypt (no key)
4292        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4293
4294        assert!(result.is_none());
4295    }
4296
4297    #[test]
4298    fn test_enable_disable_encryption() {
4299        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4300
4301        assert!(!mesh.is_encryption_enabled());
4302
4303        // Enable encryption
4304        let secret = [0x42u8; 32];
4305        mesh.enable_encryption(&secret);
4306        assert!(mesh.is_encryption_enabled());
4307
4308        // Build document should now be encrypted
4309        let doc = mesh.build_document();
4310        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4311
4312        // Disable encryption
4313        mesh.disable_encryption();
4314        assert!(!mesh.is_encryption_enabled());
4315
4316        // Build document should now be unencrypted
4317        let doc = mesh.build_document();
4318        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4319    }
4320
4321    #[test]
4322    fn test_encryption_overhead() {
4323        let secret = [0x42u8; 32];
4324        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4325        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4326
4327        let doc_encrypted = mesh_encrypted.build_document();
4328        let doc_unencrypted = mesh_unencrypted.build_document();
4329
4330        // Encrypted doc should be larger by:
4331        // - 2 bytes marker header (0xAE + reserved)
4332        // - 12 bytes nonce
4333        // - 16 bytes auth tag
4334        // Total: 30 bytes overhead
4335        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4336        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4337    }
4338
4339    // ==================== Per-Peer E2EE Tests ====================
4340
4341    #[test]
4342    fn test_peer_e2ee_enable_disable() {
4343        let mesh = create_mesh(0x11111111, "ALPHA-1");
4344
4345        assert!(!mesh.is_peer_e2ee_enabled());
4346        assert!(mesh.peer_e2ee_public_key().is_none());
4347
4348        mesh.enable_peer_e2ee();
4349        assert!(mesh.is_peer_e2ee_enabled());
4350        assert!(mesh.peer_e2ee_public_key().is_some());
4351
4352        mesh.disable_peer_e2ee();
4353        assert!(!mesh.is_peer_e2ee_enabled());
4354    }
4355
4356    #[test]
4357    fn test_peer_e2ee_initiate_session() {
4358        let mesh = create_mesh(0x11111111, "ALPHA-1");
4359        mesh.enable_peer_e2ee();
4360
4361        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4362        assert!(key_exchange.is_some());
4363
4364        let key_exchange = key_exchange.unwrap();
4365        // Should start with KEY_EXCHANGE_MARKER
4366        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4367
4368        // Should have a pending session
4369        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4370        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4371    }
4372
4373    #[test]
4374    fn test_peer_e2ee_full_handshake() {
4375        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4376        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4377
4378        mesh1.enable_peer_e2ee();
4379        mesh2.enable_peer_e2ee();
4380
4381        let observer1 = Arc::new(CollectingObserver::new());
4382        let observer2 = Arc::new(CollectingObserver::new());
4383        mesh1.add_observer(observer1.clone());
4384        mesh2.add_observer(observer2.clone());
4385
4386        // mesh1 initiates to mesh2
4387        let key_exchange1 = mesh1
4388            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4389            .unwrap();
4390
4391        // mesh2 receives and responds
4392        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4393        assert!(response.is_some());
4394
4395        // Check mesh2 has established session
4396        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4397
4398        // mesh1 receives mesh2's response
4399        let key_exchange2 = response.unwrap();
4400        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4401
4402        // Check mesh1 has established session
4403        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4404
4405        // Both should have E2EE established events
4406        let events1 = observer1.events();
4407        assert!(events1
4408            .iter()
4409            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4410
4411        let events2 = observer2.events();
4412        assert!(events2
4413            .iter()
4414            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4415    }
4416
4417    #[test]
4418    fn test_peer_e2ee_encrypt_decrypt() {
4419        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4420        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4421
4422        mesh1.enable_peer_e2ee();
4423        mesh2.enable_peer_e2ee();
4424
4425        // Establish session via key exchange
4426        let key_exchange1 = mesh1
4427            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4428            .unwrap();
4429        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4430        mesh1.handle_key_exchange(&key_exchange2, 1000);
4431
4432        // mesh1 sends encrypted message to mesh2
4433        let plaintext = b"Secret message from mesh1";
4434        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4435        assert!(encrypted.is_some());
4436
4437        let encrypted = encrypted.unwrap();
4438        // Should start with PEER_E2EE_MARKER
4439        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4440
4441        // mesh2 receives and decrypts
4442        let observer2 = Arc::new(CollectingObserver::new());
4443        mesh2.add_observer(observer2.clone());
4444
4445        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4446        assert!(decrypted.is_some());
4447        assert_eq!(decrypted.unwrap(), plaintext);
4448
4449        // Should have received message event
4450        let events = observer2.events();
4451        assert!(events.iter().any(|e| matches!(
4452            e,
4453            PeatEvent::PeerE2eeMessageReceived { from_node, data }
4454            if from_node.as_u32() == 0x11111111 && data == plaintext
4455        )));
4456    }
4457
4458    #[test]
4459    fn test_peer_e2ee_bidirectional() {
4460        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4461        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4462
4463        mesh1.enable_peer_e2ee();
4464        mesh2.enable_peer_e2ee();
4465
4466        // Establish session
4467        let key_exchange1 = mesh1
4468            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4469            .unwrap();
4470        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4471        mesh1.handle_key_exchange(&key_exchange2, 1000);
4472
4473        // mesh1 -> mesh2
4474        let msg1 = mesh1
4475            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4476            .unwrap();
4477        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4478        assert_eq!(dec1, b"Hello from mesh1");
4479
4480        // mesh2 -> mesh1
4481        let msg2 = mesh2
4482            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4483            .unwrap();
4484        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4485        assert_eq!(dec2, b"Hello from mesh2");
4486    }
4487
4488    #[test]
4489    fn test_peer_e2ee_close_session() {
4490        let mesh = create_mesh(0x11111111, "ALPHA-1");
4491        mesh.enable_peer_e2ee();
4492
4493        let observer = Arc::new(CollectingObserver::new());
4494        mesh.add_observer(observer.clone());
4495
4496        // Initiate a session
4497        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4498        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4499
4500        // Close session
4501        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4502
4503        // Check close event
4504        let events = observer.events();
4505        assert!(events
4506            .iter()
4507            .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4508    }
4509
4510    #[test]
4511    fn test_peer_e2ee_without_enabling() {
4512        let mesh = create_mesh(0x11111111, "ALPHA-1");
4513
4514        // E2EE not enabled - should return None
4515        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4516        assert!(result.is_none());
4517
4518        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4519        assert!(result.is_none());
4520
4521        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4522    }
4523
4524    #[test]
4525    fn test_peer_e2ee_overhead() {
4526        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4527        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4528
4529        mesh1.enable_peer_e2ee();
4530        mesh2.enable_peer_e2ee();
4531
4532        // Establish session
4533        let key_exchange1 = mesh1
4534            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4535            .unwrap();
4536        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4537        mesh1.handle_key_exchange(&key_exchange2, 1000);
4538
4539        // Encrypt a message
4540        let plaintext = b"Test message";
4541        let encrypted = mesh1
4542            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4543            .unwrap();
4544
4545        // Overhead should be:
4546        // - 2 bytes marker header
4547        // - 4 bytes recipient node ID
4548        // - 4 bytes sender node ID
4549        // - 8 bytes counter
4550        // - 12 bytes nonce
4551        // - 16 bytes auth tag
4552        // Total: 46 bytes overhead
4553        let overhead = encrypted.len() - plaintext.len();
4554        assert_eq!(overhead, 46);
4555    }
4556
4557    // ==================== Strict Encryption Mode Tests ====================
4558
4559    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4560        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4561            .with_encryption(secret)
4562            .with_strict_encryption();
4563        PeatMesh::new(config)
4564    }
4565
4566    #[test]
4567    fn test_strict_encryption_enabled() {
4568        let secret = [0x42u8; 32];
4569        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4570
4571        assert!(mesh.is_encryption_enabled());
4572        assert!(mesh.is_strict_encryption_enabled());
4573    }
4574
4575    #[test]
4576    fn test_strict_encryption_disabled_by_default() {
4577        let secret = [0x42u8; 32];
4578        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4579
4580        assert!(mesh.is_encryption_enabled());
4581        assert!(!mesh.is_strict_encryption_enabled());
4582    }
4583
4584    #[test]
4585    fn test_strict_encryption_requires_encryption_enabled() {
4586        // strict_encryption without encryption should have no effect
4587        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4588            .with_strict_encryption(); // No encryption!
4589        let mesh = PeatMesh::new(config);
4590
4591        assert!(!mesh.is_encryption_enabled());
4592        assert!(!mesh.is_strict_encryption_enabled());
4593    }
4594
4595    #[test]
4596    fn test_strict_mode_accepts_encrypted_documents() {
4597        let secret = [0x42u8; 32];
4598        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4599        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4600
4601        // mesh1 sends encrypted document
4602        let doc = mesh1.build_document();
4603        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4604
4605        // mesh2 (strict mode) should accept encrypted documents
4606        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4607        assert!(result.is_some());
4608    }
4609
4610    #[test]
4611    fn test_strict_mode_rejects_unencrypted_documents() {
4612        let secret = [0x42u8; 32];
4613        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4614        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
4615
4616        let observer = Arc::new(CollectingObserver::new());
4617        mesh2.add_observer(observer.clone());
4618
4619        // mesh1 sends unencrypted document
4620        let doc = mesh1.build_document();
4621        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4622
4623        // mesh2 (strict mode) should reject unencrypted documents
4624        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4625        assert!(result.is_none());
4626
4627        // Should have SecurityViolation event
4628        let events = observer.events();
4629        assert!(events.iter().any(|e| matches!(
4630            e,
4631            PeatEvent::SecurityViolation {
4632                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4633                ..
4634            }
4635        )));
4636    }
4637
4638    #[test]
4639    fn test_non_strict_mode_accepts_unencrypted_documents() {
4640        let secret = [0x42u8; 32];
4641        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4642        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
4643
4644        // mesh1 sends unencrypted document
4645        let doc = mesh1.build_document();
4646
4647        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
4648        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4649        assert!(result.is_some());
4650    }
4651
4652    #[test]
4653    fn test_strict_mode_security_violation_event_includes_source() {
4654        let secret = [0x42u8; 32];
4655        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4656        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4657
4658        let observer = Arc::new(CollectingObserver::new());
4659        mesh2.add_observer(observer.clone());
4660
4661        let doc = mesh1.build_document();
4662
4663        // Use on_ble_data_received with identifier to test source is captured
4664        mesh2.on_ble_discovered(
4665            "test-device-uuid",
4666            Some("PEAT_TEST-11111111"),
4667            -65,
4668            Some("TEST"),
4669            500,
4670        );
4671        mesh2.on_ble_connected("test-device-uuid", 600);
4672
4673        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4674
4675        // Check SecurityViolation event has source
4676        let events = observer.events();
4677        let violation = events.iter().find(|e| {
4678            matches!(
4679                e,
4680                PeatEvent::SecurityViolation {
4681                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4682                    ..
4683                }
4684            )
4685        });
4686        assert!(violation.is_some());
4687
4688        if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4689            assert!(source.is_some());
4690            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4691        }
4692    }
4693
4694    #[test]
4695    fn test_decryption_failure_emits_security_violation() {
4696        let secret1 = [0x42u8; 32];
4697        let secret2 = [0x43u8; 32]; // Different key
4698        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4699        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4700
4701        let observer = Arc::new(CollectingObserver::new());
4702        mesh2.add_observer(observer.clone());
4703
4704        // mesh1 sends encrypted document
4705        let doc = mesh1.build_document();
4706
4707        // mesh2 cannot decrypt (wrong key)
4708        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4709        assert!(result.is_none());
4710
4711        // Should have SecurityViolation event for decryption failure
4712        let events = observer.events();
4713        assert!(events.iter().any(|e| matches!(
4714            e,
4715            PeatEvent::SecurityViolation {
4716                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4717                ..
4718            }
4719        )));
4720    }
4721
4722    #[test]
4723    fn test_strict_mode_builder_chain() {
4724        let secret = [0x42u8; 32];
4725        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4726            .with_encryption(secret)
4727            .with_strict_encryption()
4728            .with_sync_interval(10_000)
4729            .with_peer_timeout(60_000);
4730
4731        let mesh = PeatMesh::new(config);
4732
4733        assert!(mesh.is_encryption_enabled());
4734        assert!(mesh.is_strict_encryption_enabled());
4735    }
4736
4737    // ==================== Multi-Hop Relay Tests ====================
4738
4739    fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4740        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4741        PeatMesh::new(config)
4742    }
4743
4744    #[test]
4745    fn test_relay_disabled_by_default() {
4746        let mesh = create_mesh(0x11111111, "ALPHA-1");
4747        assert!(!mesh.is_relay_enabled());
4748    }
4749
4750    #[test]
4751    fn test_relay_enabled() {
4752        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4753        assert!(mesh.is_relay_enabled());
4754    }
4755
4756    #[test]
4757    fn test_relay_config_builder() {
4758        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4759            .with_relay()
4760            .with_max_relay_hops(5)
4761            .with_relay_fanout(3)
4762            .with_seen_cache_ttl(60_000);
4763
4764        assert!(config.enable_relay);
4765        assert_eq!(config.max_relay_hops, 5);
4766        assert_eq!(config.relay_fanout, 3);
4767        assert_eq!(config.seen_cache_ttl_ms, 60_000);
4768    }
4769
4770    #[test]
4771    fn test_seen_message_deduplication() {
4772        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4773        let origin = NodeId::new(0x22222222);
4774        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4775
4776        // First time - should be new
4777        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4778
4779        // Second time - should be duplicate
4780        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4781
4782        assert_eq!(mesh.seen_cache_size(), 1);
4783    }
4784
4785    #[test]
4786    fn test_wrap_for_relay() {
4787        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4788
4789        let payload = vec![1, 2, 3, 4, 5];
4790        let wrapped = mesh.wrap_for_relay(payload.clone());
4791
4792        // Should start with relay envelope marker
4793        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4794
4795        // Decode and verify
4796        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4797        assert_eq!(envelope.payload, payload);
4798        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4799        assert_eq!(envelope.hop_count, 0);
4800    }
4801
4802    #[test]
4803    fn test_process_relay_envelope_new_message() {
4804        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4805        let observer = Arc::new(CollectingObserver::new());
4806        mesh.add_observer(observer.clone());
4807
4808        // Create an envelope from another node
4809        let payload = vec![1, 2, 3, 4, 5];
4810        let envelope =
4811            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4812                .with_max_hops(7);
4813        let data = envelope.encode();
4814
4815        // Process it
4816        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4817
4818        assert!(decision.is_some());
4819        let decision = decision.unwrap();
4820        assert_eq!(decision.payload, payload);
4821        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4822        assert_eq!(decision.hop_count, 0);
4823        assert!(decision.should_relay);
4824        assert!(decision.relay_envelope.is_some());
4825
4826        // Relay envelope should have incremented hop count
4827        let relay_env = decision.relay_envelope.unwrap();
4828        assert_eq!(relay_env.hop_count, 1);
4829    }
4830
4831    #[test]
4832    fn test_process_relay_envelope_duplicate() {
4833        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4834        let observer = Arc::new(CollectingObserver::new());
4835        mesh.add_observer(observer.clone());
4836
4837        let payload = vec![1, 2, 3, 4, 5];
4838        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4839        let data = envelope.encode();
4840
4841        // First time - should succeed
4842        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4843        assert!(decision.is_some());
4844
4845        // Second time - should be duplicate
4846        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4847        assert!(decision.is_none());
4848
4849        // Should have DuplicateMessageDropped event
4850        let events = observer.events();
4851        assert!(events
4852            .iter()
4853            .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4854    }
4855
4856    #[test]
4857    fn test_process_relay_envelope_ttl_expired() {
4858        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4859        let observer = Arc::new(CollectingObserver::new());
4860        mesh.add_observer(observer.clone());
4861
4862        // Create envelope at max hops (TTL expired)
4863        let payload = vec![1, 2, 3, 4, 5];
4864        let mut envelope =
4865            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4866                .with_max_hops(3);
4867
4868        // Simulate having been relayed 3 times already
4869        envelope = envelope.relay().unwrap(); // hop 1
4870        envelope = envelope.relay().unwrap(); // hop 2
4871        envelope = envelope.relay().unwrap(); // hop 3 - at max now
4872
4873        let data = envelope.encode();
4874
4875        // Process - should still process locally but not relay further
4876        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4877
4878        assert!(decision.is_some());
4879        let decision = decision.unwrap();
4880        assert_eq!(decision.payload, payload);
4881        assert!(!decision.should_relay); // Cannot relay further
4882        assert!(decision.relay_envelope.is_none());
4883
4884        // Should have MessageTtlExpired event
4885        let events = observer.events();
4886        assert!(events
4887            .iter()
4888            .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
4889    }
4890
4891    #[test]
4892    fn test_build_relay_document() {
4893        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4894
4895        let relay_doc = mesh.build_relay_document();
4896
4897        // Should be a valid relay envelope
4898        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4899
4900        // Decode and verify it contains a valid document
4901        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4902        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4903
4904        // The payload should be a valid PeatDocument
4905        let doc = crate::document::PeatDocument::decode(&envelope.payload);
4906        assert!(doc.is_some());
4907    }
4908
4909    #[test]
4910    fn test_relay_targets_excludes_source() {
4911        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4912
4913        // Add some peers
4914        mesh.on_ble_discovered(
4915            "peer-1",
4916            Some("PEAT_TEST-22222222"),
4917            -60,
4918            Some("TEST"),
4919            1000,
4920        );
4921        mesh.on_ble_connected("peer-1", 1000);
4922
4923        mesh.on_ble_discovered(
4924            "peer-2",
4925            Some("PEAT_TEST-33333333"),
4926            -65,
4927            Some("TEST"),
4928            1000,
4929        );
4930        mesh.on_ble_connected("peer-2", 1000);
4931
4932        mesh.on_ble_discovered(
4933            "peer-3",
4934            Some("PEAT_TEST-44444444"),
4935            -70,
4936            Some("TEST"),
4937            1000,
4938        );
4939        mesh.on_ble_connected("peer-3", 1000);
4940
4941        // Get relay targets excluding peer-2
4942        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4943
4944        // Should not include peer-2 in targets
4945        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4946    }
4947
4948    #[test]
4949    fn test_clear_seen_cache() {
4950        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4951        let origin = NodeId::new(0x22222222);
4952
4953        // Add some messages
4954        mesh.mark_message_seen(
4955            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4956            origin,
4957            1000,
4958        );
4959        mesh.mark_message_seen(
4960            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4961            origin,
4962            2000,
4963        );
4964
4965        assert_eq!(mesh.seen_cache_size(), 2);
4966
4967        // Clear
4968        mesh.clear_seen_cache();
4969        assert_eq!(mesh.seen_cache_size(), 0);
4970    }
4971}