Skip to main content

peat_btle/
peat_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PeatMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for Peat BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use peat_btle::peat_mesh::{PeatMesh, PeatMeshConfig};
26//! use peat_btle::observer::{PeatEvent, PeatObserver};
27//! use peat_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = PeatMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = PeatMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl PeatObserver for MyObserver {
39//!     fn on_event(&self, event: PeatEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("PEAT_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66    TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72    ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73    PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78    RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94/// Configuration for PeatMesh
95#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97    /// Our node ID
98    pub node_id: NodeId,
99
100    /// Our callsign (e.g., "ALPHA-1")
101    pub callsign: String,
102
103    /// Mesh ID to filter peers (e.g., "DEMO")
104    pub mesh_id: String,
105
106    /// Peripheral type for this device
107    pub peripheral_type: PeripheralType,
108
109    /// Peer management configuration
110    pub peer_config: PeerManagerConfig,
111
112    /// Sync interval in milliseconds (how often to broadcast state)
113    pub sync_interval_ms: u64,
114
115    /// Whether to auto-broadcast on emergency/ack
116    pub auto_broadcast_events: bool,
117
118    /// Optional shared secret for mesh-wide encryption (32 bytes)
119    ///
120    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
121    /// transmission and decrypted upon receipt. All nodes in the mesh must
122    /// share the same secret to communicate.
123    pub encryption_secret: Option<[u8; 32]>,
124
125    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
126    ///
127    /// When true and encryption is enabled, any unencrypted documents received
128    /// will be rejected and trigger a SecurityViolation event. This prevents
129    /// downgrade attacks where an adversary sends unencrypted malicious documents.
130    ///
131    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
132    pub strict_encryption: bool,
133
134    /// Enable multi-hop relay
135    ///
136    /// When enabled, received messages will be forwarded to other peers based
137    /// on the gossip strategy. Requires message deduplication to prevent loops.
138    ///
139    /// Default: false
140    pub enable_relay: bool,
141
142    /// Maximum hops for relay messages (TTL)
143    ///
144    /// Messages will not be relayed beyond this many hops from the origin.
145    /// Default: 7
146    pub max_relay_hops: u8,
147
148    /// Gossip fanout for relay
149    ///
150    /// Number of peers to forward each message to. Higher values increase
151    /// convergence speed but also bandwidth usage.
152    /// Default: 2
153    pub relay_fanout: usize,
154
155    /// TTL for seen message cache (milliseconds)
156    ///
157    /// How long to remember message IDs for deduplication.
158    /// Default: 300_000 (5 minutes)
159    pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163    /// Create a new configuration with required fields
164    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165        Self {
166            node_id,
167            callsign: callsign.into(),
168            mesh_id: mesh_id.into(),
169            peripheral_type: PeripheralType::SoldierSensor,
170            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171            sync_interval_ms: 5000,
172            auto_broadcast_events: true,
173            encryption_secret: None,
174            strict_encryption: false,
175            enable_relay: false,
176            max_relay_hops: DEFAULT_MAX_HOPS,
177            relay_fanout: 2,
178            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179        }
180    }
181
182    /// Enable mesh-wide encryption with a shared secret
183    ///
184    /// All documents will be encrypted using ChaCha20-Poly1305 before
185    /// transmission. All mesh participants must use the same secret.
186    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187        self.encryption_secret = Some(secret);
188        self
189    }
190
191    /// Set peripheral type
192    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193        self.peripheral_type = ptype;
194        self
195    }
196
197    /// Set sync interval
198    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199        self.sync_interval_ms = interval_ms;
200        self
201    }
202
203    /// Set peer timeout
204    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205        self.peer_config.peer_timeout_ms = timeout_ms;
206        self
207    }
208
209    /// Set max peers (for embedded systems)
210    pub fn with_max_peers(mut self, max: usize) -> Self {
211        self.peer_config.max_peers = max;
212        self
213    }
214
215    /// Enable strict encryption mode
216    ///
217    /// When enabled (and encryption is also enabled), any unencrypted documents
218    /// received will be rejected and trigger a `SecurityViolation` event.
219    /// This prevents downgrade attacks.
220    ///
221    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
222    pub fn with_strict_encryption(mut self) -> Self {
223        self.strict_encryption = true;
224        self
225    }
226
227    /// Enable multi-hop relay
228    ///
229    /// When enabled, received messages will be forwarded to other connected peers
230    /// based on the gossip strategy. This enables mesh-wide message propagation.
231    pub fn with_relay(mut self) -> Self {
232        self.enable_relay = true;
233        self
234    }
235
236    /// Set maximum relay hops (TTL)
237    ///
238    /// Messages will not be relayed beyond this many hops from the origin.
239    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240        self.max_relay_hops = max_hops;
241        self
242    }
243
244    /// Set gossip fanout for relay
245    ///
246    /// Number of peers to forward each message to.
247    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248        self.relay_fanout = fanout.max(1);
249        self
250    }
251
252    /// Set TTL for seen message cache
253    ///
254    /// How long to remember message IDs for deduplication (milliseconds).
255    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256        self.seen_cache_ttl_ms = ttl_ms;
257        self
258    }
259}
260
261/// Type alias for app document storage to reduce type complexity
262#[cfg(feature = "std")]
263type AppDocumentStore =
264    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266/// Main facade for Peat BLE mesh operations
267///
268/// Composes peer management, document sync, and observer notifications.
269/// Platform implementations call into this from their BLE callbacks.
270#[cfg(feature = "std")]
271pub struct PeatMesh {
272    /// Configuration
273    config: PeatMeshConfig,
274
275    /// Peer manager
276    peer_manager: PeerManager,
277
278    /// Document sync
279    document_sync: DocumentSync,
280
281    /// Observer manager
282    observers: ObserverManager,
283
284    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
285    last_sync_ms: std::sync::atomic::AtomicU32,
286
287    /// Last cleanup time
288    last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290    /// Optional mesh-wide encryption key (derived from shared secret)
291    encryption_key: Option<MeshEncryptionKey>,
292
293    /// Optional per-peer E2EE session manager
294    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296    /// Connection state graph for tracking peer connection lifecycle
297    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299    /// Seen message cache for relay deduplication
300    seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302    /// Gossip strategy for relay peer selection
303    gossip_strategy: Box<dyn GossipStrategy>,
304
305    /// Delta encoder for per-peer sync state tracking
306    ///
307    /// Tracks what data has been sent to each peer to enable delta sync
308    /// (sending only changes instead of full documents).
309    delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311    /// This node's cryptographic identity (Ed25519 keypair)
312    ///
313    /// When set, the node_id is derived from the public key and documents
314    /// can be signed for authenticity verification.
315    identity: Option<DeviceIdentity>,
316
317    /// TOFU identity registry for tracking peer identities
318    ///
319    /// Maps node_id to public key on first contact, rejects mismatches.
320    identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322    /// Peripheral state received from peers
323    ///
324    /// Stores the most recent peripheral data (callsign, location, etc.)
325    /// received from each peer via document sync.
326    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328    /// Document registry for app-layer CRDT types.
329    ///
330    /// Enables external crates to register custom document types that sync
331    /// through the mesh using the extensible registry pattern.
332    document_registry: DocumentRegistry,
333
334    /// Storage for app-layer documents.
335    ///
336    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
337    /// document instance. Values are type-erased boxes that can be downcast
338    /// using the document registry handlers.
339    app_documents: AppDocumentStore,
340
341    /// ADR-059 BleTranslator instance for inbound translator-frame decoding.
342    ///
343    /// One translator per PeatMesh, constructed with `TranslationConfig::default()`.
344    /// Operators wanting custom collection names can swap via
345    /// `set_translator_config`. Always present when the `mesh-translator`
346    /// feature is enabled — the receive-dispatch branch in
347    /// `on_ble_data_received_anonymous` (and siblings) calls
348    /// `decode_inbound_sync` on it for every 0xB6 frame.
349    #[cfg(feature = "mesh-translator")]
350    ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352    /// Optional `DecodedDocumentCallback` set by Rust-native consumers
353    /// (peat-sim integration tests, future Rust hosts). Originally
354    /// specified as the peat-ffi-side wiring in Amendment 1; Amendment 2
355    /// re-aimed Slice 1.b.4 at the host (peat-atak-plugin et al.) via
356    /// the UniFFI [`decoded_document_json_callback`](Self::decoded_document_json_callback)
357    /// path because peat-ffi never owns a `PeatMesh` handle. This Rust
358    /// trait callback is preserved for in-process Rust consumers.
359    ///
360    /// `None` until installed. Both this and the JSON callback fire
361    /// independently when both are set; `PeatEvent::TranslatorNoCallback`
362    /// fires on the observer channel only when **both** are absent so
363    /// the release-skew window stays operator-observable in real time
364    /// (see `crate::observer::PeatEvent`).
365    #[cfg(feature = "mesh-translator")]
366    decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
367
368    /// Optional UniFFI-exported `DecodedDocumentJsonCallback` set by
369    /// host bindings (Kotlin / Swift via peat-btle's UniFFI surface).
370    /// ADR-059 Amendment 2 specifies this as the production wiring
371    /// point for cross-transport receive — peat-atak-plugin's
372    /// `BleDecodedDocumentBridge` implements the trait and forwards
373    /// each decoded doc into peat-ffi's `publishDocument` with
374    /// `origin="ble"`.
375    #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
376    decoded_document_json_callback:
377        std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
378
379    /// ADR-059 Amendment 3 — set when a host calls
380    /// [`acknowledge_polled_translator_consumer`](Self::acknowledge_polled_translator_consumer)
381    /// to attest that it consumes
382    /// [`DataReceivedResult::decoded_translator_frame`] per receive.
383    /// Read by the receive dispatch as the third suppression branch
384    /// for `PeatEvent::TranslatorNoCallback` (the event fires only
385    /// when all three are absent — no Rust-trait callback, no UniFFI
386    /// JSON callback, no polled attestation).
387    ///
388    /// **Defined unconditionally** (not `cfg`-gated) so the binding
389    /// shape is stable across feature combos — UniFFI hosts call the
390    /// same Kotlin signature against any peat-btle build. When
391    /// `mesh-translator` is off the read site (suppression check) is
392    /// gated alongside the rest of the dispatch logic; the flag still
393    /// exists, it just has no effect.
394    polled_translator_consumer_attested: std::sync::atomic::AtomicBool,
395}
396
397#[cfg(feature = "std")]
398impl PeatMesh {
399    /// Create a new PeatMesh instance
400    pub fn new(config: PeatMeshConfig) -> Self {
401        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
402        let document_sync = DocumentSync::with_peripheral_type(
403            config.node_id,
404            &config.callsign,
405            config.peripheral_type,
406        );
407
408        // Derive encryption key from shared secret if configured
409        let encryption_key = config
410            .encryption_secret
411            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
412
413        // Create connection state graph with config thresholds
414        let connection_graph = ConnectionStateGraph::with_config(
415            config.peer_config.rssi_degraded_threshold,
416            config.peer_config.lost_timeout_ms,
417        );
418
419        // Create seen message cache for relay deduplication
420        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
421
422        // Create gossip strategy for relay
423        let gossip_strategy: Box<dyn GossipStrategy> =
424            Box::new(RandomFanout::new(config.relay_fanout));
425
426        // Create delta encoder for per-peer sync state tracking
427        let delta_encoder = DeltaEncoder::new(config.node_id);
428
429        let document_registry = DocumentRegistry::new();
430
431        Self {
432            config,
433            peer_manager,
434            document_sync,
435            observers: ObserverManager::new(),
436            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
437            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
438            encryption_key,
439            peer_sessions: std::sync::Mutex::new(None),
440            connection_graph: std::sync::Mutex::new(connection_graph),
441            seen_cache: std::sync::Mutex::new(seen_cache),
442            gossip_strategy,
443            delta_encoder: std::sync::Mutex::new(delta_encoder),
444            identity: None,
445            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
446            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
447            document_registry,
448            app_documents: std::sync::RwLock::new(HashMap::new()),
449            #[cfg(feature = "mesh-translator")]
450            ble_translator: std::sync::RwLock::new(
451                crate::translator::BleTranslator::with_defaults(),
452            ),
453            #[cfg(feature = "mesh-translator")]
454            decoded_document_callback: std::sync::RwLock::new(None),
455            #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
456            decoded_document_json_callback: std::sync::RwLock::new(None),
457            polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
458        }
459    }
460
461    /// Create a new PeatMesh with a cryptographic identity
462    ///
463    /// The node_id will be derived from the identity's public key, overriding
464    /// any node_id specified in the config. This ensures cryptographic binding
465    /// between node_id and identity.
466    pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
467        // Override node_id with identity-derived value
468        let mut config = config;
469        config.node_id = identity.node_id();
470
471        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
472        let document_sync = DocumentSync::with_peripheral_type(
473            config.node_id,
474            &config.callsign,
475            config.peripheral_type,
476        );
477
478        let encryption_key = config
479            .encryption_secret
480            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
481
482        let connection_graph = ConnectionStateGraph::with_config(
483            config.peer_config.rssi_degraded_threshold,
484            config.peer_config.lost_timeout_ms,
485        );
486
487        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
488        let gossip_strategy: Box<dyn GossipStrategy> =
489            Box::new(RandomFanout::new(config.relay_fanout));
490        let delta_encoder = DeltaEncoder::new(config.node_id);
491
492        let document_registry = DocumentRegistry::new();
493
494        Self {
495            config,
496            peer_manager,
497            document_sync,
498            observers: ObserverManager::new(),
499            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
500            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
501            encryption_key,
502            peer_sessions: std::sync::Mutex::new(None),
503            connection_graph: std::sync::Mutex::new(connection_graph),
504            seen_cache: std::sync::Mutex::new(seen_cache),
505            gossip_strategy,
506            delta_encoder: std::sync::Mutex::new(delta_encoder),
507            identity: Some(identity),
508            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
509            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
510            document_registry,
511            app_documents: std::sync::RwLock::new(HashMap::new()),
512            #[cfg(feature = "mesh-translator")]
513            ble_translator: std::sync::RwLock::new(
514                crate::translator::BleTranslator::with_defaults(),
515            ),
516            #[cfg(feature = "mesh-translator")]
517            decoded_document_callback: std::sync::RwLock::new(None),
518            #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
519            decoded_document_json_callback: std::sync::RwLock::new(None),
520            polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
521        }
522    }
523
524    /// Create a new PeatMesh from genesis data
525    ///
526    /// This is the recommended way to create a mesh for production use.
527    /// The mesh will be configured with:
528    /// - node_id derived from identity
529    /// - mesh_id from genesis
530    /// - encryption enabled using genesis-derived secret
531    pub fn from_genesis(
532        genesis: &crate::security::MeshGenesis,
533        identity: DeviceIdentity,
534        callsign: &str,
535    ) -> Self {
536        let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
537            .with_encryption(genesis.encryption_secret());
538
539        Self::with_identity(config, identity)
540    }
541
542    /// Create a PeatMesh from persisted state.
543    ///
544    /// Restores mesh configuration from previously saved state, including:
545    /// - Device identity (Ed25519 keypair)
546    /// - Mesh genesis (if present)
547    /// - Identity registry (TOFU cache)
548    ///
549    /// Use this on device boot to restore mesh membership without re-provisioning.
550    ///
551    /// # Arguments
552    ///
553    /// * `state` - Previously persisted state
554    /// * `callsign` - Human-readable identifier (may differ from original)
555    ///
556    /// # Errors
557    ///
558    /// Returns `PersistenceError` if the identity cannot be restored.
559    ///
560    /// # Example
561    ///
562    /// ```ignore
563    /// // On boot, restore from secure storage
564    /// let state = PersistedState::load(&storage)?;
565    /// let mesh = PeatMesh::from_persisted(state, "SENSOR-01")?;
566    /// ```
567    #[cfg(feature = "std")]
568    pub fn from_persisted(
569        state: crate::security::PersistedState,
570        callsign: &str,
571    ) -> Result<Self, crate::security::PersistenceError> {
572        // Restore identity
573        let identity = state.restore_identity()?;
574
575        // Restore genesis (if present)
576        let genesis = state.restore_genesis();
577
578        // Create mesh with or without genesis
579        let mesh = if let Some(ref gen) = genesis {
580            Self::from_genesis(gen, identity, callsign)
581        } else {
582            let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
583            Self::with_identity(config, identity)
584        };
585
586        // Restore identity registry
587        let restored_registry = state.restore_registry();
588        if let Ok(mut registry) = mesh.identity_registry.lock() {
589            *registry = restored_registry;
590        }
591
592        log::info!(
593            "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
594            mesh.config.node_id.as_u32(),
595            mesh.known_identity_count()
596        );
597
598        Ok(mesh)
599    }
600
601    /// Create persisted state from current mesh state.
602    ///
603    /// Captures the current identity, genesis, and registry for persistence.
604    /// Call this periodically or before shutdown to save state.
605    ///
606    /// # Arguments
607    ///
608    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
609    ///
610    /// # Returns
611    ///
612    /// `None` if the mesh has no identity bound.
613    #[cfg(feature = "std")]
614    pub fn to_persisted_state(
615        &self,
616        genesis: Option<&crate::security::MeshGenesis>,
617    ) -> Option<crate::security::PersistedState> {
618        let identity = self.identity.as_ref()?;
619        let registry = self.identity_registry.lock().ok()?;
620
621        Some(crate::security::PersistedState::with_registry(
622            identity, genesis, &registry,
623        ))
624    }
625
626    // ==================== ADR-059 translator-frame receive ====================
627
628    /// Install the [`DecodedDocumentCallback`](crate::DecodedDocumentCallback)
629    /// invoked when a 0xB6 translator frame decodes successfully.
630    ///
631    /// Idempotent — calling again replaces the previous callback. Pass an
632    /// `Arc<dyn DecodedDocumentCallback>` so the receive dispatch can clone
633    /// the handle without holding the lock during invocation.
634    ///
635    /// Released-skew note: Slice 1.b.3 ships peat-btle with this setter and
636    /// the receive-dispatch branch; Slice 1.b.4 wires the callback from
637    /// peat-ffi in a *later* release. Until peat-ffi installs one, decoded
638    /// frames are silently dropped (logged at debug level).
639    #[cfg(feature = "mesh-translator")]
640    pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
641        if let Ok(mut slot) = self.decoded_document_callback.write() {
642            *slot = Some(cb);
643        }
644    }
645
646    /// Install the UniFFI-exported
647    /// [`DecodedDocumentJsonCallback`](crate::DecodedDocumentJsonCallback)
648    /// invoked when a 0xB6 translator frame decodes successfully (ADR-059
649    /// Amendment 2 — host-side wiring point for cross-transport receive).
650    ///
651    /// Idempotent — calling again replaces the previous callback. Fires
652    /// independently of the Rust-trait
653    /// [`DecodedDocumentCallback`](crate::DecodedDocumentCallback): when
654    /// both are installed, both are invoked once per decoded frame; when
655    /// neither is installed, `PeatEvent::TranslatorNoCallback` fires on
656    /// the observer channel so the gap is operator-observable.
657    #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
658    pub fn set_decoded_document_json_callback(
659        &self,
660        cb: Box<dyn crate::DecodedDocumentJsonCallback>,
661    ) {
662        // UniFFI 0.31 callback_interface generates a `Box<dyn Trait>` on
663        // the FFI surface. We store as `Arc<dyn Trait>` internally so the
664        // receive dispatch can snapshot-clone the handle out of the
665        // RwLock without holding the lock during user-supplied dispatch
666        // — same pattern as `decoded_document_callback`.
667        let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
668        if let Ok(mut slot) = self.decoded_document_json_callback.write() {
669            *slot = Some(cb_arc);
670        }
671    }
672
673    /// Mark this `PeatMesh` as having a polled-field consumer (ADR-059
674    /// Amendment 3). Hosts that read
675    /// [`DataReceivedResult::decoded_translator_frame`] per receive and
676    /// forward through their own publish path SHOULD call this once at
677    /// startup, after constructing `PeatMesh` and before any GATT
678    /// receive arrives. Without this attestation, the receive dispatch
679    /// has no way to distinguish a polled-consumer host from one that
680    /// decoded the frame and dropped it on the floor, and
681    /// `PeatEvent::TranslatorNoCallback` will fire for every frame.
682    ///
683    /// Idempotent — calling again is a no-op. Defined unconditionally
684    /// so the binding shape is stable across feature combos; when
685    /// `mesh-translator` is off the suppression-check read site is
686    /// gated alongside the rest of the dispatch logic, so the flag has
687    /// no effect (still safe to call).
688    pub fn acknowledge_polled_translator_consumer(&self) {
689        self.polled_translator_consumer_attested
690            .store(true, std::sync::atomic::Ordering::Release);
691    }
692
693    /// Replace the active `BleTranslator` configuration. Optional; the
694    /// default (operator collection names = "tracks"/"platforms"/
695    /// "alerts"/"canned_messages") covers most deployments.
696    #[cfg(feature = "mesh-translator")]
697    pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
698        if let Ok(mut t) = self.ble_translator.write() {
699            *t = crate::translator::BleTranslator::new(config);
700        }
701    }
702
703    /// Translator-frame / reserved-marker dispatcher. Returns `true` if the
704    /// first byte of `decrypted` fell in `0xB6..=0xBF` (handled or silently
705    /// dropped); the caller MUST stop processing and return `None`. Returns
706    /// `false` if the byte was something else; the caller continues with
707    /// the existing legacy / delta-document flow.
708    ///
709    /// `peer` is the BLE peer's identifier (typically a peripheral address).
710    /// `source_node` is the peer's `NodeId` when known — used to populate
711    /// `TranslationContext.local_wire_id` (the hex-u32 form), which the
712    /// `tracks` collection's decoder requires per the trait contract.
713    /// Anonymous receive paths pass `None`; tracks decoding will then return
714    /// an `Err` (logged + dropped) but other collections succeed.
715    #[cfg(feature = "mesh-translator")]
716    fn try_handle_translator_marker(
717        &self,
718        decrypted: &[u8],
719        peer: Option<&str>,
720        source_node: Option<NodeId>,
721    ) -> TranslatorMarkerOutcome {
722        if decrypted.is_empty() {
723            return TranslatorMarkerOutcome::NotTranslatorMarker;
724        }
725        let marker = decrypted[0];
726
727        // Reserved range — silent drop for forward-compat. Receivers running
728        // *this* peat-btle release MUST drop unknown future-translator
729        // markers without falling through to merge_document, otherwise a
730        // 0xB7..=0xBF frame would corrupt the GCounter (ADR-059 Amendment 1
731        // §"Backwards compatibility… 2").
732        if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
733            log::warn!(
734                "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
735                decrypted.len()
736            );
737            return TranslatorMarkerOutcome::Handled;
738        }
739
740        if marker != TRANSLATOR_FRAME_MARKER {
741            return TranslatorMarkerOutcome::NotTranslatorMarker;
742        }
743
744        // 0xB6 translator dispatch.
745        if decrypted.len() < 2 {
746            log::warn!(
747                "ble: dropping truncated translator frame (len={}, missing collection code)",
748                decrypted.len()
749            );
750            return TranslatorMarkerOutcome::Handled;
751        }
752
753        let code = decrypted[1];
754        let payload = &decrypted[2..];
755
756        // Resolve the collection name + build a TranslationContext under
757        // the read-lock, drop the lock before invoking decode_inbound_sync.
758        let (collection, decode_result) = {
759            let translator = match self.ble_translator.read() {
760                Ok(g) => g,
761                Err(_) => {
762                    log::warn!("ble: translator RwLock poisoned; dropping frame");
763                    return TranslatorMarkerOutcome::Handled;
764                }
765            };
766            let collection = match translator.code_to_collection(code) {
767                Some(c) => c.to_string(),
768                None => {
769                    log::warn!(
770                        "ble: dropping translator frame with unknown collection code 0x{code:02X}"
771                    );
772                    return TranslatorMarkerOutcome::Handled;
773                }
774            };
775
776            let mut ctx =
777                peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
778                    .with_collection(collection.clone());
779            if let Some(node) = source_node {
780                ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
781            }
782
783            let result = translator.decode_inbound_sync(payload, &ctx);
784            (collection, result)
785        };
786
787        match decode_result {
788            Ok(Some(doc)) => {
789                // Snapshot both callback Arcs so we don't hold the read
790                // lock during user-supplied dispatch. Per ADR-059
791                // Amendment 2, both callbacks fire independently when
792                // both are installed. ADR-059 Amendment 3 adds the
793                // polled `decoded_translator_frame` field as a third
794                // independent dispatch path; the suppression rule for
795                // `PeatEvent::TranslatorNoCallback` fires only when
796                // **all three** are empty (no Rust-trait callback,
797                // no UniFFI JSON callback, no polled-consumer
798                // attestation via `acknowledge_polled_translator_consumer`).
799                let rust_cb = self
800                    .decoded_document_callback
801                    .read()
802                    .ok()
803                    .and_then(|g| g.as_ref().cloned());
804
805                #[cfg(feature = "uniffi")]
806                let json_cb = self
807                    .decoded_document_json_callback
808                    .read()
809                    .ok()
810                    .and_then(|g| g.as_ref().cloned());
811                #[cfg(not(feature = "uniffi"))]
812                let json_cb: Option<()> = None;
813
814                let polled_attested = self
815                    .polled_translator_consumer_attested
816                    .load(std::sync::atomic::Ordering::Acquire);
817
818                let any_consumer = rust_cb.is_some() || json_cb.is_some() || polled_attested;
819
820                // Fire the Rust-trait callback (Rust-native consumers,
821                // peat-sim integration tests, future Rust-native hosts).
822                if let Some(cb) = &rust_cb {
823                    cb.on_document(&collection, doc.clone(), peer);
824                }
825
826                // Serialize once for both the UniFFI callback and the
827                // polled-field outcome — same JSON shape per Amendment 2
828                // §"Why JSON-string instead of typed Document". A serde
829                // failure here is a wire-format-drift signal worth
830                // surfacing; log and skip both JSON-bearing paths. The
831                // Rust-trait callback (if installed) still fired above.
832                let doc_json_result = serde_json::to_string(&doc);
833
834                #[cfg(feature = "uniffi")]
835                if let (Some(json_cb), Ok(ref doc_json)) = (json_cb, &doc_json_result) {
836                    json_cb.on_document(
837                        collection.clone(),
838                        doc_json.clone(),
839                        peer.map(str::to_string),
840                    );
841                }
842
843                if let Err(ref e) = doc_json_result {
844                    log::warn!(
845                        "ble: failed to serialize decoded {} doc to JSON: {}",
846                        collection,
847                        e
848                    );
849                }
850
851                if !any_consumer {
852                    // No callback installed AND host hasn't attested to
853                    // polling. Decoded successfully but nobody's
854                    // listening. Per ADR-059 Amendment 1/2/3, surface
855                    // through the operator-observable PeatEvent channel
856                    // so staged-rollout deployments see, in real time,
857                    // how many translator-frame payloads the bridge is
858                    // decoding into the void. `log::debug!` alone is
859                    // not a monitoring signal — disabled at default
860                    // log levels.
861                    log::debug!(
862                        "ble: decoded {} frame but no consumer registered (no callback, no polled attestation)",
863                        collection
864                    );
865                    self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
866                        collection: collection.clone(),
867                        peer: peer.map(str::to_string),
868                    });
869                }
870
871                // Surface the decoded frame to the caller as the
872                // polled-consumer output. Caller hoists into
873                // `DataReceivedResult.decoded_translator_frame`.
874                // Falls through to `Handled` (no frame to surface) if
875                // JSON serialization failed above.
876                if let Ok(doc_json) = doc_json_result {
877                    return TranslatorMarkerOutcome::Decoded(DecodedTranslatorFrame {
878                        collection,
879                        doc_json,
880                        peer: peer.map(str::to_string),
881                    });
882                }
883            }
884            Ok(None) => {
885                log::debug!(
886                    "ble: codec declined translator frame for collection {}",
887                    collection
888                );
889            }
890            Err(e) => {
891                log::warn!(
892                    "ble: translator frame decode error (collection={}): {:#}",
893                    collection,
894                    e
895                );
896            }
897        }
898
899        TranslatorMarkerOutcome::Handled
900    }
901
902    // ==================== Encryption ====================
903
904    /// Check if mesh-wide encryption is enabled
905    pub fn is_encryption_enabled(&self) -> bool {
906        self.encryption_key.is_some()
907    }
908
909    /// Check if strict encryption mode is enabled
910    ///
911    /// Returns true only if both encryption and strict_encryption are enabled.
912    pub fn is_strict_encryption_enabled(&self) -> bool {
913        self.config.strict_encryption && self.encryption_key.is_some()
914    }
915
916    /// Enable mesh-wide encryption with a shared secret
917    ///
918    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
919    /// All mesh participants must use the same secret to communicate.
920    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
921        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
922            &self.config.mesh_id,
923            secret,
924        ));
925    }
926
927    /// Disable mesh-wide encryption
928    pub fn disable_encryption(&mut self) {
929        self.encryption_key = None;
930    }
931
932    /// Encrypt document bytes for transmission
933    ///
934    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
935    /// original bytes if encryption is disabled.
936    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
937        match &self.encryption_key {
938            Some(key) => {
939                // Encrypt and prepend marker
940                match key.encrypt_to_bytes(plaintext) {
941                    Ok(ciphertext) => {
942                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
943                        buf.push(ENCRYPTED_MARKER);
944                        buf.push(0x00); // reserved
945                        buf.extend_from_slice(&ciphertext);
946                        buf
947                    }
948                    Err(e) => {
949                        log::error!("Encryption failed: {}", e);
950                        // Fall back to unencrypted on error (shouldn't happen)
951                        plaintext.to_vec()
952                    }
953                }
954            }
955            None => plaintext.to_vec(),
956        }
957    }
958
959    /// Decrypt document bytes received from peer
960    ///
961    /// Returns the decrypted bytes if encrypted and valid, or the original
962    /// bytes if not encrypted. Returns None if decryption fails.
963    ///
964    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
965    /// unencrypted documents are rejected and trigger a SecurityViolation event.
966    fn decrypt_document<'a>(
967        &self,
968        data: &'a [u8],
969        source_hint: Option<&str>,
970    ) -> Option<std::borrow::Cow<'a, [u8]>> {
971        log::debug!(
972            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
973            data.len(),
974            data.first().copied().unwrap_or(0),
975            source_hint
976        );
977
978        // Check for encrypted marker
979        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
980            // Encrypted document
981            let _reserved = data[1];
982            let encrypted_payload = &data[2..];
983
984            log::debug!(
985                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
986                encrypted_payload.len()
987            );
988
989            match &self.encryption_key {
990                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
991                    Ok(plaintext) => {
992                        log::debug!(
993                            "decrypt_document: SUCCESS, plaintext len={}",
994                            plaintext.len()
995                        );
996                        Some(std::borrow::Cow::Owned(plaintext))
997                    }
998                    Err(e) => {
999                        log::warn!(
1000                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
1001                            e,
1002                            encrypted_payload.len(),
1003                            source_hint
1004                        );
1005                        self.notify(PeatEvent::SecurityViolation {
1006                            kind: SecurityViolationKind::DecryptionFailed,
1007                            source: source_hint.map(String::from),
1008                        });
1009                        None
1010                    }
1011                },
1012                None => {
1013                    log::warn!(
1014                        "decrypt_document: encryption not enabled but received encrypted doc"
1015                    );
1016                    None
1017                }
1018            }
1019        } else {
1020            // Unencrypted document
1021            // Check strict encryption mode
1022            if self.config.strict_encryption && self.encryption_key.is_some() {
1023                log::warn!(
1024                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
1025                    source_hint
1026                );
1027                self.notify(PeatEvent::SecurityViolation {
1028                    kind: SecurityViolationKind::UnencryptedInStrictMode,
1029                    source: source_hint.map(String::from),
1030                });
1031                None
1032            } else {
1033                // Permissive mode: accept unencrypted for backward compatibility
1034                Some(std::borrow::Cow::Borrowed(data))
1035            }
1036        }
1037    }
1038
1039    // ==================== Transport Layer API ====================
1040
1041    /// Decrypt data without parsing (transport-only operation)
1042    ///
1043    /// This method provides raw decrypted bytes for apps that want to handle
1044    /// message parsing themselves (using peat-lite or other libraries).
1045    ///
1046    /// # Arguments
1047    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
1048    ///
1049    /// # Returns
1050    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
1051    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
1052    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
1053        self.decrypt_document(data, None)
1054            .map(|cow| cow.into_owned())
1055    }
1056
1057    // ==================== Identity ====================
1058
1059    /// Check if this mesh has a cryptographic identity
1060    pub fn has_identity(&self) -> bool {
1061        self.identity.is_some()
1062    }
1063
1064    /// Get this node's public key (if identity is configured)
1065    pub fn public_key(&self) -> Option<[u8; 32]> {
1066        self.identity.as_ref().map(|id| id.public_key())
1067    }
1068
1069    /// Create an identity attestation for this node
1070    ///
1071    /// Returns None if no identity is configured.
1072    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1073        self.identity
1074            .as_ref()
1075            .map(|id| id.create_attestation(now_ms))
1076    }
1077
1078    /// Verify and register a peer's identity attestation
1079    ///
1080    /// Implements TOFU (Trust On First Use):
1081    /// - On first contact, registers the node_id → public_key binding
1082    /// - On subsequent contacts, verifies the public key matches
1083    ///
1084    /// Returns the verification result. Security violations should be handled
1085    /// by the caller (e.g., disconnect, alert).
1086    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1087        self.identity_registry
1088            .lock()
1089            .unwrap()
1090            .verify_or_register(attestation)
1091    }
1092
1093    /// Check if a peer's identity is known (has been registered)
1094    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1095        self.identity_registry.lock().unwrap().is_known(node_id)
1096    }
1097
1098    /// Get a peer's public key if known
1099    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1100        self.identity_registry
1101            .lock()
1102            .unwrap()
1103            .get_public_key(node_id)
1104            .copied()
1105    }
1106
1107    /// Get the number of known peer identities
1108    pub fn known_identity_count(&self) -> usize {
1109        self.identity_registry.lock().unwrap().len()
1110    }
1111
1112    /// Pre-register a peer's identity (for out-of-band key exchange)
1113    ///
1114    /// Use this when keys are exchanged through a secure side channel
1115    /// (e.g., QR code, NFC tap, or provisioning server).
1116    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1117        self.identity_registry
1118            .lock()
1119            .unwrap()
1120            .pre_register(node_id, public_key, now_ms);
1121    }
1122
1123    /// Remove a peer's identity from the registry
1124    ///
1125    /// Use with caution - this allows re-registration with a different key.
1126    pub fn forget_peer_identity(&self, node_id: NodeId) {
1127        self.identity_registry.lock().unwrap().remove(node_id);
1128    }
1129
1130    /// Sign arbitrary data with this node's identity
1131    ///
1132    /// Returns None if no identity is configured.
1133    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1134        self.identity.as_ref().map(|id| id.sign(data))
1135    }
1136
1137    /// Verify a signature from a peer
1138    ///
1139    /// Uses the peer's public key from the identity registry.
1140    /// Returns false if peer is unknown or signature is invalid.
1141    pub fn verify_peer_signature(
1142        &self,
1143        node_id: NodeId,
1144        data: &[u8],
1145        signature: &[u8; 64],
1146    ) -> bool {
1147        if let Some(public_key) = self.peer_public_key(node_id) {
1148            crate::security::verify_signature(&public_key, data, signature)
1149        } else {
1150            false
1151        }
1152    }
1153
1154    // ==================== Multi-Hop Relay ====================
1155
1156    /// Check if multi-hop relay is enabled
1157    pub fn is_relay_enabled(&self) -> bool {
1158        self.config.enable_relay
1159    }
1160
1161    /// Enable multi-hop relay
1162    pub fn enable_relay(&mut self) {
1163        self.config.enable_relay = true;
1164    }
1165
1166    /// Disable multi-hop relay
1167    pub fn disable_relay(&mut self) {
1168        self.config.enable_relay = false;
1169    }
1170
1171    /// Check if a message has been seen before (for deduplication)
1172    ///
1173    /// Returns true if the message was already seen (duplicate).
1174    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1175        self.seen_cache.lock().unwrap().has_seen(message_id)
1176    }
1177
1178    /// Mark a message as seen
1179    ///
1180    /// Returns true if this is a new message (first time seen).
1181    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1182        self.seen_cache
1183            .lock()
1184            .unwrap()
1185            .check_and_mark(message_id, origin, now_ms)
1186    }
1187
1188    /// Get the number of entries in the seen message cache
1189    pub fn seen_cache_size(&self) -> usize {
1190        self.seen_cache.lock().unwrap().len()
1191    }
1192
1193    /// Clear the seen message cache
1194    pub fn clear_seen_cache(&self) {
1195        self.seen_cache.lock().unwrap().clear();
1196    }
1197
1198    /// Wrap a document in a relay envelope for multi-hop transmission
1199    ///
1200    /// The returned bytes can be sent to peers and will be automatically
1201    /// relayed through the mesh if relay is enabled on receiving nodes.
1202    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1203        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1204            .with_max_hops(self.config.max_relay_hops);
1205        envelope.encode()
1206    }
1207
1208    /// Get peers to relay a message to
1209    ///
1210    /// Uses the configured gossip strategy to select relay targets.
1211    /// Excludes the source peer (if provided) to avoid sending back to sender.
1212    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1213        let connected = self.peer_manager.get_connected_peers();
1214        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1215            connected
1216                .into_iter()
1217                .filter(|p| p.node_id != exclude)
1218                .collect()
1219        } else {
1220            connected
1221        };
1222
1223        self.gossip_strategy
1224            .select_peers(&filtered)
1225            .into_iter()
1226            .cloned()
1227            .collect()
1228    }
1229
1230    /// Process an incoming relay envelope
1231    ///
1232    /// Handles deduplication, TTL checking, and determines if the message
1233    /// should be processed and/or relayed.
1234    ///
1235    /// Returns:
1236    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
1237    /// - `Ok(None)` if message was a duplicate or TTL expired
1238    /// - `Err` if parsing failed
1239    pub fn process_relay_envelope(
1240        &self,
1241        data: &[u8],
1242        source_peer: NodeId,
1243        now_ms: u64,
1244    ) -> Option<RelayDecision> {
1245        // Parse envelope
1246        let envelope = RelayEnvelope::decode(data)?;
1247
1248        // Update indirect peer graph if origin differs from source
1249        // This means the message was relayed through source_peer from origin_node
1250        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1251            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1252                source_peer,
1253                envelope.origin_node,
1254                envelope.hop_count,
1255                now_ms,
1256            );
1257
1258            if is_new {
1259                log::debug!(
1260                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1261                    envelope.origin_node.as_u32(),
1262                    source_peer.as_u32(),
1263                    envelope.hop_count
1264                );
1265            }
1266        }
1267
1268        // Check deduplication
1269        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1270            // Duplicate message
1271            let stats = self
1272                .seen_cache
1273                .lock()
1274                .unwrap()
1275                .get_stats(&envelope.message_id);
1276            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1277
1278            self.notify(PeatEvent::DuplicateMessageDropped {
1279                origin_node: envelope.origin_node,
1280                seen_count,
1281            });
1282
1283            log::debug!(
1284                "Dropped duplicate message {} from {:08X} (seen {} times)",
1285                envelope.message_id,
1286                envelope.origin_node.as_u32(),
1287                seen_count
1288            );
1289            return None;
1290        }
1291
1292        // Check TTL
1293        if !envelope.can_relay() {
1294            self.notify(PeatEvent::MessageTtlExpired {
1295                origin_node: envelope.origin_node,
1296                hop_count: envelope.hop_count,
1297            });
1298
1299            log::debug!(
1300                "Message {} from {:08X} TTL expired at hop {}",
1301                envelope.message_id,
1302                envelope.origin_node.as_u32(),
1303                envelope.hop_count
1304            );
1305
1306            // Still process locally even if TTL expired
1307            return Some(RelayDecision {
1308                payload: envelope.payload,
1309                origin_node: envelope.origin_node,
1310                hop_count: envelope.hop_count,
1311                should_relay: false,
1312                relay_envelope: None,
1313            });
1314        }
1315
1316        // Determine if we should relay
1317        let should_relay = self.config.enable_relay;
1318        let relay_envelope = if should_relay {
1319            envelope.relay() // Increments hop count
1320        } else {
1321            None
1322        };
1323
1324        Some(RelayDecision {
1325            payload: envelope.payload,
1326            origin_node: envelope.origin_node,
1327            hop_count: envelope.hop_count,
1328            should_relay,
1329            relay_envelope,
1330        })
1331    }
1332
1333    /// Build a document wrapped in a relay envelope
1334    ///
1335    /// Convenience method that builds the document, encrypts it (if enabled),
1336    /// and wraps it in a relay envelope for multi-hop transmission.
1337    pub fn build_relay_document(&self) -> Vec<u8> {
1338        let doc = self.build_document(); // Already encrypted if encryption enabled
1339        self.wrap_for_relay(doc)
1340    }
1341
1342    // ==================== Delta Sync ====================
1343
1344    /// Register a peer for delta sync tracking
1345    ///
1346    /// Call this when a peer connects to start tracking what data has been
1347    /// sent to them. This enables future delta sync (sending only changes).
1348    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1349        let mut encoder = self.delta_encoder.lock().unwrap();
1350        encoder.add_peer(peer_id);
1351        log::debug!(
1352            "Registered peer {:08X} for delta sync tracking",
1353            peer_id.as_u32()
1354        );
1355    }
1356
1357    /// Unregister a peer from delta sync tracking
1358    ///
1359    /// Call this when a peer disconnects to clean up tracking state.
1360    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1361        let mut encoder = self.delta_encoder.lock().unwrap();
1362        encoder.remove_peer(peer_id);
1363        log::debug!(
1364            "Unregistered peer {:08X} from delta sync tracking",
1365            peer_id.as_u32()
1366        );
1367    }
1368
1369    /// Reset delta sync state for a peer
1370    ///
1371    /// Call this when a peer reconnects to force a full sync on next
1372    /// communication. This clears the record of what was previously sent.
1373    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1374        let mut encoder = self.delta_encoder.lock().unwrap();
1375        encoder.reset_peer(peer_id);
1376        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1377    }
1378
1379    /// Record bytes sent to a peer (for delta statistics)
1380    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1381        let mut encoder = self.delta_encoder.lock().unwrap();
1382        encoder.record_sent(peer_id, bytes);
1383    }
1384
1385    /// Record bytes received from a peer (for delta statistics)
1386    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1387        let mut encoder = self.delta_encoder.lock().unwrap();
1388        encoder.record_received(peer_id, bytes, timestamp);
1389    }
1390
1391    /// Get delta sync statistics
1392    ///
1393    /// Returns aggregate statistics about delta sync across all peers,
1394    /// including bytes sent/received and sync counts.
1395    pub fn delta_stats(&self) -> DeltaStats {
1396        self.delta_encoder.lock().unwrap().stats()
1397    }
1398
1399    /// Get delta sync statistics for a specific peer
1400    ///
1401    /// Returns the bytes sent/received and sync count for a single peer.
1402    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1403        let encoder = self.delta_encoder.lock().unwrap();
1404        encoder
1405            .get_peer_state(peer_id)
1406            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1407    }
1408
1409    /// Build a delta document for a specific peer
1410    ///
1411    /// This only includes operations that have changed since the last sync
1412    /// with this peer. Uses the delta encoder to track per-peer state.
1413    ///
1414    /// Returns the encoded delta document bytes, or None if there's nothing
1415    /// new to send to this peer.
1416    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1417        // Collect all current operations
1418        let mut all_operations: Vec<Operation> = Vec::new();
1419
1420        // Add counter operations (one per node that has contributed)
1421        // Use the count value as the "timestamp" for tracking - only send if count increased
1422        for (node_id_u32, count) in self.document_sync.counter_entries() {
1423            all_operations.push(Operation::IncrementCounter {
1424                counter_id: 0, // Default mesh counter
1425                node_id: NodeId::new(node_id_u32),
1426                amount: count,
1427                timestamp: count, // Use count as timestamp for delta tracking
1428            });
1429        }
1430
1431        // Add peripheral update
1432        // Use event timestamp if available, otherwise use 1 for initial send
1433        let peripheral = self.document_sync.peripheral_snapshot();
1434        let peripheral_timestamp = peripheral
1435            .last_event
1436            .as_ref()
1437            .map(|e| e.timestamp)
1438            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1439        all_operations.push(Operation::UpdatePeripheral {
1440            peripheral,
1441            timestamp: peripheral_timestamp,
1442        });
1443
1444        // Add emergency operations if active
1445        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1446            let source_node = NodeId::new(emergency.source_node());
1447            let timestamp = emergency.timestamp();
1448
1449            // Add SetEmergency operation
1450            all_operations.push(Operation::SetEmergency {
1451                source_node,
1452                timestamp,
1453                known_peers: emergency.all_nodes(),
1454            });
1455
1456            // Add AckEmergency for each node that has acked
1457            for acked_node in emergency.acked_nodes() {
1458                all_operations.push(Operation::AckEmergency {
1459                    node_id: NodeId::new(acked_node),
1460                    emergency_timestamp: timestamp,
1461                });
1462            }
1463        }
1464
1465        // Add app document operations
1466        for app_op in self.app_document_delta_ops() {
1467            all_operations.push(Operation::App(app_op));
1468        }
1469
1470        // Filter operations for this peer (only send what's new)
1471        let filtered_operations: Vec<Operation> = {
1472            let encoder = self.delta_encoder.lock().unwrap();
1473            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1474                all_operations
1475                    .into_iter()
1476                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1477                    .collect()
1478            } else {
1479                // Unknown peer, send all operations
1480                all_operations
1481            }
1482        };
1483
1484        // If nothing new to send, return None
1485        if filtered_operations.is_empty() {
1486            return None;
1487        }
1488
1489        // Mark operations as sent
1490        {
1491            let mut encoder = self.delta_encoder.lock().unwrap();
1492            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1493                for op in &filtered_operations {
1494                    peer_state.mark_sent(&op.key(), op.timestamp());
1495                }
1496            }
1497        }
1498
1499        // Build the delta document
1500        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1501        for op in filtered_operations {
1502            delta.add_operation(op);
1503        }
1504
1505        // Encode and optionally encrypt
1506        let encoded = delta.encode();
1507        let result = self.encrypt_document(&encoded);
1508
1509        // Record stats
1510        {
1511            let mut encoder = self.delta_encoder.lock().unwrap();
1512            encoder.record_sent(peer_id, result.len());
1513        }
1514
1515        Some(result)
1516    }
1517
1518    /// Build a full delta document (for broadcast or new peers)
1519    ///
1520    /// Unlike `build_delta_document_for_peer`, this includes all state
1521    /// regardless of what has been sent before. Use this for broadcasts.
1522    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1523        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1524
1525        // Add all counter operations
1526        for (node_id_u32, count) in self.document_sync.counter_entries() {
1527            delta.add_operation(Operation::IncrementCounter {
1528                counter_id: 0,
1529                node_id: NodeId::new(node_id_u32),
1530                amount: count,
1531                timestamp: now_ms,
1532            });
1533        }
1534
1535        // Add peripheral
1536        let peripheral = self.document_sync.peripheral_snapshot();
1537        let peripheral_timestamp = peripheral
1538            .last_event
1539            .as_ref()
1540            .map(|e| e.timestamp)
1541            .unwrap_or(now_ms);
1542        delta.add_operation(Operation::UpdatePeripheral {
1543            peripheral,
1544            timestamp: peripheral_timestamp,
1545        });
1546
1547        // Add emergency if active
1548        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1549            let source_node = NodeId::new(emergency.source_node());
1550            let timestamp = emergency.timestamp();
1551
1552            delta.add_operation(Operation::SetEmergency {
1553                source_node,
1554                timestamp,
1555                known_peers: emergency.all_nodes(),
1556            });
1557
1558            for acked_node in emergency.acked_nodes() {
1559                delta.add_operation(Operation::AckEmergency {
1560                    node_id: NodeId::new(acked_node),
1561                    emergency_timestamp: timestamp,
1562                });
1563            }
1564        }
1565
1566        // Add app document operations
1567        for app_op in self.app_document_delta_ops() {
1568            delta.add_operation(Operation::App(app_op));
1569        }
1570
1571        let encoded = delta.encode();
1572        self.encrypt_document(&encoded)
1573    }
1574
1575    /// Internal: Process a received delta document
1576    ///
1577    /// Applies operations from a delta document to local state.
1578    fn process_delta_document_internal(
1579        &self,
1580        source_node: NodeId,
1581        data: &[u8],
1582        now_ms: u64,
1583        relay_data: Option<Vec<u8>>,
1584        origin_node: Option<NodeId>,
1585        hop_count: u8,
1586    ) -> Option<DataReceivedResult> {
1587        // Decode the delta document
1588        let delta = DeltaDocument::decode(data)?;
1589
1590        // Don't process our own documents
1591        if delta.origin_node == self.config.node_id {
1592            return None;
1593        }
1594
1595        // Apply operations to local state
1596        let mut counter_changed = false;
1597        let mut emergency_changed = false;
1598        let mut is_emergency = false;
1599        let mut is_ack = false;
1600        let mut event_timestamp = 0u64;
1601        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1602
1603        log::info!(
1604            "Delta document from {:08X}: {} operations, data_len={}",
1605            delta.origin_node.as_u32(),
1606            delta.operations.len(),
1607            data.len()
1608        );
1609        for op in &delta.operations {
1610            log::info!("  Operation: {}", op.key());
1611            match op {
1612                Operation::IncrementCounter {
1613                    node_id, amount, ..
1614                } => {
1615                    // Merge counter value (take max)
1616                    let current = self.document_sync.counter_entries();
1617                    let current_value = current
1618                        .iter()
1619                        .find(|(id, _)| *id == node_id.as_u32())
1620                        .map(|(_, v)| *v)
1621                        .unwrap_or(0);
1622
1623                    if *amount > current_value {
1624                        // Need to merge - this is handled by the counter merge logic
1625                        // For now, we record that counter changed
1626                        counter_changed = true;
1627                    }
1628                }
1629                Operation::UpdatePeripheral {
1630                    peripheral,
1631                    timestamp,
1632                } => {
1633                    // Store peer peripheral for callsign lookup
1634                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1635                        peripherals.insert(delta.origin_node, peripheral.clone());
1636                    }
1637                    // Track the peripheral for the result
1638                    peer_peripheral = Some(peripheral.clone());
1639                    // Track the timestamp for the result
1640                    if *timestamp > event_timestamp {
1641                        event_timestamp = *timestamp;
1642                    }
1643                }
1644                Operation::SetEmergency { timestamp, .. } => {
1645                    is_emergency = true;
1646                    emergency_changed = true;
1647                    event_timestamp = *timestamp;
1648                }
1649                Operation::AckEmergency {
1650                    emergency_timestamp,
1651                    ..
1652                } => {
1653                    is_ack = true;
1654                    emergency_changed = true;
1655                    if *emergency_timestamp > event_timestamp {
1656                        event_timestamp = *emergency_timestamp;
1657                    }
1658                }
1659                Operation::ClearEmergency {
1660                    emergency_timestamp,
1661                } => {
1662                    emergency_changed = true;
1663                    if *emergency_timestamp > event_timestamp {
1664                        event_timestamp = *emergency_timestamp;
1665                    }
1666                }
1667                Operation::App(app_op) => {
1668                    // Handle app-layer document operation
1669                    //
1670                    // The timestamp field may contain version info in the upper bits
1671                    // (used by delta encoder for change detection). Extract the
1672                    // original document timestamp from the lower 48 bits.
1673                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1674
1675                    log::info!(
1676                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1677                        app_op.type_id,
1678                        app_op.op_code,
1679                        app_op.source_node,
1680                        doc_timestamp,
1681                        app_op.payload.len()
1682                    );
1683
1684                    // Try to find/create document and apply the delta operation
1685                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1686                    let changed = {
1687                        let mut docs = self.app_documents.write().unwrap();
1688
1689                        if let Some(existing) = docs.get_mut(&doc_key) {
1690                            // Apply delta to existing document (merge via CRDT)
1691                            self.document_registry.apply_delta_op(
1692                                app_op.type_id,
1693                                existing.as_mut(),
1694                                app_op,
1695                            )
1696                        } else {
1697                            // Try to decode from payload (for FULL_STATE operations)
1698                            if let Some(decoded) = self
1699                                .document_registry
1700                                .decode(app_op.type_id, &app_op.payload)
1701                            {
1702                                docs.insert(doc_key, decoded);
1703                                true
1704                            } else {
1705                                // For delta ops on unknown documents, we can't apply
1706                                // The full state will be sent later
1707                                log::debug!(
1708                                    "Received delta for unknown doc {:?}, waiting for full state",
1709                                    doc_key
1710                                );
1711                                false
1712                            }
1713                        }
1714                    };
1715
1716                    // Emit observer event with the real document timestamp
1717                    self.observers.notify(PeatEvent::app_document_received(
1718                        app_op.type_id,
1719                        NodeId::new(app_op.source_node),
1720                        doc_timestamp,
1721                        changed,
1722                    ));
1723                }
1724            }
1725        }
1726
1727        // Record sync
1728        self.peer_manager.record_sync(source_node, now_ms);
1729
1730        // Record delta received
1731        {
1732            let mut encoder = self.delta_encoder.lock().unwrap();
1733            encoder.record_received(&source_node, data.len(), now_ms);
1734        }
1735
1736        // Generate events based on what was received
1737        if is_emergency {
1738            self.notify(PeatEvent::EmergencyReceived {
1739                from_node: delta.origin_node,
1740            });
1741        } else if is_ack {
1742            self.notify(PeatEvent::AckReceived {
1743                from_node: delta.origin_node,
1744            });
1745        }
1746
1747        if counter_changed {
1748            let total_count = self.document_sync.total_count();
1749            self.notify(PeatEvent::DocumentSynced {
1750                from_node: delta.origin_node,
1751                total_count,
1752            });
1753        }
1754
1755        // Emit relay event if we're relaying
1756        if relay_data.is_some() {
1757            let relay_targets = self.get_relay_targets(Some(source_node));
1758            self.notify(PeatEvent::MessageRelayed {
1759                origin_node: origin_node.unwrap_or(delta.origin_node),
1760                relay_count: relay_targets.len(),
1761                hop_count,
1762            });
1763        }
1764
1765        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1766            DataReceivedResult::peripheral_fields(&peer_peripheral);
1767
1768        Some(DataReceivedResult {
1769            source_node: delta.origin_node,
1770            is_emergency,
1771            is_ack,
1772            counter_changed,
1773            emergency_changed,
1774            total_count: self.document_sync.total_count(),
1775            event_timestamp,
1776            relay_data,
1777            origin_node,
1778            hop_count,
1779            callsign,
1780            battery_percent,
1781            heart_rate,
1782            event_type,
1783            latitude,
1784            longitude,
1785            altitude,
1786            ..Default::default()
1787        })
1788    }
1789
1790    // ==================== Per-Peer E2EE ====================
1791
1792    /// Enable per-peer E2EE capability
1793    ///
1794    /// Creates a new identity key for this node. This allows establishing
1795    /// encrypted sessions with specific peers where only the sender and
1796    /// recipient can read messages (other mesh members cannot).
1797    pub fn enable_peer_e2ee(&self) {
1798        let mut sessions = self.peer_sessions.lock().unwrap();
1799        if sessions.is_none() {
1800            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1801            log::info!(
1802                "Per-peer E2EE enabled for node {:08X}",
1803                self.config.node_id.as_u32()
1804            );
1805        }
1806    }
1807
1808    /// Disable per-peer E2EE capability
1809    ///
1810    /// Clears all peer sessions and disables E2EE.
1811    pub fn disable_peer_e2ee(&self) {
1812        let mut sessions = self.peer_sessions.lock().unwrap();
1813        *sessions = None;
1814        log::info!("Per-peer E2EE disabled");
1815    }
1816
1817    /// Check if per-peer E2EE is enabled
1818    pub fn is_peer_e2ee_enabled(&self) -> bool {
1819        self.peer_sessions.lock().unwrap().is_some()
1820    }
1821
1822    /// Get our E2EE public key (for sharing with peers)
1823    ///
1824    /// Returns None if per-peer E2EE is not enabled.
1825    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1826        self.peer_sessions
1827            .lock()
1828            .unwrap()
1829            .as_ref()
1830            .map(|s| s.our_public_key())
1831    }
1832
1833    /// Initiate E2EE session with a specific peer
1834    ///
1835    /// Returns the key exchange message bytes to send to the peer.
1836    /// The message should be broadcast/sent to the peer.
1837    /// Returns None if per-peer E2EE is not enabled.
1838    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1839        let mut sessions = self.peer_sessions.lock().unwrap();
1840        let session_mgr = sessions.as_mut()?;
1841
1842        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1843        let mut buf = Vec::with_capacity(2 + 37);
1844        buf.push(KEY_EXCHANGE_MARKER);
1845        buf.push(0x00); // reserved
1846        buf.extend_from_slice(&key_exchange.encode());
1847
1848        log::info!(
1849            "Initiated E2EE session with peer {:08X}",
1850            peer_node_id.as_u32()
1851        );
1852        Some(buf)
1853    }
1854
1855    /// Check if we have an established E2EE session with a peer
1856    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1857        self.peer_sessions
1858            .lock()
1859            .unwrap()
1860            .as_ref()
1861            .is_some_and(|s| s.has_session(peer_node_id))
1862    }
1863
1864    /// Get E2EE session state with a peer
1865    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1866        self.peer_sessions
1867            .lock()
1868            .unwrap()
1869            .as_ref()
1870            .and_then(|s| s.session_state(peer_node_id))
1871    }
1872
1873    /// Send an E2EE encrypted message to a specific peer
1874    ///
1875    /// Returns the encrypted message bytes to send, or None if no session exists.
1876    /// The message should be sent directly to the peer (not broadcast).
1877    pub fn send_peer_e2ee(
1878        &self,
1879        peer_node_id: NodeId,
1880        plaintext: &[u8],
1881        now_ms: u64,
1882    ) -> Option<Vec<u8>> {
1883        let mut sessions = self.peer_sessions.lock().unwrap();
1884        let session_mgr = sessions.as_mut()?;
1885
1886        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1887            Ok(encrypted) => {
1888                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1889                buf.push(PEER_E2EE_MARKER);
1890                buf.push(0x00); // reserved
1891                buf.extend_from_slice(&encrypted.encode());
1892                Some(buf)
1893            }
1894            Err(e) => {
1895                log::warn!(
1896                    "Failed to encrypt for peer {:08X}: {:?}",
1897                    peer_node_id.as_u32(),
1898                    e
1899                );
1900                None
1901            }
1902        }
1903    }
1904
1905    /// Close E2EE session with a peer
1906    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1907        let mut sessions = self.peer_sessions.lock().unwrap();
1908        if let Some(session_mgr) = sessions.as_mut() {
1909            session_mgr.close_session(peer_node_id);
1910            self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1911            log::info!(
1912                "Closed E2EE session with peer {:08X}",
1913                peer_node_id.as_u32()
1914            );
1915        }
1916    }
1917
1918    /// Get count of active E2EE sessions
1919    pub fn peer_e2ee_session_count(&self) -> usize {
1920        self.peer_sessions
1921            .lock()
1922            .unwrap()
1923            .as_ref()
1924            .map(|s| s.session_count())
1925            .unwrap_or(0)
1926    }
1927
1928    /// Get count of established E2EE sessions
1929    pub fn peer_e2ee_established_count(&self) -> usize {
1930        self.peer_sessions
1931            .lock()
1932            .unwrap()
1933            .as_ref()
1934            .map(|s| s.established_count())
1935            .unwrap_or(0)
1936    }
1937
1938    /// Handle incoming key exchange message
1939    ///
1940    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1941    /// Returns the response key exchange bytes to send back, or None if invalid.
1942    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1943        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1944            return None;
1945        }
1946
1947        let payload = &data[2..];
1948        let msg = KeyExchangeMessage::decode(payload)?;
1949
1950        let mut sessions = self.peer_sessions.lock().unwrap();
1951        let session_mgr = sessions.as_mut()?;
1952
1953        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1954
1955        if established {
1956            self.notify(PeatEvent::PeerE2eeEstablished {
1957                peer_node_id: msg.sender_node_id,
1958            });
1959            log::info!(
1960                "E2EE session established with peer {:08X}",
1961                msg.sender_node_id.as_u32()
1962            );
1963        }
1964
1965        // Return response key exchange
1966        let mut buf = Vec::with_capacity(2 + 37);
1967        buf.push(KEY_EXCHANGE_MARKER);
1968        buf.push(0x00);
1969        buf.extend_from_slice(&response.encode());
1970        Some(buf)
1971    }
1972
1973    /// Handle incoming E2EE encrypted message
1974    ///
1975    /// Called internally when we receive a PEER_E2EE_MARKER message.
1976    /// Decrypts and notifies observers of the received message.
1977    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1978        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1979            return None;
1980        }
1981
1982        let payload = &data[2..];
1983        let msg = PeerEncryptedMessage::decode(payload)?;
1984
1985        let mut sessions = self.peer_sessions.lock().unwrap();
1986        let session_mgr = sessions.as_mut()?;
1987
1988        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1989            Ok(plaintext) => {
1990                // Notify observers of the decrypted message
1991                self.notify(PeatEvent::PeerE2eeMessageReceived {
1992                    from_node: msg.sender_node_id,
1993                    data: plaintext.clone(),
1994                });
1995                Some(plaintext)
1996            }
1997            Err(e) => {
1998                log::warn!(
1999                    "Failed to decrypt E2EE message from {:08X}: {:?}",
2000                    msg.sender_node_id.as_u32(),
2001                    e
2002                );
2003                None
2004            }
2005        }
2006    }
2007
2008    // ==================== Configuration ====================
2009
2010    /// Get our node ID
2011    pub fn node_id(&self) -> NodeId {
2012        self.config.node_id
2013    }
2014
2015    /// Get our callsign
2016    pub fn callsign(&self) -> &str {
2017        &self.config.callsign
2018    }
2019
2020    /// Get the mesh ID
2021    pub fn mesh_id(&self) -> &str {
2022        &self.config.mesh_id
2023    }
2024
2025    /// Get the device name for BLE advertising
2026    pub fn device_name(&self) -> String {
2027        format!(
2028            "PEAT_{}-{:08X}",
2029            self.config.mesh_id,
2030            self.config.node_id.as_u32()
2031        )
2032    }
2033
2034    /// Get a peer's callsign by node ID
2035    ///
2036    /// Returns the callsign from the peer's most recently received peripheral data,
2037    /// or None if no peripheral data has been received from this peer.
2038    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
2039        self.peer_peripherals.read().ok().and_then(|peripherals| {
2040            peripherals
2041                .get(&node_id)
2042                .map(|p| p.callsign_str().to_string())
2043        })
2044    }
2045
2046    /// Get a peer's full peripheral data by node ID
2047    ///
2048    /// Returns a clone of the peripheral data from the peer's most recently received
2049    /// document, or None if no peripheral data has been received from this peer.
2050    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
2051        self.peer_peripherals
2052            .read()
2053            .ok()
2054            .and_then(|peripherals| peripherals.get(&node_id).cloned())
2055    }
2056
2057    // ==================== Document Registry ====================
2058
2059    /// Get a reference to the document registry.
2060    ///
2061    /// Use this to register custom document types for mesh synchronization.
2062    ///
2063    /// # Example
2064    ///
2065    /// ```ignore
2066    /// use peat_btle::registry::DocumentType;
2067    ///
2068    /// // Register a custom document type
2069    /// mesh.document_registry().register::<MyCustomDocument>();
2070    /// ```
2071    pub fn document_registry(&self) -> &DocumentRegistry {
2072        &self.document_registry
2073    }
2074
2075    /// Store an app-layer document.
2076    ///
2077    /// If a document with the same identity (type_id, source_node, timestamp)
2078    /// already exists, it will be merged using CRDT semantics.
2079    ///
2080    /// Returns true if the document was newly added or changed via merge.
2081    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2082        let type_id = T::TYPE_ID;
2083        let (source_node, timestamp) = doc.identity();
2084        let key = (type_id, source_node, timestamp);
2085
2086        let mut docs = self.app_documents.write().unwrap();
2087
2088        if let Some(existing) = docs.get_mut(&key) {
2089            // Merge with existing document
2090            self.document_registry
2091                .merge(type_id, existing.as_mut(), &doc)
2092        } else {
2093            // Insert new document
2094            docs.insert(key, Box::new(doc));
2095            true
2096        }
2097    }
2098
2099    /// Store a type-erased app-layer document.
2100    ///
2101    /// Used when receiving documents from the network where the type is
2102    /// determined at runtime.
2103    ///
2104    /// Returns true if the document was newly added or changed via merge.
2105    pub fn store_app_document_boxed(
2106        &self,
2107        type_id: u8,
2108        source_node: u32,
2109        timestamp: u64,
2110        doc: Box<dyn core::any::Any + Send + Sync>,
2111    ) -> bool {
2112        let key = (type_id, source_node, timestamp);
2113
2114        let mut docs = self.app_documents.write().unwrap();
2115
2116        if let Some(existing) = docs.get_mut(&key) {
2117            // Merge with existing document
2118            self.document_registry
2119                .merge(type_id, existing.as_mut(), doc.as_ref())
2120        } else {
2121            // Insert new document
2122            docs.insert(key, doc);
2123            true
2124        }
2125    }
2126
2127    /// Get a stored app-layer document by identity.
2128    ///
2129    /// Returns None if not found or if downcast to T fails.
2130    pub fn get_app_document<T: crate::registry::DocumentType>(
2131        &self,
2132        source_node: u32,
2133        timestamp: u64,
2134    ) -> Option<T> {
2135        let key = (T::TYPE_ID, source_node, timestamp);
2136
2137        let docs = self.app_documents.read().unwrap();
2138        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2139    }
2140
2141    /// Get all stored app-layer documents of a specific type.
2142    ///
2143    /// Returns a vector of all documents of type T currently stored.
2144    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2145        let docs = self.app_documents.read().unwrap();
2146        docs.iter()
2147            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2148            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2149            .collect()
2150    }
2151
2152    /// Get delta operations for all stored app documents.
2153    ///
2154    /// Used during sync to include app documents in the delta stream.
2155    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2156        let docs = self.app_documents.read().unwrap();
2157        let mut ops = Vec::new();
2158
2159        for ((type_id, _source, _ts), doc) in docs.iter() {
2160            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2161                ops.push(op);
2162            }
2163        }
2164
2165        ops
2166    }
2167
2168    /// Get all document keys for a given type.
2169    ///
2170    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
2171    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2172        let docs = self.app_documents.read().unwrap();
2173        docs.keys()
2174            .filter(|(tid, _, _)| *tid == type_id)
2175            .map(|(_, source, ts)| (*source, *ts))
2176            .collect()
2177    }
2178
2179    /// Get the number of stored app documents.
2180    pub fn app_document_count(&self) -> usize {
2181        self.app_documents.read().unwrap().len()
2182    }
2183
2184    // ==================== Observer Management ====================
2185
2186    /// Add an observer for mesh events
2187    pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2188        self.observers.add(observer);
2189    }
2190
2191    /// Remove an observer
2192    pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2193        self.observers.remove(observer);
2194    }
2195
2196    // ==================== User Actions ====================
2197
2198    /// Send an emergency alert
2199    ///
2200    /// Returns the document bytes to broadcast to all peers.
2201    /// If encryption is enabled, the document is encrypted.
2202    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2203        let data = self.document_sync.send_emergency(timestamp);
2204        self.notify(PeatEvent::MeshStateChanged {
2205            peer_count: self.peer_manager.peer_count(),
2206            connected_count: self.peer_manager.connected_count(),
2207        });
2208        self.encrypt_document(&data)
2209    }
2210
2211    /// Send an ACK response
2212    ///
2213    /// Returns the document bytes to broadcast to all peers.
2214    /// If encryption is enabled, the document is encrypted.
2215    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2216        let data = self.document_sync.send_ack(timestamp);
2217        self.notify(PeatEvent::MeshStateChanged {
2218            peer_count: self.peer_manager.peer_count(),
2219            connected_count: self.peer_manager.connected_count(),
2220        });
2221        self.encrypt_document(&data)
2222    }
2223
2224    /// Broadcast arbitrary bytes over the mesh.
2225    ///
2226    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
2227    /// and returns bytes ready to send to all connected peers.
2228    ///
2229    /// This is useful for sending extension data like CannedMessages from peat-lite.
2230    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2231        self.encrypt_document(payload)
2232    }
2233
2234    /// Clear the current event (emergency or ack)
2235    pub fn clear_event(&self) {
2236        self.document_sync.clear_event();
2237    }
2238
2239    /// Check if emergency is active
2240    pub fn is_emergency_active(&self) -> bool {
2241        self.document_sync.is_emergency_active()
2242    }
2243
2244    /// Check if ACK is active
2245    pub fn is_ack_active(&self) -> bool {
2246        self.document_sync.is_ack_active()
2247    }
2248
2249    /// Get current event type
2250    pub fn current_event(&self) -> Option<EventType> {
2251        self.document_sync.current_event()
2252    }
2253
2254    // ==================== Emergency Management (Document-Based) ====================
2255
2256    /// Start a new emergency event with ACK tracking
2257    ///
2258    /// Creates an emergency event that tracks ACKs from all known peers.
2259    /// Pass the list of known peer node IDs to track.
2260    /// Returns the document bytes to broadcast.
2261    /// If encryption is enabled, the document is encrypted.
2262    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2263        let data = self.document_sync.start_emergency(timestamp, known_peers);
2264        self.notify(PeatEvent::MeshStateChanged {
2265            peer_count: self.peer_manager.peer_count(),
2266            connected_count: self.peer_manager.connected_count(),
2267        });
2268        self.encrypt_document(&data)
2269    }
2270
2271    /// Start a new emergency using all currently known peers
2272    ///
2273    /// Convenience method that automatically includes all discovered peers.
2274    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2275        let peers: Vec<u32> = self
2276            .peer_manager
2277            .get_peers()
2278            .iter()
2279            .map(|p| p.node_id.as_u32())
2280            .collect();
2281        self.start_emergency(timestamp, &peers)
2282    }
2283
2284    /// Record our ACK for the current emergency
2285    ///
2286    /// Returns the document bytes to broadcast, or None if no emergency is active.
2287    /// If encryption is enabled, the document is encrypted.
2288    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2289        let result = self.document_sync.ack_emergency(timestamp);
2290        if result.is_some() {
2291            self.notify(PeatEvent::MeshStateChanged {
2292                peer_count: self.peer_manager.peer_count(),
2293                connected_count: self.peer_manager.connected_count(),
2294            });
2295        }
2296        result.map(|data| self.encrypt_document(&data))
2297    }
2298
2299    /// Clear the current emergency event
2300    pub fn clear_emergency(&self) {
2301        self.document_sync.clear_emergency();
2302    }
2303
2304    /// Check if there's an active emergency
2305    pub fn has_active_emergency(&self) -> bool {
2306        self.document_sync.has_active_emergency()
2307    }
2308
2309    /// Get emergency status info
2310    ///
2311    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
2312    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2313        self.document_sync.get_emergency_status()
2314    }
2315
2316    /// Check if a specific peer has ACKed the current emergency
2317    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2318        self.document_sync.has_peer_acked(peer_id)
2319    }
2320
2321    /// Check if all peers have ACKed the current emergency
2322    pub fn all_peers_acked(&self) -> bool {
2323        self.document_sync.all_peers_acked()
2324    }
2325
2326    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
2327
2328    /// Send a chat message
2329    ///
2330    /// Adds the message to the local CRDT and returns the document bytes
2331    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
2332    ///
2333    /// Returns the encrypted document bytes if the message was new,
2334    /// or None if it was a duplicate.
2335    #[cfg(feature = "legacy-chat")]
2336    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2337        if self.document_sync.add_chat_message(sender, text, timestamp) {
2338            Some(self.encrypt_document(&self.build_document()))
2339        } else {
2340            None
2341        }
2342    }
2343
2344    /// Send a chat reply
2345    ///
2346    /// Adds the reply to the local CRDT with reply-to information and returns
2347    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
2348    ///
2349    /// Returns the encrypted document bytes if the message was new,
2350    /// or None if it was a duplicate.
2351    #[cfg(feature = "legacy-chat")]
2352    pub fn send_chat_reply(
2353        &self,
2354        sender: &str,
2355        text: &str,
2356        reply_to_node: u32,
2357        reply_to_timestamp: u64,
2358        timestamp: u64,
2359    ) -> Option<Vec<u8>> {
2360        if self.document_sync.add_chat_reply(
2361            sender,
2362            text,
2363            reply_to_node,
2364            reply_to_timestamp,
2365            timestamp,
2366        ) {
2367            Some(self.encrypt_document(&self.build_document()))
2368        } else {
2369            None
2370        }
2371    }
2372
2373    /// Get the number of chat messages in the local CRDT
2374    #[cfg(feature = "legacy-chat")]
2375    pub fn chat_count(&self) -> usize {
2376        self.document_sync.chat_count()
2377    }
2378
2379    /// Get chat messages newer than a timestamp
2380    ///
2381    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2382    #[cfg(feature = "legacy-chat")]
2383    pub fn chat_messages_since(
2384        &self,
2385        since_timestamp: u64,
2386    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2387        self.document_sync.chat_messages_since(since_timestamp)
2388    }
2389
2390    /// Get all chat messages
2391    ///
2392    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2393    #[cfg(feature = "legacy-chat")]
2394    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2395        self.document_sync.all_chat_messages()
2396    }
2397
2398    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2399
2400    /// Called when a BLE device is discovered
2401    ///
2402    /// Returns `Some(PeatPeer)` if this is a new Peat peer on our mesh.
2403    pub fn on_ble_discovered(
2404        &self,
2405        identifier: &str,
2406        name: Option<&str>,
2407        rssi: i8,
2408        mesh_id: Option<&str>,
2409        now_ms: u64,
2410    ) -> Option<PeatPeer> {
2411        let (node_id, is_new) = self
2412            .peer_manager
2413            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2414
2415        let peer = self.peer_manager.get_peer(node_id)?;
2416
2417        // Update connection graph
2418        {
2419            let mut graph = self.connection_graph.lock().unwrap();
2420            graph.on_discovered(
2421                node_id,
2422                identifier.to_string(),
2423                name.map(|s| s.to_string()),
2424                mesh_id.map(|s| s.to_string()),
2425                rssi,
2426                now_ms,
2427            );
2428        }
2429
2430        if is_new {
2431            self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2432            self.notify_mesh_state_changed();
2433        }
2434
2435        Some(peer)
2436    }
2437
2438    /// Called when a BLE connection is established (outgoing)
2439    ///
2440    /// Returns the NodeId if this identifier is known.
2441    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2442        let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2443            Some(id) => id,
2444            None => {
2445                log::warn!(
2446                    "on_ble_connected: identifier {:?} not in peer map — \
2447                     use on_incoming_connection() for peripheral connections",
2448                    identifier
2449                );
2450                return None;
2451            }
2452        };
2453
2454        // Update connection graph
2455        {
2456            let mut graph = self.connection_graph.lock().unwrap();
2457            graph.on_connected(node_id, now_ms);
2458        }
2459
2460        // Register peer for delta sync tracking
2461        self.register_peer_for_delta(&node_id);
2462
2463        self.notify(PeatEvent::PeerConnected { node_id });
2464        self.notify_mesh_state_changed();
2465        Some(node_id)
2466    }
2467
2468    /// Called when a BLE connection is lost
2469    pub fn on_ble_disconnected(
2470        &self,
2471        identifier: &str,
2472        reason: DisconnectReason,
2473    ) -> Option<NodeId> {
2474        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2475
2476        // Update connection graph (convert observer reason to platform reason)
2477        {
2478            let mut graph = self.connection_graph.lock().unwrap();
2479            let platform_reason = match observer_reason {
2480                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2481                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2482                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2483                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2484                DisconnectReason::ConnectionFailed => {
2485                    crate::platform::DisconnectReason::ConnectionFailed
2486                }
2487                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2488            };
2489            let now_ms = std::time::SystemTime::now()
2490                .duration_since(std::time::UNIX_EPOCH)
2491                .map(|d| d.as_millis() as u64)
2492                .unwrap_or(0);
2493            graph.on_disconnected(node_id, platform_reason, now_ms);
2494
2495            // Remove indirect peer paths that went through this peer
2496            // These paths may no longer be valid since the direct connection is lost
2497            graph.remove_via_peer(node_id);
2498        }
2499
2500        // Unregister peer from delta sync tracking
2501        self.unregister_peer_for_delta(&node_id);
2502
2503        self.notify(PeatEvent::PeerDisconnected {
2504            node_id,
2505            reason: observer_reason,
2506        });
2507        self.notify_mesh_state_changed();
2508        Some(node_id)
2509    }
2510
2511    /// Called when a BLE connection is lost, using NodeId directly
2512    ///
2513    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2514    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2515        if self
2516            .peer_manager
2517            .on_disconnected_by_node_id(node_id, reason)
2518        {
2519            // Update connection graph
2520            {
2521                let mut graph = self.connection_graph.lock().unwrap();
2522                let platform_reason = match reason {
2523                    DisconnectReason::LocalRequest => {
2524                        crate::platform::DisconnectReason::LocalRequest
2525                    }
2526                    DisconnectReason::RemoteRequest => {
2527                        crate::platform::DisconnectReason::RemoteRequest
2528                    }
2529                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2530                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2531                    DisconnectReason::ConnectionFailed => {
2532                        crate::platform::DisconnectReason::ConnectionFailed
2533                    }
2534                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2535                };
2536                let now_ms = std::time::SystemTime::now()
2537                    .duration_since(std::time::UNIX_EPOCH)
2538                    .map(|d| d.as_millis() as u64)
2539                    .unwrap_or(0);
2540                graph.on_disconnected(node_id, platform_reason, now_ms);
2541
2542                // Remove indirect peer paths that went through this peer
2543                graph.remove_via_peer(node_id);
2544            }
2545
2546            // Unregister peer from delta sync tracking
2547            self.unregister_peer_for_delta(&node_id);
2548
2549            self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2550            self.notify_mesh_state_changed();
2551        }
2552    }
2553
2554    /// Called when a remote device connects to us (incoming connection)
2555    ///
2556    /// Use this when we're acting as a peripheral and a central connects to us.
2557    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2558        let is_new = self
2559            .peer_manager
2560            .on_incoming_connection(identifier, node_id, now_ms);
2561
2562        // Update connection graph
2563        {
2564            let mut graph = self.connection_graph.lock().unwrap();
2565            if is_new {
2566                graph.on_discovered(
2567                    node_id,
2568                    identifier.to_string(),
2569                    None,
2570                    Some(self.config.mesh_id.clone()),
2571                    -50, // Default good RSSI for incoming connections
2572                    now_ms,
2573                );
2574            }
2575            graph.on_connected(node_id, now_ms);
2576        }
2577
2578        // Register peer for delta sync tracking
2579        self.register_peer_for_delta(&node_id);
2580
2581        if is_new {
2582            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2583                self.notify(PeatEvent::PeerDiscovered { peer });
2584            }
2585        }
2586
2587        self.notify(PeatEvent::PeerConnected { node_id });
2588        self.notify_mesh_state_changed();
2589
2590        is_new
2591    }
2592
2593    /// Called when data is received from a peer
2594    ///
2595    /// Parses the document, merges it, and generates appropriate events.
2596    /// If encryption is enabled, decrypts the document first.
2597    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2598    /// Returns the source NodeId and whether the document contained an event.
2599    pub fn on_ble_data_received(
2600        &self,
2601        identifier: &str,
2602        data: &[u8],
2603        now_ms: u64,
2604    ) -> Option<DataReceivedResult> {
2605        // Get node ID from identifier
2606        let node_id = self.peer_manager.get_node_id(identifier)?;
2607
2608        // Check for special message types first
2609        if data.len() >= 2 {
2610            match data[0] {
2611                KEY_EXCHANGE_MARKER => {
2612                    // Handle key exchange - returns response to send back
2613                    let _response = self.handle_key_exchange(data, now_ms);
2614                    // Return None as this isn't a document sync
2615                    return None;
2616                }
2617                PEER_E2EE_MARKER => {
2618                    // Handle encrypted peer message
2619                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2620                    // Return None as this isn't a document sync
2621                    return None;
2622                }
2623                RELAY_ENVELOPE_MARKER => {
2624                    // Handle relay envelope for multi-hop
2625                    return self
2626                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2627                }
2628                _ => {}
2629            }
2630        }
2631
2632        // Direct document (not relay envelope)
2633        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2634    }
2635
2636    /// Internal: Process document data with identifier as source hint
2637    #[allow(clippy::too_many_arguments)]
2638    fn process_document_data_with_identifier(
2639        &self,
2640        source_node: NodeId,
2641        identifier: &str,
2642        data: &[u8],
2643        now_ms: u64,
2644        relay_data: Option<Vec<u8>>,
2645        origin_node: Option<NodeId>,
2646        hop_count: u8,
2647    ) -> Option<DataReceivedResult> {
2648        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2649        let decrypted = self.decrypt_document(data, Some(identifier))?;
2650
2651        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
2652        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
2653        // Amendment 3: surface decoded frames via the polled
2654        // `decoded_translator_frame` field on the returned result so
2655        // hosts (peat-atak-plugin et al.) don't depend on the JNA-broken
2656        // UniFFI callback path in ATAK.
2657        #[cfg(feature = "mesh-translator")]
2658        match self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2659            TranslatorMarkerOutcome::NotTranslatorMarker => {}
2660            TranslatorMarkerOutcome::Handled => return None,
2661            TranslatorMarkerOutcome::Decoded(frame) => {
2662                return Some(DataReceivedResult::translator_frame(source_node, frame));
2663            }
2664        }
2665
2666        // Check if this is a delta document (wire format v2)
2667        if DeltaDocument::is_delta_document(&decrypted) {
2668            return self.process_delta_document_internal(
2669                source_node,
2670                &decrypted,
2671                now_ms,
2672                relay_data,
2673                origin_node,
2674                hop_count,
2675            );
2676        }
2677
2678        // Merge the document (legacy wire format v1)
2679        let result = self.document_sync.merge_document(&decrypted)?;
2680
2681        // Store peer peripheral if present (for callsign lookup)
2682        if let Some(ref peripheral) = result.peer_peripheral {
2683            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2684                peripherals.insert(result.source_node, peripheral.clone());
2685            }
2686        }
2687
2688        // Record sync
2689        self.peer_manager.record_sync(source_node, now_ms);
2690
2691        // Generate events based on what was received
2692        if result.is_emergency() {
2693            self.notify(PeatEvent::EmergencyReceived {
2694                from_node: result.source_node,
2695            });
2696        } else if result.is_ack() {
2697            self.notify(PeatEvent::AckReceived {
2698                from_node: result.source_node,
2699            });
2700        }
2701
2702        if result.counter_changed {
2703            self.notify(PeatEvent::DocumentSynced {
2704                from_node: result.source_node,
2705                total_count: result.total_count,
2706            });
2707        }
2708
2709        // Emit relay event if we're relaying
2710        if relay_data.is_some() {
2711            let relay_targets = self.get_relay_targets(Some(source_node));
2712            self.notify(PeatEvent::MessageRelayed {
2713                origin_node: origin_node.unwrap_or(result.source_node),
2714                relay_count: relay_targets.len(),
2715                hop_count,
2716            });
2717        }
2718
2719        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2720            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2721
2722        Some(DataReceivedResult {
2723            source_node: result.source_node,
2724            is_emergency: result.is_emergency(),
2725            is_ack: result.is_ack(),
2726            counter_changed: result.counter_changed,
2727            emergency_changed: result.emergency_changed,
2728            total_count: result.total_count,
2729            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2730            relay_data,
2731            origin_node,
2732            hop_count,
2733            callsign,
2734            battery_percent,
2735            heart_rate,
2736            event_type,
2737            latitude,
2738            longitude,
2739            altitude,
2740            ..Default::default()
2741        })
2742    }
2743
2744    /// Internal: Handle relay envelope with identifier as source hint
2745    fn handle_relay_envelope_with_identifier(
2746        &self,
2747        source_node: NodeId,
2748        identifier: &str,
2749        data: &[u8],
2750        now_ms: u64,
2751    ) -> Option<DataReceivedResult> {
2752        // Process the relay envelope
2753        let envelope = RelayEnvelope::decode(data)?;
2754
2755        // Check deduplication
2756        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2757            let stats = self
2758                .seen_cache
2759                .lock()
2760                .unwrap()
2761                .get_stats(&envelope.message_id);
2762            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2763
2764            self.notify(PeatEvent::DuplicateMessageDropped {
2765                origin_node: envelope.origin_node,
2766                seen_count,
2767            });
2768            return None;
2769        }
2770
2771        // Check TTL and get relay data
2772        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2773            envelope.relay().map(|e| e.encode())
2774        } else {
2775            if !envelope.can_relay() {
2776                self.notify(PeatEvent::MessageTtlExpired {
2777                    origin_node: envelope.origin_node,
2778                    hop_count: envelope.hop_count,
2779                });
2780            }
2781            None
2782        };
2783
2784        // Process the inner payload
2785        self.process_document_data_with_identifier(
2786            source_node,
2787            identifier,
2788            &envelope.payload,
2789            now_ms,
2790            relay_data,
2791            Some(envelope.origin_node),
2792            envelope.hop_count,
2793        )
2794    }
2795
2796    /// Called when data is received but we don't have the identifier mapped
2797    ///
2798    /// Use this when receiving data from a peripheral we discovered.
2799    /// If encryption is enabled, decrypts the document first.
2800    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2801    /// Handles relay envelopes for multi-hop mesh operation.
2802    pub fn on_ble_data_received_from_node(
2803        &self,
2804        node_id: NodeId,
2805        data: &[u8],
2806        now_ms: u64,
2807    ) -> Option<DataReceivedResult> {
2808        // Check for special message types first
2809        if data.len() >= 2 {
2810            match data[0] {
2811                KEY_EXCHANGE_MARKER => {
2812                    let _response = self.handle_key_exchange(data, now_ms);
2813                    return None;
2814                }
2815                PEER_E2EE_MARKER => {
2816                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2817                    return None;
2818                }
2819                RELAY_ENVELOPE_MARKER => {
2820                    // Handle relay envelope for multi-hop
2821                    return self.handle_relay_envelope(node_id, data, now_ms);
2822                }
2823                _ => {}
2824            }
2825        }
2826
2827        // Direct document (not relay envelope)
2828        self.process_document_data(node_id, data, now_ms, None, None, 0)
2829    }
2830
2831    /// Called when encrypted data is received from an unknown peer
2832    ///
2833    /// This handles the case where we receive an encrypted document from a BLE address
2834    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2835    /// The function decrypts first using the mesh key, then extracts the source_node
2836    /// from the decrypted document header and registers the peer.
2837    ///
2838    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2839    /// Returns `None` if decryption fails or the document is invalid.
2840    pub fn on_ble_data_received_anonymous(
2841        &self,
2842        identifier: &str,
2843        data: &[u8],
2844        now_ms: u64,
2845    ) -> Option<DataReceivedResult> {
2846        log::debug!(
2847            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2848            identifier,
2849            data.len(),
2850            data.first().copied().unwrap_or(0)
2851        );
2852
2853        // Try to decrypt (handles both encrypted and unencrypted documents)
2854        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2855            Some(d) => d,
2856            None => {
2857                log::warn!(
2858                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2859                    data.len(),
2860                    identifier
2861                );
2862                return None;
2863            }
2864        };
2865
2866        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6
2867        // translator frames through BleTranslator::decode_inbound, and
2868        // silent-drop reserved 0xB7..=0xBF frames before they reach
2869        // PeatDocument::decode (where a 0xB6+ payload could hit the
2870        // GCounter-pollution hazard for inputs whose bytes 8..11 fall
2871        // <= MAX_COUNTER_ENTRIES). Anonymous receive: source_node is
2872        // unknown until the legacy path extracts it; for translator
2873        // frames there's no PeatDocument header to extract from, so we
2874        // pass None and accept that `tracks` decoding will Err for lack
2875        // of `local_wire_id` (other collections succeed).
2876        #[cfg(feature = "mesh-translator")]
2877        match self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2878            TranslatorMarkerOutcome::NotTranslatorMarker => {}
2879            TranslatorMarkerOutcome::Handled => return None,
2880            TranslatorMarkerOutcome::Decoded(frame) => {
2881                // Anonymous path has no source_node yet — translator
2882                // frames carry no PeatDocument header. Use a placeholder
2883                // NodeId; the meaningful payload rides
2884                // `decoded_translator_frame` per Amendment 3.
2885                return Some(DataReceivedResult::translator_frame(NodeId::new(0), frame));
2886            }
2887        }
2888
2889        // Extract source_node from decrypted document header
2890        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2891        if decrypted.len() < 8 {
2892            log::warn!("Decrypted document too short to extract source_node");
2893            return None;
2894        }
2895
2896        let source_node_u32 =
2897            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2898        let source_node = NodeId::new(source_node_u32);
2899
2900        log::info!(
2901            "Anonymous document from {}: source_node={:08X}, len={}",
2902            identifier,
2903            source_node_u32,
2904            decrypted.len()
2905        );
2906
2907        // Register the peer with this identifier so future lookups work
2908        // This handles BLE address rotation
2909        self.peer_manager
2910            .register_identifier(identifier, source_node);
2911
2912        // Check if this is a delta document
2913        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2914        log::info!(
2915            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2916            is_delta,
2917            decrypted.first().copied().unwrap_or(0),
2918            decrypted.len()
2919        );
2920
2921        if is_delta {
2922            return self.process_delta_document_internal(
2923                source_node,
2924                &decrypted,
2925                now_ms,
2926                None,
2927                None,
2928                0,
2929            );
2930        }
2931
2932        // Handle app-layer message (0xAF marker from peat-lite CannedMessages)
2933        // These are handled by consumers who register DocumentType implementations
2934        // via the DocumentRegistry. Pass through as relay_data for platform layer.
2935        const APP_LAYER_MARKER: u8 = 0xAF;
2936        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2937            log::debug!(
2938                "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2939                source_node.as_u32(),
2940                decrypted.len()
2941            );
2942            return Some(DataReceivedResult {
2943                source_node,
2944                is_emergency: false,
2945                is_ack: false,
2946                counter_changed: false,
2947                emergency_changed: false,
2948                total_count: 0,
2949                event_timestamp: now_ms,
2950                relay_data: Some(decrypted.to_vec()),
2951                origin_node: None,
2952                hop_count: 0,
2953                callsign: None,
2954                battery_percent: None,
2955                heart_rate: None,
2956                event_type: None,
2957                latitude: None,
2958                longitude: None,
2959                altitude: None,
2960                ..Default::default()
2961            });
2962        }
2963
2964        // Merge the document (legacy wire format v1)
2965        log::info!(
2966            "Processing legacy document from {:08X}",
2967            source_node.as_u32()
2968        );
2969        let result = self.document_sync.merge_document(&decrypted)?;
2970
2971        // Log what we got from the merge
2972        log::info!(
2973            "Merge result: peer_peripheral={}, counter_changed={}",
2974            result.peer_peripheral.is_some(),
2975            result.counter_changed
2976        );
2977        if let Some(ref p) = result.peer_peripheral {
2978            log::info!("Peripheral callsign: '{}'", p.callsign_str());
2979        }
2980
2981        // Record sync
2982        self.peer_manager.record_sync(source_node, now_ms);
2983
2984        // Generate events
2985        if result.is_emergency() {
2986            self.notify(PeatEvent::EmergencyReceived {
2987                from_node: result.source_node,
2988            });
2989        } else if result.is_ack() {
2990            self.notify(PeatEvent::AckReceived {
2991                from_node: result.source_node,
2992            });
2993        }
2994
2995        if result.counter_changed {
2996            self.notify(PeatEvent::DocumentSynced {
2997                from_node: result.source_node,
2998                total_count: result.total_count,
2999            });
3000        }
3001
3002        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3003            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3004
3005        Some(DataReceivedResult {
3006            source_node: result.source_node,
3007            is_emergency: result.is_emergency(),
3008            is_ack: result.is_ack(),
3009            counter_changed: result.counter_changed,
3010            emergency_changed: result.emergency_changed,
3011            total_count: result.total_count,
3012            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3013            relay_data: None,
3014            origin_node: None,
3015            hop_count: 0,
3016            callsign,
3017            battery_percent,
3018            heart_rate,
3019            event_type,
3020            latitude,
3021            longitude,
3022            altitude,
3023            ..Default::default()
3024        })
3025    }
3026
3027    /// Internal: Process document data (shared by direct and relay paths)
3028    fn process_document_data(
3029        &self,
3030        source_node: NodeId,
3031        data: &[u8],
3032        now_ms: u64,
3033        relay_data: Option<Vec<u8>>,
3034        origin_node: Option<NodeId>,
3035        hop_count: u8,
3036    ) -> Option<DataReceivedResult> {
3037        // Decrypt if encrypted (mesh-wide encryption)
3038        let source_hint = format!("node:{:08X}", source_node.as_u32());
3039        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
3040
3041        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
3042        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
3043        // Amendment 3: surface decoded frames via `decoded_translator_frame`.
3044        #[cfg(feature = "mesh-translator")]
3045        match self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
3046            TranslatorMarkerOutcome::NotTranslatorMarker => {}
3047            TranslatorMarkerOutcome::Handled => return None,
3048            TranslatorMarkerOutcome::Decoded(frame) => {
3049                return Some(DataReceivedResult::translator_frame(source_node, frame));
3050            }
3051        }
3052
3053        // Check if this is a delta document (wire format v2)
3054        if DeltaDocument::is_delta_document(&decrypted) {
3055            return self.process_delta_document_internal(
3056                source_node,
3057                &decrypted,
3058                now_ms,
3059                relay_data,
3060                origin_node,
3061                hop_count,
3062            );
3063        }
3064
3065        // Merge the document (legacy wire format v1)
3066        let result = self.document_sync.merge_document(&decrypted)?;
3067
3068        // Store peer peripheral if present (for callsign lookup)
3069        if let Some(ref peripheral) = result.peer_peripheral {
3070            if let Ok(mut peripherals) = self.peer_peripherals.write() {
3071                peripherals.insert(result.source_node, peripheral.clone());
3072            }
3073        }
3074
3075        // Record sync
3076        self.peer_manager.record_sync(source_node, now_ms);
3077
3078        // Generate events based on what was received
3079        if result.is_emergency() {
3080            self.notify(PeatEvent::EmergencyReceived {
3081                from_node: result.source_node,
3082            });
3083        } else if result.is_ack() {
3084            self.notify(PeatEvent::AckReceived {
3085                from_node: result.source_node,
3086            });
3087        }
3088
3089        if result.counter_changed {
3090            self.notify(PeatEvent::DocumentSynced {
3091                from_node: result.source_node,
3092                total_count: result.total_count,
3093            });
3094        }
3095
3096        // Emit relay event if we're relaying
3097        if relay_data.is_some() {
3098            let relay_targets = self.get_relay_targets(Some(source_node));
3099            self.notify(PeatEvent::MessageRelayed {
3100                origin_node: origin_node.unwrap_or(result.source_node),
3101                relay_count: relay_targets.len(),
3102                hop_count,
3103            });
3104        }
3105
3106        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3107            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3108
3109        Some(DataReceivedResult {
3110            source_node: result.source_node,
3111            is_emergency: result.is_emergency(),
3112            is_ack: result.is_ack(),
3113            counter_changed: result.counter_changed,
3114            emergency_changed: result.emergency_changed,
3115            total_count: result.total_count,
3116            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3117            relay_data,
3118            origin_node,
3119            hop_count,
3120            callsign,
3121            battery_percent,
3122            heart_rate,
3123            event_type,
3124            latitude,
3125            longitude,
3126            altitude,
3127            ..Default::default()
3128        })
3129    }
3130
3131    /// Internal: Handle relay envelope
3132    fn handle_relay_envelope(
3133        &self,
3134        source_node: NodeId,
3135        data: &[u8],
3136        now_ms: u64,
3137    ) -> Option<DataReceivedResult> {
3138        // Process the relay envelope
3139        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3140
3141        // Get relay data if we should relay
3142        let relay_data = if decision.should_relay {
3143            decision.relay_data()
3144        } else {
3145            None
3146        };
3147
3148        // Process the inner payload
3149        self.process_document_data(
3150            source_node,
3151            &decision.payload,
3152            now_ms,
3153            relay_data,
3154            Some(decision.origin_node),
3155            decision.hop_count,
3156        )
3157    }
3158
3159    /// Called when data is received without a known identifier
3160    ///
3161    /// This is the simplest data receive method - it extracts the source node_id
3162    /// from the document itself. Use this when you don't track identifiers
3163    /// (e.g., ESP32 NimBLE).
3164    /// If encryption is enabled, decrypts the document first.
3165    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
3166    /// Handles relay envelopes for multi-hop mesh operation.
3167    pub fn on_ble_data(
3168        &self,
3169        identifier: &str,
3170        data: &[u8],
3171        now_ms: u64,
3172    ) -> Option<DataReceivedResult> {
3173        // Check for special message types first
3174        if data.len() >= 2 {
3175            match data[0] {
3176                KEY_EXCHANGE_MARKER => {
3177                    let _response = self.handle_key_exchange(data, now_ms);
3178                    return None;
3179                }
3180                PEER_E2EE_MARKER => {
3181                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3182                    return None;
3183                }
3184                RELAY_ENVELOPE_MARKER => {
3185                    // Handle relay envelope - extract origin from envelope
3186                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3187                }
3188                _ => {}
3189            }
3190        }
3191
3192        // Direct document - process normally
3193        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3194    }
3195
3196    /// Internal: Process incoming document (handles peer registration)
3197    fn process_incoming_document(
3198        &self,
3199        identifier: &str,
3200        data: &[u8],
3201        now_ms: u64,
3202        relay_data: Option<Vec<u8>>,
3203        origin_node: Option<NodeId>,
3204        hop_count: u8,
3205    ) -> Option<DataReceivedResult> {
3206        // Decrypt if encrypted (mesh-wide encryption)
3207        let decrypted = self.decrypt_document(data, Some(identifier))?;
3208
3209        // Merge the document (extracts node_id internally)
3210        let result = self.document_sync.merge_document(&decrypted)?;
3211
3212        // Record sync using the source_node from the merged document
3213        self.peer_manager.record_sync(result.source_node, now_ms);
3214
3215        // Only register the identifier mapping for direct messages (not relayed)
3216        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
3217        // not the origin node (who created the document). The relay source is already registered
3218        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
3219        if origin_node.is_none() {
3220            // Direct message - register the peer with this identifier
3221            let is_new =
3222                self.peer_manager
3223                    .on_incoming_connection(identifier, result.source_node, now_ms);
3224
3225            // Update connection graph to track connection state
3226            {
3227                let mut graph = self.connection_graph.lock().unwrap();
3228                if is_new {
3229                    graph.on_discovered(
3230                        result.source_node,
3231                        identifier.to_string(),
3232                        None,
3233                        Some(self.config.mesh_id.clone()),
3234                        -50, // Default RSSI for data-based discovery
3235                        now_ms,
3236                    );
3237                }
3238                graph.on_connected(result.source_node, now_ms);
3239            }
3240        }
3241
3242        // Generate events based on what was received
3243        if result.is_emergency() {
3244            self.notify(PeatEvent::EmergencyReceived {
3245                from_node: result.source_node,
3246            });
3247        } else if result.is_ack() {
3248            self.notify(PeatEvent::AckReceived {
3249                from_node: result.source_node,
3250            });
3251        }
3252
3253        if result.counter_changed {
3254            self.notify(PeatEvent::DocumentSynced {
3255                from_node: result.source_node,
3256                total_count: result.total_count,
3257            });
3258        }
3259
3260        // Emit relay event if we're relaying
3261        if relay_data.is_some() {
3262            let relay_targets = self.get_relay_targets(Some(result.source_node));
3263            self.notify(PeatEvent::MessageRelayed {
3264                origin_node: origin_node.unwrap_or(result.source_node),
3265                relay_count: relay_targets.len(),
3266                hop_count,
3267            });
3268        }
3269
3270        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3271            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3272
3273        Some(DataReceivedResult {
3274            source_node: result.source_node,
3275            is_emergency: result.is_emergency(),
3276            is_ack: result.is_ack(),
3277            counter_changed: result.counter_changed,
3278            emergency_changed: result.emergency_changed,
3279            total_count: result.total_count,
3280            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3281            relay_data,
3282            origin_node,
3283            hop_count,
3284            callsign,
3285            battery_percent,
3286            heart_rate,
3287            event_type,
3288            latitude,
3289            longitude,
3290            altitude,
3291            ..Default::default()
3292        })
3293    }
3294
3295    /// Internal: Handle relay envelope with incoming connection registration
3296    fn handle_relay_envelope_with_incoming(
3297        &self,
3298        identifier: &str,
3299        data: &[u8],
3300        now_ms: u64,
3301    ) -> Option<DataReceivedResult> {
3302        // Parse envelope to get origin
3303        let envelope = RelayEnvelope::decode(data)?;
3304
3305        // Try to look up the source peer from identifier to register indirect path
3306        // If we know who sent this relay, we can track indirect peers via them
3307        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3308            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3309                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3310                    source_peer,
3311                    envelope.origin_node,
3312                    envelope.hop_count,
3313                    now_ms,
3314                );
3315
3316                if is_new {
3317                    log::debug!(
3318                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3319                        envelope.origin_node.as_u32(),
3320                        source_peer.as_u32(),
3321                        envelope.hop_count
3322                    );
3323                }
3324            }
3325        }
3326
3327        // Check deduplication
3328        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3329            // Duplicate - get stats for event
3330            let stats = self
3331                .seen_cache
3332                .lock()
3333                .unwrap()
3334                .get_stats(&envelope.message_id);
3335            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3336
3337            self.notify(PeatEvent::DuplicateMessageDropped {
3338                origin_node: envelope.origin_node,
3339                seen_count,
3340            });
3341            return None;
3342        }
3343
3344        // Check TTL
3345        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3346            let relay_env = envelope.relay();
3347            (true, relay_env.map(|e| e.encode()))
3348        } else {
3349            if !envelope.can_relay() {
3350                self.notify(PeatEvent::MessageTtlExpired {
3351                    origin_node: envelope.origin_node,
3352                    hop_count: envelope.hop_count,
3353                });
3354            }
3355            (false, None)
3356        };
3357
3358        // Process the inner payload
3359        self.process_incoming_document(
3360            identifier,
3361            &envelope.payload,
3362            now_ms,
3363            if should_relay { relay_data } else { None },
3364            Some(envelope.origin_node),
3365            envelope.hop_count,
3366        )
3367    }
3368
3369    // ==================== Periodic Maintenance ====================
3370
3371    /// Periodic tick - call this regularly (e.g., every second)
3372    ///
3373    /// Performs:
3374    /// - Stale peer cleanup
3375    /// - Periodic sync broadcast (if interval elapsed)
3376    ///
3377    /// Returns `Some(data)` if a sync broadcast is needed.
3378    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3379        use std::sync::atomic::Ordering;
3380
3381        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
3382        let now_ms_32 = now_ms as u32;
3383
3384        // Cleanup stale peers
3385        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3386        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3387        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3388            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3389            let removed = self.peer_manager.cleanup_stale(now_ms);
3390            for node_id in &removed {
3391                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3392            }
3393            if !removed.is_empty() {
3394                self.notify_mesh_state_changed();
3395            }
3396
3397            // Run connection graph maintenance (transition Disconnected -> Lost)
3398            {
3399                let mut graph = self.connection_graph.lock().unwrap();
3400                let newly_lost = graph.tick(now_ms);
3401                // Also cleanup peers lost for more than peer_timeout
3402                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3403                drop(graph);
3404
3405                // Emit PeerLost events for newly lost peers from graph
3406                // (these may differ from peer_manager removals)
3407                for node_id in newly_lost {
3408                    // Only notify if not already notified by peer_manager
3409                    if !removed.contains(&node_id) {
3410                        self.notify(PeatEvent::PeerLost { node_id });
3411                    }
3412                }
3413            }
3414        }
3415
3416        // Check if sync broadcast is needed
3417        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3418        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3419        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3420            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3421            // Only broadcast if we have connected peers
3422            if self.peer_manager.connected_count() > 0 {
3423                let doc = self.document_sync.build_document();
3424                return Some(self.encrypt_document(&doc));
3425            }
3426        }
3427
3428        None
3429    }
3430
3431    /// Periodic tick returning per-peer delta documents
3432    ///
3433    /// Unlike `tick()` which broadcasts a single document to all peers,
3434    /// this returns targeted deltas that only include changes each peer
3435    /// hasn't seen. Use this for platforms that support per-peer transmission.
3436    ///
3437    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3438    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3439    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3440        use std::sync::atomic::Ordering;
3441        let now_ms_32 = now_ms as u32;
3442
3443        // Cleanup stale peers (same as tick())
3444        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3445        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3446        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3447            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3448            let removed = self.peer_manager.cleanup_stale(now_ms);
3449            for node_id in &removed {
3450                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3451            }
3452            if !removed.is_empty() {
3453                self.notify_mesh_state_changed();
3454            }
3455
3456            // Run connection graph maintenance
3457            {
3458                let mut graph = self.connection_graph.lock().unwrap();
3459                let newly_lost = graph.tick(now_ms);
3460                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3461                drop(graph);
3462
3463                for node_id in newly_lost {
3464                    if !removed.contains(&node_id) {
3465                        self.notify(PeatEvent::PeerLost { node_id });
3466                    }
3467                }
3468            }
3469        }
3470
3471        // Check if sync is needed
3472        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3473        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3474        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3475            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3476
3477            // Build document for each connected peer
3478            let doc = self.document_sync.build_document();
3479            let encrypted = self.encrypt_document(&doc);
3480            let mut results = Vec::new();
3481            for peer in self.get_connected_peers() {
3482                results.push((peer.node_id, encrypted.clone()));
3483            }
3484            return results;
3485        }
3486
3487        Vec::new()
3488    }
3489
3490    // ==================== State Queries ====================
3491
3492    /// Get all known peers
3493    pub fn get_peers(&self) -> Vec<PeatPeer> {
3494        self.peer_manager.get_peers()
3495    }
3496
3497    /// Get connected peers only
3498    pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3499        self.peer_manager.get_connected_peers()
3500    }
3501
3502    /// Get a specific peer by NodeId
3503    pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3504        self.peer_manager.get_peer(node_id)
3505    }
3506
3507    /// Get peer count
3508    pub fn peer_count(&self) -> usize {
3509        self.peer_manager.peer_count()
3510    }
3511
3512    /// Get connected peer count
3513    pub fn connected_count(&self) -> usize {
3514        self.peer_manager.connected_count()
3515    }
3516
3517    /// Check if a device mesh ID matches our mesh
3518    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3519        self.peer_manager.matches_mesh(device_mesh_id)
3520    }
3521
3522    // ==================== Connection State Graph ====================
3523
3524    /// Get the connection state graph with all peer states
3525    ///
3526    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3527    /// Apps can use this to display appropriate UI indicators:
3528    /// - Green for Connected peers
3529    /// - Yellow for Degraded or RecentlyDisconnected peers
3530    /// - Gray for Lost peers
3531    ///
3532    /// # Example
3533    /// ```ignore
3534    /// let states = mesh.get_connection_graph();
3535    /// for peer in states {
3536    ///     match peer.state {
3537    ///         ConnectionState::Connected => show_green_indicator(&peer),
3538    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3539    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3540    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3541    ///         _ => {}
3542    ///     }
3543    /// }
3544    /// ```
3545    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3546        self.connection_graph.lock().unwrap().get_all_owned()
3547    }
3548
3549    /// Get a specific peer's connection state
3550    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3551        self.connection_graph
3552            .lock()
3553            .unwrap()
3554            .get_peer(node_id)
3555            .cloned()
3556    }
3557
3558    /// Get all currently connected peers from the connection graph
3559    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3560        self.connection_graph
3561            .lock()
3562            .unwrap()
3563            .get_connected()
3564            .into_iter()
3565            .cloned()
3566            .collect()
3567    }
3568
3569    /// Get peers in degraded state (connected but poor signal quality)
3570    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3571        self.connection_graph
3572            .lock()
3573            .unwrap()
3574            .get_degraded()
3575            .into_iter()
3576            .cloned()
3577            .collect()
3578    }
3579
3580    /// Get peers that disconnected within the specified time window
3581    ///
3582    /// Useful for showing "stale" peers that were recently connected.
3583    pub fn get_recently_disconnected(
3584        &self,
3585        within_ms: u64,
3586        now_ms: u64,
3587    ) -> Vec<PeerConnectionState> {
3588        self.connection_graph
3589            .lock()
3590            .unwrap()
3591            .get_recently_disconnected(within_ms, now_ms)
3592            .into_iter()
3593            .cloned()
3594            .collect()
3595    }
3596
3597    /// Get peers in Lost state (disconnected and no longer advertising)
3598    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3599        self.connection_graph
3600            .lock()
3601            .unwrap()
3602            .get_lost()
3603            .into_iter()
3604            .cloned()
3605            .collect()
3606    }
3607
3608    /// Get summary counts of peers in each connection state
3609    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3610        self.connection_graph.lock().unwrap().state_counts()
3611    }
3612
3613    // ==================== Indirect Peer Methods ====================
3614
3615    /// Get all indirect (multi-hop) peers
3616    ///
3617    /// Returns peers discovered via relay messages that are not directly
3618    /// connected via BLE. Each indirect peer includes the minimum hop count
3619    /// and the direct peers through which they can be reached.
3620    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3621        self.connection_graph
3622            .lock()
3623            .unwrap()
3624            .get_indirect_peers_owned()
3625    }
3626
3627    /// Get the degree (hop count) for a specific peer
3628    ///
3629    /// Returns:
3630    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3631    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3632    /// - `None` if peer is not known
3633    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3634        self.connection_graph.lock().unwrap().peer_degree(node_id)
3635    }
3636
3637    /// Get full state counts including indirect peers
3638    ///
3639    /// Returns counts of direct peers by connection state plus counts
3640    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3641    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3642        self.connection_graph.lock().unwrap().full_state_counts()
3643    }
3644
3645    /// Get all paths to reach an indirect peer
3646    ///
3647    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3648    /// known routes to the specified peer.
3649    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3650        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3651    }
3652
3653    /// Check if a node is known (either direct or indirect)
3654    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3655        self.connection_graph.lock().unwrap().is_known(node_id)
3656    }
3657
3658    /// Get number of indirect peers
3659    pub fn indirect_peer_count(&self) -> usize {
3660        self.connection_graph.lock().unwrap().indirect_peer_count()
3661    }
3662
3663    /// Cleanup stale indirect peers
3664    ///
3665    /// Removes indirect peers that haven't been seen within the timeout.
3666    /// Returns the list of removed peer IDs.
3667    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3668        self.connection_graph
3669            .lock()
3670            .unwrap()
3671            .cleanup_indirect(now_ms)
3672    }
3673
3674    /// Get total counter value
3675    pub fn total_count(&self) -> u64 {
3676        self.document_sync.total_count()
3677    }
3678
3679    /// Get document version
3680    pub fn document_version(&self) -> u32 {
3681        self.document_sync.version()
3682    }
3683
3684    /// Get document version (alias)
3685    pub fn version(&self) -> u32 {
3686        self.document_sync.version()
3687    }
3688
3689    /// Update health status (battery percentage)
3690    pub fn update_health(&self, battery_percent: u8) {
3691        self.document_sync.update_health(battery_percent);
3692    }
3693
3694    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
3695    pub fn update_activity(&self, activity: u8) {
3696        self.document_sync.update_activity(activity);
3697    }
3698
3699    /// Update full health status (battery and activity)
3700    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3701        self.document_sync
3702            .update_health_full(battery_percent, activity);
3703    }
3704
3705    /// Update heart rate
3706    pub fn update_heart_rate(&self, heart_rate: u8) {
3707        self.document_sync.update_heart_rate(heart_rate);
3708    }
3709
3710    /// Update location
3711    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3712        self.document_sync
3713            .update_location(latitude, longitude, altitude);
3714    }
3715
3716    /// Clear location
3717    pub fn clear_location(&self) {
3718        self.document_sync.clear_location();
3719    }
3720
3721    /// Update callsign
3722    pub fn update_callsign(&self, callsign: &str) {
3723        self.document_sync.update_callsign(callsign);
3724    }
3725
3726    /// Set peripheral event type
3727    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3728        self.document_sync
3729            .set_peripheral_event(event_type, timestamp);
3730    }
3731
3732    /// Clear peripheral event
3733    pub fn clear_peripheral_event(&self) {
3734        self.document_sync.clear_peripheral_event();
3735    }
3736
3737    /// Update full peripheral state in one call
3738    ///
3739    /// This is the most efficient way to update all peripheral data before
3740    /// calling `build_document()` for encrypted transmission.
3741    #[allow(clippy::too_many_arguments)]
3742    pub fn update_peripheral_state(
3743        &self,
3744        callsign: &str,
3745        battery_percent: u8,
3746        heart_rate: Option<u8>,
3747        latitude: Option<f32>,
3748        longitude: Option<f32>,
3749        altitude: Option<f32>,
3750        event_type: Option<EventType>,
3751        timestamp: u64,
3752    ) {
3753        self.document_sync.update_peripheral_state(
3754            callsign,
3755            battery_percent,
3756            heart_rate,
3757            latitude,
3758            longitude,
3759            altitude,
3760            event_type,
3761            timestamp,
3762        );
3763    }
3764
3765    /// Build current document for transmission
3766    ///
3767    /// If encryption is enabled, the document is encrypted.
3768    pub fn build_document(&self) -> Vec<u8> {
3769        let doc = self.document_sync.build_document();
3770        self.encrypt_document(&doc)
3771    }
3772
3773    /// Get peers that should be synced with
3774    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3775        self.peer_manager.peers_needing_sync(now_ms)
3776    }
3777
3778    // ==================== Internal Helpers ====================
3779
3780    fn notify(&self, event: PeatEvent) {
3781        self.observers.notify(event);
3782    }
3783
3784    fn notify_mesh_state_changed(&self) {
3785        self.notify(PeatEvent::MeshStateChanged {
3786            peer_count: self.peer_manager.peer_count(),
3787            connected_count: self.peer_manager.connected_count(),
3788        });
3789    }
3790
3791    // ==================== CannedMessage Integration ====================
3792    //
3793    // These methods provide deduplication support for peat-lite CannedMessages.
3794    // They use document identity (source_node + timestamp) instead of content hash,
3795    // because CRDT merge can change byte ordering.
3796
3797    /// Check if a CannedMessage should be processed.
3798    ///
3799    /// Uses document identity (source_node + timestamp) for deduplication.
3800    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
3801    ///
3802    /// # Arguments
3803    /// * `source_node` - The source node ID from the CannedMessage
3804    /// * `timestamp` - The timestamp from the CannedMessage
3805    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
3806    ///
3807    /// # Returns
3808    /// `true` if this message is new and should be processed,
3809    /// `false` if it was seen recently and should be skipped.
3810    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3811        // Create a unique key from source_node and timestamp
3812        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3813        let mut id_bytes = [0u8; 16];
3814        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3815        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3816        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3817
3818        // Check the seen cache
3819        let seen = self.seen_cache.lock().unwrap();
3820        !seen.has_seen(&message_id)
3821    }
3822
3823    /// Mark a CannedMessage as seen (for deduplication).
3824    ///
3825    /// Call this after processing a CannedMessage to prevent reprocessing
3826    /// the same message from other relay paths.
3827    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3828        let now = std::time::SystemTime::now()
3829            .duration_since(std::time::UNIX_EPOCH)
3830            .map(|d| d.as_millis() as u64)
3831            .unwrap_or(0);
3832
3833        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3834        let mut id_bytes = [0u8; 16];
3835        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3836        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3837        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3838        let origin = NodeId::new(source_node);
3839
3840        let mut seen = self.seen_cache.lock().unwrap();
3841        seen.mark_seen(message_id, origin, now);
3842    }
3843
3844    /// Get list of connected peer identifiers for relay.
3845    ///
3846    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
3847    /// to other peers after deduplication check.
3848    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3849        self.peer_manager.get_connected_identifiers()
3850    }
3851}
3852
3853/// Result from receiving BLE data
3854#[derive(Debug, Clone, Default)]
3855pub struct DataReceivedResult {
3856    /// Node that sent this data
3857    pub source_node: NodeId,
3858
3859    /// Whether this contained an emergency event
3860    pub is_emergency: bool,
3861
3862    /// Whether this contained an ACK event
3863    pub is_ack: bool,
3864
3865    /// Whether the counter changed (new data)
3866    pub counter_changed: bool,
3867
3868    /// Whether emergency state changed (new emergency or ACK updates)
3869    pub emergency_changed: bool,
3870
3871    /// Updated total count
3872    pub total_count: u64,
3873
3874    /// Event timestamp (if event present) - use to detect duplicate events
3875    pub event_timestamp: u64,
3876
3877    /// Data to relay to other peers (if multi-hop relay is enabled)
3878    ///
3879    /// When present, the platform adapter should send this data to peers
3880    /// returned by `get_relay_targets(Some(source_node))`.
3881    pub relay_data: Option<Vec<u8>>,
3882
3883    /// Origin node for relay (may differ from source_node for relayed messages)
3884    pub origin_node: Option<NodeId>,
3885
3886    /// Current hop count (for relayed messages)
3887    pub hop_count: u8,
3888
3889    // ========== Peripheral data from sender ==========
3890    /// Sender's callsign (up to 12 chars)
3891    pub callsign: Option<String>,
3892
3893    /// Sender's battery percentage (0-100)
3894    pub battery_percent: Option<u8>,
3895
3896    /// Sender's heart rate (BPM)
3897    pub heart_rate: Option<u8>,
3898
3899    /// Sender's event type (from PeripheralEvent)
3900    pub event_type: Option<u8>,
3901
3902    /// Sender's latitude
3903    pub latitude: Option<f32>,
3904
3905    /// Sender's longitude
3906    pub longitude: Option<f32>,
3907
3908    /// Sender's altitude (meters)
3909    pub altitude: Option<f32>,
3910
3911    /// ADR-059 Amendment 3 — set when the receive dispatch decoded a
3912    /// 0xB6 translator frame on this call. `None` for legacy/delta/
3913    /// reserved-marker paths, for 0xB6 frames that declined or errored,
3914    /// and for builds compiled without the `mesh-translator` feature
3915    /// (the field stays in the binding shape unconditionally so hosts
3916    /// don't see binding drift across feature combos; only the
3917    /// population logic is feature-gated).
3918    pub decoded_translator_frame: Option<DecodedTranslatorFrame>,
3919}
3920
3921/// ADR-059 Amendment 3 — payload returned in
3922/// [`DataReceivedResult::decoded_translator_frame`] when a 0xB6
3923/// translator frame decodes successfully on the receive dispatch.
3924/// Hosts (peat-atak-plugin and equivalents) forward populated entries
3925/// through their existing publish-with-origin FFI surface (e.g.
3926/// peat-ffi's `publishDocumentWithOriginJni(collection, doc_json,
3927/// "ble")`). Defined unconditionally — no `#[cfg(feature = "mesh-translator")]`
3928/// — so the UniFFI binding shape is stable across feature combos.
3929/// Population only happens when `mesh-translator` is on.
3930#[derive(Debug, Clone)]
3931pub struct DecodedTranslatorFrame {
3932    /// `BleTranslator` collection name, e.g. `"tracks"` / `"platforms"`.
3933    pub collection: String,
3934    /// serde-JSON serialization of the decoded `Document` — same shape
3935    /// as the JSON callback variant from 0.3.1, just delivered via
3936    /// struct return rather than callback invocation.
3937    pub doc_json: String,
3938    /// BLE peer identifier from the receive context (when known).
3939    pub peer: Option<String>,
3940}
3941
3942/// ADR-059 Amendment 3 — per-call outcome of the
3943/// `try_handle_translator_marker` dispatcher. Per-call return-threaded;
3944/// no shared mutable state, so concurrent receives on different
3945/// threads can't race on a slot.
3946#[cfg(feature = "mesh-translator")]
3947#[derive(Debug, Clone)]
3948enum TranslatorMarkerOutcome {
3949    /// First byte didn't match the translator-frame range; caller
3950    /// continues the legacy / delta-document dispatch.
3951    NotTranslatorMarker,
3952    /// 0xB6 frame decoded successfully. Caller hoists the payload into
3953    /// `DataReceivedResult.decoded_translator_frame` before returning.
3954    Decoded(DecodedTranslatorFrame),
3955    /// 0xB6 reserved-range / decode-error / codec-decline / unknown-code
3956    /// — caller stops processing but no frame to surface.
3957    Handled,
3958}
3959
3960impl DataReceivedResult {
3961    /// ADR-059 Amendment 3 — construct a `DataReceivedResult` whose
3962    /// only meaningful payload is a successfully-decoded 0xB6 translator
3963    /// frame. Other fields default (no event, no counter change,
3964    /// no peripheral data); the host reads `decoded_translator_frame`
3965    /// and forwards through its publish-with-origin FFI surface.
3966    /// Used by the receive entry points when
3967    /// `try_handle_translator_marker` returns `Decoded(frame)`.
3968    #[cfg(feature = "mesh-translator")]
3969    pub(crate) fn translator_frame(source_node: NodeId, frame: DecodedTranslatorFrame) -> Self {
3970        Self {
3971            source_node,
3972            decoded_translator_frame: Some(frame),
3973            ..Default::default()
3974        }
3975    }
3976
3977    /// Extract peripheral fields from an Option<Peripheral>
3978    #[allow(clippy::type_complexity)]
3979    fn peripheral_fields(
3980        peripheral: &Option<crate::sync::crdt::Peripheral>,
3981    ) -> (
3982        Option<String>,
3983        Option<u8>,
3984        Option<u8>,
3985        Option<u8>,
3986        Option<f32>,
3987        Option<f32>,
3988        Option<f32>,
3989    ) {
3990        match peripheral {
3991            Some(p) => {
3992                let callsign = {
3993                    let s = p.callsign_str();
3994                    if s.is_empty() {
3995                        None
3996                    } else {
3997                        Some(s.to_string())
3998                    }
3999                };
4000                let battery = if p.health.battery_percent > 0 {
4001                    Some(p.health.battery_percent)
4002                } else {
4003                    None
4004                };
4005                let heart_rate = p.health.heart_rate;
4006                let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
4007                let (lat, lon, alt) = match &p.location {
4008                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
4009                    None => (None, None, None),
4010                };
4011                (callsign, battery, heart_rate, event_type, lat, lon, alt)
4012            }
4013            None => (None, None, None, None, None, None, None),
4014        }
4015    }
4016}
4017
4018/// Decision from processing a relay envelope
4019#[derive(Debug, Clone)]
4020pub struct RelayDecision {
4021    /// The payload (document) to process locally
4022    pub payload: Vec<u8>,
4023
4024    /// Original sender of the message
4025    pub origin_node: NodeId,
4026
4027    /// Current hop count
4028    pub hop_count: u8,
4029
4030    /// Whether this message should be relayed to other peers
4031    pub should_relay: bool,
4032
4033    /// The relay envelope to forward (with incremented hop count)
4034    ///
4035    /// Only present if `should_relay` is true and TTL not expired.
4036    pub relay_envelope: Option<RelayEnvelope>,
4037}
4038
4039impl RelayDecision {
4040    /// Get the relay data to send to peers
4041    ///
4042    /// Returns None if relay is not needed.
4043    pub fn relay_data(&self) -> Option<Vec<u8>> {
4044        self.relay_envelope.as_ref().map(|e| e.encode())
4045    }
4046}
4047
4048#[cfg(all(test, feature = "std"))]
4049mod tests {
4050    use super::*;
4051    use crate::observer::CollectingObserver;
4052
4053    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
4054    const TEST_TIMESTAMP: u64 = 1705276800000;
4055
4056    fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4057        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
4058        PeatMesh::new(config)
4059    }
4060
4061    #[test]
4062    fn test_mesh_creation() {
4063        let mesh = create_mesh(0x12345678, "ALPHA-1");
4064
4065        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
4066        assert_eq!(mesh.callsign(), "ALPHA-1");
4067        assert_eq!(mesh.mesh_id(), "TEST");
4068        assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
4069    }
4070
4071    #[test]
4072    fn test_peer_discovery() {
4073        let mesh = create_mesh(0x11111111, "ALPHA-1");
4074        let observer = Arc::new(CollectingObserver::new());
4075        mesh.add_observer(observer.clone());
4076
4077        // Discover a peer
4078        let peer = mesh.on_ble_discovered(
4079            "device-uuid",
4080            Some("PEAT_TEST-22222222"),
4081            -65,
4082            Some("TEST"),
4083            1000,
4084        );
4085
4086        assert!(peer.is_some());
4087        let peer = peer.unwrap();
4088        assert_eq!(peer.node_id.as_u32(), 0x22222222);
4089
4090        // Check events were generated
4091        let events = observer.events();
4092        assert!(events
4093            .iter()
4094            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4095        assert!(events
4096            .iter()
4097            .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
4098    }
4099
4100    #[test]
4101    fn test_connection_lifecycle() {
4102        let mesh = create_mesh(0x11111111, "ALPHA-1");
4103        let observer = Arc::new(CollectingObserver::new());
4104        mesh.add_observer(observer.clone());
4105
4106        // Discover and connect
4107        mesh.on_ble_discovered(
4108            "device-uuid",
4109            Some("PEAT_TEST-22222222"),
4110            -65,
4111            Some("TEST"),
4112            1000,
4113        );
4114
4115        let node_id = mesh.on_ble_connected("device-uuid", 2000);
4116        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4117        assert_eq!(mesh.connected_count(), 1);
4118
4119        // Disconnect
4120        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
4121        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4122        assert_eq!(mesh.connected_count(), 0);
4123
4124        // Check events
4125        let events = observer.events();
4126        assert!(events
4127            .iter()
4128            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4129        assert!(events
4130            .iter()
4131            .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
4132    }
4133
4134    #[test]
4135    fn test_emergency_flow() {
4136        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4137        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4138
4139        let observer2 = Arc::new(CollectingObserver::new());
4140        mesh2.add_observer(observer2.clone());
4141
4142        // mesh1 sends emergency
4143        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4144        assert!(mesh1.is_emergency_active());
4145
4146        // mesh2 receives it
4147        let result =
4148            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4149
4150        assert!(result.is_some());
4151        let result = result.unwrap();
4152        assert!(result.is_emergency);
4153        assert_eq!(result.source_node.as_u32(), 0x11111111);
4154
4155        // Check events on mesh2
4156        let events = observer2.events();
4157        assert!(events
4158            .iter()
4159            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4160    }
4161
4162    #[test]
4163    fn test_ack_flow() {
4164        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4165        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4166
4167        let observer2 = Arc::new(CollectingObserver::new());
4168        mesh2.add_observer(observer2.clone());
4169
4170        // mesh1 sends ACK
4171        let doc = mesh1.send_ack(TEST_TIMESTAMP);
4172        assert!(mesh1.is_ack_active());
4173
4174        // mesh2 receives it
4175        let result =
4176            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4177
4178        assert!(result.is_some());
4179        let result = result.unwrap();
4180        assert!(result.is_ack);
4181
4182        // Check events on mesh2
4183        let events = observer2.events();
4184        assert!(events
4185            .iter()
4186            .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4187    }
4188
4189    #[test]
4190    fn test_tick_cleanup() {
4191        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4192            .with_peer_timeout(10_000);
4193        let mesh = PeatMesh::new(config);
4194
4195        let observer = Arc::new(CollectingObserver::new());
4196        mesh.add_observer(observer.clone());
4197
4198        // Discover a peer
4199        mesh.on_ble_discovered(
4200            "device-uuid",
4201            Some("PEAT_TEST-22222222"),
4202            -65,
4203            Some("TEST"),
4204            1000,
4205        );
4206        assert_eq!(mesh.peer_count(), 1);
4207
4208        // Tick at t=5000 - not stale yet
4209        mesh.tick(5000);
4210        assert_eq!(mesh.peer_count(), 1);
4211
4212        // Tick at t=20000 - peer is stale (10s timeout exceeded)
4213        mesh.tick(20000);
4214        assert_eq!(mesh.peer_count(), 0);
4215
4216        // Check PeerLost event
4217        let events = observer.events();
4218        assert!(events
4219            .iter()
4220            .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4221    }
4222
4223    #[test]
4224    fn test_tick_sync_broadcast() {
4225        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4226            .with_sync_interval(5000);
4227        let mesh = PeatMesh::new(config);
4228
4229        // Discover and connect a peer first
4230        mesh.on_ble_discovered(
4231            "device-uuid",
4232            Some("PEAT_TEST-22222222"),
4233            -65,
4234            Some("TEST"),
4235            1000,
4236        );
4237        mesh.on_ble_connected("device-uuid", 1000);
4238
4239        // First tick at t=0 sets last_sync
4240        let _result = mesh.tick(0);
4241        // May or may not broadcast depending on initial state
4242
4243        // Tick before interval - no broadcast
4244        let result = mesh.tick(3000);
4245        assert!(result.is_none());
4246
4247        // After interval - should broadcast
4248        let result = mesh.tick(6000);
4249        assert!(result.is_some());
4250
4251        // Immediate second tick - no broadcast (interval not elapsed)
4252        let result = mesh.tick(6100);
4253        assert!(result.is_none());
4254
4255        // After another interval - should broadcast again
4256        let result = mesh.tick(12000);
4257        assert!(result.is_some());
4258    }
4259
4260    #[test]
4261    fn test_incoming_connection() {
4262        let mesh = create_mesh(0x11111111, "ALPHA-1");
4263        let observer = Arc::new(CollectingObserver::new());
4264        mesh.add_observer(observer.clone());
4265
4266        // Incoming connection from unknown peer
4267        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4268
4269        assert!(is_new);
4270        assert_eq!(mesh.peer_count(), 1);
4271        assert_eq!(mesh.connected_count(), 1);
4272
4273        // Check events
4274        let events = observer.events();
4275        assert!(events
4276            .iter()
4277            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4278        assert!(events
4279            .iter()
4280            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4281    }
4282
4283    #[test]
4284    fn test_mesh_filtering() {
4285        let mesh = create_mesh(0x11111111, "ALPHA-1");
4286
4287        // Wrong mesh - ignored
4288        let peer = mesh.on_ble_discovered(
4289            "device-uuid-1",
4290            Some("PEAT_OTHER-22222222"),
4291            -65,
4292            Some("OTHER"),
4293            1000,
4294        );
4295        assert!(peer.is_none());
4296        assert_eq!(mesh.peer_count(), 0);
4297
4298        // Correct mesh - accepted
4299        let peer = mesh.on_ble_discovered(
4300            "device-uuid-2",
4301            Some("PEAT_TEST-33333333"),
4302            -65,
4303            Some("TEST"),
4304            1000,
4305        );
4306        assert!(peer.is_some());
4307        assert_eq!(mesh.peer_count(), 1);
4308    }
4309
4310    // ==================== Encryption Tests ====================
4311
4312    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4313        let config =
4314            PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4315        PeatMesh::new(config)
4316    }
4317
4318    #[test]
4319    fn test_encryption_enabled() {
4320        let secret = [0x42u8; 32];
4321        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4322
4323        assert!(mesh.is_encryption_enabled());
4324    }
4325
4326    #[test]
4327    fn test_encryption_disabled_by_default() {
4328        let mesh = create_mesh(0x11111111, "ALPHA-1");
4329
4330        assert!(!mesh.is_encryption_enabled());
4331    }
4332
4333    #[test]
4334    fn test_encrypted_document_exchange() {
4335        let secret = [0x42u8; 32];
4336        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4337        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4338
4339        // mesh1 sends document
4340        let doc = mesh1.build_document();
4341
4342        // Document should be encrypted (starts with ENCRYPTED_MARKER)
4343        assert!(doc.len() >= 2);
4344        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4345
4346        // mesh2 receives and decrypts
4347        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4348
4349        assert!(result.is_some());
4350        let result = result.unwrap();
4351        assert_eq!(result.source_node.as_u32(), 0x11111111);
4352    }
4353
4354    #[test]
4355    fn test_encrypted_emergency_exchange() {
4356        let secret = [0x42u8; 32];
4357        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4358        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4359
4360        let observer = Arc::new(CollectingObserver::new());
4361        mesh2.add_observer(observer.clone());
4362
4363        // mesh1 sends emergency
4364        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4365
4366        // mesh2 receives and decrypts
4367        let result =
4368            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4369
4370        assert!(result.is_some());
4371        let result = result.unwrap();
4372        assert!(result.is_emergency);
4373
4374        // Check EmergencyReceived event was fired
4375        let events = observer.events();
4376        assert!(events
4377            .iter()
4378            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4379    }
4380
4381    #[test]
4382    fn test_wrong_key_fails_decrypt() {
4383        let secret1 = [0x42u8; 32];
4384        let secret2 = [0x43u8; 32]; // Different key
4385        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4386        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4387
4388        // mesh1 sends document
4389        let doc = mesh1.build_document();
4390
4391        // mesh2 cannot decrypt (wrong key)
4392        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4393
4394        assert!(result.is_none());
4395    }
4396
4397    #[test]
4398    fn test_unencrypted_mesh_can_read_unencrypted() {
4399        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4400        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4401
4402        // mesh1 sends document (unencrypted)
4403        let doc = mesh1.build_document();
4404
4405        // mesh2 receives (also unencrypted)
4406        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4407
4408        assert!(result.is_some());
4409    }
4410
4411    #[test]
4412    fn test_encrypted_mesh_can_receive_unencrypted() {
4413        // Backward compatibility: encrypted mesh can receive unencrypted docs
4414        let secret = [0x42u8; 32];
4415        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
4416        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
4417
4418        // mesh1 sends unencrypted document
4419        let doc = mesh1.build_document();
4420
4421        // mesh2 can receive unencrypted (backward compat)
4422        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4423
4424        assert!(result.is_some());
4425    }
4426
4427    #[test]
4428    fn test_unencrypted_mesh_cannot_receive_encrypted() {
4429        let secret = [0x42u8; 32];
4430        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
4431        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
4432
4433        // mesh1 sends encrypted document
4434        let doc = mesh1.build_document();
4435
4436        // mesh2 cannot decrypt (no key)
4437        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4438
4439        assert!(result.is_none());
4440    }
4441
4442    #[test]
4443    fn test_enable_disable_encryption() {
4444        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4445
4446        assert!(!mesh.is_encryption_enabled());
4447
4448        // Enable encryption
4449        let secret = [0x42u8; 32];
4450        mesh.enable_encryption(&secret);
4451        assert!(mesh.is_encryption_enabled());
4452
4453        // Build document should now be encrypted
4454        let doc = mesh.build_document();
4455        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4456
4457        // Disable encryption
4458        mesh.disable_encryption();
4459        assert!(!mesh.is_encryption_enabled());
4460
4461        // Build document should now be unencrypted
4462        let doc = mesh.build_document();
4463        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4464    }
4465
4466    #[test]
4467    fn test_encryption_overhead() {
4468        let secret = [0x42u8; 32];
4469        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4470        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4471
4472        let doc_encrypted = mesh_encrypted.build_document();
4473        let doc_unencrypted = mesh_unencrypted.build_document();
4474
4475        // Encrypted doc should be larger by:
4476        // - 2 bytes marker header (0xAE + reserved)
4477        // - 12 bytes nonce
4478        // - 16 bytes auth tag
4479        // Total: 30 bytes overhead
4480        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4481        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4482    }
4483
4484    // ==================== Per-Peer E2EE Tests ====================
4485
4486    #[test]
4487    fn test_peer_e2ee_enable_disable() {
4488        let mesh = create_mesh(0x11111111, "ALPHA-1");
4489
4490        assert!(!mesh.is_peer_e2ee_enabled());
4491        assert!(mesh.peer_e2ee_public_key().is_none());
4492
4493        mesh.enable_peer_e2ee();
4494        assert!(mesh.is_peer_e2ee_enabled());
4495        assert!(mesh.peer_e2ee_public_key().is_some());
4496
4497        mesh.disable_peer_e2ee();
4498        assert!(!mesh.is_peer_e2ee_enabled());
4499    }
4500
4501    #[test]
4502    fn test_peer_e2ee_initiate_session() {
4503        let mesh = create_mesh(0x11111111, "ALPHA-1");
4504        mesh.enable_peer_e2ee();
4505
4506        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4507        assert!(key_exchange.is_some());
4508
4509        let key_exchange = key_exchange.unwrap();
4510        // Should start with KEY_EXCHANGE_MARKER
4511        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4512
4513        // Should have a pending session
4514        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4515        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4516    }
4517
4518    #[test]
4519    fn test_peer_e2ee_full_handshake() {
4520        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4521        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4522
4523        mesh1.enable_peer_e2ee();
4524        mesh2.enable_peer_e2ee();
4525
4526        let observer1 = Arc::new(CollectingObserver::new());
4527        let observer2 = Arc::new(CollectingObserver::new());
4528        mesh1.add_observer(observer1.clone());
4529        mesh2.add_observer(observer2.clone());
4530
4531        // mesh1 initiates to mesh2
4532        let key_exchange1 = mesh1
4533            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4534            .unwrap();
4535
4536        // mesh2 receives and responds
4537        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4538        assert!(response.is_some());
4539
4540        // Check mesh2 has established session
4541        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4542
4543        // mesh1 receives mesh2's response
4544        let key_exchange2 = response.unwrap();
4545        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4546
4547        // Check mesh1 has established session
4548        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4549
4550        // Both should have E2EE established events
4551        let events1 = observer1.events();
4552        assert!(events1
4553            .iter()
4554            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4555
4556        let events2 = observer2.events();
4557        assert!(events2
4558            .iter()
4559            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4560    }
4561
4562    #[test]
4563    fn test_peer_e2ee_encrypt_decrypt() {
4564        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4565        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4566
4567        mesh1.enable_peer_e2ee();
4568        mesh2.enable_peer_e2ee();
4569
4570        // Establish session via key exchange
4571        let key_exchange1 = mesh1
4572            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4573            .unwrap();
4574        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4575        mesh1.handle_key_exchange(&key_exchange2, 1000);
4576
4577        // mesh1 sends encrypted message to mesh2
4578        let plaintext = b"Secret message from mesh1";
4579        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4580        assert!(encrypted.is_some());
4581
4582        let encrypted = encrypted.unwrap();
4583        // Should start with PEER_E2EE_MARKER
4584        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4585
4586        // mesh2 receives and decrypts
4587        let observer2 = Arc::new(CollectingObserver::new());
4588        mesh2.add_observer(observer2.clone());
4589
4590        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4591        assert!(decrypted.is_some());
4592        assert_eq!(decrypted.unwrap(), plaintext);
4593
4594        // Should have received message event
4595        let events = observer2.events();
4596        assert!(events.iter().any(|e| matches!(
4597            e,
4598            PeatEvent::PeerE2eeMessageReceived { from_node, data }
4599            if from_node.as_u32() == 0x11111111 && data == plaintext
4600        )));
4601    }
4602
4603    #[test]
4604    fn test_peer_e2ee_bidirectional() {
4605        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4606        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4607
4608        mesh1.enable_peer_e2ee();
4609        mesh2.enable_peer_e2ee();
4610
4611        // Establish session
4612        let key_exchange1 = mesh1
4613            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4614            .unwrap();
4615        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4616        mesh1.handle_key_exchange(&key_exchange2, 1000);
4617
4618        // mesh1 -> mesh2
4619        let msg1 = mesh1
4620            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4621            .unwrap();
4622        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4623        assert_eq!(dec1, b"Hello from mesh1");
4624
4625        // mesh2 -> mesh1
4626        let msg2 = mesh2
4627            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4628            .unwrap();
4629        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4630        assert_eq!(dec2, b"Hello from mesh2");
4631    }
4632
4633    #[test]
4634    fn test_peer_e2ee_close_session() {
4635        let mesh = create_mesh(0x11111111, "ALPHA-1");
4636        mesh.enable_peer_e2ee();
4637
4638        let observer = Arc::new(CollectingObserver::new());
4639        mesh.add_observer(observer.clone());
4640
4641        // Initiate a session
4642        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4643        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4644
4645        // Close session
4646        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4647
4648        // Check close event
4649        let events = observer.events();
4650        assert!(events
4651            .iter()
4652            .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4653    }
4654
4655    #[test]
4656    fn test_peer_e2ee_without_enabling() {
4657        let mesh = create_mesh(0x11111111, "ALPHA-1");
4658
4659        // E2EE not enabled - should return None
4660        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4661        assert!(result.is_none());
4662
4663        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4664        assert!(result.is_none());
4665
4666        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4667    }
4668
4669    #[test]
4670    fn test_peer_e2ee_overhead() {
4671        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4672        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4673
4674        mesh1.enable_peer_e2ee();
4675        mesh2.enable_peer_e2ee();
4676
4677        // Establish session
4678        let key_exchange1 = mesh1
4679            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4680            .unwrap();
4681        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4682        mesh1.handle_key_exchange(&key_exchange2, 1000);
4683
4684        // Encrypt a message
4685        let plaintext = b"Test message";
4686        let encrypted = mesh1
4687            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4688            .unwrap();
4689
4690        // Overhead should be:
4691        // - 2 bytes marker header
4692        // - 4 bytes recipient node ID
4693        // - 4 bytes sender node ID
4694        // - 8 bytes counter
4695        // - 12 bytes nonce
4696        // - 16 bytes auth tag
4697        // Total: 46 bytes overhead
4698        let overhead = encrypted.len() - plaintext.len();
4699        assert_eq!(overhead, 46);
4700    }
4701
4702    // ==================== Strict Encryption Mode Tests ====================
4703
4704    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4705        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4706            .with_encryption(secret)
4707            .with_strict_encryption();
4708        PeatMesh::new(config)
4709    }
4710
4711    #[test]
4712    fn test_strict_encryption_enabled() {
4713        let secret = [0x42u8; 32];
4714        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4715
4716        assert!(mesh.is_encryption_enabled());
4717        assert!(mesh.is_strict_encryption_enabled());
4718    }
4719
4720    #[test]
4721    fn test_strict_encryption_disabled_by_default() {
4722        let secret = [0x42u8; 32];
4723        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4724
4725        assert!(mesh.is_encryption_enabled());
4726        assert!(!mesh.is_strict_encryption_enabled());
4727    }
4728
4729    #[test]
4730    fn test_strict_encryption_requires_encryption_enabled() {
4731        // strict_encryption without encryption should have no effect
4732        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4733            .with_strict_encryption(); // No encryption!
4734        let mesh = PeatMesh::new(config);
4735
4736        assert!(!mesh.is_encryption_enabled());
4737        assert!(!mesh.is_strict_encryption_enabled());
4738    }
4739
4740    #[test]
4741    fn test_strict_mode_accepts_encrypted_documents() {
4742        let secret = [0x42u8; 32];
4743        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4744        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4745
4746        // mesh1 sends encrypted document
4747        let doc = mesh1.build_document();
4748        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4749
4750        // mesh2 (strict mode) should accept encrypted documents
4751        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4752        assert!(result.is_some());
4753    }
4754
4755    #[test]
4756    fn test_strict_mode_rejects_unencrypted_documents() {
4757        let secret = [0x42u8; 32];
4758        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4759        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
4760
4761        let observer = Arc::new(CollectingObserver::new());
4762        mesh2.add_observer(observer.clone());
4763
4764        // mesh1 sends unencrypted document
4765        let doc = mesh1.build_document();
4766        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4767
4768        // mesh2 (strict mode) should reject unencrypted documents
4769        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4770        assert!(result.is_none());
4771
4772        // Should have SecurityViolation event
4773        let events = observer.events();
4774        assert!(events.iter().any(|e| matches!(
4775            e,
4776            PeatEvent::SecurityViolation {
4777                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4778                ..
4779            }
4780        )));
4781    }
4782
4783    #[test]
4784    fn test_non_strict_mode_accepts_unencrypted_documents() {
4785        let secret = [0x42u8; 32];
4786        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4787        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
4788
4789        // mesh1 sends unencrypted document
4790        let doc = mesh1.build_document();
4791
4792        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
4793        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4794        assert!(result.is_some());
4795    }
4796
4797    #[test]
4798    fn test_strict_mode_security_violation_event_includes_source() {
4799        let secret = [0x42u8; 32];
4800        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4801        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4802
4803        let observer = Arc::new(CollectingObserver::new());
4804        mesh2.add_observer(observer.clone());
4805
4806        let doc = mesh1.build_document();
4807
4808        // Use on_ble_data_received with identifier to test source is captured
4809        mesh2.on_ble_discovered(
4810            "test-device-uuid",
4811            Some("PEAT_TEST-11111111"),
4812            -65,
4813            Some("TEST"),
4814            500,
4815        );
4816        mesh2.on_ble_connected("test-device-uuid", 600);
4817
4818        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4819
4820        // Check SecurityViolation event has source
4821        let events = observer.events();
4822        let violation = events.iter().find(|e| {
4823            matches!(
4824                e,
4825                PeatEvent::SecurityViolation {
4826                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4827                    ..
4828                }
4829            )
4830        });
4831        assert!(violation.is_some());
4832
4833        if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4834            assert!(source.is_some());
4835            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4836        }
4837    }
4838
4839    #[test]
4840    fn test_decryption_failure_emits_security_violation() {
4841        let secret1 = [0x42u8; 32];
4842        let secret2 = [0x43u8; 32]; // Different key
4843        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4844        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4845
4846        let observer = Arc::new(CollectingObserver::new());
4847        mesh2.add_observer(observer.clone());
4848
4849        // mesh1 sends encrypted document
4850        let doc = mesh1.build_document();
4851
4852        // mesh2 cannot decrypt (wrong key)
4853        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4854        assert!(result.is_none());
4855
4856        // Should have SecurityViolation event for decryption failure
4857        let events = observer.events();
4858        assert!(events.iter().any(|e| matches!(
4859            e,
4860            PeatEvent::SecurityViolation {
4861                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4862                ..
4863            }
4864        )));
4865    }
4866
4867    #[test]
4868    fn test_strict_mode_builder_chain() {
4869        let secret = [0x42u8; 32];
4870        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4871            .with_encryption(secret)
4872            .with_strict_encryption()
4873            .with_sync_interval(10_000)
4874            .with_peer_timeout(60_000);
4875
4876        let mesh = PeatMesh::new(config);
4877
4878        assert!(mesh.is_encryption_enabled());
4879        assert!(mesh.is_strict_encryption_enabled());
4880    }
4881
4882    // ==================== Multi-Hop Relay Tests ====================
4883
4884    fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4885        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4886        PeatMesh::new(config)
4887    }
4888
4889    #[test]
4890    fn test_relay_disabled_by_default() {
4891        let mesh = create_mesh(0x11111111, "ALPHA-1");
4892        assert!(!mesh.is_relay_enabled());
4893    }
4894
4895    #[test]
4896    fn test_relay_enabled() {
4897        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4898        assert!(mesh.is_relay_enabled());
4899    }
4900
4901    #[test]
4902    fn test_relay_config_builder() {
4903        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4904            .with_relay()
4905            .with_max_relay_hops(5)
4906            .with_relay_fanout(3)
4907            .with_seen_cache_ttl(60_000);
4908
4909        assert!(config.enable_relay);
4910        assert_eq!(config.max_relay_hops, 5);
4911        assert_eq!(config.relay_fanout, 3);
4912        assert_eq!(config.seen_cache_ttl_ms, 60_000);
4913    }
4914
4915    #[test]
4916    fn test_seen_message_deduplication() {
4917        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4918        let origin = NodeId::new(0x22222222);
4919        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4920
4921        // First time - should be new
4922        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4923
4924        // Second time - should be duplicate
4925        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4926
4927        assert_eq!(mesh.seen_cache_size(), 1);
4928    }
4929
4930    #[test]
4931    fn test_wrap_for_relay() {
4932        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4933
4934        let payload = vec![1, 2, 3, 4, 5];
4935        let wrapped = mesh.wrap_for_relay(payload.clone());
4936
4937        // Should start with relay envelope marker
4938        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4939
4940        // Decode and verify
4941        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4942        assert_eq!(envelope.payload, payload);
4943        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4944        assert_eq!(envelope.hop_count, 0);
4945    }
4946
4947    #[test]
4948    fn test_process_relay_envelope_new_message() {
4949        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4950        let observer = Arc::new(CollectingObserver::new());
4951        mesh.add_observer(observer.clone());
4952
4953        // Create an envelope from another node
4954        let payload = vec![1, 2, 3, 4, 5];
4955        let envelope =
4956            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4957                .with_max_hops(7);
4958        let data = envelope.encode();
4959
4960        // Process it
4961        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4962
4963        assert!(decision.is_some());
4964        let decision = decision.unwrap();
4965        assert_eq!(decision.payload, payload);
4966        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4967        assert_eq!(decision.hop_count, 0);
4968        assert!(decision.should_relay);
4969        assert!(decision.relay_envelope.is_some());
4970
4971        // Relay envelope should have incremented hop count
4972        let relay_env = decision.relay_envelope.unwrap();
4973        assert_eq!(relay_env.hop_count, 1);
4974    }
4975
4976    #[test]
4977    fn test_process_relay_envelope_duplicate() {
4978        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4979        let observer = Arc::new(CollectingObserver::new());
4980        mesh.add_observer(observer.clone());
4981
4982        let payload = vec![1, 2, 3, 4, 5];
4983        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4984        let data = envelope.encode();
4985
4986        // First time - should succeed
4987        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4988        assert!(decision.is_some());
4989
4990        // Second time - should be duplicate
4991        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4992        assert!(decision.is_none());
4993
4994        // Should have DuplicateMessageDropped event
4995        let events = observer.events();
4996        assert!(events
4997            .iter()
4998            .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4999    }
5000
5001    #[test]
5002    fn test_process_relay_envelope_ttl_expired() {
5003        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5004        let observer = Arc::new(CollectingObserver::new());
5005        mesh.add_observer(observer.clone());
5006
5007        // Create envelope at max hops (TTL expired)
5008        let payload = vec![1, 2, 3, 4, 5];
5009        let mut envelope =
5010            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5011                .with_max_hops(3);
5012
5013        // Simulate having been relayed 3 times already
5014        envelope = envelope.relay().unwrap(); // hop 1
5015        envelope = envelope.relay().unwrap(); // hop 2
5016        envelope = envelope.relay().unwrap(); // hop 3 - at max now
5017
5018        let data = envelope.encode();
5019
5020        // Process - should still process locally but not relay further
5021        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5022
5023        assert!(decision.is_some());
5024        let decision = decision.unwrap();
5025        assert_eq!(decision.payload, payload);
5026        assert!(!decision.should_relay); // Cannot relay further
5027        assert!(decision.relay_envelope.is_none());
5028
5029        // Should have MessageTtlExpired event
5030        let events = observer.events();
5031        assert!(events
5032            .iter()
5033            .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
5034    }
5035
5036    #[test]
5037    fn test_build_relay_document() {
5038        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5039
5040        let relay_doc = mesh.build_relay_document();
5041
5042        // Should be a valid relay envelope
5043        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
5044
5045        // Decode and verify it contains a valid document
5046        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
5047        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
5048
5049        // The payload should be a valid PeatDocument
5050        let doc = crate::document::PeatDocument::decode(&envelope.payload);
5051        assert!(doc.is_some());
5052    }
5053
5054    #[test]
5055    fn test_relay_targets_excludes_source() {
5056        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5057
5058        // Add some peers
5059        mesh.on_ble_discovered(
5060            "peer-1",
5061            Some("PEAT_TEST-22222222"),
5062            -60,
5063            Some("TEST"),
5064            1000,
5065        );
5066        mesh.on_ble_connected("peer-1", 1000);
5067
5068        mesh.on_ble_discovered(
5069            "peer-2",
5070            Some("PEAT_TEST-33333333"),
5071            -65,
5072            Some("TEST"),
5073            1000,
5074        );
5075        mesh.on_ble_connected("peer-2", 1000);
5076
5077        mesh.on_ble_discovered(
5078            "peer-3",
5079            Some("PEAT_TEST-44444444"),
5080            -70,
5081            Some("TEST"),
5082            1000,
5083        );
5084        mesh.on_ble_connected("peer-3", 1000);
5085
5086        // Get relay targets excluding peer-2
5087        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
5088
5089        // Should not include peer-2 in targets
5090        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
5091    }
5092
5093    #[test]
5094    fn test_clear_seen_cache() {
5095        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5096        let origin = NodeId::new(0x22222222);
5097
5098        // Add some messages
5099        mesh.mark_message_seen(
5100            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
5101            origin,
5102            1000,
5103        );
5104        mesh.mark_message_seen(
5105            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
5106            origin,
5107            2000,
5108        );
5109
5110        assert_eq!(mesh.seen_cache_size(), 2);
5111
5112        // Clear
5113        mesh.clear_seen_cache();
5114        assert_eq!(mesh.seen_cache_size(), 0);
5115    }
5116}