Skip to main content

peat_btle/
peat_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PeatMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for Peat BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use peat_btle::peat_mesh::{PeatMesh, PeatMeshConfig};
26//! use peat_btle::observer::{PeatEvent, PeatObserver};
27//! use peat_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = PeatMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = PeatMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl PeatObserver for MyObserver {
39//!     fn on_event(&self, event: PeatEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("PEAT_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "translator-codec")]
65use crate::document::{
66    TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72    ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73    PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78    RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94/// Configuration for PeatMesh
95#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97    /// Our node ID
98    pub node_id: NodeId,
99
100    /// Our callsign (e.g., "ALPHA-1")
101    pub callsign: String,
102
103    /// Mesh ID to filter peers (e.g., "DEMO")
104    pub mesh_id: String,
105
106    /// Peripheral type for this device
107    pub peripheral_type: PeripheralType,
108
109    /// Peer management configuration
110    pub peer_config: PeerManagerConfig,
111
112    /// Sync interval in milliseconds (how often to broadcast state)
113    pub sync_interval_ms: u64,
114
115    /// Whether to auto-broadcast on emergency/ack
116    pub auto_broadcast_events: bool,
117
118    /// Optional shared secret for mesh-wide encryption (32 bytes)
119    ///
120    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
121    /// transmission and decrypted upon receipt. All nodes in the mesh must
122    /// share the same secret to communicate.
123    pub encryption_secret: Option<[u8; 32]>,
124
125    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
126    ///
127    /// When true and encryption is enabled, any unencrypted documents received
128    /// will be rejected and trigger a SecurityViolation event. This prevents
129    /// downgrade attacks where an adversary sends unencrypted malicious documents.
130    ///
131    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
132    pub strict_encryption: bool,
133
134    /// Enable multi-hop relay
135    ///
136    /// When enabled, received messages will be forwarded to other peers based
137    /// on the gossip strategy. Requires message deduplication to prevent loops.
138    ///
139    /// Default: false
140    pub enable_relay: bool,
141
142    /// Maximum hops for relay messages (TTL)
143    ///
144    /// Messages will not be relayed beyond this many hops from the origin.
145    /// Default: 7
146    pub max_relay_hops: u8,
147
148    /// Gossip fanout for relay
149    ///
150    /// Number of peers to forward each message to. Higher values increase
151    /// convergence speed but also bandwidth usage.
152    /// Default: 2
153    pub relay_fanout: usize,
154
155    /// TTL for seen message cache (milliseconds)
156    ///
157    /// How long to remember message IDs for deduplication.
158    /// Default: 300_000 (5 minutes)
159    pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163    /// Create a new configuration with required fields
164    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165        Self {
166            node_id,
167            callsign: callsign.into(),
168            mesh_id: mesh_id.into(),
169            peripheral_type: PeripheralType::SoldierSensor,
170            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171            sync_interval_ms: 5000,
172            auto_broadcast_events: true,
173            encryption_secret: None,
174            strict_encryption: false,
175            enable_relay: false,
176            max_relay_hops: DEFAULT_MAX_HOPS,
177            relay_fanout: 2,
178            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179        }
180    }
181
182    /// Enable mesh-wide encryption with a shared secret
183    ///
184    /// All documents will be encrypted using ChaCha20-Poly1305 before
185    /// transmission. All mesh participants must use the same secret.
186    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187        self.encryption_secret = Some(secret);
188        self
189    }
190
191    /// Set peripheral type
192    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193        self.peripheral_type = ptype;
194        self
195    }
196
197    /// Set sync interval
198    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199        self.sync_interval_ms = interval_ms;
200        self
201    }
202
203    /// Set peer timeout
204    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205        self.peer_config.peer_timeout_ms = timeout_ms;
206        self
207    }
208
209    /// Set max peers (for embedded systems)
210    pub fn with_max_peers(mut self, max: usize) -> Self {
211        self.peer_config.max_peers = max;
212        self
213    }
214
215    /// Enable strict encryption mode
216    ///
217    /// When enabled (and encryption is also enabled), any unencrypted documents
218    /// received will be rejected and trigger a `SecurityViolation` event.
219    /// This prevents downgrade attacks.
220    ///
221    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
222    pub fn with_strict_encryption(mut self) -> Self {
223        self.strict_encryption = true;
224        self
225    }
226
227    /// Enable multi-hop relay
228    ///
229    /// When enabled, received messages will be forwarded to other connected peers
230    /// based on the gossip strategy. This enables mesh-wide message propagation.
231    pub fn with_relay(mut self) -> Self {
232        self.enable_relay = true;
233        self
234    }
235
236    /// Set maximum relay hops (TTL)
237    ///
238    /// Messages will not be relayed beyond this many hops from the origin.
239    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240        self.max_relay_hops = max_hops;
241        self
242    }
243
244    /// Set gossip fanout for relay
245    ///
246    /// Number of peers to forward each message to.
247    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248        self.relay_fanout = fanout.max(1);
249        self
250    }
251
252    /// Set TTL for seen message cache
253    ///
254    /// How long to remember message IDs for deduplication (milliseconds).
255    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256        self.seen_cache_ttl_ms = ttl_ms;
257        self
258    }
259}
260
261/// Type alias for app document storage to reduce type complexity
262#[cfg(feature = "std")]
263type AppDocumentStore =
264    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266/// Main facade for Peat BLE mesh operations
267///
268/// Composes peer management, document sync, and observer notifications.
269/// Platform implementations call into this from their BLE callbacks.
270#[cfg(feature = "std")]
271pub struct PeatMesh {
272    /// Configuration
273    config: PeatMeshConfig,
274
275    /// Peer manager
276    peer_manager: PeerManager,
277
278    /// Document sync
279    document_sync: DocumentSync,
280
281    /// Observer manager
282    observers: ObserverManager,
283
284    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
285    last_sync_ms: std::sync::atomic::AtomicU32,
286
287    /// Last cleanup time
288    last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290    /// Monotonic outbound counter for peat-lite Document frames
291    /// (`MessageType::Document`). Threaded into the wire header so
292    /// polled receivers can dedup on `(source_node_id, seq_num)` —
293    /// pre-fix callers passed 0 by default and silently disabled
294    /// receive-side dedup. Wraps every ~4B publishes (u32), well
295    /// past any realistic per-session frame rate.
296    #[cfg(feature = "peat-lite-frame")]
297    peat_lite_seq: std::sync::atomic::AtomicU32,
298
299    /// Optional mesh-wide encryption key (derived from shared secret)
300    encryption_key: Option<MeshEncryptionKey>,
301
302    /// Optional per-peer E2EE session manager
303    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
304
305    /// Connection state graph for tracking peer connection lifecycle
306    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
307
308    /// Seen message cache for relay deduplication
309    seen_cache: std::sync::Mutex<SeenMessageCache>,
310
311    /// Gossip strategy for relay peer selection
312    gossip_strategy: Box<dyn GossipStrategy>,
313
314    /// Delta encoder for per-peer sync state tracking
315    ///
316    /// Tracks what data has been sent to each peer to enable delta sync
317    /// (sending only changes instead of full documents).
318    delta_encoder: std::sync::Mutex<DeltaEncoder>,
319
320    /// This node's cryptographic identity (Ed25519 keypair)
321    ///
322    /// When set, the node_id is derived from the public key and documents
323    /// can be signed for authenticity verification.
324    identity: Option<DeviceIdentity>,
325
326    /// TOFU identity registry for tracking peer identities
327    ///
328    /// Maps node_id to public key on first contact, rejects mismatches.
329    identity_registry: std::sync::Mutex<IdentityRegistry>,
330
331    /// Peripheral state received from peers
332    ///
333    /// Stores the most recent peripheral data (callsign, location, etc.)
334    /// received from each peer via document sync.
335    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
336
337    /// Document registry for app-layer CRDT types.
338    ///
339    /// Enables external crates to register custom document types that sync
340    /// through the mesh using the extensible registry pattern.
341    document_registry: DocumentRegistry,
342
343    /// Storage for app-layer documents.
344    ///
345    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
346    /// document instance. Values are type-erased boxes that can be downcast
347    /// using the document registry handlers.
348    app_documents: AppDocumentStore,
349
350    /// ADR-059 BleTranslator instance for inbound translator-frame decoding.
351    ///
352    /// One translator per PeatMesh, constructed with `TranslationConfig::default()`.
353    /// Operators wanting custom collection names can swap via
354    /// `set_translator_config`. Always present when the
355    /// `translator-codec` feature is enabled — the receive-dispatch
356    /// branch in `on_ble_data_received_anonymous` (and siblings) calls
357    /// `decode_inbound_sync` on it for every 0xB6 frame.
358    #[cfg(feature = "translator-codec")]
359    ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
360
361    /// Optional UniFFI-exported `DecodedDocumentJsonCallback` set by
362    /// host bindings (Kotlin / Swift via peat-btle's UniFFI surface).
363    /// ADR-059 Amendment 2 specifies this as the production wiring
364    /// point for cross-transport receive — peat-atak-plugin's
365    /// `BleDecodedDocumentBridge` implements the trait and forwards
366    /// each decoded doc into peat-ffi's `publishDocument` with
367    /// `origin="ble"`.
368    ///
369    /// **ADR-059 Amendment 4 Slice 4.b note.** The Rust-native
370    /// `DecodedDocumentCallback` trait that previously lived alongside
371    /// this one (taking the typed `peat_mesh::sync::Document`) is
372    /// deleted: peat-btle has no peat-mesh dep at all post-Slice-4.b,
373    /// so the only payload shape that crosses the FFI boundary now is
374    /// the JSON projection. Rust-native consumers either consume the
375    /// polled `DataReceivedResult::decoded_translator_frame` field
376    /// (Amendment 3) or wire a `Translator` impl on the peat-mesh
377    /// side (Slice 4.a's `peat_mesh::transport::btle_translator`).
378    #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
379    decoded_document_json_callback:
380        std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
381
382    /// ADR-059 Amendment 3 — set when a host calls
383    /// [`acknowledge_polled_translator_consumer`](Self::acknowledge_polled_translator_consumer)
384    /// to attest that it consumes
385    /// [`DataReceivedResult::decoded_translator_frame`] per receive.
386    /// Read by the receive dispatch as the third suppression branch
387    /// for `PeatEvent::TranslatorNoCallback` (the event fires only
388    /// when all three are absent — no Rust-trait callback, no UniFFI
389    /// JSON callback, no polled attestation).
390    ///
391    /// **Defined unconditionally** (not `cfg`-gated) so the binding
392    /// shape is stable across feature combos — UniFFI hosts call the
393    /// same Kotlin signature against any peat-btle build. When
394    /// `translator-codec` is off the read site (suppression check) is
395    /// gated alongside the rest of the dispatch logic; the flag still
396    /// exists, it just has no effect.
397    polled_translator_consumer_attested: std::sync::atomic::AtomicBool,
398}
399
400#[cfg(feature = "std")]
401impl PeatMesh {
402    /// Create a new PeatMesh instance
403    pub fn new(config: PeatMeshConfig) -> Self {
404        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
405        let document_sync = DocumentSync::with_peripheral_type(
406            config.node_id,
407            &config.callsign,
408            config.peripheral_type,
409        );
410
411        // Derive encryption key from shared secret if configured
412        let encryption_key = config
413            .encryption_secret
414            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
415
416        // Create connection state graph with config thresholds
417        let connection_graph = ConnectionStateGraph::with_config(
418            config.peer_config.rssi_degraded_threshold,
419            config.peer_config.lost_timeout_ms,
420        );
421
422        // Create seen message cache for relay deduplication
423        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
424
425        // Create gossip strategy for relay
426        let gossip_strategy: Box<dyn GossipStrategy> =
427            Box::new(RandomFanout::new(config.relay_fanout));
428
429        // Create delta encoder for per-peer sync state tracking
430        let delta_encoder = DeltaEncoder::new(config.node_id);
431
432        let document_registry = DocumentRegistry::new();
433
434        Self {
435            config,
436            peer_manager,
437            document_sync,
438            observers: ObserverManager::new(),
439            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
440            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
441            #[cfg(feature = "peat-lite-frame")]
442            peat_lite_seq: std::sync::atomic::AtomicU32::new(0),
443            encryption_key,
444            peer_sessions: std::sync::Mutex::new(None),
445            connection_graph: std::sync::Mutex::new(connection_graph),
446            seen_cache: std::sync::Mutex::new(seen_cache),
447            gossip_strategy,
448            delta_encoder: std::sync::Mutex::new(delta_encoder),
449            identity: None,
450            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
451            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
452            document_registry,
453            app_documents: std::sync::RwLock::new(HashMap::new()),
454            #[cfg(feature = "translator-codec")]
455            ble_translator: std::sync::RwLock::new(
456                crate::translator::BleTranslator::with_defaults(),
457            ),
458            #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
459            decoded_document_json_callback: std::sync::RwLock::new(None),
460            polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
461        }
462    }
463
464    /// Create a new PeatMesh with a cryptographic identity
465    ///
466    /// The node_id will be derived from the identity's public key, overriding
467    /// any node_id specified in the config. This ensures cryptographic binding
468    /// between node_id and identity.
469    pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
470        // Override node_id with identity-derived value
471        let mut config = config;
472        config.node_id = identity.node_id();
473
474        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
475        let document_sync = DocumentSync::with_peripheral_type(
476            config.node_id,
477            &config.callsign,
478            config.peripheral_type,
479        );
480
481        let encryption_key = config
482            .encryption_secret
483            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
484
485        let connection_graph = ConnectionStateGraph::with_config(
486            config.peer_config.rssi_degraded_threshold,
487            config.peer_config.lost_timeout_ms,
488        );
489
490        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
491        let gossip_strategy: Box<dyn GossipStrategy> =
492            Box::new(RandomFanout::new(config.relay_fanout));
493        let delta_encoder = DeltaEncoder::new(config.node_id);
494
495        let document_registry = DocumentRegistry::new();
496
497        Self {
498            config,
499            peer_manager,
500            document_sync,
501            observers: ObserverManager::new(),
502            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
503            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
504            #[cfg(feature = "peat-lite-frame")]
505            peat_lite_seq: std::sync::atomic::AtomicU32::new(0),
506            encryption_key,
507            peer_sessions: std::sync::Mutex::new(None),
508            connection_graph: std::sync::Mutex::new(connection_graph),
509            seen_cache: std::sync::Mutex::new(seen_cache),
510            gossip_strategy,
511            delta_encoder: std::sync::Mutex::new(delta_encoder),
512            identity: Some(identity),
513            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
514            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
515            document_registry,
516            app_documents: std::sync::RwLock::new(HashMap::new()),
517            #[cfg(feature = "translator-codec")]
518            ble_translator: std::sync::RwLock::new(
519                crate::translator::BleTranslator::with_defaults(),
520            ),
521            #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
522            decoded_document_json_callback: std::sync::RwLock::new(None),
523            polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
524        }
525    }
526
527    /// Create a new PeatMesh from genesis data
528    ///
529    /// This is the recommended way to create a mesh for production use.
530    /// The mesh will be configured with:
531    /// - node_id derived from identity
532    /// - mesh_id from genesis
533    /// - encryption enabled using genesis-derived secret
534    pub fn from_genesis(
535        genesis: &crate::security::MeshGenesis,
536        identity: DeviceIdentity,
537        callsign: &str,
538    ) -> Self {
539        let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
540            .with_encryption(genesis.encryption_secret());
541
542        Self::with_identity(config, identity)
543    }
544
545    /// Create a PeatMesh from persisted state.
546    ///
547    /// Restores mesh configuration from previously saved state, including:
548    /// - Device identity (Ed25519 keypair)
549    /// - Mesh genesis (if present)
550    /// - Identity registry (TOFU cache)
551    ///
552    /// Use this on device boot to restore mesh membership without re-provisioning.
553    ///
554    /// # Arguments
555    ///
556    /// * `state` - Previously persisted state
557    /// * `callsign` - Human-readable identifier (may differ from original)
558    ///
559    /// # Errors
560    ///
561    /// Returns `PersistenceError` if the identity cannot be restored.
562    ///
563    /// # Example
564    ///
565    /// ```ignore
566    /// // On boot, restore from secure storage
567    /// let state = PersistedState::load(&storage)?;
568    /// let mesh = PeatMesh::from_persisted(state, "SENSOR-01")?;
569    /// ```
570    #[cfg(feature = "std")]
571    pub fn from_persisted(
572        state: crate::security::PersistedState,
573        callsign: &str,
574    ) -> Result<Self, crate::security::PersistenceError> {
575        // Restore identity
576        let identity = state.restore_identity()?;
577
578        // Restore genesis (if present)
579        let genesis = state.restore_genesis();
580
581        // Create mesh with or without genesis
582        let mesh = if let Some(ref gen) = genesis {
583            Self::from_genesis(gen, identity, callsign)
584        } else {
585            let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
586            Self::with_identity(config, identity)
587        };
588
589        // Restore identity registry
590        let restored_registry = state.restore_registry();
591        if let Ok(mut registry) = mesh.identity_registry.lock() {
592            *registry = restored_registry;
593        }
594
595        log::info!(
596            "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
597            mesh.config.node_id.as_u32(),
598            mesh.known_identity_count()
599        );
600
601        Ok(mesh)
602    }
603
604    /// Create persisted state from current mesh state.
605    ///
606    /// Captures the current identity, genesis, and registry for persistence.
607    /// Call this periodically or before shutdown to save state.
608    ///
609    /// # Arguments
610    ///
611    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
612    ///
613    /// # Returns
614    ///
615    /// `None` if the mesh has no identity bound.
616    #[cfg(feature = "std")]
617    pub fn to_persisted_state(
618        &self,
619        genesis: Option<&crate::security::MeshGenesis>,
620    ) -> Option<crate::security::PersistedState> {
621        let identity = self.identity.as_ref()?;
622        let registry = self.identity_registry.lock().ok()?;
623
624        Some(crate::security::PersistedState::with_registry(
625            identity, genesis, &registry,
626        ))
627    }
628
629    // ==================== ADR-059 translator-frame receive ====================
630
631    /// Install the UniFFI-exported
632    /// [`DecodedDocumentJsonCallback`](crate::DecodedDocumentJsonCallback)
633    /// invoked when a 0xB6 translator frame decodes successfully (ADR-059
634    /// Amendment 2 — host-side wiring point for cross-transport receive).
635    ///
636    /// Idempotent — calling again replaces the previous callback. When
637    /// no callback is installed and no polled-consumer attestation has
638    /// fired (see [`acknowledge_polled_translator_consumer`](Self::acknowledge_polled_translator_consumer)),
639    /// `PeatEvent::TranslatorNoCallback` fires on the observer channel
640    /// so the gap is operator-observable.
641    ///
642    /// **ADR-059 Amendment 4 Slice 4.b note.** The previously parallel
643    /// Rust-trait `set_decoded_document_callback` (taking
644    /// `peat_mesh::sync::Document`) is deleted along with the trait
645    /// itself; peat-btle has no peat-mesh dep at all post-Slice-4.b, so
646    /// the only payload shape is the JSON projection. Rust-native
647    /// consumers that need typed `Document` should wire a `Translator`
648    /// impl on the peat-mesh side (Slice 4.a's
649    /// `peat_mesh::transport::btle_translator`).
650    #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
651    pub fn set_decoded_document_json_callback(
652        &self,
653        cb: Box<dyn crate::DecodedDocumentJsonCallback>,
654    ) {
655        // UniFFI 0.31 callback_interface generates a `Box<dyn Trait>` on
656        // the FFI surface. We store as `Arc<dyn Trait>` internally so the
657        // receive dispatch can snapshot-clone the handle out of the
658        // RwLock without holding the lock during user-supplied dispatch.
659        let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
660        if let Ok(mut slot) = self.decoded_document_json_callback.write() {
661            *slot = Some(cb_arc);
662        }
663    }
664
665    /// Mark this `PeatMesh` as having a polled-field consumer (ADR-059
666    /// Amendment 3). Hosts that read
667    /// [`DataReceivedResult::decoded_translator_frame`] per receive and
668    /// forward through their own publish path SHOULD call this once at
669    /// startup, after constructing `PeatMesh` and before any GATT
670    /// receive arrives. Without this attestation, the receive dispatch
671    /// has no way to distinguish a polled-consumer host from one that
672    /// decoded the frame and dropped it on the floor, and
673    /// `PeatEvent::TranslatorNoCallback` will fire for every frame.
674    ///
675    /// Idempotent — calling again is a no-op. Defined unconditionally
676    /// so the binding shape is stable across feature combos; when
677    /// `translator-codec` is off the suppression-check read site is
678    /// gated alongside the rest of the dispatch logic, so the flag has
679    /// no effect (still safe to call).
680    pub fn acknowledge_polled_translator_consumer(&self) {
681        self.polled_translator_consumer_attested
682            .store(true, std::sync::atomic::Ordering::Release);
683    }
684
685    /// Replace the active `BleTranslator` configuration. Optional; the
686    /// default (operator collection names = "tracks"/"platforms"/
687    /// "alerts"/"canned_messages") covers most deployments.
688    #[cfg(feature = "translator-codec")]
689    pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
690        if let Ok(mut t) = self.ble_translator.write() {
691            *t = crate::translator::BleTranslator::new(config);
692        }
693    }
694
695    /// Translator-frame / reserved-marker dispatcher. Returns `true` if the
696    /// first byte of `decrypted` fell in `0xB6..=0xBF` (handled or silently
697    /// dropped); the caller MUST stop processing and return `None`. Returns
698    /// `false` if the byte was something else; the caller continues with
699    /// the existing legacy / delta-document flow.
700    ///
701    /// `peer` is the BLE peer's identifier (typically a peripheral address).
702    /// `source_node` is the peer's `NodeId` when known — used to populate
703    /// the `peripheral_id` field on `DecodeInboundCtx`, which the `tracks`
704    /// collection's decoder requires per the codec contract. Anonymous
705    /// receive paths pass `None`; tracks decoding will then return an
706    /// `Err` (logged + dropped) but other collections succeed.
707    #[cfg(feature = "translator-codec")]
708    fn try_handle_translator_marker(
709        &self,
710        decrypted: &[u8],
711        peer: Option<&str>,
712        source_node: Option<NodeId>,
713    ) -> TranslatorMarkerOutcome {
714        if decrypted.is_empty() {
715            return TranslatorMarkerOutcome::NotTranslatorMarker;
716        }
717        let marker = decrypted[0];
718
719        // Reserved range — silent drop for forward-compat. Receivers running
720        // *this* peat-btle release MUST drop unknown future-translator
721        // markers without falling through to merge_document, otherwise a
722        // 0xB7..=0xBF frame would corrupt the GCounter (ADR-059 Amendment 1
723        // §"Backwards compatibility… 2").
724        if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
725            log::warn!(
726                "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
727                decrypted.len()
728            );
729            return TranslatorMarkerOutcome::Handled;
730        }
731
732        if marker != TRANSLATOR_FRAME_MARKER {
733            return TranslatorMarkerOutcome::NotTranslatorMarker;
734        }
735
736        // 0xB6 translator dispatch.
737        if decrypted.len() < 2 {
738            log::warn!(
739                "ble: dropping truncated translator frame (len={}, missing collection code)",
740                decrypted.len()
741            );
742            return TranslatorMarkerOutcome::Handled;
743        }
744
745        let code = decrypted[1];
746        let payload = &decrypted[2..];
747
748        // Resolve the collection name + build a DecodeInboundCtx under
749        // the read-lock, drop the lock before invoking decode_inbound_sync.
750        let (collection, decode_result) = {
751            let translator = match self.ble_translator.read() {
752                Ok(g) => g,
753                Err(_) => {
754                    log::warn!("ble: translator RwLock poisoned; dropping frame");
755                    return TranslatorMarkerOutcome::Handled;
756                }
757            };
758            let collection = match translator.code_to_collection(code) {
759                Some(c) => c.to_string(),
760                None => {
761                    log::warn!(
762                        "ble: dropping translator frame with unknown collection code 0x{code:02X}"
763                    );
764                    return TranslatorMarkerOutcome::Handled;
765                }
766            };
767
768            let ctx = crate::translator::DecodeInboundCtx {
769                collection: &collection,
770                callsign: None,
771                cell_id: None,
772                peripheral_id: source_node.map(|n| n.as_u32()),
773            };
774
775            let result = translator.decode_inbound_sync(payload, &ctx);
776            (collection, result)
777        };
778
779        match decode_result {
780            Ok(Some(value)) => {
781                // ADR-059 Amendment 2: UniFFI JSON callback. Amendment 3:
782                // polled `decoded_translator_frame` struct field. Slice 4.b
783                // deleted the parallel Rust-trait callback (it took
784                // peat_mesh::sync::Document; peat-btle no longer depends
785                // on peat-mesh). Suppression rule for
786                // `PeatEvent::TranslatorNoCallback` fires only when both
787                // remaining sinks are absent — no UniFFI JSON callback,
788                // no polled-consumer attestation via
789                // `acknowledge_polled_translator_consumer`.
790                #[cfg(feature = "uniffi")]
791                let json_cb = self
792                    .decoded_document_json_callback
793                    .read()
794                    .ok()
795                    .and_then(|g| g.as_ref().cloned());
796                #[cfg(not(feature = "uniffi"))]
797                let json_cb: Option<()> = None;
798
799                let polled_attested = self
800                    .polled_translator_consumer_attested
801                    .load(std::sync::atomic::Ordering::Acquire);
802
803                let any_consumer = json_cb.is_some() || polled_attested;
804
805                // Serialize the Value projection to a JSON string. Same
806                // wire shape Amendment 2 specifies — peat-mesh's
807                // `Translator` impl on the receive side projects the
808                // string back into `peat_mesh::sync::Document`; consumers
809                // forwarding through peat-ffi pass the string through.
810                let doc_json_result = serde_json::to_string(&value);
811
812                #[cfg(feature = "uniffi")]
813                if let (Some(json_cb), Ok(ref doc_json)) = (json_cb, &doc_json_result) {
814                    json_cb.on_document(
815                        collection.clone(),
816                        doc_json.clone(),
817                        peer.map(str::to_string),
818                    );
819                }
820
821                if let Err(ref e) = doc_json_result {
822                    log::warn!(
823                        "ble: failed to serialize decoded {} value to JSON: {}",
824                        collection,
825                        e
826                    );
827                }
828
829                if !any_consumer {
830                    // No callback installed AND host hasn't attested to
831                    // polling. Decoded successfully but nobody's
832                    // listening. Per ADR-059 Amendment 1/2/3, surface
833                    // through the operator-observable PeatEvent channel
834                    // so staged-rollout deployments see, in real time,
835                    // how many translator-frame payloads the bridge is
836                    // decoding into the void. `log::debug!` alone is
837                    // not a monitoring signal — disabled at default
838                    // log levels.
839                    log::debug!(
840                        "ble: decoded {} frame but no consumer registered (no callback, no polled attestation)",
841                        collection
842                    );
843                    self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
844                        collection: collection.clone(),
845                        peer: peer.map(str::to_string),
846                    });
847                }
848
849                // Surface the decoded frame to the caller as the
850                // polled-consumer output. Caller hoists into
851                // `DataReceivedResult.decoded_translator_frame`.
852                // Falls through to `Handled` (no frame to surface) if
853                // JSON serialization failed above.
854                if let Ok(doc_json) = doc_json_result {
855                    return TranslatorMarkerOutcome::Decoded(DecodedTranslatorFrame {
856                        collection,
857                        doc_json,
858                        peer: peer.map(str::to_string),
859                    });
860                }
861            }
862            Ok(None) => {
863                log::debug!(
864                    "ble: codec declined translator frame for collection {}",
865                    collection
866                );
867            }
868            Err(e) => {
869                log::warn!(
870                    "ble: translator frame decode error (collection={}): {:#}",
871                    collection,
872                    e
873                );
874            }
875        }
876
877        TranslatorMarkerOutcome::Handled
878    }
879
880    // ==================== Encryption ====================
881
882    /// Check if mesh-wide encryption is enabled
883    pub fn is_encryption_enabled(&self) -> bool {
884        self.encryption_key.is_some()
885    }
886
887    /// Check if strict encryption mode is enabled
888    ///
889    /// Returns true only if both encryption and strict_encryption are enabled.
890    pub fn is_strict_encryption_enabled(&self) -> bool {
891        self.config.strict_encryption && self.encryption_key.is_some()
892    }
893
894    /// Enable mesh-wide encryption with a shared secret
895    ///
896    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
897    /// All mesh participants must use the same secret to communicate.
898    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
899        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
900            &self.config.mesh_id,
901            secret,
902        ));
903    }
904
905    /// Disable mesh-wide encryption
906    pub fn disable_encryption(&mut self) {
907        self.encryption_key = None;
908    }
909
910    /// Encrypt document bytes for transmission
911    ///
912    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
913    /// original bytes if encryption is disabled.
914    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
915        match &self.encryption_key {
916            Some(key) => {
917                // Encrypt and prepend marker
918                match key.encrypt_to_bytes(plaintext) {
919                    Ok(ciphertext) => {
920                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
921                        buf.push(ENCRYPTED_MARKER);
922                        buf.push(0x00); // reserved
923                        buf.extend_from_slice(&ciphertext);
924                        buf
925                    }
926                    Err(e) => {
927                        log::error!("Encryption failed: {}", e);
928                        // Fall back to unencrypted on error (shouldn't happen)
929                        plaintext.to_vec()
930                    }
931                }
932            }
933            None => plaintext.to_vec(),
934        }
935    }
936
937    /// Decrypt document bytes received from peer
938    ///
939    /// Returns the decrypted bytes if encrypted and valid, or the original
940    /// bytes if not encrypted. Returns None if decryption fails.
941    ///
942    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
943    /// unencrypted documents are rejected and trigger a SecurityViolation event.
944    fn decrypt_document<'a>(
945        &self,
946        data: &'a [u8],
947        source_hint: Option<&str>,
948    ) -> Option<std::borrow::Cow<'a, [u8]>> {
949        log::debug!(
950            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
951            data.len(),
952            data.first().copied().unwrap_or(0),
953            source_hint
954        );
955
956        // Check for encrypted marker
957        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
958            // Encrypted document
959            let _reserved = data[1];
960            let encrypted_payload = &data[2..];
961
962            log::debug!(
963                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
964                encrypted_payload.len()
965            );
966
967            match &self.encryption_key {
968                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
969                    Ok(plaintext) => {
970                        log::debug!(
971                            "decrypt_document: SUCCESS, plaintext len={}",
972                            plaintext.len()
973                        );
974                        Some(std::borrow::Cow::Owned(plaintext))
975                    }
976                    Err(e) => {
977                        log::warn!(
978                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
979                            e,
980                            encrypted_payload.len(),
981                            source_hint
982                        );
983                        self.notify(PeatEvent::SecurityViolation {
984                            kind: SecurityViolationKind::DecryptionFailed,
985                            source: source_hint.map(String::from),
986                        });
987                        None
988                    }
989                },
990                None => {
991                    log::warn!(
992                        "decrypt_document: encryption not enabled but received encrypted doc"
993                    );
994                    None
995                }
996            }
997        } else {
998            // Unencrypted document
999            // Check strict encryption mode
1000            if self.config.strict_encryption && self.encryption_key.is_some() {
1001                log::warn!(
1002                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
1003                    source_hint
1004                );
1005                self.notify(PeatEvent::SecurityViolation {
1006                    kind: SecurityViolationKind::UnencryptedInStrictMode,
1007                    source: source_hint.map(String::from),
1008                });
1009                None
1010            } else {
1011                // Permissive mode: accept unencrypted for backward compatibility
1012                Some(std::borrow::Cow::Borrowed(data))
1013            }
1014        }
1015    }
1016
1017    // ==================== Transport Layer API ====================
1018
1019    /// Decrypt data without parsing (transport-only operation)
1020    ///
1021    /// This method provides raw decrypted bytes for apps that want to handle
1022    /// message parsing themselves (using peat-lite or other libraries).
1023    ///
1024    /// # Arguments
1025    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
1026    ///
1027    /// # Returns
1028    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
1029    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
1030    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
1031        self.decrypt_document(data, None)
1032            .map(|cow| cow.into_owned())
1033    }
1034
1035    // ==================== Identity ====================
1036
1037    /// Check if this mesh has a cryptographic identity
1038    pub fn has_identity(&self) -> bool {
1039        self.identity.is_some()
1040    }
1041
1042    /// Get this node's public key (if identity is configured)
1043    pub fn public_key(&self) -> Option<[u8; 32]> {
1044        self.identity.as_ref().map(|id| id.public_key())
1045    }
1046
1047    /// Create an identity attestation for this node
1048    ///
1049    /// Returns None if no identity is configured.
1050    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1051        self.identity
1052            .as_ref()
1053            .map(|id| id.create_attestation(now_ms))
1054    }
1055
1056    /// Verify and register a peer's identity attestation
1057    ///
1058    /// Implements TOFU (Trust On First Use):
1059    /// - On first contact, registers the node_id → public_key binding
1060    /// - On subsequent contacts, verifies the public key matches
1061    ///
1062    /// Returns the verification result. Security violations should be handled
1063    /// by the caller (e.g., disconnect, alert).
1064    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1065        self.identity_registry
1066            .lock()
1067            .unwrap()
1068            .verify_or_register(attestation)
1069    }
1070
1071    /// Check if a peer's identity is known (has been registered)
1072    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1073        self.identity_registry.lock().unwrap().is_known(node_id)
1074    }
1075
1076    /// Get a peer's public key if known
1077    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1078        self.identity_registry
1079            .lock()
1080            .unwrap()
1081            .get_public_key(node_id)
1082            .copied()
1083    }
1084
1085    /// Get the number of known peer identities
1086    pub fn known_identity_count(&self) -> usize {
1087        self.identity_registry.lock().unwrap().len()
1088    }
1089
1090    /// Pre-register a peer's identity (for out-of-band key exchange)
1091    ///
1092    /// Use this when keys are exchanged through a secure side channel
1093    /// (e.g., QR code, NFC tap, or provisioning server).
1094    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1095        self.identity_registry
1096            .lock()
1097            .unwrap()
1098            .pre_register(node_id, public_key, now_ms);
1099    }
1100
1101    /// Remove a peer's identity from the registry
1102    ///
1103    /// Use with caution - this allows re-registration with a different key.
1104    pub fn forget_peer_identity(&self, node_id: NodeId) {
1105        self.identity_registry.lock().unwrap().remove(node_id);
1106    }
1107
1108    /// Sign arbitrary data with this node's identity
1109    ///
1110    /// Returns None if no identity is configured.
1111    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1112        self.identity.as_ref().map(|id| id.sign(data))
1113    }
1114
1115    /// Verify a signature from a peer
1116    ///
1117    /// Uses the peer's public key from the identity registry.
1118    /// Returns false if peer is unknown or signature is invalid.
1119    pub fn verify_peer_signature(
1120        &self,
1121        node_id: NodeId,
1122        data: &[u8],
1123        signature: &[u8; 64],
1124    ) -> bool {
1125        if let Some(public_key) = self.peer_public_key(node_id) {
1126            crate::security::verify_signature(&public_key, data, signature)
1127        } else {
1128            false
1129        }
1130    }
1131
1132    // ==================== Multi-Hop Relay ====================
1133
1134    /// Check if multi-hop relay is enabled
1135    pub fn is_relay_enabled(&self) -> bool {
1136        self.config.enable_relay
1137    }
1138
1139    /// Enable multi-hop relay
1140    pub fn enable_relay(&mut self) {
1141        self.config.enable_relay = true;
1142    }
1143
1144    /// Disable multi-hop relay
1145    pub fn disable_relay(&mut self) {
1146        self.config.enable_relay = false;
1147    }
1148
1149    /// Check if a message has been seen before (for deduplication)
1150    ///
1151    /// Returns true if the message was already seen (duplicate).
1152    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1153        self.seen_cache.lock().unwrap().has_seen(message_id)
1154    }
1155
1156    /// Mark a message as seen
1157    ///
1158    /// Returns true if this is a new message (first time seen).
1159    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1160        self.seen_cache
1161            .lock()
1162            .unwrap()
1163            .check_and_mark(message_id, origin, now_ms)
1164    }
1165
1166    /// Get the number of entries in the seen message cache
1167    pub fn seen_cache_size(&self) -> usize {
1168        self.seen_cache.lock().unwrap().len()
1169    }
1170
1171    /// Clear the seen message cache
1172    pub fn clear_seen_cache(&self) {
1173        self.seen_cache.lock().unwrap().clear();
1174    }
1175
1176    /// Wrap a document in a relay envelope for multi-hop transmission
1177    ///
1178    /// The returned bytes can be sent to peers and will be automatically
1179    /// relayed through the mesh if relay is enabled on receiving nodes.
1180    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1181        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1182            .with_max_hops(self.config.max_relay_hops);
1183        envelope.encode()
1184    }
1185
1186    /// Get peers to relay a message to
1187    ///
1188    /// Uses the configured gossip strategy to select relay targets.
1189    /// Excludes the source peer (if provided) to avoid sending back to sender.
1190    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1191        let connected = self.peer_manager.get_connected_peers();
1192        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1193            connected
1194                .into_iter()
1195                .filter(|p| p.node_id != exclude)
1196                .collect()
1197        } else {
1198            connected
1199        };
1200
1201        self.gossip_strategy
1202            .select_peers(&filtered)
1203            .into_iter()
1204            .cloned()
1205            .collect()
1206    }
1207
1208    /// Process an incoming relay envelope
1209    ///
1210    /// Handles deduplication, TTL checking, and determines if the message
1211    /// should be processed and/or relayed.
1212    ///
1213    /// Returns:
1214    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
1215    /// - `Ok(None)` if message was a duplicate or TTL expired
1216    /// - `Err` if parsing failed
1217    pub fn process_relay_envelope(
1218        &self,
1219        data: &[u8],
1220        source_peer: NodeId,
1221        now_ms: u64,
1222    ) -> Option<RelayDecision> {
1223        // Parse envelope
1224        let envelope = RelayEnvelope::decode(data)?;
1225
1226        // Update indirect peer graph if origin differs from source
1227        // This means the message was relayed through source_peer from origin_node
1228        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1229            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1230                source_peer,
1231                envelope.origin_node,
1232                envelope.hop_count,
1233                now_ms,
1234            );
1235
1236            if is_new {
1237                log::debug!(
1238                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1239                    envelope.origin_node.as_u32(),
1240                    source_peer.as_u32(),
1241                    envelope.hop_count
1242                );
1243            }
1244        }
1245
1246        // Check deduplication
1247        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1248            // Duplicate message
1249            let stats = self
1250                .seen_cache
1251                .lock()
1252                .unwrap()
1253                .get_stats(&envelope.message_id);
1254            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1255
1256            self.notify(PeatEvent::DuplicateMessageDropped {
1257                origin_node: envelope.origin_node,
1258                seen_count,
1259            });
1260
1261            log::debug!(
1262                "Dropped duplicate message {} from {:08X} (seen {} times)",
1263                envelope.message_id,
1264                envelope.origin_node.as_u32(),
1265                seen_count
1266            );
1267            return None;
1268        }
1269
1270        // Check TTL
1271        if !envelope.can_relay() {
1272            self.notify(PeatEvent::MessageTtlExpired {
1273                origin_node: envelope.origin_node,
1274                hop_count: envelope.hop_count,
1275            });
1276
1277            log::debug!(
1278                "Message {} from {:08X} TTL expired at hop {}",
1279                envelope.message_id,
1280                envelope.origin_node.as_u32(),
1281                envelope.hop_count
1282            );
1283
1284            // Still process locally even if TTL expired
1285            return Some(RelayDecision {
1286                payload: envelope.payload,
1287                origin_node: envelope.origin_node,
1288                hop_count: envelope.hop_count,
1289                should_relay: false,
1290                relay_envelope: None,
1291            });
1292        }
1293
1294        // Determine if we should relay
1295        let should_relay = self.config.enable_relay;
1296        let relay_envelope = if should_relay {
1297            envelope.relay() // Increments hop count
1298        } else {
1299            None
1300        };
1301
1302        Some(RelayDecision {
1303            payload: envelope.payload,
1304            origin_node: envelope.origin_node,
1305            hop_count: envelope.hop_count,
1306            should_relay,
1307            relay_envelope,
1308        })
1309    }
1310
1311    /// Build a document wrapped in a relay envelope
1312    ///
1313    /// Convenience method that builds the document, encrypts it (if enabled),
1314    /// and wraps it in a relay envelope for multi-hop transmission.
1315    pub fn build_relay_document(&self) -> Vec<u8> {
1316        let doc = self.build_document(); // Already encrypted if encryption enabled
1317        self.wrap_for_relay(doc)
1318    }
1319
1320    // ==================== Delta Sync ====================
1321
1322    /// Register a peer for delta sync tracking
1323    ///
1324    /// Call this when a peer connects to start tracking what data has been
1325    /// sent to them. This enables future delta sync (sending only changes).
1326    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1327        let mut encoder = self.delta_encoder.lock().unwrap();
1328        encoder.add_peer(peer_id);
1329        log::debug!(
1330            "Registered peer {:08X} for delta sync tracking",
1331            peer_id.as_u32()
1332        );
1333    }
1334
1335    /// Unregister a peer from delta sync tracking
1336    ///
1337    /// Call this when a peer disconnects to clean up tracking state.
1338    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1339        let mut encoder = self.delta_encoder.lock().unwrap();
1340        encoder.remove_peer(peer_id);
1341        log::debug!(
1342            "Unregistered peer {:08X} from delta sync tracking",
1343            peer_id.as_u32()
1344        );
1345    }
1346
1347    /// Reset delta sync state for a peer
1348    ///
1349    /// Call this when a peer reconnects to force a full sync on next
1350    /// communication. This clears the record of what was previously sent.
1351    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1352        let mut encoder = self.delta_encoder.lock().unwrap();
1353        encoder.reset_peer(peer_id);
1354        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1355    }
1356
1357    /// Record bytes sent to a peer (for delta statistics)
1358    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1359        let mut encoder = self.delta_encoder.lock().unwrap();
1360        encoder.record_sent(peer_id, bytes);
1361    }
1362
1363    /// Record bytes received from a peer (for delta statistics)
1364    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1365        let mut encoder = self.delta_encoder.lock().unwrap();
1366        encoder.record_received(peer_id, bytes, timestamp);
1367    }
1368
1369    /// Get delta sync statistics
1370    ///
1371    /// Returns aggregate statistics about delta sync across all peers,
1372    /// including bytes sent/received and sync counts.
1373    pub fn delta_stats(&self) -> DeltaStats {
1374        self.delta_encoder.lock().unwrap().stats()
1375    }
1376
1377    /// Get delta sync statistics for a specific peer
1378    ///
1379    /// Returns the bytes sent/received and sync count for a single peer.
1380    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1381        let encoder = self.delta_encoder.lock().unwrap();
1382        encoder
1383            .get_peer_state(peer_id)
1384            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1385    }
1386
1387    /// Build a delta document for a specific peer
1388    ///
1389    /// This only includes operations that have changed since the last sync
1390    /// with this peer. Uses the delta encoder to track per-peer state.
1391    ///
1392    /// Returns the encoded delta document bytes, or None if there's nothing
1393    /// new to send to this peer.
1394    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1395        // Collect all current operations
1396        let mut all_operations: Vec<Operation> = Vec::new();
1397
1398        // Add counter operations (one per node that has contributed)
1399        // Use the count value as the "timestamp" for tracking - only send if count increased
1400        for (node_id_u32, count) in self.document_sync.counter_entries() {
1401            all_operations.push(Operation::IncrementCounter {
1402                counter_id: 0, // Default mesh counter
1403                node_id: NodeId::new(node_id_u32),
1404                amount: count,
1405                timestamp: count, // Use count as timestamp for delta tracking
1406            });
1407        }
1408
1409        // Add peripheral update
1410        // Use event timestamp if available, otherwise use 1 for initial send
1411        let peripheral = self.document_sync.peripheral_snapshot();
1412        let peripheral_timestamp = peripheral
1413            .last_event
1414            .as_ref()
1415            .map(|e| e.timestamp)
1416            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1417        all_operations.push(Operation::UpdatePeripheral {
1418            peripheral,
1419            timestamp: peripheral_timestamp,
1420        });
1421
1422        // Add emergency operations if active
1423        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1424            let source_node = NodeId::new(emergency.source_node());
1425            let timestamp = emergency.timestamp();
1426
1427            // Add SetEmergency operation
1428            all_operations.push(Operation::SetEmergency {
1429                source_node,
1430                timestamp,
1431                known_peers: emergency.all_nodes(),
1432            });
1433
1434            // Add AckEmergency for each node that has acked
1435            for acked_node in emergency.acked_nodes() {
1436                all_operations.push(Operation::AckEmergency {
1437                    node_id: NodeId::new(acked_node),
1438                    emergency_timestamp: timestamp,
1439                });
1440            }
1441        }
1442
1443        // Add app document operations
1444        for app_op in self.app_document_delta_ops() {
1445            all_operations.push(Operation::App(app_op));
1446        }
1447
1448        // Filter operations for this peer (only send what's new)
1449        let filtered_operations: Vec<Operation> = {
1450            let encoder = self.delta_encoder.lock().unwrap();
1451            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1452                all_operations
1453                    .into_iter()
1454                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1455                    .collect()
1456            } else {
1457                // Unknown peer, send all operations
1458                all_operations
1459            }
1460        };
1461
1462        // If nothing new to send, return None
1463        if filtered_operations.is_empty() {
1464            return None;
1465        }
1466
1467        // Mark operations as sent
1468        {
1469            let mut encoder = self.delta_encoder.lock().unwrap();
1470            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1471                for op in &filtered_operations {
1472                    peer_state.mark_sent(&op.key(), op.timestamp());
1473                }
1474            }
1475        }
1476
1477        // Build the delta document
1478        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1479        for op in filtered_operations {
1480            delta.add_operation(op);
1481        }
1482
1483        // Encode and optionally encrypt
1484        let encoded = delta.encode();
1485        let result = self.encrypt_document(&encoded);
1486
1487        // Record stats
1488        {
1489            let mut encoder = self.delta_encoder.lock().unwrap();
1490            encoder.record_sent(peer_id, result.len());
1491        }
1492
1493        Some(result)
1494    }
1495
1496    /// Build a full delta document (for broadcast or new peers)
1497    ///
1498    /// Unlike `build_delta_document_for_peer`, this includes all state
1499    /// regardless of what has been sent before. Use this for broadcasts.
1500    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1501        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1502
1503        // Add all counter operations
1504        for (node_id_u32, count) in self.document_sync.counter_entries() {
1505            delta.add_operation(Operation::IncrementCounter {
1506                counter_id: 0,
1507                node_id: NodeId::new(node_id_u32),
1508                amount: count,
1509                timestamp: now_ms,
1510            });
1511        }
1512
1513        // Add peripheral
1514        let peripheral = self.document_sync.peripheral_snapshot();
1515        let peripheral_timestamp = peripheral
1516            .last_event
1517            .as_ref()
1518            .map(|e| e.timestamp)
1519            .unwrap_or(now_ms);
1520        delta.add_operation(Operation::UpdatePeripheral {
1521            peripheral,
1522            timestamp: peripheral_timestamp,
1523        });
1524
1525        // Add emergency if active
1526        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1527            let source_node = NodeId::new(emergency.source_node());
1528            let timestamp = emergency.timestamp();
1529
1530            delta.add_operation(Operation::SetEmergency {
1531                source_node,
1532                timestamp,
1533                known_peers: emergency.all_nodes(),
1534            });
1535
1536            for acked_node in emergency.acked_nodes() {
1537                delta.add_operation(Operation::AckEmergency {
1538                    node_id: NodeId::new(acked_node),
1539                    emergency_timestamp: timestamp,
1540                });
1541            }
1542        }
1543
1544        // Add app document operations
1545        for app_op in self.app_document_delta_ops() {
1546            delta.add_operation(Operation::App(app_op));
1547        }
1548
1549        let encoded = delta.encode();
1550        self.encrypt_document(&encoded)
1551    }
1552
1553    /// Internal: Process a received delta document
1554    ///
1555    /// Applies operations from a delta document to local state.
1556    fn process_delta_document_internal(
1557        &self,
1558        source_node: NodeId,
1559        data: &[u8],
1560        now_ms: u64,
1561        relay_data: Option<Vec<u8>>,
1562        origin_node: Option<NodeId>,
1563        hop_count: u8,
1564    ) -> Option<DataReceivedResult> {
1565        // Decode the delta document
1566        let delta = DeltaDocument::decode(data)?;
1567
1568        // Don't process our own documents
1569        if delta.origin_node == self.config.node_id {
1570            return None;
1571        }
1572
1573        // Apply operations to local state
1574        let mut counter_changed = false;
1575        let mut emergency_changed = false;
1576        let mut is_emergency = false;
1577        let mut is_ack = false;
1578        let mut event_timestamp = 0u64;
1579        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1580
1581        log::info!(
1582            "Delta document from {:08X}: {} operations, data_len={}",
1583            delta.origin_node.as_u32(),
1584            delta.operations.len(),
1585            data.len()
1586        );
1587        for op in &delta.operations {
1588            log::info!("  Operation: {}", op.key());
1589            match op {
1590                Operation::IncrementCounter {
1591                    node_id, amount, ..
1592                } => {
1593                    // Merge counter value (take max)
1594                    let current = self.document_sync.counter_entries();
1595                    let current_value = current
1596                        .iter()
1597                        .find(|(id, _)| *id == node_id.as_u32())
1598                        .map(|(_, v)| *v)
1599                        .unwrap_or(0);
1600
1601                    if *amount > current_value {
1602                        // Need to merge - this is handled by the counter merge logic
1603                        // For now, we record that counter changed
1604                        counter_changed = true;
1605                    }
1606                }
1607                Operation::UpdatePeripheral {
1608                    peripheral,
1609                    timestamp,
1610                } => {
1611                    // Store peer peripheral for callsign lookup
1612                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1613                        peripherals.insert(delta.origin_node, peripheral.clone());
1614                    }
1615                    // Track the peripheral for the result
1616                    peer_peripheral = Some(peripheral.clone());
1617                    // Track the timestamp for the result
1618                    if *timestamp > event_timestamp {
1619                        event_timestamp = *timestamp;
1620                    }
1621                }
1622                Operation::SetEmergency { timestamp, .. } => {
1623                    is_emergency = true;
1624                    emergency_changed = true;
1625                    event_timestamp = *timestamp;
1626                }
1627                Operation::AckEmergency {
1628                    emergency_timestamp,
1629                    ..
1630                } => {
1631                    is_ack = true;
1632                    emergency_changed = true;
1633                    if *emergency_timestamp > event_timestamp {
1634                        event_timestamp = *emergency_timestamp;
1635                    }
1636                }
1637                Operation::ClearEmergency {
1638                    emergency_timestamp,
1639                } => {
1640                    emergency_changed = true;
1641                    if *emergency_timestamp > event_timestamp {
1642                        event_timestamp = *emergency_timestamp;
1643                    }
1644                }
1645                Operation::App(app_op) => {
1646                    // Handle app-layer document operation
1647                    //
1648                    // The timestamp field may contain version info in the upper bits
1649                    // (used by delta encoder for change detection). Extract the
1650                    // original document timestamp from the lower 48 bits.
1651                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1652
1653                    log::info!(
1654                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1655                        app_op.type_id,
1656                        app_op.op_code,
1657                        app_op.source_node,
1658                        doc_timestamp,
1659                        app_op.payload.len()
1660                    );
1661
1662                    // Try to find/create document and apply the delta operation
1663                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1664                    let changed = {
1665                        let mut docs = self.app_documents.write().unwrap();
1666
1667                        if let Some(existing) = docs.get_mut(&doc_key) {
1668                            // Apply delta to existing document (merge via CRDT)
1669                            self.document_registry.apply_delta_op(
1670                                app_op.type_id,
1671                                existing.as_mut(),
1672                                app_op,
1673                            )
1674                        } else {
1675                            // Try to decode from payload (for FULL_STATE operations)
1676                            if let Some(decoded) = self
1677                                .document_registry
1678                                .decode(app_op.type_id, &app_op.payload)
1679                            {
1680                                docs.insert(doc_key, decoded);
1681                                true
1682                            } else {
1683                                // For delta ops on unknown documents, we can't apply
1684                                // The full state will be sent later
1685                                log::debug!(
1686                                    "Received delta for unknown doc {:?}, waiting for full state",
1687                                    doc_key
1688                                );
1689                                false
1690                            }
1691                        }
1692                    };
1693
1694                    // Emit observer event with the real document timestamp
1695                    self.observers.notify(PeatEvent::app_document_received(
1696                        app_op.type_id,
1697                        NodeId::new(app_op.source_node),
1698                        doc_timestamp,
1699                        changed,
1700                    ));
1701                }
1702            }
1703        }
1704
1705        // Record sync
1706        self.peer_manager.record_sync(source_node, now_ms);
1707
1708        // Record delta received
1709        {
1710            let mut encoder = self.delta_encoder.lock().unwrap();
1711            encoder.record_received(&source_node, data.len(), now_ms);
1712        }
1713
1714        // Generate events based on what was received
1715        if is_emergency {
1716            self.notify(PeatEvent::EmergencyReceived {
1717                from_node: delta.origin_node,
1718            });
1719        } else if is_ack {
1720            self.notify(PeatEvent::AckReceived {
1721                from_node: delta.origin_node,
1722            });
1723        }
1724
1725        if counter_changed {
1726            let total_count = self.document_sync.total_count();
1727            self.notify(PeatEvent::DocumentSynced {
1728                from_node: delta.origin_node,
1729                total_count,
1730            });
1731        }
1732
1733        // Emit relay event if we're relaying
1734        if relay_data.is_some() {
1735            let relay_targets = self.get_relay_targets(Some(source_node));
1736            self.notify(PeatEvent::MessageRelayed {
1737                origin_node: origin_node.unwrap_or(delta.origin_node),
1738                relay_count: relay_targets.len(),
1739                hop_count,
1740            });
1741        }
1742
1743        let PeripheralFields {
1744            callsign,
1745            battery_percent,
1746            heart_rate,
1747            event_type,
1748            latitude,
1749            longitude,
1750            altitude,
1751            activity_level,
1752            alerts,
1753        } = DataReceivedResult::peripheral_fields(&peer_peripheral);
1754
1755        Some(DataReceivedResult {
1756            source_node: delta.origin_node,
1757            is_emergency,
1758            is_ack,
1759            counter_changed,
1760            emergency_changed,
1761            total_count: self.document_sync.total_count(),
1762            event_timestamp,
1763            relay_data,
1764            origin_node,
1765            hop_count,
1766            callsign,
1767            battery_percent,
1768            heart_rate,
1769            event_type,
1770            latitude,
1771            longitude,
1772            altitude,
1773            activity_level,
1774            alerts,
1775            ..Default::default()
1776        })
1777    }
1778
1779    // ==================== Per-Peer E2EE ====================
1780
1781    /// Enable per-peer E2EE capability
1782    ///
1783    /// Creates a new identity key for this node. This allows establishing
1784    /// encrypted sessions with specific peers where only the sender and
1785    /// recipient can read messages (other mesh members cannot).
1786    pub fn enable_peer_e2ee(&self) {
1787        let mut sessions = self.peer_sessions.lock().unwrap();
1788        if sessions.is_none() {
1789            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1790            log::info!(
1791                "Per-peer E2EE enabled for node {:08X}",
1792                self.config.node_id.as_u32()
1793            );
1794        }
1795    }
1796
1797    /// Disable per-peer E2EE capability
1798    ///
1799    /// Clears all peer sessions and disables E2EE.
1800    pub fn disable_peer_e2ee(&self) {
1801        let mut sessions = self.peer_sessions.lock().unwrap();
1802        *sessions = None;
1803        log::info!("Per-peer E2EE disabled");
1804    }
1805
1806    /// Check if per-peer E2EE is enabled
1807    pub fn is_peer_e2ee_enabled(&self) -> bool {
1808        self.peer_sessions.lock().unwrap().is_some()
1809    }
1810
1811    /// Get our E2EE public key (for sharing with peers)
1812    ///
1813    /// Returns None if per-peer E2EE is not enabled.
1814    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1815        self.peer_sessions
1816            .lock()
1817            .unwrap()
1818            .as_ref()
1819            .map(|s| s.our_public_key())
1820    }
1821
1822    /// Initiate E2EE session with a specific peer
1823    ///
1824    /// Returns the key exchange message bytes to send to the peer.
1825    /// The message should be broadcast/sent to the peer.
1826    /// Returns None if per-peer E2EE is not enabled.
1827    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1828        let mut sessions = self.peer_sessions.lock().unwrap();
1829        let session_mgr = sessions.as_mut()?;
1830
1831        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1832        let mut buf = Vec::with_capacity(2 + 37);
1833        buf.push(KEY_EXCHANGE_MARKER);
1834        buf.push(0x00); // reserved
1835        buf.extend_from_slice(&key_exchange.encode());
1836
1837        log::info!(
1838            "Initiated E2EE session with peer {:08X}",
1839            peer_node_id.as_u32()
1840        );
1841        Some(buf)
1842    }
1843
1844    /// Check if we have an established E2EE session with a peer
1845    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1846        self.peer_sessions
1847            .lock()
1848            .unwrap()
1849            .as_ref()
1850            .is_some_and(|s| s.has_session(peer_node_id))
1851    }
1852
1853    /// Get E2EE session state with a peer
1854    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1855        self.peer_sessions
1856            .lock()
1857            .unwrap()
1858            .as_ref()
1859            .and_then(|s| s.session_state(peer_node_id))
1860    }
1861
1862    /// Send an E2EE encrypted message to a specific peer
1863    ///
1864    /// Returns the encrypted message bytes to send, or None if no session exists.
1865    /// The message should be sent directly to the peer (not broadcast).
1866    pub fn send_peer_e2ee(
1867        &self,
1868        peer_node_id: NodeId,
1869        plaintext: &[u8],
1870        now_ms: u64,
1871    ) -> Option<Vec<u8>> {
1872        let mut sessions = self.peer_sessions.lock().unwrap();
1873        let session_mgr = sessions.as_mut()?;
1874
1875        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1876            Ok(encrypted) => {
1877                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1878                buf.push(PEER_E2EE_MARKER);
1879                buf.push(0x00); // reserved
1880                buf.extend_from_slice(&encrypted.encode());
1881                Some(buf)
1882            }
1883            Err(e) => {
1884                log::warn!(
1885                    "Failed to encrypt for peer {:08X}: {:?}",
1886                    peer_node_id.as_u32(),
1887                    e
1888                );
1889                None
1890            }
1891        }
1892    }
1893
1894    /// Close E2EE session with a peer
1895    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1896        let mut sessions = self.peer_sessions.lock().unwrap();
1897        if let Some(session_mgr) = sessions.as_mut() {
1898            session_mgr.close_session(peer_node_id);
1899            self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1900            log::info!(
1901                "Closed E2EE session with peer {:08X}",
1902                peer_node_id.as_u32()
1903            );
1904        }
1905    }
1906
1907    /// Get count of active E2EE sessions
1908    pub fn peer_e2ee_session_count(&self) -> usize {
1909        self.peer_sessions
1910            .lock()
1911            .unwrap()
1912            .as_ref()
1913            .map(|s| s.session_count())
1914            .unwrap_or(0)
1915    }
1916
1917    /// Get count of established E2EE sessions
1918    pub fn peer_e2ee_established_count(&self) -> usize {
1919        self.peer_sessions
1920            .lock()
1921            .unwrap()
1922            .as_ref()
1923            .map(|s| s.established_count())
1924            .unwrap_or(0)
1925    }
1926
1927    /// Handle incoming key exchange message
1928    ///
1929    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1930    /// Returns the response key exchange bytes to send back, or None if invalid.
1931    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1932        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1933            return None;
1934        }
1935
1936        let payload = &data[2..];
1937        let msg = KeyExchangeMessage::decode(payload)?;
1938
1939        let mut sessions = self.peer_sessions.lock().unwrap();
1940        let session_mgr = sessions.as_mut()?;
1941
1942        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1943
1944        if established {
1945            self.notify(PeatEvent::PeerE2eeEstablished {
1946                peer_node_id: msg.sender_node_id,
1947            });
1948            log::info!(
1949                "E2EE session established with peer {:08X}",
1950                msg.sender_node_id.as_u32()
1951            );
1952        }
1953
1954        // Return response key exchange
1955        let mut buf = Vec::with_capacity(2 + 37);
1956        buf.push(KEY_EXCHANGE_MARKER);
1957        buf.push(0x00);
1958        buf.extend_from_slice(&response.encode());
1959        Some(buf)
1960    }
1961
1962    /// Handle incoming E2EE encrypted message
1963    ///
1964    /// Called internally when we receive a PEER_E2EE_MARKER message.
1965    /// Decrypts and notifies observers of the received message.
1966    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1967        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1968            return None;
1969        }
1970
1971        let payload = &data[2..];
1972        let msg = PeerEncryptedMessage::decode(payload)?;
1973
1974        let mut sessions = self.peer_sessions.lock().unwrap();
1975        let session_mgr = sessions.as_mut()?;
1976
1977        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1978            Ok(plaintext) => {
1979                // Notify observers of the decrypted message
1980                self.notify(PeatEvent::PeerE2eeMessageReceived {
1981                    from_node: msg.sender_node_id,
1982                    data: plaintext.clone(),
1983                });
1984                Some(plaintext)
1985            }
1986            Err(e) => {
1987                log::warn!(
1988                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1989                    msg.sender_node_id.as_u32(),
1990                    e
1991                );
1992                None
1993            }
1994        }
1995    }
1996
1997    // ==================== Configuration ====================
1998
1999    /// Get our node ID
2000    pub fn node_id(&self) -> NodeId {
2001        self.config.node_id
2002    }
2003
2004    /// Get our callsign
2005    pub fn callsign(&self) -> &str {
2006        &self.config.callsign
2007    }
2008
2009    /// Get the mesh ID
2010    pub fn mesh_id(&self) -> &str {
2011        &self.config.mesh_id
2012    }
2013
2014    /// Get the device name for BLE advertising
2015    pub fn device_name(&self) -> String {
2016        format!(
2017            "PEAT_{}-{:08X}",
2018            self.config.mesh_id,
2019            self.config.node_id.as_u32()
2020        )
2021    }
2022
2023    /// Get a peer's callsign by node ID
2024    ///
2025    /// Returns the callsign from the peer's most recently received peripheral data,
2026    /// or None if no peripheral data has been received from this peer.
2027    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
2028        self.peer_peripherals.read().ok().and_then(|peripherals| {
2029            peripherals
2030                .get(&node_id)
2031                .map(|p| p.callsign_str().to_string())
2032        })
2033    }
2034
2035    /// Get a peer's full peripheral data by node ID
2036    ///
2037    /// Returns a clone of the peripheral data from the peer's most recently received
2038    /// document, or None if no peripheral data has been received from this peer.
2039    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
2040        self.peer_peripherals
2041            .read()
2042            .ok()
2043            .and_then(|peripherals| peripherals.get(&node_id).cloned())
2044    }
2045
2046    // ==================== Document Registry ====================
2047
2048    /// Get a reference to the document registry.
2049    ///
2050    /// Use this to register custom document types for mesh synchronization.
2051    ///
2052    /// # Example
2053    ///
2054    /// ```ignore
2055    /// use peat_btle::registry::DocumentType;
2056    ///
2057    /// // Register a custom document type
2058    /// mesh.document_registry().register::<MyCustomDocument>();
2059    /// ```
2060    pub fn document_registry(&self) -> &DocumentRegistry {
2061        &self.document_registry
2062    }
2063
2064    /// Store an app-layer document.
2065    ///
2066    /// If a document with the same identity (type_id, source_node, timestamp)
2067    /// already exists, it will be merged using CRDT semantics.
2068    ///
2069    /// Returns true if the document was newly added or changed via merge.
2070    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2071        let type_id = T::TYPE_ID;
2072        let (source_node, timestamp) = doc.identity();
2073        let key = (type_id, source_node, timestamp);
2074
2075        let mut docs = self.app_documents.write().unwrap();
2076
2077        if let Some(existing) = docs.get_mut(&key) {
2078            // Merge with existing document
2079            self.document_registry
2080                .merge(type_id, existing.as_mut(), &doc)
2081        } else {
2082            // Insert new document
2083            docs.insert(key, Box::new(doc));
2084            true
2085        }
2086    }
2087
2088    /// Store a type-erased app-layer document.
2089    ///
2090    /// Used when receiving documents from the network where the type is
2091    /// determined at runtime.
2092    ///
2093    /// Returns true if the document was newly added or changed via merge.
2094    pub fn store_app_document_boxed(
2095        &self,
2096        type_id: u8,
2097        source_node: u32,
2098        timestamp: u64,
2099        doc: Box<dyn core::any::Any + Send + Sync>,
2100    ) -> bool {
2101        let key = (type_id, source_node, timestamp);
2102
2103        let mut docs = self.app_documents.write().unwrap();
2104
2105        if let Some(existing) = docs.get_mut(&key) {
2106            // Merge with existing document
2107            self.document_registry
2108                .merge(type_id, existing.as_mut(), doc.as_ref())
2109        } else {
2110            // Insert new document
2111            docs.insert(key, doc);
2112            true
2113        }
2114    }
2115
2116    /// Get a stored app-layer document by identity.
2117    ///
2118    /// Returns None if not found or if downcast to T fails.
2119    pub fn get_app_document<T: crate::registry::DocumentType>(
2120        &self,
2121        source_node: u32,
2122        timestamp: u64,
2123    ) -> Option<T> {
2124        let key = (T::TYPE_ID, source_node, timestamp);
2125
2126        let docs = self.app_documents.read().unwrap();
2127        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2128    }
2129
2130    /// Get all stored app-layer documents of a specific type.
2131    ///
2132    /// Returns a vector of all documents of type T currently stored.
2133    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2134        let docs = self.app_documents.read().unwrap();
2135        docs.iter()
2136            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2137            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2138            .collect()
2139    }
2140
2141    /// Get delta operations for all stored app documents.
2142    ///
2143    /// Used during sync to include app documents in the delta stream.
2144    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2145        let docs = self.app_documents.read().unwrap();
2146        let mut ops = Vec::new();
2147
2148        for ((type_id, _source, _ts), doc) in docs.iter() {
2149            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2150                ops.push(op);
2151            }
2152        }
2153
2154        ops
2155    }
2156
2157    /// Get all document keys for a given type.
2158    ///
2159    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
2160    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2161        let docs = self.app_documents.read().unwrap();
2162        docs.keys()
2163            .filter(|(tid, _, _)| *tid == type_id)
2164            .map(|(_, source, ts)| (*source, *ts))
2165            .collect()
2166    }
2167
2168    /// Get the number of stored app documents.
2169    pub fn app_document_count(&self) -> usize {
2170        self.app_documents.read().unwrap().len()
2171    }
2172
2173    // ==================== Observer Management ====================
2174
2175    /// Add an observer for mesh events
2176    pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2177        self.observers.add(observer);
2178    }
2179
2180    /// Remove an observer
2181    pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2182        self.observers.remove(observer);
2183    }
2184
2185    // ==================== User Actions ====================
2186
2187    /// Send an emergency alert
2188    ///
2189    /// Returns the document bytes to broadcast to all peers.
2190    /// If encryption is enabled, the document is encrypted.
2191    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2192        let data = self.document_sync.send_emergency(timestamp);
2193        self.notify(PeatEvent::MeshStateChanged {
2194            peer_count: self.peer_manager.peer_count(),
2195            connected_count: self.peer_manager.connected_count(),
2196        });
2197        self.encrypt_document(&data)
2198    }
2199
2200    /// Send an ACK response
2201    ///
2202    /// Returns the document bytes to broadcast to all peers.
2203    /// If encryption is enabled, the document is encrypted.
2204    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2205        let data = self.document_sync.send_ack(timestamp);
2206        self.notify(PeatEvent::MeshStateChanged {
2207            peer_count: self.peer_manager.peer_count(),
2208            connected_count: self.peer_manager.connected_count(),
2209        });
2210        self.encrypt_document(&data)
2211    }
2212
2213    /// Broadcast arbitrary bytes over the mesh.
2214    ///
2215    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
2216    /// and returns bytes ready to send to all connected peers.
2217    ///
2218    /// This is useful for sending extension data like CannedMessages from peat-lite.
2219    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2220        self.encrypt_document(payload)
2221    }
2222
2223    /// Clear the current event (emergency or ack)
2224    pub fn clear_event(&self) {
2225        self.document_sync.clear_event();
2226    }
2227
2228    /// Check if emergency is active
2229    pub fn is_emergency_active(&self) -> bool {
2230        self.document_sync.is_emergency_active()
2231    }
2232
2233    /// Check if ACK is active
2234    pub fn is_ack_active(&self) -> bool {
2235        self.document_sync.is_ack_active()
2236    }
2237
2238    /// Get current event type
2239    pub fn current_event(&self) -> Option<EventType> {
2240        self.document_sync.current_event()
2241    }
2242
2243    // ==================== Emergency Management (Document-Based) ====================
2244
2245    /// Start a new emergency event with ACK tracking
2246    ///
2247    /// Creates an emergency event that tracks ACKs from all known peers.
2248    /// Pass the list of known peer node IDs to track.
2249    /// Returns the document bytes to broadcast.
2250    /// If encryption is enabled, the document is encrypted.
2251    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2252        let data = self.document_sync.start_emergency(timestamp, known_peers);
2253        self.notify(PeatEvent::MeshStateChanged {
2254            peer_count: self.peer_manager.peer_count(),
2255            connected_count: self.peer_manager.connected_count(),
2256        });
2257        self.encrypt_document(&data)
2258    }
2259
2260    /// Start a new emergency using all currently known peers
2261    ///
2262    /// Convenience method that automatically includes all discovered peers.
2263    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2264        let peers: Vec<u32> = self
2265            .peer_manager
2266            .get_peers()
2267            .iter()
2268            .map(|p| p.node_id.as_u32())
2269            .collect();
2270        self.start_emergency(timestamp, &peers)
2271    }
2272
2273    /// Record our ACK for the current emergency
2274    ///
2275    /// Returns the document bytes to broadcast, or None if no emergency is active.
2276    /// If encryption is enabled, the document is encrypted.
2277    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2278        let result = self.document_sync.ack_emergency(timestamp);
2279        if result.is_some() {
2280            self.notify(PeatEvent::MeshStateChanged {
2281                peer_count: self.peer_manager.peer_count(),
2282                connected_count: self.peer_manager.connected_count(),
2283            });
2284        }
2285        result.map(|data| self.encrypt_document(&data))
2286    }
2287
2288    /// Clear the current emergency event
2289    pub fn clear_emergency(&self) {
2290        self.document_sync.clear_emergency();
2291    }
2292
2293    /// Check if there's an active emergency
2294    pub fn has_active_emergency(&self) -> bool {
2295        self.document_sync.has_active_emergency()
2296    }
2297
2298    /// Get emergency status info
2299    ///
2300    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
2301    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2302        self.document_sync.get_emergency_status()
2303    }
2304
2305    /// Check if a specific peer has ACKed the current emergency
2306    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2307        self.document_sync.has_peer_acked(peer_id)
2308    }
2309
2310    /// Check if all peers have ACKed the current emergency
2311    pub fn all_peers_acked(&self) -> bool {
2312        self.document_sync.all_peers_acked()
2313    }
2314
2315    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
2316
2317    /// Send a chat message
2318    ///
2319    /// Adds the message to the local CRDT and returns the document bytes
2320    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
2321    ///
2322    /// Returns the encrypted document bytes if the message was new,
2323    /// or None if it was a duplicate.
2324    #[cfg(feature = "legacy-chat")]
2325    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2326        if self.document_sync.add_chat_message(sender, text, timestamp) {
2327            Some(self.encrypt_document(&self.build_document()))
2328        } else {
2329            None
2330        }
2331    }
2332
2333    /// Send a chat reply
2334    ///
2335    /// Adds the reply to the local CRDT with reply-to information and returns
2336    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
2337    ///
2338    /// Returns the encrypted document bytes if the message was new,
2339    /// or None if it was a duplicate.
2340    #[cfg(feature = "legacy-chat")]
2341    pub fn send_chat_reply(
2342        &self,
2343        sender: &str,
2344        text: &str,
2345        reply_to_node: u32,
2346        reply_to_timestamp: u64,
2347        timestamp: u64,
2348    ) -> Option<Vec<u8>> {
2349        if self.document_sync.add_chat_reply(
2350            sender,
2351            text,
2352            reply_to_node,
2353            reply_to_timestamp,
2354            timestamp,
2355        ) {
2356            Some(self.encrypt_document(&self.build_document()))
2357        } else {
2358            None
2359        }
2360    }
2361
2362    /// Get the number of chat messages in the local CRDT
2363    #[cfg(feature = "legacy-chat")]
2364    pub fn chat_count(&self) -> usize {
2365        self.document_sync.chat_count()
2366    }
2367
2368    /// Get chat messages newer than a timestamp
2369    ///
2370    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2371    #[cfg(feature = "legacy-chat")]
2372    pub fn chat_messages_since(
2373        &self,
2374        since_timestamp: u64,
2375    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2376        self.document_sync.chat_messages_since(since_timestamp)
2377    }
2378
2379    /// Get all chat messages
2380    ///
2381    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2382    #[cfg(feature = "legacy-chat")]
2383    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2384        self.document_sync.all_chat_messages()
2385    }
2386
2387    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2388
2389    /// Called when a BLE device is discovered
2390    ///
2391    /// Returns `Some(PeatPeer)` if this is a new Peat peer on our mesh.
2392    pub fn on_ble_discovered(
2393        &self,
2394        identifier: &str,
2395        name: Option<&str>,
2396        rssi: i8,
2397        mesh_id: Option<&str>,
2398        now_ms: u64,
2399    ) -> Option<PeatPeer> {
2400        let (node_id, is_new) = self
2401            .peer_manager
2402            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2403
2404        let peer = self.peer_manager.get_peer(node_id)?;
2405
2406        // Update connection graph
2407        {
2408            let mut graph = self.connection_graph.lock().unwrap();
2409            graph.on_discovered(
2410                node_id,
2411                identifier.to_string(),
2412                name.map(|s| s.to_string()),
2413                mesh_id.map(|s| s.to_string()),
2414                rssi,
2415                now_ms,
2416            );
2417        }
2418
2419        if is_new {
2420            self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2421            self.notify_mesh_state_changed();
2422        }
2423
2424        Some(peer)
2425    }
2426
2427    /// Called when a BLE connection is established (outgoing)
2428    ///
2429    /// Returns the NodeId if this identifier is known.
2430    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2431        let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2432            Some(id) => id,
2433            None => {
2434                log::warn!(
2435                    "on_ble_connected: identifier {:?} not in peer map — \
2436                     use on_incoming_connection() for peripheral connections",
2437                    identifier
2438                );
2439                return None;
2440            }
2441        };
2442
2443        // Update connection graph
2444        {
2445            let mut graph = self.connection_graph.lock().unwrap();
2446            graph.on_connected(node_id, now_ms);
2447        }
2448
2449        // Register peer for delta sync tracking
2450        self.register_peer_for_delta(&node_id);
2451
2452        self.notify(PeatEvent::PeerConnected { node_id });
2453        self.notify_mesh_state_changed();
2454        Some(node_id)
2455    }
2456
2457    /// Called when a BLE connection is lost
2458    pub fn on_ble_disconnected(
2459        &self,
2460        identifier: &str,
2461        reason: DisconnectReason,
2462    ) -> Option<NodeId> {
2463        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2464
2465        // Update connection graph (convert observer reason to platform reason)
2466        {
2467            let mut graph = self.connection_graph.lock().unwrap();
2468            let platform_reason = match observer_reason {
2469                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2470                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2471                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2472                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2473                DisconnectReason::ConnectionFailed => {
2474                    crate::platform::DisconnectReason::ConnectionFailed
2475                }
2476                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2477            };
2478            let now_ms = std::time::SystemTime::now()
2479                .duration_since(std::time::UNIX_EPOCH)
2480                .map(|d| d.as_millis() as u64)
2481                .unwrap_or(0);
2482            graph.on_disconnected(node_id, platform_reason, now_ms);
2483
2484            // Remove indirect peer paths that went through this peer
2485            // These paths may no longer be valid since the direct connection is lost
2486            graph.remove_via_peer(node_id);
2487        }
2488
2489        // Unregister peer from delta sync tracking
2490        self.unregister_peer_for_delta(&node_id);
2491
2492        self.notify(PeatEvent::PeerDisconnected {
2493            node_id,
2494            reason: observer_reason,
2495        });
2496        self.notify_mesh_state_changed();
2497        Some(node_id)
2498    }
2499
2500    /// Called when a BLE connection is lost, using NodeId directly
2501    ///
2502    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2503    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2504        if self
2505            .peer_manager
2506            .on_disconnected_by_node_id(node_id, reason)
2507        {
2508            // Update connection graph
2509            {
2510                let mut graph = self.connection_graph.lock().unwrap();
2511                let platform_reason = match reason {
2512                    DisconnectReason::LocalRequest => {
2513                        crate::platform::DisconnectReason::LocalRequest
2514                    }
2515                    DisconnectReason::RemoteRequest => {
2516                        crate::platform::DisconnectReason::RemoteRequest
2517                    }
2518                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2519                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2520                    DisconnectReason::ConnectionFailed => {
2521                        crate::platform::DisconnectReason::ConnectionFailed
2522                    }
2523                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2524                };
2525                let now_ms = std::time::SystemTime::now()
2526                    .duration_since(std::time::UNIX_EPOCH)
2527                    .map(|d| d.as_millis() as u64)
2528                    .unwrap_or(0);
2529                graph.on_disconnected(node_id, platform_reason, now_ms);
2530
2531                // Remove indirect peer paths that went through this peer
2532                graph.remove_via_peer(node_id);
2533            }
2534
2535            // Unregister peer from delta sync tracking
2536            self.unregister_peer_for_delta(&node_id);
2537
2538            self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2539            self.notify_mesh_state_changed();
2540        }
2541    }
2542
2543    /// Called when a remote device connects to us (incoming connection)
2544    ///
2545    /// Use this when we're acting as a peripheral and a central connects to us.
2546    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2547        let is_new = self
2548            .peer_manager
2549            .on_incoming_connection(identifier, node_id, now_ms);
2550
2551        // Update connection graph
2552        {
2553            let mut graph = self.connection_graph.lock().unwrap();
2554            if is_new {
2555                graph.on_discovered(
2556                    node_id,
2557                    identifier.to_string(),
2558                    None,
2559                    Some(self.config.mesh_id.clone()),
2560                    -50, // Default good RSSI for incoming connections
2561                    now_ms,
2562                );
2563            }
2564            graph.on_connected(node_id, now_ms);
2565        }
2566
2567        // Register peer for delta sync tracking
2568        self.register_peer_for_delta(&node_id);
2569
2570        if is_new {
2571            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2572                self.notify(PeatEvent::PeerDiscovered { peer });
2573            }
2574        }
2575
2576        self.notify(PeatEvent::PeerConnected { node_id });
2577        self.notify_mesh_state_changed();
2578
2579        is_new
2580    }
2581
2582    /// Called when data is received from a peer
2583    ///
2584    /// Parses the document, merges it, and generates appropriate events.
2585    /// If encryption is enabled, decrypts the document first.
2586    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2587    /// Returns the source NodeId and whether the document contained an event.
2588    pub fn on_ble_data_received(
2589        &self,
2590        identifier: &str,
2591        data: &[u8],
2592        now_ms: u64,
2593    ) -> Option<DataReceivedResult> {
2594        // Get node ID from identifier
2595        let node_id = self.peer_manager.get_node_id(identifier)?;
2596
2597        // Check for special message types first
2598        if data.len() >= 2 {
2599            match data[0] {
2600                KEY_EXCHANGE_MARKER => {
2601                    // Handle key exchange - returns response to send back
2602                    let _response = self.handle_key_exchange(data, now_ms);
2603                    // Return None as this isn't a document sync
2604                    return None;
2605                }
2606                PEER_E2EE_MARKER => {
2607                    // Handle encrypted peer message
2608                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2609                    // Return None as this isn't a document sync
2610                    return None;
2611                }
2612                RELAY_ENVELOPE_MARKER => {
2613                    // Handle relay envelope for multi-hop
2614                    return self
2615                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2616                }
2617                _ => {}
2618            }
2619        }
2620
2621        // Direct document (not relay envelope)
2622        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2623    }
2624
2625    /// Internal: Process document data with identifier as source hint
2626    #[allow(clippy::too_many_arguments)]
2627    fn process_document_data_with_identifier(
2628        &self,
2629        source_node: NodeId,
2630        identifier: &str,
2631        data: &[u8],
2632        now_ms: u64,
2633        relay_data: Option<Vec<u8>>,
2634        origin_node: Option<NodeId>,
2635        hop_count: u8,
2636    ) -> Option<DataReceivedResult> {
2637        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2638        let decrypted = self.decrypt_document(data, Some(identifier))?;
2639
2640        // Universal Document transport (peat-lite envelope, magic
2641        // prefix b"PEAT" / 0x50). Checked BEFORE 0xB6 translator
2642        // dispatch so a peat-lite frame doesn't get mis-decoded as a
2643        // translator frame (the magic byte 0x50 is outside the
2644        // 0xB6-0xBF translator range, but the routing fall-through
2645        // would otherwise reach delta-document handling and fail
2646        // there). Population is feature-gated; the field
2647        // `peat_lite_document` on `DataReceivedResult` is in the
2648        // binding shape unconditionally.
2649        #[cfg(feature = "peat-lite-frame")]
2650        match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
2651            crate::peat_lite_frame::PeatLiteFrameOutcome::NotPeatLiteFrame => {}
2652            crate::peat_lite_frame::PeatLiteFrameOutcome::Handled => return None,
2653            crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(frame) => {
2654                return Some(DataReceivedResult {
2655                    source_node,
2656                    peat_lite_document: Some(frame),
2657                    ..Default::default()
2658                });
2659            }
2660        }
2661
2662        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
2663        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
2664        // Amendment 3: surface decoded frames via the polled
2665        // `decoded_translator_frame` field on the returned result so
2666        // hosts (peat-atak-plugin et al.) don't depend on the JNA-broken
2667        // UniFFI callback path in ATAK.
2668        #[cfg(feature = "translator-codec")]
2669        match self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2670            TranslatorMarkerOutcome::NotTranslatorMarker => {}
2671            TranslatorMarkerOutcome::Handled => return None,
2672            TranslatorMarkerOutcome::Decoded(frame) => {
2673                return Some(DataReceivedResult::translator_frame(source_node, frame));
2674            }
2675        }
2676
2677        // Check if this is a delta document (wire format v2)
2678        if DeltaDocument::is_delta_document(&decrypted) {
2679            return self.process_delta_document_internal(
2680                source_node,
2681                &decrypted,
2682                now_ms,
2683                relay_data,
2684                origin_node,
2685                hop_count,
2686            );
2687        }
2688
2689        // Merge the document (legacy wire format v1)
2690        let result = self.document_sync.merge_document(&decrypted)?;
2691
2692        // Store peer peripheral if present (for callsign lookup)
2693        if let Some(ref peripheral) = result.peer_peripheral {
2694            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2695                peripherals.insert(result.source_node, peripheral.clone());
2696            }
2697        }
2698
2699        // Record sync
2700        self.peer_manager.record_sync(source_node, now_ms);
2701
2702        // Generate events based on what was received
2703        if result.is_emergency() {
2704            self.notify(PeatEvent::EmergencyReceived {
2705                from_node: result.source_node,
2706            });
2707        } else if result.is_ack() {
2708            self.notify(PeatEvent::AckReceived {
2709                from_node: result.source_node,
2710            });
2711        }
2712
2713        if result.counter_changed {
2714            self.notify(PeatEvent::DocumentSynced {
2715                from_node: result.source_node,
2716                total_count: result.total_count,
2717            });
2718        }
2719
2720        // Emit relay event if we're relaying
2721        if relay_data.is_some() {
2722            let relay_targets = self.get_relay_targets(Some(source_node));
2723            self.notify(PeatEvent::MessageRelayed {
2724                origin_node: origin_node.unwrap_or(result.source_node),
2725                relay_count: relay_targets.len(),
2726                hop_count,
2727            });
2728        }
2729
2730        let PeripheralFields {
2731            callsign,
2732            battery_percent,
2733            heart_rate,
2734            event_type,
2735            latitude,
2736            longitude,
2737            altitude,
2738            activity_level,
2739            alerts,
2740        } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2741
2742        Some(DataReceivedResult {
2743            source_node: result.source_node,
2744            is_emergency: result.is_emergency(),
2745            is_ack: result.is_ack(),
2746            counter_changed: result.counter_changed,
2747            emergency_changed: result.emergency_changed,
2748            total_count: result.total_count,
2749            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2750            relay_data,
2751            origin_node,
2752            hop_count,
2753            callsign,
2754            battery_percent,
2755            heart_rate,
2756            event_type,
2757            latitude,
2758            longitude,
2759            altitude,
2760            activity_level,
2761            alerts,
2762            ..Default::default()
2763        })
2764    }
2765
2766    /// Internal: Handle relay envelope with identifier as source hint
2767    fn handle_relay_envelope_with_identifier(
2768        &self,
2769        source_node: NodeId,
2770        identifier: &str,
2771        data: &[u8],
2772        now_ms: u64,
2773    ) -> Option<DataReceivedResult> {
2774        // Process the relay envelope
2775        let envelope = RelayEnvelope::decode(data)?;
2776
2777        // Check deduplication
2778        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2779            let stats = self
2780                .seen_cache
2781                .lock()
2782                .unwrap()
2783                .get_stats(&envelope.message_id);
2784            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2785
2786            self.notify(PeatEvent::DuplicateMessageDropped {
2787                origin_node: envelope.origin_node,
2788                seen_count,
2789            });
2790            return None;
2791        }
2792
2793        // Check TTL and get relay data
2794        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2795            envelope.relay().map(|e| e.encode())
2796        } else {
2797            if !envelope.can_relay() {
2798                self.notify(PeatEvent::MessageTtlExpired {
2799                    origin_node: envelope.origin_node,
2800                    hop_count: envelope.hop_count,
2801                });
2802            }
2803            None
2804        };
2805
2806        // Process the inner payload
2807        self.process_document_data_with_identifier(
2808            source_node,
2809            identifier,
2810            &envelope.payload,
2811            now_ms,
2812            relay_data,
2813            Some(envelope.origin_node),
2814            envelope.hop_count,
2815        )
2816    }
2817
2818    /// Called when data is received but we don't have the identifier mapped
2819    ///
2820    /// Use this when receiving data from a peripheral we discovered.
2821    /// If encryption is enabled, decrypts the document first.
2822    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2823    /// Handles relay envelopes for multi-hop mesh operation.
2824    pub fn on_ble_data_received_from_node(
2825        &self,
2826        node_id: NodeId,
2827        data: &[u8],
2828        now_ms: u64,
2829    ) -> Option<DataReceivedResult> {
2830        // Check for special message types first
2831        if data.len() >= 2 {
2832            match data[0] {
2833                KEY_EXCHANGE_MARKER => {
2834                    let _response = self.handle_key_exchange(data, now_ms);
2835                    return None;
2836                }
2837                PEER_E2EE_MARKER => {
2838                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2839                    return None;
2840                }
2841                RELAY_ENVELOPE_MARKER => {
2842                    // Handle relay envelope for multi-hop
2843                    return self.handle_relay_envelope(node_id, data, now_ms);
2844                }
2845                _ => {}
2846            }
2847        }
2848
2849        // Direct document (not relay envelope)
2850        self.process_document_data(node_id, data, now_ms, None, None, 0)
2851    }
2852
2853    /// Called when encrypted data is received from an unknown peer
2854    ///
2855    /// This handles the case where we receive an encrypted document from a BLE address
2856    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2857    /// The function decrypts first using the mesh key, then extracts the source_node
2858    /// from the decrypted document header and registers the peer.
2859    ///
2860    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2861    /// Returns `None` if decryption fails or the document is invalid.
2862    pub fn on_ble_data_received_anonymous(
2863        &self,
2864        identifier: &str,
2865        data: &[u8],
2866        now_ms: u64,
2867    ) -> Option<DataReceivedResult> {
2868        log::debug!(
2869            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2870            identifier,
2871            data.len(),
2872            data.first().copied().unwrap_or(0)
2873        );
2874
2875        // Try to decrypt (handles both encrypted and unencrypted documents)
2876        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2877            Some(d) => d,
2878            None => {
2879                log::warn!(
2880                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2881                    data.len(),
2882                    identifier
2883                );
2884                return None;
2885            }
2886        };
2887
2888        // Universal Document transport (peat-lite envelope) — checked
2889        // before 0xB6 translator dispatch. Anonymous-path: the
2890        // peat-lite header itself carries source_node_id, so we can
2891        // surface it on the returned result even though the outer
2892        // identifier didn't yield a NodeId.
2893        #[cfg(feature = "peat-lite-frame")]
2894        match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
2895            crate::peat_lite_frame::PeatLiteFrameOutcome::NotPeatLiteFrame => {}
2896            crate::peat_lite_frame::PeatLiteFrameOutcome::Handled => return None,
2897            crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(frame) => {
2898                let source = NodeId::new(frame.source_node_id);
2899                return Some(DataReceivedResult {
2900                    source_node: source,
2901                    peat_lite_document: Some(frame),
2902                    ..Default::default()
2903                });
2904            }
2905        }
2906
2907        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6
2908        // translator frames through BleTranslator::decode_inbound, and
2909        // silent-drop reserved 0xB7..=0xBF frames before they reach
2910        // PeatDocument::decode (where a 0xB6+ payload could hit the
2911        // GCounter-pollution hazard for inputs whose bytes 8..11 fall
2912        // <= MAX_COUNTER_ENTRIES). Anonymous receive: source_node is
2913        // unknown until the legacy path extracts it; for translator
2914        // frames there's no PeatDocument header to extract from, so we
2915        // pass None and accept that `tracks` decoding will Err for lack
2916        // of `local_wire_id` (other collections succeed).
2917        #[cfg(feature = "translator-codec")]
2918        match self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2919            TranslatorMarkerOutcome::NotTranslatorMarker => {}
2920            TranslatorMarkerOutcome::Handled => return None,
2921            TranslatorMarkerOutcome::Decoded(frame) => {
2922                // Anonymous path has no source_node yet — translator
2923                // frames carry no PeatDocument header. Use a placeholder
2924                // NodeId; the meaningful payload rides
2925                // `decoded_translator_frame` per Amendment 3.
2926                return Some(DataReceivedResult::translator_frame(NodeId::new(0), frame));
2927            }
2928        }
2929
2930        // Extract source_node from decrypted document header
2931        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2932        if decrypted.len() < 8 {
2933            log::warn!("Decrypted document too short to extract source_node");
2934            return None;
2935        }
2936
2937        let source_node_u32 =
2938            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2939        let source_node = NodeId::new(source_node_u32);
2940
2941        log::info!(
2942            "Anonymous document from {}: source_node={:08X}, len={}",
2943            identifier,
2944            source_node_u32,
2945            decrypted.len()
2946        );
2947
2948        // Register the peer with this identifier so future lookups work
2949        // This handles BLE address rotation
2950        self.peer_manager
2951            .register_identifier(identifier, source_node);
2952
2953        // Check if this is a delta document
2954        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2955        log::info!(
2956            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2957            is_delta,
2958            decrypted.first().copied().unwrap_or(0),
2959            decrypted.len()
2960        );
2961
2962        if is_delta {
2963            return self.process_delta_document_internal(
2964                source_node,
2965                &decrypted,
2966                now_ms,
2967                None,
2968                None,
2969                0,
2970            );
2971        }
2972
2973        // Handle app-layer message (0xAF marker from peat-lite CannedMessages)
2974        // These are handled by consumers who register DocumentType implementations
2975        // via the DocumentRegistry. Pass through as relay_data for platform layer.
2976        const APP_LAYER_MARKER: u8 = 0xAF;
2977        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2978            log::debug!(
2979                "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2980                source_node.as_u32(),
2981                decrypted.len()
2982            );
2983            return Some(DataReceivedResult {
2984                source_node,
2985                is_emergency: false,
2986                is_ack: false,
2987                counter_changed: false,
2988                emergency_changed: false,
2989                total_count: 0,
2990                event_timestamp: now_ms,
2991                relay_data: Some(decrypted.to_vec()),
2992                origin_node: None,
2993                hop_count: 0,
2994                callsign: None,
2995                battery_percent: None,
2996                heart_rate: None,
2997                event_type: None,
2998                latitude: None,
2999                longitude: None,
3000                altitude: None,
3001                ..Default::default()
3002            });
3003        }
3004
3005        // Merge the document (legacy wire format v1)
3006        log::info!(
3007            "Processing legacy document from {:08X}",
3008            source_node.as_u32()
3009        );
3010        let result = self.document_sync.merge_document(&decrypted)?;
3011
3012        // Log what we got from the merge
3013        log::info!(
3014            "Merge result: peer_peripheral={}, counter_changed={}",
3015            result.peer_peripheral.is_some(),
3016            result.counter_changed
3017        );
3018        if let Some(ref p) = result.peer_peripheral {
3019            log::info!("Peripheral callsign: '{}'", p.callsign_str());
3020        }
3021
3022        // Record sync
3023        self.peer_manager.record_sync(source_node, now_ms);
3024
3025        // Generate events
3026        if result.is_emergency() {
3027            self.notify(PeatEvent::EmergencyReceived {
3028                from_node: result.source_node,
3029            });
3030        } else if result.is_ack() {
3031            self.notify(PeatEvent::AckReceived {
3032                from_node: result.source_node,
3033            });
3034        }
3035
3036        if result.counter_changed {
3037            self.notify(PeatEvent::DocumentSynced {
3038                from_node: result.source_node,
3039                total_count: result.total_count,
3040            });
3041        }
3042
3043        let PeripheralFields {
3044            callsign,
3045            battery_percent,
3046            heart_rate,
3047            event_type,
3048            latitude,
3049            longitude,
3050            altitude,
3051            activity_level,
3052            alerts,
3053        } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3054
3055        Some(DataReceivedResult {
3056            source_node: result.source_node,
3057            is_emergency: result.is_emergency(),
3058            is_ack: result.is_ack(),
3059            counter_changed: result.counter_changed,
3060            emergency_changed: result.emergency_changed,
3061            total_count: result.total_count,
3062            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3063            relay_data: None,
3064            origin_node: None,
3065            hop_count: 0,
3066            callsign,
3067            battery_percent,
3068            heart_rate,
3069            event_type,
3070            latitude,
3071            longitude,
3072            altitude,
3073            activity_level,
3074            alerts,
3075            ..Default::default()
3076        })
3077    }
3078
3079    /// Internal: Process document data (shared by direct and relay paths)
3080    fn process_document_data(
3081        &self,
3082        source_node: NodeId,
3083        data: &[u8],
3084        now_ms: u64,
3085        relay_data: Option<Vec<u8>>,
3086        origin_node: Option<NodeId>,
3087        hop_count: u8,
3088    ) -> Option<DataReceivedResult> {
3089        // Decrypt if encrypted (mesh-wide encryption)
3090        let source_hint = format!("node:{:08X}", source_node.as_u32());
3091        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
3092
3093        // Universal Document transport (peat-lite envelope) — checked
3094        // before 0xB6 translator dispatch.
3095        #[cfg(feature = "peat-lite-frame")]
3096        match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
3097            crate::peat_lite_frame::PeatLiteFrameOutcome::NotPeatLiteFrame => {}
3098            crate::peat_lite_frame::PeatLiteFrameOutcome::Handled => return None,
3099            crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(frame) => {
3100                return Some(DataReceivedResult {
3101                    source_node,
3102                    relay_data,
3103                    origin_node,
3104                    hop_count,
3105                    peat_lite_document: Some(frame),
3106                    ..Default::default()
3107                });
3108            }
3109        }
3110
3111        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
3112        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
3113        // Amendment 3: surface decoded frames via `decoded_translator_frame`.
3114        #[cfg(feature = "translator-codec")]
3115        match self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
3116            TranslatorMarkerOutcome::NotTranslatorMarker => {}
3117            TranslatorMarkerOutcome::Handled => return None,
3118            TranslatorMarkerOutcome::Decoded(frame) => {
3119                return Some(DataReceivedResult::translator_frame(source_node, frame));
3120            }
3121        }
3122
3123        // Check if this is a delta document (wire format v2)
3124        if DeltaDocument::is_delta_document(&decrypted) {
3125            return self.process_delta_document_internal(
3126                source_node,
3127                &decrypted,
3128                now_ms,
3129                relay_data,
3130                origin_node,
3131                hop_count,
3132            );
3133        }
3134
3135        // Merge the document (legacy wire format v1)
3136        let result = self.document_sync.merge_document(&decrypted)?;
3137
3138        // Store peer peripheral if present (for callsign lookup)
3139        if let Some(ref peripheral) = result.peer_peripheral {
3140            if let Ok(mut peripherals) = self.peer_peripherals.write() {
3141                peripherals.insert(result.source_node, peripheral.clone());
3142            }
3143        }
3144
3145        // Record sync
3146        self.peer_manager.record_sync(source_node, now_ms);
3147
3148        // Generate events based on what was received
3149        if result.is_emergency() {
3150            self.notify(PeatEvent::EmergencyReceived {
3151                from_node: result.source_node,
3152            });
3153        } else if result.is_ack() {
3154            self.notify(PeatEvent::AckReceived {
3155                from_node: result.source_node,
3156            });
3157        }
3158
3159        if result.counter_changed {
3160            self.notify(PeatEvent::DocumentSynced {
3161                from_node: result.source_node,
3162                total_count: result.total_count,
3163            });
3164        }
3165
3166        // Emit relay event if we're relaying
3167        if relay_data.is_some() {
3168            let relay_targets = self.get_relay_targets(Some(source_node));
3169            self.notify(PeatEvent::MessageRelayed {
3170                origin_node: origin_node.unwrap_or(result.source_node),
3171                relay_count: relay_targets.len(),
3172                hop_count,
3173            });
3174        }
3175
3176        let PeripheralFields {
3177            callsign,
3178            battery_percent,
3179            heart_rate,
3180            event_type,
3181            latitude,
3182            longitude,
3183            altitude,
3184            activity_level,
3185            alerts,
3186        } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3187
3188        Some(DataReceivedResult {
3189            source_node: result.source_node,
3190            is_emergency: result.is_emergency(),
3191            is_ack: result.is_ack(),
3192            counter_changed: result.counter_changed,
3193            emergency_changed: result.emergency_changed,
3194            total_count: result.total_count,
3195            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3196            relay_data,
3197            origin_node,
3198            hop_count,
3199            callsign,
3200            battery_percent,
3201            heart_rate,
3202            event_type,
3203            latitude,
3204            longitude,
3205            altitude,
3206            activity_level,
3207            alerts,
3208            ..Default::default()
3209        })
3210    }
3211
3212    /// Internal: Handle relay envelope
3213    fn handle_relay_envelope(
3214        &self,
3215        source_node: NodeId,
3216        data: &[u8],
3217        now_ms: u64,
3218    ) -> Option<DataReceivedResult> {
3219        // Process the relay envelope
3220        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3221
3222        // Get relay data if we should relay
3223        let relay_data = if decision.should_relay {
3224            decision.relay_data()
3225        } else {
3226            None
3227        };
3228
3229        // Process the inner payload
3230        self.process_document_data(
3231            source_node,
3232            &decision.payload,
3233            now_ms,
3234            relay_data,
3235            Some(decision.origin_node),
3236            decision.hop_count,
3237        )
3238    }
3239
3240    /// Called when data is received without a known identifier
3241    ///
3242    /// This is the simplest data receive method - it extracts the source node_id
3243    /// from the document itself. Use this when you don't track identifiers
3244    /// (e.g., ESP32 NimBLE).
3245    /// If encryption is enabled, decrypts the document first.
3246    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
3247    /// Handles relay envelopes for multi-hop mesh operation.
3248    pub fn on_ble_data(
3249        &self,
3250        identifier: &str,
3251        data: &[u8],
3252        now_ms: u64,
3253    ) -> Option<DataReceivedResult> {
3254        // Check for special message types first
3255        if data.len() >= 2 {
3256            match data[0] {
3257                KEY_EXCHANGE_MARKER => {
3258                    let _response = self.handle_key_exchange(data, now_ms);
3259                    return None;
3260                }
3261                PEER_E2EE_MARKER => {
3262                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3263                    return None;
3264                }
3265                RELAY_ENVELOPE_MARKER => {
3266                    // Handle relay envelope - extract origin from envelope
3267                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3268                }
3269                _ => {}
3270            }
3271        }
3272
3273        // Direct document - process normally
3274        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3275    }
3276
3277    /// Internal: Process incoming document (handles peer registration)
3278    fn process_incoming_document(
3279        &self,
3280        identifier: &str,
3281        data: &[u8],
3282        now_ms: u64,
3283        relay_data: Option<Vec<u8>>,
3284        origin_node: Option<NodeId>,
3285        hop_count: u8,
3286    ) -> Option<DataReceivedResult> {
3287        // Decrypt if encrypted (mesh-wide encryption)
3288        let decrypted = self.decrypt_document(data, Some(identifier))?;
3289
3290        // Merge the document (extracts node_id internally)
3291        let result = self.document_sync.merge_document(&decrypted)?;
3292
3293        // Record sync using the source_node from the merged document
3294        self.peer_manager.record_sync(result.source_node, now_ms);
3295
3296        // Only register the identifier mapping for direct messages (not relayed)
3297        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
3298        // not the origin node (who created the document). The relay source is already registered
3299        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
3300        if origin_node.is_none() {
3301            // Direct message - register the peer with this identifier
3302            let is_new =
3303                self.peer_manager
3304                    .on_incoming_connection(identifier, result.source_node, now_ms);
3305
3306            // Update connection graph to track connection state
3307            {
3308                let mut graph = self.connection_graph.lock().unwrap();
3309                if is_new {
3310                    graph.on_discovered(
3311                        result.source_node,
3312                        identifier.to_string(),
3313                        None,
3314                        Some(self.config.mesh_id.clone()),
3315                        -50, // Default RSSI for data-based discovery
3316                        now_ms,
3317                    );
3318                }
3319                graph.on_connected(result.source_node, now_ms);
3320            }
3321        }
3322
3323        // Generate events based on what was received
3324        if result.is_emergency() {
3325            self.notify(PeatEvent::EmergencyReceived {
3326                from_node: result.source_node,
3327            });
3328        } else if result.is_ack() {
3329            self.notify(PeatEvent::AckReceived {
3330                from_node: result.source_node,
3331            });
3332        }
3333
3334        if result.counter_changed {
3335            self.notify(PeatEvent::DocumentSynced {
3336                from_node: result.source_node,
3337                total_count: result.total_count,
3338            });
3339        }
3340
3341        // Emit relay event if we're relaying
3342        if relay_data.is_some() {
3343            let relay_targets = self.get_relay_targets(Some(result.source_node));
3344            self.notify(PeatEvent::MessageRelayed {
3345                origin_node: origin_node.unwrap_or(result.source_node),
3346                relay_count: relay_targets.len(),
3347                hop_count,
3348            });
3349        }
3350
3351        let PeripheralFields {
3352            callsign,
3353            battery_percent,
3354            heart_rate,
3355            event_type,
3356            latitude,
3357            longitude,
3358            altitude,
3359            activity_level,
3360            alerts,
3361        } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3362
3363        Some(DataReceivedResult {
3364            source_node: result.source_node,
3365            is_emergency: result.is_emergency(),
3366            is_ack: result.is_ack(),
3367            counter_changed: result.counter_changed,
3368            emergency_changed: result.emergency_changed,
3369            total_count: result.total_count,
3370            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3371            relay_data,
3372            origin_node,
3373            hop_count,
3374            callsign,
3375            battery_percent,
3376            heart_rate,
3377            event_type,
3378            latitude,
3379            longitude,
3380            altitude,
3381            activity_level,
3382            alerts,
3383            ..Default::default()
3384        })
3385    }
3386
3387    /// Internal: Handle relay envelope with incoming connection registration
3388    fn handle_relay_envelope_with_incoming(
3389        &self,
3390        identifier: &str,
3391        data: &[u8],
3392        now_ms: u64,
3393    ) -> Option<DataReceivedResult> {
3394        // Parse envelope to get origin
3395        let envelope = RelayEnvelope::decode(data)?;
3396
3397        // Try to look up the source peer from identifier to register indirect path
3398        // If we know who sent this relay, we can track indirect peers via them
3399        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3400            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3401                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3402                    source_peer,
3403                    envelope.origin_node,
3404                    envelope.hop_count,
3405                    now_ms,
3406                );
3407
3408                if is_new {
3409                    log::debug!(
3410                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3411                        envelope.origin_node.as_u32(),
3412                        source_peer.as_u32(),
3413                        envelope.hop_count
3414                    );
3415                }
3416            }
3417        }
3418
3419        // Check deduplication
3420        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3421            // Duplicate - get stats for event
3422            let stats = self
3423                .seen_cache
3424                .lock()
3425                .unwrap()
3426                .get_stats(&envelope.message_id);
3427            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3428
3429            self.notify(PeatEvent::DuplicateMessageDropped {
3430                origin_node: envelope.origin_node,
3431                seen_count,
3432            });
3433            return None;
3434        }
3435
3436        // Check TTL
3437        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3438            let relay_env = envelope.relay();
3439            (true, relay_env.map(|e| e.encode()))
3440        } else {
3441            if !envelope.can_relay() {
3442                self.notify(PeatEvent::MessageTtlExpired {
3443                    origin_node: envelope.origin_node,
3444                    hop_count: envelope.hop_count,
3445                });
3446            }
3447            (false, None)
3448        };
3449
3450        // Process the inner payload
3451        self.process_incoming_document(
3452            identifier,
3453            &envelope.payload,
3454            now_ms,
3455            if should_relay { relay_data } else { None },
3456            Some(envelope.origin_node),
3457            envelope.hop_count,
3458        )
3459    }
3460
3461    // ==================== Periodic Maintenance ====================
3462
3463    /// Periodic tick - call this regularly (e.g., every second)
3464    ///
3465    /// Performs:
3466    /// - Stale peer cleanup
3467    /// - Periodic sync broadcast (if interval elapsed)
3468    ///
3469    /// Returns `Some(data)` if a sync broadcast is needed.
3470    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3471        use std::sync::atomic::Ordering;
3472
3473        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
3474        let now_ms_32 = now_ms as u32;
3475
3476        // Cleanup stale peers
3477        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3478        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3479        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3480            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3481            let removed = self.peer_manager.cleanup_stale(now_ms);
3482            for node_id in &removed {
3483                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3484            }
3485            if !removed.is_empty() {
3486                self.notify_mesh_state_changed();
3487            }
3488
3489            // Run connection graph maintenance (transition Disconnected -> Lost)
3490            {
3491                let mut graph = self.connection_graph.lock().unwrap();
3492                let newly_lost = graph.tick(now_ms);
3493                // Also cleanup peers lost for more than peer_timeout
3494                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3495                drop(graph);
3496
3497                // Emit PeerLost events for newly lost peers from graph
3498                // (these may differ from peer_manager removals)
3499                for node_id in newly_lost {
3500                    // Only notify if not already notified by peer_manager
3501                    if !removed.contains(&node_id) {
3502                        self.notify(PeatEvent::PeerLost { node_id });
3503                    }
3504                }
3505            }
3506        }
3507
3508        // Check if sync broadcast is needed
3509        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3510        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3511        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3512            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3513            // Only broadcast if we have connected peers
3514            if self.peer_manager.connected_count() > 0 {
3515                let doc = self.document_sync.build_document();
3516                return Some(self.encrypt_document(&doc));
3517            }
3518        }
3519
3520        None
3521    }
3522
3523    /// Periodic tick returning per-peer delta documents
3524    ///
3525    /// Unlike `tick()` which broadcasts a single document to all peers,
3526    /// this returns targeted deltas that only include changes each peer
3527    /// hasn't seen. Use this for platforms that support per-peer transmission.
3528    ///
3529    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3530    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3531    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3532        use std::sync::atomic::Ordering;
3533        let now_ms_32 = now_ms as u32;
3534
3535        // Cleanup stale peers (same as tick())
3536        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3537        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3538        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3539            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3540            let removed = self.peer_manager.cleanup_stale(now_ms);
3541            for node_id in &removed {
3542                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3543            }
3544            if !removed.is_empty() {
3545                self.notify_mesh_state_changed();
3546            }
3547
3548            // Run connection graph maintenance
3549            {
3550                let mut graph = self.connection_graph.lock().unwrap();
3551                let newly_lost = graph.tick(now_ms);
3552                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3553                drop(graph);
3554
3555                for node_id in newly_lost {
3556                    if !removed.contains(&node_id) {
3557                        self.notify(PeatEvent::PeerLost { node_id });
3558                    }
3559                }
3560            }
3561        }
3562
3563        // Check if sync is needed
3564        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3565        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3566        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3567            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3568
3569            // Build document for each connected peer
3570            let doc = self.document_sync.build_document();
3571            let encrypted = self.encrypt_document(&doc);
3572            let mut results = Vec::new();
3573            for peer in self.get_connected_peers() {
3574                results.push((peer.node_id, encrypted.clone()));
3575            }
3576            return results;
3577        }
3578
3579        Vec::new()
3580    }
3581
3582    // ==================== State Queries ====================
3583
3584    /// Get all known peers
3585    pub fn get_peers(&self) -> Vec<PeatPeer> {
3586        self.peer_manager.get_peers()
3587    }
3588
3589    /// Get connected peers only
3590    pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3591        self.peer_manager.get_connected_peers()
3592    }
3593
3594    /// Get a specific peer by NodeId
3595    pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3596        self.peer_manager.get_peer(node_id)
3597    }
3598
3599    /// Get peer count
3600    pub fn peer_count(&self) -> usize {
3601        self.peer_manager.peer_count()
3602    }
3603
3604    /// Get connected peer count
3605    pub fn connected_count(&self) -> usize {
3606        self.peer_manager.connected_count()
3607    }
3608
3609    /// Check if a device mesh ID matches our mesh
3610    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3611        self.peer_manager.matches_mesh(device_mesh_id)
3612    }
3613
3614    // ==================== Connection State Graph ====================
3615
3616    /// Get the connection state graph with all peer states
3617    ///
3618    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3619    /// Apps can use this to display appropriate UI indicators:
3620    /// - Green for Connected peers
3621    /// - Yellow for Degraded or RecentlyDisconnected peers
3622    /// - Gray for Lost peers
3623    ///
3624    /// # Example
3625    /// ```ignore
3626    /// let states = mesh.get_connection_graph();
3627    /// for peer in states {
3628    ///     match peer.state {
3629    ///         ConnectionState::Connected => show_green_indicator(&peer),
3630    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3631    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3632    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3633    ///         _ => {}
3634    ///     }
3635    /// }
3636    /// ```
3637    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3638        self.connection_graph.lock().unwrap().get_all_owned()
3639    }
3640
3641    /// Get a specific peer's connection state
3642    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3643        self.connection_graph
3644            .lock()
3645            .unwrap()
3646            .get_peer(node_id)
3647            .cloned()
3648    }
3649
3650    /// Get all currently connected peers from the connection graph
3651    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3652        self.connection_graph
3653            .lock()
3654            .unwrap()
3655            .get_connected()
3656            .into_iter()
3657            .cloned()
3658            .collect()
3659    }
3660
3661    /// Get peers in degraded state (connected but poor signal quality)
3662    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3663        self.connection_graph
3664            .lock()
3665            .unwrap()
3666            .get_degraded()
3667            .into_iter()
3668            .cloned()
3669            .collect()
3670    }
3671
3672    /// Get peers that disconnected within the specified time window
3673    ///
3674    /// Useful for showing "stale" peers that were recently connected.
3675    pub fn get_recently_disconnected(
3676        &self,
3677        within_ms: u64,
3678        now_ms: u64,
3679    ) -> Vec<PeerConnectionState> {
3680        self.connection_graph
3681            .lock()
3682            .unwrap()
3683            .get_recently_disconnected(within_ms, now_ms)
3684            .into_iter()
3685            .cloned()
3686            .collect()
3687    }
3688
3689    /// Get peers in Lost state (disconnected and no longer advertising)
3690    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3691        self.connection_graph
3692            .lock()
3693            .unwrap()
3694            .get_lost()
3695            .into_iter()
3696            .cloned()
3697            .collect()
3698    }
3699
3700    /// Get summary counts of peers in each connection state
3701    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3702        self.connection_graph.lock().unwrap().state_counts()
3703    }
3704
3705    // ==================== Indirect Peer Methods ====================
3706
3707    /// Get all indirect (multi-hop) peers
3708    ///
3709    /// Returns peers discovered via relay messages that are not directly
3710    /// connected via BLE. Each indirect peer includes the minimum hop count
3711    /// and the direct peers through which they can be reached.
3712    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3713        self.connection_graph
3714            .lock()
3715            .unwrap()
3716            .get_indirect_peers_owned()
3717    }
3718
3719    /// Get the degree (hop count) for a specific peer
3720    ///
3721    /// Returns:
3722    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3723    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3724    /// - `None` if peer is not known
3725    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3726        self.connection_graph.lock().unwrap().peer_degree(node_id)
3727    }
3728
3729    /// Get full state counts including indirect peers
3730    ///
3731    /// Returns counts of direct peers by connection state plus counts
3732    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3733    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3734        self.connection_graph.lock().unwrap().full_state_counts()
3735    }
3736
3737    /// Get all paths to reach an indirect peer
3738    ///
3739    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3740    /// known routes to the specified peer.
3741    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3742        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3743    }
3744
3745    /// Check if a node is known (either direct or indirect)
3746    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3747        self.connection_graph.lock().unwrap().is_known(node_id)
3748    }
3749
3750    /// Get number of indirect peers
3751    pub fn indirect_peer_count(&self) -> usize {
3752        self.connection_graph.lock().unwrap().indirect_peer_count()
3753    }
3754
3755    /// Cleanup stale indirect peers
3756    ///
3757    /// Removes indirect peers that haven't been seen within the timeout.
3758    /// Returns the list of removed peer IDs.
3759    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3760        self.connection_graph
3761            .lock()
3762            .unwrap()
3763            .cleanup_indirect(now_ms)
3764    }
3765
3766    /// Get total counter value
3767    pub fn total_count(&self) -> u64 {
3768        self.document_sync.total_count()
3769    }
3770
3771    /// Get document version
3772    pub fn document_version(&self) -> u32 {
3773        self.document_sync.version()
3774    }
3775
3776    /// Get document version (alias)
3777    pub fn version(&self) -> u32 {
3778        self.document_sync.version()
3779    }
3780
3781    /// Update health status (battery percentage)
3782    pub fn update_health(&self, battery_percent: u8) {
3783        self.document_sync.update_health(battery_percent);
3784    }
3785
3786    /// Update activity level. The wire field is a non-validated `u8`
3787    /// render hint (see `HealthStatus::decode`); receivers map known
3788    /// values and silently ignore unknown ones. Current emitters use:
3789    /// 0=still/standing, 3=PossibleFall, 4+=reserved for sender-side
3790    /// extensions.
3791    pub fn update_activity(&self, activity: u8) {
3792        self.document_sync.update_activity(activity);
3793    }
3794
3795    /// Update full health status (battery and activity)
3796    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3797        self.document_sync
3798            .update_health_full(battery_percent, activity);
3799    }
3800
3801    /// Update heart rate
3802    pub fn update_heart_rate(&self, heart_rate: u8) {
3803        self.document_sync.update_heart_rate(heart_rate);
3804    }
3805
3806    /// Replace the alert flags bitfield. Callers OR together
3807    /// `HealthStatus::ALERT_*` constants (e.g. `ALERT_MAN_DOWN`) to
3808    /// express which alerts are currently active. Passing `0` clears all.
3809    pub fn update_alerts(&self, alerts: u8) {
3810        self.document_sync.update_alerts(alerts);
3811    }
3812
3813    /// Update location
3814    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3815        self.document_sync
3816            .update_location(latitude, longitude, altitude);
3817    }
3818
3819    /// Clear location
3820    pub fn clear_location(&self) {
3821        self.document_sync.clear_location();
3822    }
3823
3824    /// Update callsign
3825    pub fn update_callsign(&self, callsign: &str) {
3826        self.document_sync.update_callsign(callsign);
3827    }
3828
3829    /// Set peripheral event type
3830    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3831        self.document_sync
3832            .set_peripheral_event(event_type, timestamp);
3833    }
3834
3835    /// Clear peripheral event
3836    pub fn clear_peripheral_event(&self) {
3837        self.document_sync.clear_peripheral_event();
3838    }
3839
3840    /// Update full peripheral state in one call
3841    ///
3842    /// This is the most efficient way to update all peripheral data before
3843    /// calling `build_document()` for encrypted transmission.
3844    #[allow(clippy::too_many_arguments)]
3845    pub fn update_peripheral_state(
3846        &self,
3847        callsign: &str,
3848        battery_percent: u8,
3849        heart_rate: Option<u8>,
3850        latitude: Option<f32>,
3851        longitude: Option<f32>,
3852        altitude: Option<f32>,
3853        event_type: Option<EventType>,
3854        timestamp: u64,
3855    ) {
3856        self.document_sync.update_peripheral_state(
3857            callsign,
3858            battery_percent,
3859            heart_rate,
3860            latitude,
3861            longitude,
3862            altitude,
3863            event_type,
3864            timestamp,
3865        );
3866    }
3867
3868    /// Build current document for transmission
3869    ///
3870    /// If encryption is enabled, the document is encrypted.
3871    pub fn build_document(&self) -> Vec<u8> {
3872        let doc = self.document_sync.build_document();
3873        self.encrypt_document(&doc)
3874    }
3875
3876    /// Build a translator-framed (`0xB6`) wire payload for the given
3877    /// collection + document, encrypted under the mesh secret if
3878    /// encryption is configured. Returns ready-to-broadcast bytes for
3879    /// the caller's transport (BLE notification / GATT write / nimble
3880    /// gossip), or `None` if the translator declines the collection.
3881    ///
3882    /// Symmetric with [`Self::build_document`] (legacy peripheral
3883    /// path). Hosts that already integrate with peat-mesh use this
3884    /// shape from peat-mesh's own `Translator` impl (Slice 4.a
3885    /// `peat_mesh::transport::btle_translator::BleTranslator`).
3886    /// Embedded callers without peat-mesh in their dep graph (M5Stack
3887    /// Core2 et al.) use the slim siblings
3888    /// `publish_platform_advertisement` / future per-collection
3889    /// methods, which take peat-btle-native typed input directly. The
3890    /// wire shape (encrypt-wrap of `[0xB6, collection_code,
3891    /// postcard...]`) is identical across both — receivers can't
3892    /// distinguish publishers.
3893    ///
3894    /// **ADR-059 Amendment 4 Slice 4.b note.** Pre-Slice-4.b this
3895    /// method took `&peat_mesh::sync::Document`. peat-btle no longer
3896    /// depends on peat-mesh, so the parameter is now
3897    /// `&serde_json::Value` (the same JSON projection peat-mesh's
3898    /// `Translator` impl produces internally before invoking the
3899    /// peat-btle codec). peat-mesh-using callers serialize their
3900    /// `Document` to a `Value` once and pass it through; standalone
3901    /// callers use the slim siblings instead.
3902    #[cfg(feature = "translator-codec")]
3903    pub fn publish_translator_frame(
3904        &self,
3905        collection: &str,
3906        value: &serde_json::Value,
3907    ) -> Option<Vec<u8>> {
3908        let translator = self.ble_translator.read().ok()?;
3909        let framed = translator.encode_outbound_sync(value, collection)?;
3910        drop(translator);
3911        Some(self.encrypt_document(&framed))
3912    }
3913
3914    /// Build a translator-framed platform advertisement from a
3915    /// [`crate::translator::BlePeripheral`] directly — codec-only,
3916    /// no peat-mesh dep needed. This is the publish surface embedded
3917    /// firmware uses (M5Stack Core2 emits one of these on every
3918    /// activity transition, encoding ALERT_MAN_DOWN into
3919    /// `peripheral.health.alerts` to drive the receiver-side
3920    /// "unavailable" status semantic).
3921    ///
3922    /// Wire format is identical to [`Self::publish_translator_frame`]
3923    /// invoked with `collection = "platforms"` and a peat-mesh
3924    /// `Document` whose fields came from
3925    /// `peripheral_to_platform_in_cell` — the receiver dispatcher
3926    /// (`try_handle_translator_marker`) can't distinguish the two.
3927    /// Skipping the `BlePeripheral → JSON Value → BlePeripheral`
3928    /// round-trip the generic path takes saves an allocation pair on
3929    /// the embedded side and lets the M5Stack avoid pulling
3930    /// peat-mesh's tokio + signal-hook chain (which doesn't build for
3931    /// xtensa-esp32).
3932    #[cfg(feature = "translator-codec")]
3933    pub fn publish_platform_advertisement(
3934        &self,
3935        peripheral: &crate::translator::BlePeripheral,
3936    ) -> Option<Vec<u8>> {
3937        let payload = crate::translator::postcard_encode(peripheral)?;
3938        let mut framed = Vec::with_capacity(2 + payload.len());
3939        framed.push(crate::document::TRANSLATOR_FRAME_MARKER);
3940        framed.push(crate::translator::COLLECTION_CODE_PLATFORMS);
3941        framed.extend_from_slice(&payload);
3942        Some(self.encrypt_document(&framed))
3943    }
3944
3945    /// Build a wire-ready peat-lite Document frame for transmission.
3946    ///
3947    /// Wraps the supplied envelope payload (the bytes
3948    /// `peat_mesh::transport::document_codec::encode_document`
3949    /// produced) with the 16-byte peat-lite header
3950    /// (`MessageType::Document`, this node's id + sequence number),
3951    /// then encrypts via the same `encrypt_document` path
3952    /// translator-frame and platform-advertisement publishers use, so
3953    /// the wire-level encryption stays uniform across frame types.
3954    ///
3955    /// Caller hands the returned `Vec<u8>` to whatever GATT-write +
3956    /// chunked-transport path the platform adapter uses for outbound
3957    /// frames. peat-lite envelopes that exceed BLE MTU pass through
3958    /// `crate::sync::protocol::chunk_data` /
3959    /// `ChunkReassembler` transparently — no caller-side
3960    /// fragmentation logic.
3961    ///
3962    /// The wire `seq_num` is sourced from an internal monotonic
3963    /// counter (`peat_lite_seq`) so the polled-host receive-side dedup
3964    /// on `(source_node_id, seq_num)` always sees a unique pair per
3965    /// outbound frame. Pre-dedup-fix the caller threaded their own
3966    /// counter and the API documented "pass 0 if dedup happens
3967    /// elsewhere," which silently disabled dedup whenever the caller
3968    /// took the escape hatch — fixed per the QA review on the initial
3969    /// PR. The counter is `Relaxed`-ordered: only the per-emission
3970    /// monotonicity matters; cross-thread visibility is irrelevant.
3971    #[cfg(feature = "peat-lite-frame")]
3972    pub fn publish_peat_lite_document(&self, envelope_payload: &[u8]) -> Option<Vec<u8>> {
3973        let seq_num = self
3974            .peat_lite_seq
3975            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3976        let framed = crate::peat_lite_frame::encode_peat_lite_document(
3977            self.node_id().as_u32(),
3978            seq_num,
3979            envelope_payload,
3980        )
3981        .ok()?;
3982        Some(self.encrypt_document(&framed))
3983    }
3984
3985    /// Get peers that should be synced with
3986    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3987        self.peer_manager.peers_needing_sync(now_ms)
3988    }
3989
3990    // ==================== Internal Helpers ====================
3991
3992    fn notify(&self, event: PeatEvent) {
3993        self.observers.notify(event);
3994    }
3995
3996    fn notify_mesh_state_changed(&self) {
3997        self.notify(PeatEvent::MeshStateChanged {
3998            peer_count: self.peer_manager.peer_count(),
3999            connected_count: self.peer_manager.connected_count(),
4000        });
4001    }
4002
4003    // ==================== CannedMessage Integration ====================
4004    //
4005    // These methods provide deduplication support for peat-lite CannedMessages.
4006    // They use document identity (source_node + timestamp) instead of content hash,
4007    // because CRDT merge can change byte ordering.
4008
4009    /// Check if a CannedMessage should be processed.
4010    ///
4011    /// Uses document identity (source_node + timestamp) for deduplication.
4012    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
4013    ///
4014    /// # Arguments
4015    /// * `source_node` - The source node ID from the CannedMessage
4016    /// * `timestamp` - The timestamp from the CannedMessage
4017    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
4018    ///
4019    /// # Returns
4020    /// `true` if this message is new and should be processed,
4021    /// `false` if it was seen recently and should be skipped.
4022    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
4023        // Create a unique key from source_node and timestamp
4024        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
4025        let mut id_bytes = [0u8; 16];
4026        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
4027        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
4028        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
4029
4030        // Check the seen cache
4031        let seen = self.seen_cache.lock().unwrap();
4032        !seen.has_seen(&message_id)
4033    }
4034
4035    /// Mark a CannedMessage as seen (for deduplication).
4036    ///
4037    /// Call this after processing a CannedMessage to prevent reprocessing
4038    /// the same message from other relay paths.
4039    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
4040        let now = std::time::SystemTime::now()
4041            .duration_since(std::time::UNIX_EPOCH)
4042            .map(|d| d.as_millis() as u64)
4043            .unwrap_or(0);
4044
4045        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
4046        let mut id_bytes = [0u8; 16];
4047        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
4048        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
4049        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
4050        let origin = NodeId::new(source_node);
4051
4052        let mut seen = self.seen_cache.lock().unwrap();
4053        seen.mark_seen(message_id, origin, now);
4054    }
4055
4056    /// Get list of connected peer identifiers for relay.
4057    ///
4058    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
4059    /// to other peers after deduplication check.
4060    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
4061        self.peer_manager.get_connected_identifiers()
4062    }
4063}
4064
4065/// Result from receiving BLE data
4066#[derive(Debug, Clone, Default)]
4067pub struct DataReceivedResult {
4068    /// Node that sent this data
4069    pub source_node: NodeId,
4070
4071    /// Whether this contained an emergency event
4072    pub is_emergency: bool,
4073
4074    /// Whether this contained an ACK event
4075    pub is_ack: bool,
4076
4077    /// Whether the counter changed (new data)
4078    pub counter_changed: bool,
4079
4080    /// Whether emergency state changed (new emergency or ACK updates)
4081    pub emergency_changed: bool,
4082
4083    /// Updated total count
4084    pub total_count: u64,
4085
4086    /// Event timestamp (if event present) - use to detect duplicate events
4087    pub event_timestamp: u64,
4088
4089    /// Data to relay to other peers (if multi-hop relay is enabled)
4090    ///
4091    /// When present, the platform adapter should send this data to peers
4092    /// returned by `get_relay_targets(Some(source_node))`.
4093    pub relay_data: Option<Vec<u8>>,
4094
4095    /// Origin node for relay (may differ from source_node for relayed messages)
4096    pub origin_node: Option<NodeId>,
4097
4098    /// Current hop count (for relayed messages)
4099    pub hop_count: u8,
4100
4101    // ========== Peripheral data from sender ==========
4102    /// Sender's callsign (up to 12 chars)
4103    pub callsign: Option<String>,
4104
4105    /// Sender's battery percentage (0-100)
4106    pub battery_percent: Option<u8>,
4107
4108    /// Sender's heart rate (BPM)
4109    pub heart_rate: Option<u8>,
4110
4111    /// Sender's event type (from PeripheralEvent)
4112    pub event_type: Option<u8>,
4113
4114    /// Sender's latitude
4115    pub latitude: Option<f32>,
4116
4117    /// Sender's longitude
4118    pub longitude: Option<f32>,
4119
4120    /// Sender's altitude (meters)
4121    pub altitude: Option<f32>,
4122
4123    /// Sender's activity level (0=Standing, 3=PossibleFall, 4=Prone, …).
4124    /// Mirrors `HealthStatus::activity` from the merged peripheral so
4125    /// hosts can render posture state without re-decoding the wire
4126    /// bytes. None when no peripheral data was attached to the merge.
4127    pub activity_level: Option<u8>,
4128
4129    /// Sender's health alerts bitfield (e.g. `ALERT_MAN_DOWN = 0x01`).
4130    /// Mirrors `HealthStatus::alerts` from the merged peripheral. The
4131    /// receive-side surfaces this so consumers (the ATAK plugin's
4132    /// `_platformOfflinePeers` toggle) can flip OFFLINE state directly
4133    /// from peripheral broadcasts, without needing the higher-level
4134    /// 0xB6 platforms translator-frame to fire.
4135    pub alerts: Option<u8>,
4136
4137    /// ADR-059 Amendment 3 — set when the receive dispatch decoded a
4138    /// 0xB6 translator frame on this call. `None` for legacy/delta/
4139    /// reserved-marker paths, for 0xB6 frames that declined or errored,
4140    /// and for builds compiled without the `translator-codec` feature
4141    /// (the field stays in the binding shape unconditionally so hosts
4142    /// don't see binding drift across feature combos; only the
4143    /// population logic is feature-gated).
4144    pub decoded_translator_frame: Option<DecodedTranslatorFrame>,
4145
4146    /// Universal-Document-transport amendment — set when the receive
4147    /// dispatch decoded a peat-lite Document frame
4148    /// (`MessageType::Document`, magic-prefix `b"PEAT"`) on this
4149    /// call. Polled-consumer hosts (peat-mesh, peat-atak-plugin's
4150    /// marker bridge, …) forward populated entries through their
4151    /// existing publish-with-origin FFI surface to plug the body
4152    /// into peat-mesh's doc store.
4153    ///
4154    /// `None` for typed-translator (0xB6) / delta / legacy-peripheral
4155    /// paths, for peat-lite frames that declined or errored, and for
4156    /// builds compiled without the `peat-lite-frame` feature (the
4157    /// field stays in the binding shape unconditionally so hosts
4158    /// don't see binding drift across feature combos; only the
4159    /// population logic is feature-gated).
4160    pub peat_lite_document: Option<PeatLiteDocumentFrame>,
4161}
4162
4163/// Owned snapshot of a decoded peat-lite Document frame, surfaced via
4164/// [`DataReceivedResult::peat_lite_document`] when the receive
4165/// dispatch successfully decodes a `MessageType::Document` envelope.
4166///
4167/// Mirrors the [`DecodedTranslatorFrame`] design pattern: defined
4168/// unconditionally so the UniFFI binding shape stays stable across
4169/// feature combos. Population only happens when the
4170/// `peat-lite-frame` feature is on (the codec dispatcher in
4171/// [`crate::peat_lite_frame`] is feature-gated).
4172///
4173/// Hosts forward populated entries through their existing
4174/// publish-with-origin FFI surface to plug into peat-mesh's doc
4175/// store. The `body` is opaque bytes — peat-btle does not interpret;
4176/// consumers (peat-mesh's `transport::document_codec`,
4177/// peat-atak-plugin's marker bridge, M5Stack firmware) own the body
4178/// schema.
4179#[derive(Debug, Clone)]
4180pub struct PeatLiteDocumentFrame {
4181    /// Source node id from the peat-lite header. Polled hosts use
4182    /// this for loop-prevention and dedup tracking.
4183    pub source_node_id: u32,
4184    /// Sequence number from the peat-lite header. Polled hosts use
4185    /// this to detect duplicate deliveries.
4186    pub seq_num: u32,
4187    /// Flags byte from the Document envelope (bit 0 = tombstone).
4188    pub flags: u8,
4189    /// Collection name (UTF-8). Doc-store routing key alongside
4190    /// `doc_id`.
4191    pub collection: String,
4192    /// Document id (UTF-8). Empty signals
4193    /// "publisher-delegated id assignment."
4194    pub doc_id: String,
4195    /// Unix epoch milliseconds.
4196    pub timestamp_ms: i64,
4197    /// Opaque body bytes (typically JSON-encoded
4198    /// `Document.fields` from peat-mesh's `transport::document_codec`).
4199    pub body: Vec<u8>,
4200}
4201
4202impl PeatLiteDocumentFrame {
4203    /// Returns true if the deletion-tombstone flag is set.
4204    #[inline]
4205    pub fn is_tombstone(&self) -> bool {
4206        // peat-lite bit 0 — duplicated as a const here to keep the
4207        // type defined without the optional peat-lite dep.
4208        self.flags & 0x01 != 0
4209    }
4210}
4211
4212/// ADR-059 Amendment 3 — payload returned in
4213/// [`DataReceivedResult::decoded_translator_frame`] when a 0xB6
4214/// translator frame decodes successfully on the receive dispatch.
4215/// Hosts (peat-atak-plugin and equivalents) forward populated entries
4216/// through their existing publish-with-origin FFI surface (e.g.
4217/// peat-ffi's `publishDocumentWithOriginJni(collection, doc_json,
4218/// "ble")`). Defined unconditionally — no `#[cfg(feature = "translator-codec")]`
4219/// — so the UniFFI binding shape is stable across feature combos.
4220/// Population only happens when `translator-codec` is on.
4221#[derive(Debug, Clone)]
4222pub struct DecodedTranslatorFrame {
4223    /// `BleTranslator` collection name, e.g. `"tracks"` / `"platforms"`.
4224    pub collection: String,
4225    /// serde-JSON serialization of the decoded `Document` — same shape
4226    /// as the JSON callback variant from 0.3.1, just delivered via
4227    /// struct return rather than callback invocation.
4228    pub doc_json: String,
4229    /// BLE peer identifier from the receive context (when known).
4230    pub peer: Option<String>,
4231}
4232
4233/// ADR-059 Amendment 3 — per-call outcome of the
4234/// `try_handle_translator_marker` dispatcher. Per-call return-threaded;
4235/// no shared mutable state, so concurrent receives on different
4236/// threads can't race on a slot.
4237#[cfg(feature = "translator-codec")]
4238#[derive(Debug, Clone)]
4239enum TranslatorMarkerOutcome {
4240    /// First byte didn't match the translator-frame range; caller
4241    /// continues the legacy / delta-document dispatch.
4242    NotTranslatorMarker,
4243    /// 0xB6 frame decoded successfully. Caller hoists the payload into
4244    /// `DataReceivedResult.decoded_translator_frame` before returning.
4245    Decoded(DecodedTranslatorFrame),
4246    /// 0xB6 reserved-range / decode-error / codec-decline / unknown-code
4247    /// — caller stops processing but no frame to surface.
4248    Handled,
4249}
4250
4251impl DataReceivedResult {
4252    /// ADR-059 Amendment 3 — construct a `DataReceivedResult` whose
4253    /// only meaningful payload is a successfully-decoded 0xB6 translator
4254    /// frame. Other fields default (no event, no counter change,
4255    /// no peripheral data); the host reads `decoded_translator_frame`
4256    /// and forwards through its publish-with-origin FFI surface.
4257    /// Used by the receive entry points when
4258    /// `try_handle_translator_marker` returns `Decoded(frame)`.
4259    #[cfg(feature = "translator-codec")]
4260    pub(crate) fn translator_frame(source_node: NodeId, frame: DecodedTranslatorFrame) -> Self {
4261        Self {
4262            source_node,
4263            decoded_translator_frame: Some(frame),
4264            ..Default::default()
4265        }
4266    }
4267
4268    /// Extract peripheral fields from an `Option<Peripheral>`.
4269    ///
4270    /// Returns a [`PeripheralFields`] struct rather than a positional
4271    /// tuple so future additions don't require lockstep updates at
4272    /// every destructure site (the prior 9-tuple had 5 callers and
4273    /// adding the next field would have meant 6 mechanical edits with
4274    /// the type system unable to catch a misordered destructure).
4275    fn peripheral_fields(peripheral: &Option<crate::sync::crdt::Peripheral>) -> PeripheralFields {
4276        match peripheral {
4277            Some(p) => {
4278                let callsign = {
4279                    let s = p.callsign_str();
4280                    if s.is_empty() {
4281                        None
4282                    } else {
4283                        Some(s.to_string())
4284                    }
4285                };
4286                let battery_percent = if p.health.battery_percent > 0 {
4287                    Some(p.health.battery_percent)
4288                } else {
4289                    None
4290                };
4291                let (latitude, longitude, altitude) = match &p.location {
4292                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
4293                    None => (None, None, None),
4294                };
4295                PeripheralFields {
4296                    callsign,
4297                    battery_percent,
4298                    heart_rate: p.health.heart_rate,
4299                    event_type: p.last_event.as_ref().map(|e| e.event_type as u8),
4300                    latitude,
4301                    longitude,
4302                    altitude,
4303                    activity_level: Some(p.health.activity),
4304                    alerts: Some(p.health.alerts),
4305                }
4306            }
4307            None => PeripheralFields::default(),
4308        }
4309    }
4310}
4311
4312/// Subset of merged-peripheral fields surfaced on
4313/// [`DataReceivedResult`]. Returned by
4314/// [`DataReceivedResult::peripheral_fields`] so callers don't have to
4315/// destructure a positional tuple — adding a new field is a struct
4316/// update with the type system enforcing every consumer either reads
4317/// or ignores it explicitly, instead of silently re-indexing existing
4318/// destructures.
4319#[derive(Debug, Clone, Default)]
4320pub struct PeripheralFields {
4321    /// Sender's callsign (None when the peripheral has an empty callsign).
4322    pub callsign: Option<String>,
4323    /// Sender's battery percentage (None when the peripheral reports 0,
4324    /// which the wire convention treats as "absent").
4325    pub battery_percent: Option<u8>,
4326    /// Sender's heart rate (None when the wire byte is 0 — the
4327    /// `unwrap_or(0)` convention from the Rust encode side).
4328    pub heart_rate: Option<u8>,
4329    /// Sender's PeripheralEvent type (None when no event is attached).
4330    pub event_type: Option<u8>,
4331    /// Sender's latitude (None when no location record is attached).
4332    pub latitude: Option<f32>,
4333    /// Sender's longitude (None when no location record is attached).
4334    pub longitude: Option<f32>,
4335    /// Sender's altitude in meters (None when no location record is
4336    /// attached, or when the location's altitude field is itself absent).
4337    pub altitude: Option<f32>,
4338    /// Sender's activity level (0=Standing, 3=PossibleFall, 4=Prone …).
4339    pub activity_level: Option<u8>,
4340    /// Sender's health-alerts bitfield (e.g. `ALERT_MAN_DOWN = 0x01`).
4341    pub alerts: Option<u8>,
4342}
4343
4344/// Decision from processing a relay envelope
4345#[derive(Debug, Clone)]
4346pub struct RelayDecision {
4347    /// The payload (document) to process locally
4348    pub payload: Vec<u8>,
4349
4350    /// Original sender of the message
4351    pub origin_node: NodeId,
4352
4353    /// Current hop count
4354    pub hop_count: u8,
4355
4356    /// Whether this message should be relayed to other peers
4357    pub should_relay: bool,
4358
4359    /// The relay envelope to forward (with incremented hop count)
4360    ///
4361    /// Only present if `should_relay` is true and TTL not expired.
4362    pub relay_envelope: Option<RelayEnvelope>,
4363}
4364
4365impl RelayDecision {
4366    /// Get the relay data to send to peers
4367    ///
4368    /// Returns None if relay is not needed.
4369    pub fn relay_data(&self) -> Option<Vec<u8>> {
4370        self.relay_envelope.as_ref().map(|e| e.encode())
4371    }
4372}
4373
4374#[cfg(all(test, feature = "std"))]
4375mod tests {
4376    use super::*;
4377    use crate::observer::CollectingObserver;
4378
4379    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
4380    const TEST_TIMESTAMP: u64 = 1705276800000;
4381
4382    fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4383        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
4384        PeatMesh::new(config)
4385    }
4386
4387    #[test]
4388    fn test_mesh_creation() {
4389        let mesh = create_mesh(0x12345678, "ALPHA-1");
4390
4391        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
4392        assert_eq!(mesh.callsign(), "ALPHA-1");
4393        assert_eq!(mesh.mesh_id(), "TEST");
4394        assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
4395    }
4396
4397    #[test]
4398    fn test_peer_discovery() {
4399        let mesh = create_mesh(0x11111111, "ALPHA-1");
4400        let observer = Arc::new(CollectingObserver::new());
4401        mesh.add_observer(observer.clone());
4402
4403        // Discover a peer
4404        let peer = mesh.on_ble_discovered(
4405            "device-uuid",
4406            Some("PEAT_TEST-22222222"),
4407            -65,
4408            Some("TEST"),
4409            1000,
4410        );
4411
4412        assert!(peer.is_some());
4413        let peer = peer.unwrap();
4414        assert_eq!(peer.node_id.as_u32(), 0x22222222);
4415
4416        // Check events were generated
4417        let events = observer.events();
4418        assert!(events
4419            .iter()
4420            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4421        assert!(events
4422            .iter()
4423            .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
4424    }
4425
4426    #[test]
4427    fn test_connection_lifecycle() {
4428        let mesh = create_mesh(0x11111111, "ALPHA-1");
4429        let observer = Arc::new(CollectingObserver::new());
4430        mesh.add_observer(observer.clone());
4431
4432        // Discover and connect
4433        mesh.on_ble_discovered(
4434            "device-uuid",
4435            Some("PEAT_TEST-22222222"),
4436            -65,
4437            Some("TEST"),
4438            1000,
4439        );
4440
4441        let node_id = mesh.on_ble_connected("device-uuid", 2000);
4442        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4443        assert_eq!(mesh.connected_count(), 1);
4444
4445        // Disconnect
4446        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
4447        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4448        assert_eq!(mesh.connected_count(), 0);
4449
4450        // Check events
4451        let events = observer.events();
4452        assert!(events
4453            .iter()
4454            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4455        assert!(events
4456            .iter()
4457            .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
4458    }
4459
4460    #[test]
4461    fn test_emergency_flow() {
4462        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4463        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4464
4465        let observer2 = Arc::new(CollectingObserver::new());
4466        mesh2.add_observer(observer2.clone());
4467
4468        // mesh1 sends emergency
4469        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4470        assert!(mesh1.is_emergency_active());
4471
4472        // mesh2 receives it
4473        let result =
4474            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4475
4476        assert!(result.is_some());
4477        let result = result.unwrap();
4478        assert!(result.is_emergency);
4479        assert_eq!(result.source_node.as_u32(), 0x11111111);
4480
4481        // Check events on mesh2
4482        let events = observer2.events();
4483        assert!(events
4484            .iter()
4485            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4486    }
4487
4488    #[test]
4489    fn test_ack_flow() {
4490        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4491        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4492
4493        let observer2 = Arc::new(CollectingObserver::new());
4494        mesh2.add_observer(observer2.clone());
4495
4496        // mesh1 sends ACK
4497        let doc = mesh1.send_ack(TEST_TIMESTAMP);
4498        assert!(mesh1.is_ack_active());
4499
4500        // mesh2 receives it
4501        let result =
4502            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4503
4504        assert!(result.is_some());
4505        let result = result.unwrap();
4506        assert!(result.is_ack);
4507
4508        // Check events on mesh2
4509        let events = observer2.events();
4510        assert!(events
4511            .iter()
4512            .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4513    }
4514
4515    #[test]
4516    fn test_tick_cleanup() {
4517        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4518            .with_peer_timeout(10_000);
4519        let mesh = PeatMesh::new(config);
4520
4521        let observer = Arc::new(CollectingObserver::new());
4522        mesh.add_observer(observer.clone());
4523
4524        // Discover a peer
4525        mesh.on_ble_discovered(
4526            "device-uuid",
4527            Some("PEAT_TEST-22222222"),
4528            -65,
4529            Some("TEST"),
4530            1000,
4531        );
4532        assert_eq!(mesh.peer_count(), 1);
4533
4534        // Tick at t=5000 - not stale yet
4535        mesh.tick(5000);
4536        assert_eq!(mesh.peer_count(), 1);
4537
4538        // Tick at t=20000 - peer is stale (10s timeout exceeded)
4539        mesh.tick(20000);
4540        assert_eq!(mesh.peer_count(), 0);
4541
4542        // Check PeerLost event
4543        let events = observer.events();
4544        assert!(events
4545            .iter()
4546            .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4547    }
4548
4549    #[test]
4550    fn test_tick_sync_broadcast() {
4551        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4552            .with_sync_interval(5000);
4553        let mesh = PeatMesh::new(config);
4554
4555        // Discover and connect a peer first
4556        mesh.on_ble_discovered(
4557            "device-uuid",
4558            Some("PEAT_TEST-22222222"),
4559            -65,
4560            Some("TEST"),
4561            1000,
4562        );
4563        mesh.on_ble_connected("device-uuid", 1000);
4564
4565        // First tick at t=0 sets last_sync
4566        let _result = mesh.tick(0);
4567        // May or may not broadcast depending on initial state
4568
4569        // Tick before interval - no broadcast
4570        let result = mesh.tick(3000);
4571        assert!(result.is_none());
4572
4573        // After interval - should broadcast
4574        let result = mesh.tick(6000);
4575        assert!(result.is_some());
4576
4577        // Immediate second tick - no broadcast (interval not elapsed)
4578        let result = mesh.tick(6100);
4579        assert!(result.is_none());
4580
4581        // After another interval - should broadcast again
4582        let result = mesh.tick(12000);
4583        assert!(result.is_some());
4584    }
4585
4586    #[test]
4587    fn test_incoming_connection() {
4588        let mesh = create_mesh(0x11111111, "ALPHA-1");
4589        let observer = Arc::new(CollectingObserver::new());
4590        mesh.add_observer(observer.clone());
4591
4592        // Incoming connection from unknown peer
4593        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4594
4595        assert!(is_new);
4596        assert_eq!(mesh.peer_count(), 1);
4597        assert_eq!(mesh.connected_count(), 1);
4598
4599        // Check events
4600        let events = observer.events();
4601        assert!(events
4602            .iter()
4603            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4604        assert!(events
4605            .iter()
4606            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4607    }
4608
4609    #[test]
4610    fn test_mesh_filtering() {
4611        let mesh = create_mesh(0x11111111, "ALPHA-1");
4612
4613        // Wrong mesh - ignored
4614        let peer = mesh.on_ble_discovered(
4615            "device-uuid-1",
4616            Some("PEAT_OTHER-22222222"),
4617            -65,
4618            Some("OTHER"),
4619            1000,
4620        );
4621        assert!(peer.is_none());
4622        assert_eq!(mesh.peer_count(), 0);
4623
4624        // Correct mesh - accepted
4625        let peer = mesh.on_ble_discovered(
4626            "device-uuid-2",
4627            Some("PEAT_TEST-33333333"),
4628            -65,
4629            Some("TEST"),
4630            1000,
4631        );
4632        assert!(peer.is_some());
4633        assert_eq!(mesh.peer_count(), 1);
4634    }
4635
4636    // ==================== Encryption Tests ====================
4637
4638    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4639        let config =
4640            PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4641        PeatMesh::new(config)
4642    }
4643
4644    #[test]
4645    fn test_encryption_enabled() {
4646        let secret = [0x42u8; 32];
4647        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4648
4649        assert!(mesh.is_encryption_enabled());
4650    }
4651
4652    #[test]
4653    fn test_encryption_disabled_by_default() {
4654        let mesh = create_mesh(0x11111111, "ALPHA-1");
4655
4656        assert!(!mesh.is_encryption_enabled());
4657    }
4658
4659    #[test]
4660    fn test_encrypted_document_exchange() {
4661        let secret = [0x42u8; 32];
4662        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4663        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4664
4665        // mesh1 sends document
4666        let doc = mesh1.build_document();
4667
4668        // Document should be encrypted (starts with ENCRYPTED_MARKER)
4669        assert!(doc.len() >= 2);
4670        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4671
4672        // mesh2 receives and decrypts
4673        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4674
4675        assert!(result.is_some());
4676        let result = result.unwrap();
4677        assert_eq!(result.source_node.as_u32(), 0x11111111);
4678    }
4679
4680    #[test]
4681    fn test_encrypted_emergency_exchange() {
4682        let secret = [0x42u8; 32];
4683        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4684        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4685
4686        let observer = Arc::new(CollectingObserver::new());
4687        mesh2.add_observer(observer.clone());
4688
4689        // mesh1 sends emergency
4690        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4691
4692        // mesh2 receives and decrypts
4693        let result =
4694            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4695
4696        assert!(result.is_some());
4697        let result = result.unwrap();
4698        assert!(result.is_emergency);
4699
4700        // Check EmergencyReceived event was fired
4701        let events = observer.events();
4702        assert!(events
4703            .iter()
4704            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4705    }
4706
4707    #[test]
4708    fn test_wrong_key_fails_decrypt() {
4709        let secret1 = [0x42u8; 32];
4710        let secret2 = [0x43u8; 32]; // Different key
4711        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4712        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4713
4714        // mesh1 sends document
4715        let doc = mesh1.build_document();
4716
4717        // mesh2 cannot decrypt (wrong key)
4718        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4719
4720        assert!(result.is_none());
4721    }
4722
4723    #[test]
4724    fn test_unencrypted_mesh_can_read_unencrypted() {
4725        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4726        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4727
4728        // mesh1 sends document (unencrypted)
4729        let doc = mesh1.build_document();
4730
4731        // mesh2 receives (also unencrypted)
4732        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4733
4734        assert!(result.is_some());
4735    }
4736
4737    #[test]
4738    fn test_encrypted_mesh_can_receive_unencrypted() {
4739        // Backward compatibility: encrypted mesh can receive unencrypted docs
4740        let secret = [0x42u8; 32];
4741        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
4742        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
4743
4744        // mesh1 sends unencrypted document
4745        let doc = mesh1.build_document();
4746
4747        // mesh2 can receive unencrypted (backward compat)
4748        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4749
4750        assert!(result.is_some());
4751    }
4752
4753    #[test]
4754    fn test_unencrypted_mesh_cannot_receive_encrypted() {
4755        let secret = [0x42u8; 32];
4756        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
4757        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
4758
4759        // mesh1 sends encrypted document
4760        let doc = mesh1.build_document();
4761
4762        // mesh2 cannot decrypt (no key)
4763        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4764
4765        assert!(result.is_none());
4766    }
4767
4768    #[test]
4769    fn test_enable_disable_encryption() {
4770        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4771
4772        assert!(!mesh.is_encryption_enabled());
4773
4774        // Enable encryption
4775        let secret = [0x42u8; 32];
4776        mesh.enable_encryption(&secret);
4777        assert!(mesh.is_encryption_enabled());
4778
4779        // Build document should now be encrypted
4780        let doc = mesh.build_document();
4781        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4782
4783        // Disable encryption
4784        mesh.disable_encryption();
4785        assert!(!mesh.is_encryption_enabled());
4786
4787        // Build document should now be unencrypted
4788        let doc = mesh.build_document();
4789        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4790    }
4791
4792    #[test]
4793    fn test_encryption_overhead() {
4794        let secret = [0x42u8; 32];
4795        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4796        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4797
4798        let doc_encrypted = mesh_encrypted.build_document();
4799        let doc_unencrypted = mesh_unencrypted.build_document();
4800
4801        // Encrypted doc should be larger by:
4802        // - 2 bytes marker header (0xAE + reserved)
4803        // - 12 bytes nonce
4804        // - 16 bytes auth tag
4805        // Total: 30 bytes overhead
4806        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4807        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4808    }
4809
4810    // ==================== Per-Peer E2EE Tests ====================
4811
4812    #[test]
4813    fn test_peer_e2ee_enable_disable() {
4814        let mesh = create_mesh(0x11111111, "ALPHA-1");
4815
4816        assert!(!mesh.is_peer_e2ee_enabled());
4817        assert!(mesh.peer_e2ee_public_key().is_none());
4818
4819        mesh.enable_peer_e2ee();
4820        assert!(mesh.is_peer_e2ee_enabled());
4821        assert!(mesh.peer_e2ee_public_key().is_some());
4822
4823        mesh.disable_peer_e2ee();
4824        assert!(!mesh.is_peer_e2ee_enabled());
4825    }
4826
4827    #[test]
4828    fn test_peer_e2ee_initiate_session() {
4829        let mesh = create_mesh(0x11111111, "ALPHA-1");
4830        mesh.enable_peer_e2ee();
4831
4832        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4833        assert!(key_exchange.is_some());
4834
4835        let key_exchange = key_exchange.unwrap();
4836        // Should start with KEY_EXCHANGE_MARKER
4837        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4838
4839        // Should have a pending session
4840        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4841        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4842    }
4843
4844    #[test]
4845    fn test_peer_e2ee_full_handshake() {
4846        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4847        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4848
4849        mesh1.enable_peer_e2ee();
4850        mesh2.enable_peer_e2ee();
4851
4852        let observer1 = Arc::new(CollectingObserver::new());
4853        let observer2 = Arc::new(CollectingObserver::new());
4854        mesh1.add_observer(observer1.clone());
4855        mesh2.add_observer(observer2.clone());
4856
4857        // mesh1 initiates to mesh2
4858        let key_exchange1 = mesh1
4859            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4860            .unwrap();
4861
4862        // mesh2 receives and responds
4863        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4864        assert!(response.is_some());
4865
4866        // Check mesh2 has established session
4867        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4868
4869        // mesh1 receives mesh2's response
4870        let key_exchange2 = response.unwrap();
4871        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4872
4873        // Check mesh1 has established session
4874        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4875
4876        // Both should have E2EE established events
4877        let events1 = observer1.events();
4878        assert!(events1
4879            .iter()
4880            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4881
4882        let events2 = observer2.events();
4883        assert!(events2
4884            .iter()
4885            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4886    }
4887
4888    #[test]
4889    fn test_peer_e2ee_encrypt_decrypt() {
4890        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4891        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4892
4893        mesh1.enable_peer_e2ee();
4894        mesh2.enable_peer_e2ee();
4895
4896        // Establish session via key exchange
4897        let key_exchange1 = mesh1
4898            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4899            .unwrap();
4900        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4901        mesh1.handle_key_exchange(&key_exchange2, 1000);
4902
4903        // mesh1 sends encrypted message to mesh2
4904        let plaintext = b"Secret message from mesh1";
4905        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4906        assert!(encrypted.is_some());
4907
4908        let encrypted = encrypted.unwrap();
4909        // Should start with PEER_E2EE_MARKER
4910        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4911
4912        // mesh2 receives and decrypts
4913        let observer2 = Arc::new(CollectingObserver::new());
4914        mesh2.add_observer(observer2.clone());
4915
4916        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4917        assert!(decrypted.is_some());
4918        assert_eq!(decrypted.unwrap(), plaintext);
4919
4920        // Should have received message event
4921        let events = observer2.events();
4922        assert!(events.iter().any(|e| matches!(
4923            e,
4924            PeatEvent::PeerE2eeMessageReceived { from_node, data }
4925            if from_node.as_u32() == 0x11111111 && data == plaintext
4926        )));
4927    }
4928
4929    #[test]
4930    fn test_peer_e2ee_bidirectional() {
4931        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4932        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4933
4934        mesh1.enable_peer_e2ee();
4935        mesh2.enable_peer_e2ee();
4936
4937        // Establish session
4938        let key_exchange1 = mesh1
4939            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4940            .unwrap();
4941        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4942        mesh1.handle_key_exchange(&key_exchange2, 1000);
4943
4944        // mesh1 -> mesh2
4945        let msg1 = mesh1
4946            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4947            .unwrap();
4948        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4949        assert_eq!(dec1, b"Hello from mesh1");
4950
4951        // mesh2 -> mesh1
4952        let msg2 = mesh2
4953            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4954            .unwrap();
4955        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4956        assert_eq!(dec2, b"Hello from mesh2");
4957    }
4958
4959    #[test]
4960    fn test_peer_e2ee_close_session() {
4961        let mesh = create_mesh(0x11111111, "ALPHA-1");
4962        mesh.enable_peer_e2ee();
4963
4964        let observer = Arc::new(CollectingObserver::new());
4965        mesh.add_observer(observer.clone());
4966
4967        // Initiate a session
4968        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4969        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4970
4971        // Close session
4972        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4973
4974        // Check close event
4975        let events = observer.events();
4976        assert!(events
4977            .iter()
4978            .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4979    }
4980
4981    #[test]
4982    fn test_peer_e2ee_without_enabling() {
4983        let mesh = create_mesh(0x11111111, "ALPHA-1");
4984
4985        // E2EE not enabled - should return None
4986        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4987        assert!(result.is_none());
4988
4989        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4990        assert!(result.is_none());
4991
4992        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4993    }
4994
4995    #[test]
4996    fn test_peer_e2ee_overhead() {
4997        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4998        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4999
5000        mesh1.enable_peer_e2ee();
5001        mesh2.enable_peer_e2ee();
5002
5003        // Establish session
5004        let key_exchange1 = mesh1
5005            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
5006            .unwrap();
5007        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
5008        mesh1.handle_key_exchange(&key_exchange2, 1000);
5009
5010        // Encrypt a message
5011        let plaintext = b"Test message";
5012        let encrypted = mesh1
5013            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
5014            .unwrap();
5015
5016        // Overhead should be:
5017        // - 2 bytes marker header
5018        // - 4 bytes recipient node ID
5019        // - 4 bytes sender node ID
5020        // - 8 bytes counter
5021        // - 12 bytes nonce
5022        // - 16 bytes auth tag
5023        // Total: 46 bytes overhead
5024        let overhead = encrypted.len() - plaintext.len();
5025        assert_eq!(overhead, 46);
5026    }
5027
5028    // ==================== Strict Encryption Mode Tests ====================
5029
5030    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
5031        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
5032            .with_encryption(secret)
5033            .with_strict_encryption();
5034        PeatMesh::new(config)
5035    }
5036
5037    #[test]
5038    fn test_strict_encryption_enabled() {
5039        let secret = [0x42u8; 32];
5040        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
5041
5042        assert!(mesh.is_encryption_enabled());
5043        assert!(mesh.is_strict_encryption_enabled());
5044    }
5045
5046    #[test]
5047    fn test_strict_encryption_disabled_by_default() {
5048        let secret = [0x42u8; 32];
5049        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
5050
5051        assert!(mesh.is_encryption_enabled());
5052        assert!(!mesh.is_strict_encryption_enabled());
5053    }
5054
5055    #[test]
5056    fn test_strict_encryption_requires_encryption_enabled() {
5057        // strict_encryption without encryption should have no effect
5058        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
5059            .with_strict_encryption(); // No encryption!
5060        let mesh = PeatMesh::new(config);
5061
5062        assert!(!mesh.is_encryption_enabled());
5063        assert!(!mesh.is_strict_encryption_enabled());
5064    }
5065
5066    #[test]
5067    fn test_strict_mode_accepts_encrypted_documents() {
5068        let secret = [0x42u8; 32];
5069        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
5070        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
5071
5072        // mesh1 sends encrypted document
5073        let doc = mesh1.build_document();
5074        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
5075
5076        // mesh2 (strict mode) should accept encrypted documents
5077        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5078        assert!(result.is_some());
5079    }
5080
5081    #[test]
5082    fn test_strict_mode_rejects_unencrypted_documents() {
5083        let secret = [0x42u8; 32];
5084        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
5085        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
5086
5087        let observer = Arc::new(CollectingObserver::new());
5088        mesh2.add_observer(observer.clone());
5089
5090        // mesh1 sends unencrypted document
5091        let doc = mesh1.build_document();
5092        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
5093
5094        // mesh2 (strict mode) should reject unencrypted documents
5095        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5096        assert!(result.is_none());
5097
5098        // Should have SecurityViolation event
5099        let events = observer.events();
5100        assert!(events.iter().any(|e| matches!(
5101            e,
5102            PeatEvent::SecurityViolation {
5103                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
5104                ..
5105            }
5106        )));
5107    }
5108
5109    #[test]
5110    fn test_non_strict_mode_accepts_unencrypted_documents() {
5111        let secret = [0x42u8; 32];
5112        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
5113        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
5114
5115        // mesh1 sends unencrypted document
5116        let doc = mesh1.build_document();
5117
5118        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
5119        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5120        assert!(result.is_some());
5121    }
5122
5123    #[test]
5124    fn test_strict_mode_security_violation_event_includes_source() {
5125        let secret = [0x42u8; 32];
5126        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
5127        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
5128
5129        let observer = Arc::new(CollectingObserver::new());
5130        mesh2.add_observer(observer.clone());
5131
5132        let doc = mesh1.build_document();
5133
5134        // Use on_ble_data_received with identifier to test source is captured
5135        mesh2.on_ble_discovered(
5136            "test-device-uuid",
5137            Some("PEAT_TEST-11111111"),
5138            -65,
5139            Some("TEST"),
5140            500,
5141        );
5142        mesh2.on_ble_connected("test-device-uuid", 600);
5143
5144        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
5145
5146        // Check SecurityViolation event has source
5147        let events = observer.events();
5148        let violation = events.iter().find(|e| {
5149            matches!(
5150                e,
5151                PeatEvent::SecurityViolation {
5152                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
5153                    ..
5154                }
5155            )
5156        });
5157        assert!(violation.is_some());
5158
5159        if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
5160            assert!(source.is_some());
5161            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
5162        }
5163    }
5164
5165    #[test]
5166    fn test_decryption_failure_emits_security_violation() {
5167        let secret1 = [0x42u8; 32];
5168        let secret2 = [0x43u8; 32]; // Different key
5169        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
5170        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
5171
5172        let observer = Arc::new(CollectingObserver::new());
5173        mesh2.add_observer(observer.clone());
5174
5175        // mesh1 sends encrypted document
5176        let doc = mesh1.build_document();
5177
5178        // mesh2 cannot decrypt (wrong key)
5179        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5180        assert!(result.is_none());
5181
5182        // Should have SecurityViolation event for decryption failure
5183        let events = observer.events();
5184        assert!(events.iter().any(|e| matches!(
5185            e,
5186            PeatEvent::SecurityViolation {
5187                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
5188                ..
5189            }
5190        )));
5191    }
5192
5193    #[test]
5194    fn test_strict_mode_builder_chain() {
5195        let secret = [0x42u8; 32];
5196        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
5197            .with_encryption(secret)
5198            .with_strict_encryption()
5199            .with_sync_interval(10_000)
5200            .with_peer_timeout(60_000);
5201
5202        let mesh = PeatMesh::new(config);
5203
5204        assert!(mesh.is_encryption_enabled());
5205        assert!(mesh.is_strict_encryption_enabled());
5206    }
5207
5208    // ==================== Multi-Hop Relay Tests ====================
5209
5210    fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
5211        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
5212        PeatMesh::new(config)
5213    }
5214
5215    #[test]
5216    fn test_relay_disabled_by_default() {
5217        let mesh = create_mesh(0x11111111, "ALPHA-1");
5218        assert!(!mesh.is_relay_enabled());
5219    }
5220
5221    #[test]
5222    fn test_relay_enabled() {
5223        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5224        assert!(mesh.is_relay_enabled());
5225    }
5226
5227    #[test]
5228    fn test_relay_config_builder() {
5229        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
5230            .with_relay()
5231            .with_max_relay_hops(5)
5232            .with_relay_fanout(3)
5233            .with_seen_cache_ttl(60_000);
5234
5235        assert!(config.enable_relay);
5236        assert_eq!(config.max_relay_hops, 5);
5237        assert_eq!(config.relay_fanout, 3);
5238        assert_eq!(config.seen_cache_ttl_ms, 60_000);
5239    }
5240
5241    #[test]
5242    fn test_seen_message_deduplication() {
5243        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5244        let origin = NodeId::new(0x22222222);
5245        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
5246
5247        // First time - should be new
5248        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
5249
5250        // Second time - should be duplicate
5251        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
5252
5253        assert_eq!(mesh.seen_cache_size(), 1);
5254    }
5255
5256    #[test]
5257    fn test_wrap_for_relay() {
5258        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5259
5260        let payload = vec![1, 2, 3, 4, 5];
5261        let wrapped = mesh.wrap_for_relay(payload.clone());
5262
5263        // Should start with relay envelope marker
5264        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
5265
5266        // Decode and verify
5267        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
5268        assert_eq!(envelope.payload, payload);
5269        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
5270        assert_eq!(envelope.hop_count, 0);
5271    }
5272
5273    #[test]
5274    fn test_process_relay_envelope_new_message() {
5275        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5276        let observer = Arc::new(CollectingObserver::new());
5277        mesh.add_observer(observer.clone());
5278
5279        // Create an envelope from another node
5280        let payload = vec![1, 2, 3, 4, 5];
5281        let envelope =
5282            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5283                .with_max_hops(7);
5284        let data = envelope.encode();
5285
5286        // Process it
5287        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5288
5289        assert!(decision.is_some());
5290        let decision = decision.unwrap();
5291        assert_eq!(decision.payload, payload);
5292        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
5293        assert_eq!(decision.hop_count, 0);
5294        assert!(decision.should_relay);
5295        assert!(decision.relay_envelope.is_some());
5296
5297        // Relay envelope should have incremented hop count
5298        let relay_env = decision.relay_envelope.unwrap();
5299        assert_eq!(relay_env.hop_count, 1);
5300    }
5301
5302    #[test]
5303    fn test_process_relay_envelope_duplicate() {
5304        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5305        let observer = Arc::new(CollectingObserver::new());
5306        mesh.add_observer(observer.clone());
5307
5308        let payload = vec![1, 2, 3, 4, 5];
5309        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
5310        let data = envelope.encode();
5311
5312        // First time - should succeed
5313        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5314        assert!(decision.is_some());
5315
5316        // Second time - should be duplicate
5317        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
5318        assert!(decision.is_none());
5319
5320        // Should have DuplicateMessageDropped event
5321        let events = observer.events();
5322        assert!(events
5323            .iter()
5324            .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
5325    }
5326
5327    #[test]
5328    fn test_process_relay_envelope_ttl_expired() {
5329        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5330        let observer = Arc::new(CollectingObserver::new());
5331        mesh.add_observer(observer.clone());
5332
5333        // Create envelope at max hops (TTL expired)
5334        let payload = vec![1, 2, 3, 4, 5];
5335        let mut envelope =
5336            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5337                .with_max_hops(3);
5338
5339        // Simulate having been relayed 3 times already
5340        envelope = envelope.relay().unwrap(); // hop 1
5341        envelope = envelope.relay().unwrap(); // hop 2
5342        envelope = envelope.relay().unwrap(); // hop 3 - at max now
5343
5344        let data = envelope.encode();
5345
5346        // Process - should still process locally but not relay further
5347        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5348
5349        assert!(decision.is_some());
5350        let decision = decision.unwrap();
5351        assert_eq!(decision.payload, payload);
5352        assert!(!decision.should_relay); // Cannot relay further
5353        assert!(decision.relay_envelope.is_none());
5354
5355        // Should have MessageTtlExpired event
5356        let events = observer.events();
5357        assert!(events
5358            .iter()
5359            .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
5360    }
5361
5362    #[test]
5363    fn test_build_relay_document() {
5364        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5365
5366        let relay_doc = mesh.build_relay_document();
5367
5368        // Should be a valid relay envelope
5369        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
5370
5371        // Decode and verify it contains a valid document
5372        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
5373        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
5374
5375        // The payload should be a valid PeatDocument
5376        let doc = crate::document::PeatDocument::decode(&envelope.payload);
5377        assert!(doc.is_some());
5378    }
5379
5380    #[test]
5381    fn test_relay_targets_excludes_source() {
5382        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5383
5384        // Add some peers
5385        mesh.on_ble_discovered(
5386            "peer-1",
5387            Some("PEAT_TEST-22222222"),
5388            -60,
5389            Some("TEST"),
5390            1000,
5391        );
5392        mesh.on_ble_connected("peer-1", 1000);
5393
5394        mesh.on_ble_discovered(
5395            "peer-2",
5396            Some("PEAT_TEST-33333333"),
5397            -65,
5398            Some("TEST"),
5399            1000,
5400        );
5401        mesh.on_ble_connected("peer-2", 1000);
5402
5403        mesh.on_ble_discovered(
5404            "peer-3",
5405            Some("PEAT_TEST-44444444"),
5406            -70,
5407            Some("TEST"),
5408            1000,
5409        );
5410        mesh.on_ble_connected("peer-3", 1000);
5411
5412        // Get relay targets excluding peer-2
5413        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
5414
5415        // Should not include peer-2 in targets
5416        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
5417    }
5418
5419    #[test]
5420    fn test_clear_seen_cache() {
5421        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5422        let origin = NodeId::new(0x22222222);
5423
5424        // Add some messages
5425        mesh.mark_message_seen(
5426            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
5427            origin,
5428            1000,
5429        );
5430        mesh.mark_message_seen(
5431            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
5432            origin,
5433            2000,
5434        );
5435
5436        assert_eq!(mesh.seen_cache_size(), 2);
5437
5438        // Clear
5439        mesh.clear_seen_cache();
5440        assert_eq!(mesh.seen_cache_size(), 0);
5441    }
5442
5443    // ====================================================================
5444    // Universal Document transport (peat-lite envelope) integration tests
5445    // ====================================================================
5446
5447    /// **Outbound encode end-to-end**: build a peat-lite envelope payload,
5448    /// run it through `publish_peat_lite_document`, peel the encryption,
5449    /// verify the wire bytes start with peat-lite's MAGIC + carry the
5450    /// expected sender node id + seq num + envelope contents. Locks the
5451    /// outbound contract that the marker-bridge demo will rely on.
5452    #[cfg(feature = "peat-lite-frame")]
5453    #[test]
5454    fn publish_peat_lite_document_emits_well_formed_frame() {
5455        use peat_lite::protocol::document as pl_doc;
5456
5457        let mesh = create_mesh(0xCAFEBABE, "ALPHA-1");
5458
5459        // Build a Document envelope payload (mirrors what
5460        // `peat_mesh::transport::document_codec::encode_document`
5461        // produces).
5462        let mut env = vec![0u8; 256];
5463        let env_n = pl_doc::encode(
5464            0,
5465            "markers",
5466            "test-id",
5467            1_700_000_000_000,
5468            b"body",
5469            &mut env,
5470        )
5471        .expect("envelope encode");
5472        env.truncate(env_n);
5473
5474        // Publish via the peat-mesh outbound API. `seq_num` is now
5475        // sourced from the internal monotonic counter — first publish
5476        // starts at 0 and increments per emission.
5477        let wire = mesh
5478            .publish_peat_lite_document(&env)
5479            .expect("publish should succeed");
5480
5481        // Peel the wire-level encryption (matches what the receive
5482        // path does on inbound).
5483        let decrypted = mesh
5484            .decrypt_document(&wire, None)
5485            .expect("decrypt own outbound");
5486
5487        // Decrypted bytes are the peat-lite frame; verify shape.
5488        assert!(
5489            crate::peat_lite_frame::is_peat_lite_frame(&decrypted),
5490            "decrypted outbound must start with peat-lite MAGIC"
5491        );
5492
5493        match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
5494            crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(doc) => {
5495                assert_eq!(doc.source_node_id, 0xCAFEBABE);
5496                assert_eq!(doc.seq_num, 0, "first publish must use seq=0");
5497                assert_eq!(doc.collection, "markers");
5498                assert_eq!(doc.doc_id, "test-id");
5499                assert_eq!(doc.body.as_slice(), b"body");
5500            }
5501            other => panic!("decoded outbound frame mis-routed: {:?}", other),
5502        }
5503
5504        // Second publish must increment — locks the dedup contract
5505        // that the QA review on the initial PR flagged. If a future
5506        // refactor drops the internal counter back to a per-call
5507        // parameter with a 0 escape hatch, this assertion catches it.
5508        let wire2 = mesh
5509            .publish_peat_lite_document(&env)
5510            .expect("second publish should succeed");
5511        let decrypted2 = mesh
5512            .decrypt_document(&wire2, None)
5513            .expect("decrypt second outbound");
5514        match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted2) {
5515            crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(doc) => {
5516                assert_eq!(
5517                    doc.seq_num, 1,
5518                    "second publish must increment seq, not collide with first"
5519                );
5520            }
5521            other => panic!("second outbound frame mis-routed: {:?}", other),
5522        }
5523    }
5524
5525    /// **Routing-safety end-to-end**: a peat-lite frame produced by
5526    /// `publish_peat_lite_document` must NOT be classified as a
5527    /// translator (0xB6) frame on the receive side. peat-lite's
5528    /// magic byte 0x50 is outside translator range; this test pins
5529    /// the routing invariant so a future dispatcher refactor can't
5530    /// silently re-route.
5531    #[cfg(feature = "peat-lite-frame")]
5532    #[test]
5533    fn peat_lite_frames_do_not_collide_with_translator_routing() {
5534        use peat_lite::protocol::document as pl_doc;
5535
5536        let mesh = create_mesh(0x11111111, "ALPHA-1");
5537        let mut env = vec![0u8; 64];
5538        let n = pl_doc::encode(0, "tracks", "id", 0, b"x", &mut env).expect("envelope encode");
5539        env.truncate(n);
5540
5541        let wire = mesh.publish_peat_lite_document(&env).expect("publish");
5542        let decrypted = mesh.decrypt_document(&wire, None).expect("decrypt");
5543
5544        // First byte MUST be peat-lite's 'P' (0x50), not in the
5545        // translator range 0xB6..=0xBF.
5546        assert_eq!(
5547            decrypted[0], 0x50,
5548            "peat-lite frame's first byte must be 0x50, not in translator range"
5549        );
5550        assert!(
5551            !(0xB6..=0xBF).contains(&decrypted[0]),
5552            "peat-lite frame must not collide with translator marker range"
5553        );
5554    }
5555
5556    /// **DataReceivedResult binding-shape stability**: the
5557    /// `peat_lite_document` field is unconditional in the struct
5558    /// definition, so consumers compiled without the
5559    /// `peat-lite-frame` feature still get the field (always `None`).
5560    /// This locks the binding shape contract so UniFFI consumers
5561    /// don't see drift across feature combos.
5562    #[test]
5563    fn data_received_result_default_has_peat_lite_field() {
5564        let result = DataReceivedResult::default();
5565        assert!(
5566            result.peat_lite_document.is_none(),
5567            "default DataReceivedResult must have peat_lite_document = None"
5568        );
5569    }
5570}