Skip to main content

peat_btle/
peat_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PeatMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for Peat BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use peat_btle::peat_mesh::{PeatMesh, PeatMeshConfig};
26//! use peat_btle::observer::{PeatEvent, PeatObserver};
27//! use peat_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = PeatMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = PeatMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl PeatObserver for MyObserver {
39//!     fn on_event(&self, event: PeatEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("PEAT_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66    TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72    ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73    PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78    RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94/// Configuration for PeatMesh
95#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97    /// Our node ID
98    pub node_id: NodeId,
99
100    /// Our callsign (e.g., "ALPHA-1")
101    pub callsign: String,
102
103    /// Mesh ID to filter peers (e.g., "DEMO")
104    pub mesh_id: String,
105
106    /// Peripheral type for this device
107    pub peripheral_type: PeripheralType,
108
109    /// Peer management configuration
110    pub peer_config: PeerManagerConfig,
111
112    /// Sync interval in milliseconds (how often to broadcast state)
113    pub sync_interval_ms: u64,
114
115    /// Whether to auto-broadcast on emergency/ack
116    pub auto_broadcast_events: bool,
117
118    /// Optional shared secret for mesh-wide encryption (32 bytes)
119    ///
120    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
121    /// transmission and decrypted upon receipt. All nodes in the mesh must
122    /// share the same secret to communicate.
123    pub encryption_secret: Option<[u8; 32]>,
124
125    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
126    ///
127    /// When true and encryption is enabled, any unencrypted documents received
128    /// will be rejected and trigger a SecurityViolation event. This prevents
129    /// downgrade attacks where an adversary sends unencrypted malicious documents.
130    ///
131    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
132    pub strict_encryption: bool,
133
134    /// Enable multi-hop relay
135    ///
136    /// When enabled, received messages will be forwarded to other peers based
137    /// on the gossip strategy. Requires message deduplication to prevent loops.
138    ///
139    /// Default: false
140    pub enable_relay: bool,
141
142    /// Maximum hops for relay messages (TTL)
143    ///
144    /// Messages will not be relayed beyond this many hops from the origin.
145    /// Default: 7
146    pub max_relay_hops: u8,
147
148    /// Gossip fanout for relay
149    ///
150    /// Number of peers to forward each message to. Higher values increase
151    /// convergence speed but also bandwidth usage.
152    /// Default: 2
153    pub relay_fanout: usize,
154
155    /// TTL for seen message cache (milliseconds)
156    ///
157    /// How long to remember message IDs for deduplication.
158    /// Default: 300_000 (5 minutes)
159    pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163    /// Create a new configuration with required fields
164    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165        Self {
166            node_id,
167            callsign: callsign.into(),
168            mesh_id: mesh_id.into(),
169            peripheral_type: PeripheralType::SoldierSensor,
170            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171            sync_interval_ms: 5000,
172            auto_broadcast_events: true,
173            encryption_secret: None,
174            strict_encryption: false,
175            enable_relay: false,
176            max_relay_hops: DEFAULT_MAX_HOPS,
177            relay_fanout: 2,
178            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179        }
180    }
181
182    /// Enable mesh-wide encryption with a shared secret
183    ///
184    /// All documents will be encrypted using ChaCha20-Poly1305 before
185    /// transmission. All mesh participants must use the same secret.
186    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187        self.encryption_secret = Some(secret);
188        self
189    }
190
191    /// Set peripheral type
192    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193        self.peripheral_type = ptype;
194        self
195    }
196
197    /// Set sync interval
198    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199        self.sync_interval_ms = interval_ms;
200        self
201    }
202
203    /// Set peer timeout
204    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205        self.peer_config.peer_timeout_ms = timeout_ms;
206        self
207    }
208
209    /// Set max peers (for embedded systems)
210    pub fn with_max_peers(mut self, max: usize) -> Self {
211        self.peer_config.max_peers = max;
212        self
213    }
214
215    /// Enable strict encryption mode
216    ///
217    /// When enabled (and encryption is also enabled), any unencrypted documents
218    /// received will be rejected and trigger a `SecurityViolation` event.
219    /// This prevents downgrade attacks.
220    ///
221    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
222    pub fn with_strict_encryption(mut self) -> Self {
223        self.strict_encryption = true;
224        self
225    }
226
227    /// Enable multi-hop relay
228    ///
229    /// When enabled, received messages will be forwarded to other connected peers
230    /// based on the gossip strategy. This enables mesh-wide message propagation.
231    pub fn with_relay(mut self) -> Self {
232        self.enable_relay = true;
233        self
234    }
235
236    /// Set maximum relay hops (TTL)
237    ///
238    /// Messages will not be relayed beyond this many hops from the origin.
239    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240        self.max_relay_hops = max_hops;
241        self
242    }
243
244    /// Set gossip fanout for relay
245    ///
246    /// Number of peers to forward each message to.
247    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248        self.relay_fanout = fanout.max(1);
249        self
250    }
251
252    /// Set TTL for seen message cache
253    ///
254    /// How long to remember message IDs for deduplication (milliseconds).
255    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256        self.seen_cache_ttl_ms = ttl_ms;
257        self
258    }
259}
260
261/// Type alias for app document storage to reduce type complexity
262#[cfg(feature = "std")]
263type AppDocumentStore =
264    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266/// Main facade for Peat BLE mesh operations
267///
268/// Composes peer management, document sync, and observer notifications.
269/// Platform implementations call into this from their BLE callbacks.
270#[cfg(feature = "std")]
271pub struct PeatMesh {
272    /// Configuration
273    config: PeatMeshConfig,
274
275    /// Peer manager
276    peer_manager: PeerManager,
277
278    /// Document sync
279    document_sync: DocumentSync,
280
281    /// Observer manager
282    observers: ObserverManager,
283
284    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
285    last_sync_ms: std::sync::atomic::AtomicU32,
286
287    /// Last cleanup time
288    last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290    /// Optional mesh-wide encryption key (derived from shared secret)
291    encryption_key: Option<MeshEncryptionKey>,
292
293    /// Optional per-peer E2EE session manager
294    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296    /// Connection state graph for tracking peer connection lifecycle
297    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299    /// Seen message cache for relay deduplication
300    seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302    /// Gossip strategy for relay peer selection
303    gossip_strategy: Box<dyn GossipStrategy>,
304
305    /// Delta encoder for per-peer sync state tracking
306    ///
307    /// Tracks what data has been sent to each peer to enable delta sync
308    /// (sending only changes instead of full documents).
309    delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311    /// This node's cryptographic identity (Ed25519 keypair)
312    ///
313    /// When set, the node_id is derived from the public key and documents
314    /// can be signed for authenticity verification.
315    identity: Option<DeviceIdentity>,
316
317    /// TOFU identity registry for tracking peer identities
318    ///
319    /// Maps node_id to public key on first contact, rejects mismatches.
320    identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322    /// Peripheral state received from peers
323    ///
324    /// Stores the most recent peripheral data (callsign, location, etc.)
325    /// received from each peer via document sync.
326    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328    /// Document registry for app-layer CRDT types.
329    ///
330    /// Enables external crates to register custom document types that sync
331    /// through the mesh using the extensible registry pattern.
332    document_registry: DocumentRegistry,
333
334    /// Storage for app-layer documents.
335    ///
336    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
337    /// document instance. Values are type-erased boxes that can be downcast
338    /// using the document registry handlers.
339    app_documents: AppDocumentStore,
340
341    /// ADR-059 BleTranslator instance for inbound translator-frame decoding.
342    ///
343    /// One translator per PeatMesh, constructed with `TranslationConfig::default()`.
344    /// Operators wanting custom collection names can swap via
345    /// `set_translator_config`. Always present when the `mesh-translator`
346    /// feature is enabled — the receive-dispatch branch in
347    /// `on_ble_data_received_anonymous` (and siblings) calls
348    /// `decode_inbound_sync` on it for every 0xB6 frame.
349    #[cfg(feature = "mesh-translator")]
350    ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352    /// Optional `DecodedDocumentCallback` set by peat-ffi (Slice 1.b.4).
353    ///
354    /// `None` until installed. The amendment specifies a real release-skew
355    /// window between Slice 1.b.3 (this crate) shipping and Slice 1.b.4
356    /// (peat-ffi) wiring the callback — during that window translator
357    /// frames are decoded successfully and silently dropped, with a
358    /// `PeatEvent::TranslatorNoCallback { collection, peer }` event
359    /// emitted on the observer channel so the gap is operator-observable
360    /// in real time (see `crate::observer::PeatEvent`).
361    #[cfg(feature = "mesh-translator")]
362    decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
363}
364
365#[cfg(feature = "std")]
366impl PeatMesh {
367    /// Create a new PeatMesh instance
368    pub fn new(config: PeatMeshConfig) -> Self {
369        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
370        let document_sync = DocumentSync::with_peripheral_type(
371            config.node_id,
372            &config.callsign,
373            config.peripheral_type,
374        );
375
376        // Derive encryption key from shared secret if configured
377        let encryption_key = config
378            .encryption_secret
379            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
380
381        // Create connection state graph with config thresholds
382        let connection_graph = ConnectionStateGraph::with_config(
383            config.peer_config.rssi_degraded_threshold,
384            config.peer_config.lost_timeout_ms,
385        );
386
387        // Create seen message cache for relay deduplication
388        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
389
390        // Create gossip strategy for relay
391        let gossip_strategy: Box<dyn GossipStrategy> =
392            Box::new(RandomFanout::new(config.relay_fanout));
393
394        // Create delta encoder for per-peer sync state tracking
395        let delta_encoder = DeltaEncoder::new(config.node_id);
396
397        let document_registry = DocumentRegistry::new();
398
399        Self {
400            config,
401            peer_manager,
402            document_sync,
403            observers: ObserverManager::new(),
404            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
405            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
406            encryption_key,
407            peer_sessions: std::sync::Mutex::new(None),
408            connection_graph: std::sync::Mutex::new(connection_graph),
409            seen_cache: std::sync::Mutex::new(seen_cache),
410            gossip_strategy,
411            delta_encoder: std::sync::Mutex::new(delta_encoder),
412            identity: None,
413            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
414            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
415            document_registry,
416            app_documents: std::sync::RwLock::new(HashMap::new()),
417            #[cfg(feature = "mesh-translator")]
418            ble_translator: std::sync::RwLock::new(
419                crate::translator::BleTranslator::with_defaults(),
420            ),
421            #[cfg(feature = "mesh-translator")]
422            decoded_document_callback: std::sync::RwLock::new(None),
423        }
424    }
425
426    /// Create a new PeatMesh with a cryptographic identity
427    ///
428    /// The node_id will be derived from the identity's public key, overriding
429    /// any node_id specified in the config. This ensures cryptographic binding
430    /// between node_id and identity.
431    pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
432        // Override node_id with identity-derived value
433        let mut config = config;
434        config.node_id = identity.node_id();
435
436        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
437        let document_sync = DocumentSync::with_peripheral_type(
438            config.node_id,
439            &config.callsign,
440            config.peripheral_type,
441        );
442
443        let encryption_key = config
444            .encryption_secret
445            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
446
447        let connection_graph = ConnectionStateGraph::with_config(
448            config.peer_config.rssi_degraded_threshold,
449            config.peer_config.lost_timeout_ms,
450        );
451
452        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
453        let gossip_strategy: Box<dyn GossipStrategy> =
454            Box::new(RandomFanout::new(config.relay_fanout));
455        let delta_encoder = DeltaEncoder::new(config.node_id);
456
457        let document_registry = DocumentRegistry::new();
458
459        Self {
460            config,
461            peer_manager,
462            document_sync,
463            observers: ObserverManager::new(),
464            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
465            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
466            encryption_key,
467            peer_sessions: std::sync::Mutex::new(None),
468            connection_graph: std::sync::Mutex::new(connection_graph),
469            seen_cache: std::sync::Mutex::new(seen_cache),
470            gossip_strategy,
471            delta_encoder: std::sync::Mutex::new(delta_encoder),
472            identity: Some(identity),
473            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
474            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
475            document_registry,
476            app_documents: std::sync::RwLock::new(HashMap::new()),
477            #[cfg(feature = "mesh-translator")]
478            ble_translator: std::sync::RwLock::new(
479                crate::translator::BleTranslator::with_defaults(),
480            ),
481            #[cfg(feature = "mesh-translator")]
482            decoded_document_callback: std::sync::RwLock::new(None),
483        }
484    }
485
486    /// Create a new PeatMesh from genesis data
487    ///
488    /// This is the recommended way to create a mesh for production use.
489    /// The mesh will be configured with:
490    /// - node_id derived from identity
491    /// - mesh_id from genesis
492    /// - encryption enabled using genesis-derived secret
493    pub fn from_genesis(
494        genesis: &crate::security::MeshGenesis,
495        identity: DeviceIdentity,
496        callsign: &str,
497    ) -> Self {
498        let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
499            .with_encryption(genesis.encryption_secret());
500
501        Self::with_identity(config, identity)
502    }
503
504    /// Create a PeatMesh from persisted state.
505    ///
506    /// Restores mesh configuration from previously saved state, including:
507    /// - Device identity (Ed25519 keypair)
508    /// - Mesh genesis (if present)
509    /// - Identity registry (TOFU cache)
510    ///
511    /// Use this on device boot to restore mesh membership without re-provisioning.
512    ///
513    /// # Arguments
514    ///
515    /// * `state` - Previously persisted state
516    /// * `callsign` - Human-readable identifier (may differ from original)
517    ///
518    /// # Errors
519    ///
520    /// Returns `PersistenceError` if the identity cannot be restored.
521    ///
522    /// # Example
523    ///
524    /// ```ignore
525    /// // On boot, restore from secure storage
526    /// let state = PersistedState::load(&storage)?;
527    /// let mesh = PeatMesh::from_persisted(state, "SENSOR-01")?;
528    /// ```
529    #[cfg(feature = "std")]
530    pub fn from_persisted(
531        state: crate::security::PersistedState,
532        callsign: &str,
533    ) -> Result<Self, crate::security::PersistenceError> {
534        // Restore identity
535        let identity = state.restore_identity()?;
536
537        // Restore genesis (if present)
538        let genesis = state.restore_genesis();
539
540        // Create mesh with or without genesis
541        let mesh = if let Some(ref gen) = genesis {
542            Self::from_genesis(gen, identity, callsign)
543        } else {
544            let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
545            Self::with_identity(config, identity)
546        };
547
548        // Restore identity registry
549        let restored_registry = state.restore_registry();
550        if let Ok(mut registry) = mesh.identity_registry.lock() {
551            *registry = restored_registry;
552        }
553
554        log::info!(
555            "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
556            mesh.config.node_id.as_u32(),
557            mesh.known_identity_count()
558        );
559
560        Ok(mesh)
561    }
562
563    /// Create persisted state from current mesh state.
564    ///
565    /// Captures the current identity, genesis, and registry for persistence.
566    /// Call this periodically or before shutdown to save state.
567    ///
568    /// # Arguments
569    ///
570    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
571    ///
572    /// # Returns
573    ///
574    /// `None` if the mesh has no identity bound.
575    #[cfg(feature = "std")]
576    pub fn to_persisted_state(
577        &self,
578        genesis: Option<&crate::security::MeshGenesis>,
579    ) -> Option<crate::security::PersistedState> {
580        let identity = self.identity.as_ref()?;
581        let registry = self.identity_registry.lock().ok()?;
582
583        Some(crate::security::PersistedState::with_registry(
584            identity, genesis, &registry,
585        ))
586    }
587
588    // ==================== ADR-059 translator-frame receive ====================
589
590    /// Install the [`DecodedDocumentCallback`](crate::DecodedDocumentCallback)
591    /// invoked when a 0xB6 translator frame decodes successfully.
592    ///
593    /// Idempotent — calling again replaces the previous callback. Pass an
594    /// `Arc<dyn DecodedDocumentCallback>` so the receive dispatch can clone
595    /// the handle without holding the lock during invocation.
596    ///
597    /// Released-skew note: Slice 1.b.3 ships peat-btle with this setter and
598    /// the receive-dispatch branch; Slice 1.b.4 wires the callback from
599    /// peat-ffi in a *later* release. Until peat-ffi installs one, decoded
600    /// frames are silently dropped (logged at debug level).
601    #[cfg(feature = "mesh-translator")]
602    pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
603        if let Ok(mut slot) = self.decoded_document_callback.write() {
604            *slot = Some(cb);
605        }
606    }
607
608    /// Replace the active `BleTranslator` configuration. Optional; the
609    /// default (operator collection names = "tracks"/"platforms"/
610    /// "alerts"/"canned_messages") covers most deployments.
611    #[cfg(feature = "mesh-translator")]
612    pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
613        if let Ok(mut t) = self.ble_translator.write() {
614            *t = crate::translator::BleTranslator::new(config);
615        }
616    }
617
618    /// Translator-frame / reserved-marker dispatcher. Returns `true` if the
619    /// first byte of `decrypted` fell in `0xB6..=0xBF` (handled or silently
620    /// dropped); the caller MUST stop processing and return `None`. Returns
621    /// `false` if the byte was something else; the caller continues with
622    /// the existing legacy / delta-document flow.
623    ///
624    /// `peer` is the BLE peer's identifier (typically a peripheral address).
625    /// `source_node` is the peer's `NodeId` when known — used to populate
626    /// `TranslationContext.local_wire_id` (the hex-u32 form), which the
627    /// `tracks` collection's decoder requires per the trait contract.
628    /// Anonymous receive paths pass `None`; tracks decoding will then return
629    /// an `Err` (logged + dropped) but other collections succeed.
630    #[cfg(feature = "mesh-translator")]
631    fn try_handle_translator_marker(
632        &self,
633        decrypted: &[u8],
634        peer: Option<&str>,
635        source_node: Option<NodeId>,
636    ) -> bool {
637        if decrypted.is_empty() {
638            return false;
639        }
640        let marker = decrypted[0];
641
642        // Reserved range — silent drop for forward-compat. Receivers running
643        // *this* peat-btle release MUST drop unknown future-translator
644        // markers without falling through to merge_document, otherwise a
645        // 0xB7..=0xBF frame would corrupt the GCounter (ADR-059 Amendment 1
646        // §"Backwards compatibility… 2").
647        if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
648            log::warn!(
649                "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
650                decrypted.len()
651            );
652            return true;
653        }
654
655        if marker != TRANSLATOR_FRAME_MARKER {
656            return false;
657        }
658
659        // 0xB6 translator dispatch.
660        if decrypted.len() < 2 {
661            log::warn!(
662                "ble: dropping truncated translator frame (len={}, missing collection code)",
663                decrypted.len()
664            );
665            return true;
666        }
667
668        let code = decrypted[1];
669        let payload = &decrypted[2..];
670
671        // Resolve the collection name + build a TranslationContext under
672        // the read-lock, drop the lock before invoking decode_inbound_sync.
673        let (collection, decode_result) = {
674            let translator = match self.ble_translator.read() {
675                Ok(g) => g,
676                Err(_) => {
677                    log::warn!("ble: translator RwLock poisoned; dropping frame");
678                    return true;
679                }
680            };
681            let collection = match translator.code_to_collection(code) {
682                Some(c) => c.to_string(),
683                None => {
684                    log::warn!(
685                        "ble: dropping translator frame with unknown collection code 0x{code:02X}"
686                    );
687                    return true;
688                }
689            };
690
691            let mut ctx =
692                peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
693                    .with_collection(collection.clone());
694            if let Some(node) = source_node {
695                ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
696            }
697
698            let result = translator.decode_inbound_sync(payload, &ctx);
699            (collection, result)
700        };
701
702        match decode_result {
703            Ok(Some(doc)) => {
704                // Snapshot the callback Arc so we don't hold the read lock
705                // during user-supplied dispatch.
706                let cb = self
707                    .decoded_document_callback
708                    .read()
709                    .ok()
710                    .and_then(|g| g.as_ref().cloned());
711                match cb {
712                    Some(cb) => cb.on_document(&collection, doc, peer),
713                    None => {
714                        // Slice 1.b.3-ships-before-1.b.4 release-skew window:
715                        // decoded successfully but no consumer is wired yet.
716                        // Per ADR-059 Amendment 1, this branch must surface
717                        // through the operator-observable PeatEvent channel
718                        // so deployments staging the rollout see, in real
719                        // time, how many translator-frame payloads the
720                        // bridge is decoding into the void. `log::debug!`
721                        // alone is not a monitoring signal — it is disabled
722                        // at default log levels.
723                        log::debug!(
724                            "ble: decoded {} frame but no DecodedDocumentCallback installed",
725                            collection
726                        );
727                        self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
728                            collection: collection.clone(),
729                            peer: peer.map(str::to_string),
730                        });
731                    }
732                }
733            }
734            Ok(None) => {
735                log::debug!(
736                    "ble: codec declined translator frame for collection {}",
737                    collection
738                );
739            }
740            Err(e) => {
741                log::warn!(
742                    "ble: translator frame decode error (collection={}): {:#}",
743                    collection,
744                    e
745                );
746            }
747        }
748
749        true
750    }
751
752    // ==================== Encryption ====================
753
754    /// Check if mesh-wide encryption is enabled
755    pub fn is_encryption_enabled(&self) -> bool {
756        self.encryption_key.is_some()
757    }
758
759    /// Check if strict encryption mode is enabled
760    ///
761    /// Returns true only if both encryption and strict_encryption are enabled.
762    pub fn is_strict_encryption_enabled(&self) -> bool {
763        self.config.strict_encryption && self.encryption_key.is_some()
764    }
765
766    /// Enable mesh-wide encryption with a shared secret
767    ///
768    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
769    /// All mesh participants must use the same secret to communicate.
770    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
771        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
772            &self.config.mesh_id,
773            secret,
774        ));
775    }
776
777    /// Disable mesh-wide encryption
778    pub fn disable_encryption(&mut self) {
779        self.encryption_key = None;
780    }
781
782    /// Encrypt document bytes for transmission
783    ///
784    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
785    /// original bytes if encryption is disabled.
786    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
787        match &self.encryption_key {
788            Some(key) => {
789                // Encrypt and prepend marker
790                match key.encrypt_to_bytes(plaintext) {
791                    Ok(ciphertext) => {
792                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
793                        buf.push(ENCRYPTED_MARKER);
794                        buf.push(0x00); // reserved
795                        buf.extend_from_slice(&ciphertext);
796                        buf
797                    }
798                    Err(e) => {
799                        log::error!("Encryption failed: {}", e);
800                        // Fall back to unencrypted on error (shouldn't happen)
801                        plaintext.to_vec()
802                    }
803                }
804            }
805            None => plaintext.to_vec(),
806        }
807    }
808
809    /// Decrypt document bytes received from peer
810    ///
811    /// Returns the decrypted bytes if encrypted and valid, or the original
812    /// bytes if not encrypted. Returns None if decryption fails.
813    ///
814    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
815    /// unencrypted documents are rejected and trigger a SecurityViolation event.
816    fn decrypt_document<'a>(
817        &self,
818        data: &'a [u8],
819        source_hint: Option<&str>,
820    ) -> Option<std::borrow::Cow<'a, [u8]>> {
821        log::debug!(
822            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
823            data.len(),
824            data.first().copied().unwrap_or(0),
825            source_hint
826        );
827
828        // Check for encrypted marker
829        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
830            // Encrypted document
831            let _reserved = data[1];
832            let encrypted_payload = &data[2..];
833
834            log::debug!(
835                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
836                encrypted_payload.len()
837            );
838
839            match &self.encryption_key {
840                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
841                    Ok(plaintext) => {
842                        log::debug!(
843                            "decrypt_document: SUCCESS, plaintext len={}",
844                            plaintext.len()
845                        );
846                        Some(std::borrow::Cow::Owned(plaintext))
847                    }
848                    Err(e) => {
849                        log::warn!(
850                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
851                            e,
852                            encrypted_payload.len(),
853                            source_hint
854                        );
855                        self.notify(PeatEvent::SecurityViolation {
856                            kind: SecurityViolationKind::DecryptionFailed,
857                            source: source_hint.map(String::from),
858                        });
859                        None
860                    }
861                },
862                None => {
863                    log::warn!(
864                        "decrypt_document: encryption not enabled but received encrypted doc"
865                    );
866                    None
867                }
868            }
869        } else {
870            // Unencrypted document
871            // Check strict encryption mode
872            if self.config.strict_encryption && self.encryption_key.is_some() {
873                log::warn!(
874                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
875                    source_hint
876                );
877                self.notify(PeatEvent::SecurityViolation {
878                    kind: SecurityViolationKind::UnencryptedInStrictMode,
879                    source: source_hint.map(String::from),
880                });
881                None
882            } else {
883                // Permissive mode: accept unencrypted for backward compatibility
884                Some(std::borrow::Cow::Borrowed(data))
885            }
886        }
887    }
888
889    // ==================== Transport Layer API ====================
890
891    /// Decrypt data without parsing (transport-only operation)
892    ///
893    /// This method provides raw decrypted bytes for apps that want to handle
894    /// message parsing themselves (using peat-lite or other libraries).
895    ///
896    /// # Arguments
897    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
898    ///
899    /// # Returns
900    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
901    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
902    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
903        self.decrypt_document(data, None)
904            .map(|cow| cow.into_owned())
905    }
906
907    // ==================== Identity ====================
908
909    /// Check if this mesh has a cryptographic identity
910    pub fn has_identity(&self) -> bool {
911        self.identity.is_some()
912    }
913
914    /// Get this node's public key (if identity is configured)
915    pub fn public_key(&self) -> Option<[u8; 32]> {
916        self.identity.as_ref().map(|id| id.public_key())
917    }
918
919    /// Create an identity attestation for this node
920    ///
921    /// Returns None if no identity is configured.
922    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
923        self.identity
924            .as_ref()
925            .map(|id| id.create_attestation(now_ms))
926    }
927
928    /// Verify and register a peer's identity attestation
929    ///
930    /// Implements TOFU (Trust On First Use):
931    /// - On first contact, registers the node_id → public_key binding
932    /// - On subsequent contacts, verifies the public key matches
933    ///
934    /// Returns the verification result. Security violations should be handled
935    /// by the caller (e.g., disconnect, alert).
936    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
937        self.identity_registry
938            .lock()
939            .unwrap()
940            .verify_or_register(attestation)
941    }
942
943    /// Check if a peer's identity is known (has been registered)
944    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
945        self.identity_registry.lock().unwrap().is_known(node_id)
946    }
947
948    /// Get a peer's public key if known
949    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
950        self.identity_registry
951            .lock()
952            .unwrap()
953            .get_public_key(node_id)
954            .copied()
955    }
956
957    /// Get the number of known peer identities
958    pub fn known_identity_count(&self) -> usize {
959        self.identity_registry.lock().unwrap().len()
960    }
961
962    /// Pre-register a peer's identity (for out-of-band key exchange)
963    ///
964    /// Use this when keys are exchanged through a secure side channel
965    /// (e.g., QR code, NFC tap, or provisioning server).
966    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
967        self.identity_registry
968            .lock()
969            .unwrap()
970            .pre_register(node_id, public_key, now_ms);
971    }
972
973    /// Remove a peer's identity from the registry
974    ///
975    /// Use with caution - this allows re-registration with a different key.
976    pub fn forget_peer_identity(&self, node_id: NodeId) {
977        self.identity_registry.lock().unwrap().remove(node_id);
978    }
979
980    /// Sign arbitrary data with this node's identity
981    ///
982    /// Returns None if no identity is configured.
983    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
984        self.identity.as_ref().map(|id| id.sign(data))
985    }
986
987    /// Verify a signature from a peer
988    ///
989    /// Uses the peer's public key from the identity registry.
990    /// Returns false if peer is unknown or signature is invalid.
991    pub fn verify_peer_signature(
992        &self,
993        node_id: NodeId,
994        data: &[u8],
995        signature: &[u8; 64],
996    ) -> bool {
997        if let Some(public_key) = self.peer_public_key(node_id) {
998            crate::security::verify_signature(&public_key, data, signature)
999        } else {
1000            false
1001        }
1002    }
1003
1004    // ==================== Multi-Hop Relay ====================
1005
1006    /// Check if multi-hop relay is enabled
1007    pub fn is_relay_enabled(&self) -> bool {
1008        self.config.enable_relay
1009    }
1010
1011    /// Enable multi-hop relay
1012    pub fn enable_relay(&mut self) {
1013        self.config.enable_relay = true;
1014    }
1015
1016    /// Disable multi-hop relay
1017    pub fn disable_relay(&mut self) {
1018        self.config.enable_relay = false;
1019    }
1020
1021    /// Check if a message has been seen before (for deduplication)
1022    ///
1023    /// Returns true if the message was already seen (duplicate).
1024    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1025        self.seen_cache.lock().unwrap().has_seen(message_id)
1026    }
1027
1028    /// Mark a message as seen
1029    ///
1030    /// Returns true if this is a new message (first time seen).
1031    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1032        self.seen_cache
1033            .lock()
1034            .unwrap()
1035            .check_and_mark(message_id, origin, now_ms)
1036    }
1037
1038    /// Get the number of entries in the seen message cache
1039    pub fn seen_cache_size(&self) -> usize {
1040        self.seen_cache.lock().unwrap().len()
1041    }
1042
1043    /// Clear the seen message cache
1044    pub fn clear_seen_cache(&self) {
1045        self.seen_cache.lock().unwrap().clear();
1046    }
1047
1048    /// Wrap a document in a relay envelope for multi-hop transmission
1049    ///
1050    /// The returned bytes can be sent to peers and will be automatically
1051    /// relayed through the mesh if relay is enabled on receiving nodes.
1052    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1053        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1054            .with_max_hops(self.config.max_relay_hops);
1055        envelope.encode()
1056    }
1057
1058    /// Get peers to relay a message to
1059    ///
1060    /// Uses the configured gossip strategy to select relay targets.
1061    /// Excludes the source peer (if provided) to avoid sending back to sender.
1062    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1063        let connected = self.peer_manager.get_connected_peers();
1064        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1065            connected
1066                .into_iter()
1067                .filter(|p| p.node_id != exclude)
1068                .collect()
1069        } else {
1070            connected
1071        };
1072
1073        self.gossip_strategy
1074            .select_peers(&filtered)
1075            .into_iter()
1076            .cloned()
1077            .collect()
1078    }
1079
1080    /// Process an incoming relay envelope
1081    ///
1082    /// Handles deduplication, TTL checking, and determines if the message
1083    /// should be processed and/or relayed.
1084    ///
1085    /// Returns:
1086    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
1087    /// - `Ok(None)` if message was a duplicate or TTL expired
1088    /// - `Err` if parsing failed
1089    pub fn process_relay_envelope(
1090        &self,
1091        data: &[u8],
1092        source_peer: NodeId,
1093        now_ms: u64,
1094    ) -> Option<RelayDecision> {
1095        // Parse envelope
1096        let envelope = RelayEnvelope::decode(data)?;
1097
1098        // Update indirect peer graph if origin differs from source
1099        // This means the message was relayed through source_peer from origin_node
1100        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1101            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1102                source_peer,
1103                envelope.origin_node,
1104                envelope.hop_count,
1105                now_ms,
1106            );
1107
1108            if is_new {
1109                log::debug!(
1110                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1111                    envelope.origin_node.as_u32(),
1112                    source_peer.as_u32(),
1113                    envelope.hop_count
1114                );
1115            }
1116        }
1117
1118        // Check deduplication
1119        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1120            // Duplicate message
1121            let stats = self
1122                .seen_cache
1123                .lock()
1124                .unwrap()
1125                .get_stats(&envelope.message_id);
1126            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1127
1128            self.notify(PeatEvent::DuplicateMessageDropped {
1129                origin_node: envelope.origin_node,
1130                seen_count,
1131            });
1132
1133            log::debug!(
1134                "Dropped duplicate message {} from {:08X} (seen {} times)",
1135                envelope.message_id,
1136                envelope.origin_node.as_u32(),
1137                seen_count
1138            );
1139            return None;
1140        }
1141
1142        // Check TTL
1143        if !envelope.can_relay() {
1144            self.notify(PeatEvent::MessageTtlExpired {
1145                origin_node: envelope.origin_node,
1146                hop_count: envelope.hop_count,
1147            });
1148
1149            log::debug!(
1150                "Message {} from {:08X} TTL expired at hop {}",
1151                envelope.message_id,
1152                envelope.origin_node.as_u32(),
1153                envelope.hop_count
1154            );
1155
1156            // Still process locally even if TTL expired
1157            return Some(RelayDecision {
1158                payload: envelope.payload,
1159                origin_node: envelope.origin_node,
1160                hop_count: envelope.hop_count,
1161                should_relay: false,
1162                relay_envelope: None,
1163            });
1164        }
1165
1166        // Determine if we should relay
1167        let should_relay = self.config.enable_relay;
1168        let relay_envelope = if should_relay {
1169            envelope.relay() // Increments hop count
1170        } else {
1171            None
1172        };
1173
1174        Some(RelayDecision {
1175            payload: envelope.payload,
1176            origin_node: envelope.origin_node,
1177            hop_count: envelope.hop_count,
1178            should_relay,
1179            relay_envelope,
1180        })
1181    }
1182
1183    /// Build a document wrapped in a relay envelope
1184    ///
1185    /// Convenience method that builds the document, encrypts it (if enabled),
1186    /// and wraps it in a relay envelope for multi-hop transmission.
1187    pub fn build_relay_document(&self) -> Vec<u8> {
1188        let doc = self.build_document(); // Already encrypted if encryption enabled
1189        self.wrap_for_relay(doc)
1190    }
1191
1192    // ==================== Delta Sync ====================
1193
1194    /// Register a peer for delta sync tracking
1195    ///
1196    /// Call this when a peer connects to start tracking what data has been
1197    /// sent to them. This enables future delta sync (sending only changes).
1198    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1199        let mut encoder = self.delta_encoder.lock().unwrap();
1200        encoder.add_peer(peer_id);
1201        log::debug!(
1202            "Registered peer {:08X} for delta sync tracking",
1203            peer_id.as_u32()
1204        );
1205    }
1206
1207    /// Unregister a peer from delta sync tracking
1208    ///
1209    /// Call this when a peer disconnects to clean up tracking state.
1210    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1211        let mut encoder = self.delta_encoder.lock().unwrap();
1212        encoder.remove_peer(peer_id);
1213        log::debug!(
1214            "Unregistered peer {:08X} from delta sync tracking",
1215            peer_id.as_u32()
1216        );
1217    }
1218
1219    /// Reset delta sync state for a peer
1220    ///
1221    /// Call this when a peer reconnects to force a full sync on next
1222    /// communication. This clears the record of what was previously sent.
1223    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1224        let mut encoder = self.delta_encoder.lock().unwrap();
1225        encoder.reset_peer(peer_id);
1226        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1227    }
1228
1229    /// Record bytes sent to a peer (for delta statistics)
1230    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1231        let mut encoder = self.delta_encoder.lock().unwrap();
1232        encoder.record_sent(peer_id, bytes);
1233    }
1234
1235    /// Record bytes received from a peer (for delta statistics)
1236    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1237        let mut encoder = self.delta_encoder.lock().unwrap();
1238        encoder.record_received(peer_id, bytes, timestamp);
1239    }
1240
1241    /// Get delta sync statistics
1242    ///
1243    /// Returns aggregate statistics about delta sync across all peers,
1244    /// including bytes sent/received and sync counts.
1245    pub fn delta_stats(&self) -> DeltaStats {
1246        self.delta_encoder.lock().unwrap().stats()
1247    }
1248
1249    /// Get delta sync statistics for a specific peer
1250    ///
1251    /// Returns the bytes sent/received and sync count for a single peer.
1252    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1253        let encoder = self.delta_encoder.lock().unwrap();
1254        encoder
1255            .get_peer_state(peer_id)
1256            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1257    }
1258
1259    /// Build a delta document for a specific peer
1260    ///
1261    /// This only includes operations that have changed since the last sync
1262    /// with this peer. Uses the delta encoder to track per-peer state.
1263    ///
1264    /// Returns the encoded delta document bytes, or None if there's nothing
1265    /// new to send to this peer.
1266    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1267        // Collect all current operations
1268        let mut all_operations: Vec<Operation> = Vec::new();
1269
1270        // Add counter operations (one per node that has contributed)
1271        // Use the count value as the "timestamp" for tracking - only send if count increased
1272        for (node_id_u32, count) in self.document_sync.counter_entries() {
1273            all_operations.push(Operation::IncrementCounter {
1274                counter_id: 0, // Default mesh counter
1275                node_id: NodeId::new(node_id_u32),
1276                amount: count,
1277                timestamp: count, // Use count as timestamp for delta tracking
1278            });
1279        }
1280
1281        // Add peripheral update
1282        // Use event timestamp if available, otherwise use 1 for initial send
1283        let peripheral = self.document_sync.peripheral_snapshot();
1284        let peripheral_timestamp = peripheral
1285            .last_event
1286            .as_ref()
1287            .map(|e| e.timestamp)
1288            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1289        all_operations.push(Operation::UpdatePeripheral {
1290            peripheral,
1291            timestamp: peripheral_timestamp,
1292        });
1293
1294        // Add emergency operations if active
1295        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1296            let source_node = NodeId::new(emergency.source_node());
1297            let timestamp = emergency.timestamp();
1298
1299            // Add SetEmergency operation
1300            all_operations.push(Operation::SetEmergency {
1301                source_node,
1302                timestamp,
1303                known_peers: emergency.all_nodes(),
1304            });
1305
1306            // Add AckEmergency for each node that has acked
1307            for acked_node in emergency.acked_nodes() {
1308                all_operations.push(Operation::AckEmergency {
1309                    node_id: NodeId::new(acked_node),
1310                    emergency_timestamp: timestamp,
1311                });
1312            }
1313        }
1314
1315        // Add app document operations
1316        for app_op in self.app_document_delta_ops() {
1317            all_operations.push(Operation::App(app_op));
1318        }
1319
1320        // Filter operations for this peer (only send what's new)
1321        let filtered_operations: Vec<Operation> = {
1322            let encoder = self.delta_encoder.lock().unwrap();
1323            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1324                all_operations
1325                    .into_iter()
1326                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1327                    .collect()
1328            } else {
1329                // Unknown peer, send all operations
1330                all_operations
1331            }
1332        };
1333
1334        // If nothing new to send, return None
1335        if filtered_operations.is_empty() {
1336            return None;
1337        }
1338
1339        // Mark operations as sent
1340        {
1341            let mut encoder = self.delta_encoder.lock().unwrap();
1342            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1343                for op in &filtered_operations {
1344                    peer_state.mark_sent(&op.key(), op.timestamp());
1345                }
1346            }
1347        }
1348
1349        // Build the delta document
1350        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1351        for op in filtered_operations {
1352            delta.add_operation(op);
1353        }
1354
1355        // Encode and optionally encrypt
1356        let encoded = delta.encode();
1357        let result = self.encrypt_document(&encoded);
1358
1359        // Record stats
1360        {
1361            let mut encoder = self.delta_encoder.lock().unwrap();
1362            encoder.record_sent(peer_id, result.len());
1363        }
1364
1365        Some(result)
1366    }
1367
1368    /// Build a full delta document (for broadcast or new peers)
1369    ///
1370    /// Unlike `build_delta_document_for_peer`, this includes all state
1371    /// regardless of what has been sent before. Use this for broadcasts.
1372    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1373        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1374
1375        // Add all counter operations
1376        for (node_id_u32, count) in self.document_sync.counter_entries() {
1377            delta.add_operation(Operation::IncrementCounter {
1378                counter_id: 0,
1379                node_id: NodeId::new(node_id_u32),
1380                amount: count,
1381                timestamp: now_ms,
1382            });
1383        }
1384
1385        // Add peripheral
1386        let peripheral = self.document_sync.peripheral_snapshot();
1387        let peripheral_timestamp = peripheral
1388            .last_event
1389            .as_ref()
1390            .map(|e| e.timestamp)
1391            .unwrap_or(now_ms);
1392        delta.add_operation(Operation::UpdatePeripheral {
1393            peripheral,
1394            timestamp: peripheral_timestamp,
1395        });
1396
1397        // Add emergency if active
1398        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1399            let source_node = NodeId::new(emergency.source_node());
1400            let timestamp = emergency.timestamp();
1401
1402            delta.add_operation(Operation::SetEmergency {
1403                source_node,
1404                timestamp,
1405                known_peers: emergency.all_nodes(),
1406            });
1407
1408            for acked_node in emergency.acked_nodes() {
1409                delta.add_operation(Operation::AckEmergency {
1410                    node_id: NodeId::new(acked_node),
1411                    emergency_timestamp: timestamp,
1412                });
1413            }
1414        }
1415
1416        // Add app document operations
1417        for app_op in self.app_document_delta_ops() {
1418            delta.add_operation(Operation::App(app_op));
1419        }
1420
1421        let encoded = delta.encode();
1422        self.encrypt_document(&encoded)
1423    }
1424
1425    /// Internal: Process a received delta document
1426    ///
1427    /// Applies operations from a delta document to local state.
1428    fn process_delta_document_internal(
1429        &self,
1430        source_node: NodeId,
1431        data: &[u8],
1432        now_ms: u64,
1433        relay_data: Option<Vec<u8>>,
1434        origin_node: Option<NodeId>,
1435        hop_count: u8,
1436    ) -> Option<DataReceivedResult> {
1437        // Decode the delta document
1438        let delta = DeltaDocument::decode(data)?;
1439
1440        // Don't process our own documents
1441        if delta.origin_node == self.config.node_id {
1442            return None;
1443        }
1444
1445        // Apply operations to local state
1446        let mut counter_changed = false;
1447        let mut emergency_changed = false;
1448        let mut is_emergency = false;
1449        let mut is_ack = false;
1450        let mut event_timestamp = 0u64;
1451        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1452
1453        log::info!(
1454            "Delta document from {:08X}: {} operations, data_len={}",
1455            delta.origin_node.as_u32(),
1456            delta.operations.len(),
1457            data.len()
1458        );
1459        for op in &delta.operations {
1460            log::info!("  Operation: {}", op.key());
1461            match op {
1462                Operation::IncrementCounter {
1463                    node_id, amount, ..
1464                } => {
1465                    // Merge counter value (take max)
1466                    let current = self.document_sync.counter_entries();
1467                    let current_value = current
1468                        .iter()
1469                        .find(|(id, _)| *id == node_id.as_u32())
1470                        .map(|(_, v)| *v)
1471                        .unwrap_or(0);
1472
1473                    if *amount > current_value {
1474                        // Need to merge - this is handled by the counter merge logic
1475                        // For now, we record that counter changed
1476                        counter_changed = true;
1477                    }
1478                }
1479                Operation::UpdatePeripheral {
1480                    peripheral,
1481                    timestamp,
1482                } => {
1483                    // Store peer peripheral for callsign lookup
1484                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1485                        peripherals.insert(delta.origin_node, peripheral.clone());
1486                    }
1487                    // Track the peripheral for the result
1488                    peer_peripheral = Some(peripheral.clone());
1489                    // Track the timestamp for the result
1490                    if *timestamp > event_timestamp {
1491                        event_timestamp = *timestamp;
1492                    }
1493                }
1494                Operation::SetEmergency { timestamp, .. } => {
1495                    is_emergency = true;
1496                    emergency_changed = true;
1497                    event_timestamp = *timestamp;
1498                }
1499                Operation::AckEmergency {
1500                    emergency_timestamp,
1501                    ..
1502                } => {
1503                    is_ack = true;
1504                    emergency_changed = true;
1505                    if *emergency_timestamp > event_timestamp {
1506                        event_timestamp = *emergency_timestamp;
1507                    }
1508                }
1509                Operation::ClearEmergency {
1510                    emergency_timestamp,
1511                } => {
1512                    emergency_changed = true;
1513                    if *emergency_timestamp > event_timestamp {
1514                        event_timestamp = *emergency_timestamp;
1515                    }
1516                }
1517                Operation::App(app_op) => {
1518                    // Handle app-layer document operation
1519                    //
1520                    // The timestamp field may contain version info in the upper bits
1521                    // (used by delta encoder for change detection). Extract the
1522                    // original document timestamp from the lower 48 bits.
1523                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1524
1525                    log::info!(
1526                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1527                        app_op.type_id,
1528                        app_op.op_code,
1529                        app_op.source_node,
1530                        doc_timestamp,
1531                        app_op.payload.len()
1532                    );
1533
1534                    // Try to find/create document and apply the delta operation
1535                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1536                    let changed = {
1537                        let mut docs = self.app_documents.write().unwrap();
1538
1539                        if let Some(existing) = docs.get_mut(&doc_key) {
1540                            // Apply delta to existing document (merge via CRDT)
1541                            self.document_registry.apply_delta_op(
1542                                app_op.type_id,
1543                                existing.as_mut(),
1544                                app_op,
1545                            )
1546                        } else {
1547                            // Try to decode from payload (for FULL_STATE operations)
1548                            if let Some(decoded) = self
1549                                .document_registry
1550                                .decode(app_op.type_id, &app_op.payload)
1551                            {
1552                                docs.insert(doc_key, decoded);
1553                                true
1554                            } else {
1555                                // For delta ops on unknown documents, we can't apply
1556                                // The full state will be sent later
1557                                log::debug!(
1558                                    "Received delta for unknown doc {:?}, waiting for full state",
1559                                    doc_key
1560                                );
1561                                false
1562                            }
1563                        }
1564                    };
1565
1566                    // Emit observer event with the real document timestamp
1567                    self.observers.notify(PeatEvent::app_document_received(
1568                        app_op.type_id,
1569                        NodeId::new(app_op.source_node),
1570                        doc_timestamp,
1571                        changed,
1572                    ));
1573                }
1574            }
1575        }
1576
1577        // Record sync
1578        self.peer_manager.record_sync(source_node, now_ms);
1579
1580        // Record delta received
1581        {
1582            let mut encoder = self.delta_encoder.lock().unwrap();
1583            encoder.record_received(&source_node, data.len(), now_ms);
1584        }
1585
1586        // Generate events based on what was received
1587        if is_emergency {
1588            self.notify(PeatEvent::EmergencyReceived {
1589                from_node: delta.origin_node,
1590            });
1591        } else if is_ack {
1592            self.notify(PeatEvent::AckReceived {
1593                from_node: delta.origin_node,
1594            });
1595        }
1596
1597        if counter_changed {
1598            let total_count = self.document_sync.total_count();
1599            self.notify(PeatEvent::DocumentSynced {
1600                from_node: delta.origin_node,
1601                total_count,
1602            });
1603        }
1604
1605        // Emit relay event if we're relaying
1606        if relay_data.is_some() {
1607            let relay_targets = self.get_relay_targets(Some(source_node));
1608            self.notify(PeatEvent::MessageRelayed {
1609                origin_node: origin_node.unwrap_or(delta.origin_node),
1610                relay_count: relay_targets.len(),
1611                hop_count,
1612            });
1613        }
1614
1615        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1616            DataReceivedResult::peripheral_fields(&peer_peripheral);
1617
1618        Some(DataReceivedResult {
1619            source_node: delta.origin_node,
1620            is_emergency,
1621            is_ack,
1622            counter_changed,
1623            emergency_changed,
1624            total_count: self.document_sync.total_count(),
1625            event_timestamp,
1626            relay_data,
1627            origin_node,
1628            hop_count,
1629            callsign,
1630            battery_percent,
1631            heart_rate,
1632            event_type,
1633            latitude,
1634            longitude,
1635            altitude,
1636        })
1637    }
1638
1639    // ==================== Per-Peer E2EE ====================
1640
1641    /// Enable per-peer E2EE capability
1642    ///
1643    /// Creates a new identity key for this node. This allows establishing
1644    /// encrypted sessions with specific peers where only the sender and
1645    /// recipient can read messages (other mesh members cannot).
1646    pub fn enable_peer_e2ee(&self) {
1647        let mut sessions = self.peer_sessions.lock().unwrap();
1648        if sessions.is_none() {
1649            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1650            log::info!(
1651                "Per-peer E2EE enabled for node {:08X}",
1652                self.config.node_id.as_u32()
1653            );
1654        }
1655    }
1656
1657    /// Disable per-peer E2EE capability
1658    ///
1659    /// Clears all peer sessions and disables E2EE.
1660    pub fn disable_peer_e2ee(&self) {
1661        let mut sessions = self.peer_sessions.lock().unwrap();
1662        *sessions = None;
1663        log::info!("Per-peer E2EE disabled");
1664    }
1665
1666    /// Check if per-peer E2EE is enabled
1667    pub fn is_peer_e2ee_enabled(&self) -> bool {
1668        self.peer_sessions.lock().unwrap().is_some()
1669    }
1670
1671    /// Get our E2EE public key (for sharing with peers)
1672    ///
1673    /// Returns None if per-peer E2EE is not enabled.
1674    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1675        self.peer_sessions
1676            .lock()
1677            .unwrap()
1678            .as_ref()
1679            .map(|s| s.our_public_key())
1680    }
1681
1682    /// Initiate E2EE session with a specific peer
1683    ///
1684    /// Returns the key exchange message bytes to send to the peer.
1685    /// The message should be broadcast/sent to the peer.
1686    /// Returns None if per-peer E2EE is not enabled.
1687    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1688        let mut sessions = self.peer_sessions.lock().unwrap();
1689        let session_mgr = sessions.as_mut()?;
1690
1691        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1692        let mut buf = Vec::with_capacity(2 + 37);
1693        buf.push(KEY_EXCHANGE_MARKER);
1694        buf.push(0x00); // reserved
1695        buf.extend_from_slice(&key_exchange.encode());
1696
1697        log::info!(
1698            "Initiated E2EE session with peer {:08X}",
1699            peer_node_id.as_u32()
1700        );
1701        Some(buf)
1702    }
1703
1704    /// Check if we have an established E2EE session with a peer
1705    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1706        self.peer_sessions
1707            .lock()
1708            .unwrap()
1709            .as_ref()
1710            .is_some_and(|s| s.has_session(peer_node_id))
1711    }
1712
1713    /// Get E2EE session state with a peer
1714    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1715        self.peer_sessions
1716            .lock()
1717            .unwrap()
1718            .as_ref()
1719            .and_then(|s| s.session_state(peer_node_id))
1720    }
1721
1722    /// Send an E2EE encrypted message to a specific peer
1723    ///
1724    /// Returns the encrypted message bytes to send, or None if no session exists.
1725    /// The message should be sent directly to the peer (not broadcast).
1726    pub fn send_peer_e2ee(
1727        &self,
1728        peer_node_id: NodeId,
1729        plaintext: &[u8],
1730        now_ms: u64,
1731    ) -> Option<Vec<u8>> {
1732        let mut sessions = self.peer_sessions.lock().unwrap();
1733        let session_mgr = sessions.as_mut()?;
1734
1735        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1736            Ok(encrypted) => {
1737                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1738                buf.push(PEER_E2EE_MARKER);
1739                buf.push(0x00); // reserved
1740                buf.extend_from_slice(&encrypted.encode());
1741                Some(buf)
1742            }
1743            Err(e) => {
1744                log::warn!(
1745                    "Failed to encrypt for peer {:08X}: {:?}",
1746                    peer_node_id.as_u32(),
1747                    e
1748                );
1749                None
1750            }
1751        }
1752    }
1753
1754    /// Close E2EE session with a peer
1755    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1756        let mut sessions = self.peer_sessions.lock().unwrap();
1757        if let Some(session_mgr) = sessions.as_mut() {
1758            session_mgr.close_session(peer_node_id);
1759            self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1760            log::info!(
1761                "Closed E2EE session with peer {:08X}",
1762                peer_node_id.as_u32()
1763            );
1764        }
1765    }
1766
1767    /// Get count of active E2EE sessions
1768    pub fn peer_e2ee_session_count(&self) -> usize {
1769        self.peer_sessions
1770            .lock()
1771            .unwrap()
1772            .as_ref()
1773            .map(|s| s.session_count())
1774            .unwrap_or(0)
1775    }
1776
1777    /// Get count of established E2EE sessions
1778    pub fn peer_e2ee_established_count(&self) -> usize {
1779        self.peer_sessions
1780            .lock()
1781            .unwrap()
1782            .as_ref()
1783            .map(|s| s.established_count())
1784            .unwrap_or(0)
1785    }
1786
1787    /// Handle incoming key exchange message
1788    ///
1789    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1790    /// Returns the response key exchange bytes to send back, or None if invalid.
1791    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1792        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1793            return None;
1794        }
1795
1796        let payload = &data[2..];
1797        let msg = KeyExchangeMessage::decode(payload)?;
1798
1799        let mut sessions = self.peer_sessions.lock().unwrap();
1800        let session_mgr = sessions.as_mut()?;
1801
1802        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1803
1804        if established {
1805            self.notify(PeatEvent::PeerE2eeEstablished {
1806                peer_node_id: msg.sender_node_id,
1807            });
1808            log::info!(
1809                "E2EE session established with peer {:08X}",
1810                msg.sender_node_id.as_u32()
1811            );
1812        }
1813
1814        // Return response key exchange
1815        let mut buf = Vec::with_capacity(2 + 37);
1816        buf.push(KEY_EXCHANGE_MARKER);
1817        buf.push(0x00);
1818        buf.extend_from_slice(&response.encode());
1819        Some(buf)
1820    }
1821
1822    /// Handle incoming E2EE encrypted message
1823    ///
1824    /// Called internally when we receive a PEER_E2EE_MARKER message.
1825    /// Decrypts and notifies observers of the received message.
1826    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1827        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1828            return None;
1829        }
1830
1831        let payload = &data[2..];
1832        let msg = PeerEncryptedMessage::decode(payload)?;
1833
1834        let mut sessions = self.peer_sessions.lock().unwrap();
1835        let session_mgr = sessions.as_mut()?;
1836
1837        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1838            Ok(plaintext) => {
1839                // Notify observers of the decrypted message
1840                self.notify(PeatEvent::PeerE2eeMessageReceived {
1841                    from_node: msg.sender_node_id,
1842                    data: plaintext.clone(),
1843                });
1844                Some(plaintext)
1845            }
1846            Err(e) => {
1847                log::warn!(
1848                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1849                    msg.sender_node_id.as_u32(),
1850                    e
1851                );
1852                None
1853            }
1854        }
1855    }
1856
1857    // ==================== Configuration ====================
1858
1859    /// Get our node ID
1860    pub fn node_id(&self) -> NodeId {
1861        self.config.node_id
1862    }
1863
1864    /// Get our callsign
1865    pub fn callsign(&self) -> &str {
1866        &self.config.callsign
1867    }
1868
1869    /// Get the mesh ID
1870    pub fn mesh_id(&self) -> &str {
1871        &self.config.mesh_id
1872    }
1873
1874    /// Get the device name for BLE advertising
1875    pub fn device_name(&self) -> String {
1876        format!(
1877            "PEAT_{}-{:08X}",
1878            self.config.mesh_id,
1879            self.config.node_id.as_u32()
1880        )
1881    }
1882
1883    /// Get a peer's callsign by node ID
1884    ///
1885    /// Returns the callsign from the peer's most recently received peripheral data,
1886    /// or None if no peripheral data has been received from this peer.
1887    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1888        self.peer_peripherals.read().ok().and_then(|peripherals| {
1889            peripherals
1890                .get(&node_id)
1891                .map(|p| p.callsign_str().to_string())
1892        })
1893    }
1894
1895    /// Get a peer's full peripheral data by node ID
1896    ///
1897    /// Returns a clone of the peripheral data from the peer's most recently received
1898    /// document, or None if no peripheral data has been received from this peer.
1899    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1900        self.peer_peripherals
1901            .read()
1902            .ok()
1903            .and_then(|peripherals| peripherals.get(&node_id).cloned())
1904    }
1905
1906    // ==================== Document Registry ====================
1907
1908    /// Get a reference to the document registry.
1909    ///
1910    /// Use this to register custom document types for mesh synchronization.
1911    ///
1912    /// # Example
1913    ///
1914    /// ```ignore
1915    /// use peat_btle::registry::DocumentType;
1916    ///
1917    /// // Register a custom document type
1918    /// mesh.document_registry().register::<MyCustomDocument>();
1919    /// ```
1920    pub fn document_registry(&self) -> &DocumentRegistry {
1921        &self.document_registry
1922    }
1923
1924    /// Store an app-layer document.
1925    ///
1926    /// If a document with the same identity (type_id, source_node, timestamp)
1927    /// already exists, it will be merged using CRDT semantics.
1928    ///
1929    /// Returns true if the document was newly added or changed via merge.
1930    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1931        let type_id = T::TYPE_ID;
1932        let (source_node, timestamp) = doc.identity();
1933        let key = (type_id, source_node, timestamp);
1934
1935        let mut docs = self.app_documents.write().unwrap();
1936
1937        if let Some(existing) = docs.get_mut(&key) {
1938            // Merge with existing document
1939            self.document_registry
1940                .merge(type_id, existing.as_mut(), &doc)
1941        } else {
1942            // Insert new document
1943            docs.insert(key, Box::new(doc));
1944            true
1945        }
1946    }
1947
1948    /// Store a type-erased app-layer document.
1949    ///
1950    /// Used when receiving documents from the network where the type is
1951    /// determined at runtime.
1952    ///
1953    /// Returns true if the document was newly added or changed via merge.
1954    pub fn store_app_document_boxed(
1955        &self,
1956        type_id: u8,
1957        source_node: u32,
1958        timestamp: u64,
1959        doc: Box<dyn core::any::Any + Send + Sync>,
1960    ) -> bool {
1961        let key = (type_id, source_node, timestamp);
1962
1963        let mut docs = self.app_documents.write().unwrap();
1964
1965        if let Some(existing) = docs.get_mut(&key) {
1966            // Merge with existing document
1967            self.document_registry
1968                .merge(type_id, existing.as_mut(), doc.as_ref())
1969        } else {
1970            // Insert new document
1971            docs.insert(key, doc);
1972            true
1973        }
1974    }
1975
1976    /// Get a stored app-layer document by identity.
1977    ///
1978    /// Returns None if not found or if downcast to T fails.
1979    pub fn get_app_document<T: crate::registry::DocumentType>(
1980        &self,
1981        source_node: u32,
1982        timestamp: u64,
1983    ) -> Option<T> {
1984        let key = (T::TYPE_ID, source_node, timestamp);
1985
1986        let docs = self.app_documents.read().unwrap();
1987        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1988    }
1989
1990    /// Get all stored app-layer documents of a specific type.
1991    ///
1992    /// Returns a vector of all documents of type T currently stored.
1993    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1994        let docs = self.app_documents.read().unwrap();
1995        docs.iter()
1996            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1997            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1998            .collect()
1999    }
2000
2001    /// Get delta operations for all stored app documents.
2002    ///
2003    /// Used during sync to include app documents in the delta stream.
2004    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2005        let docs = self.app_documents.read().unwrap();
2006        let mut ops = Vec::new();
2007
2008        for ((type_id, _source, _ts), doc) in docs.iter() {
2009            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2010                ops.push(op);
2011            }
2012        }
2013
2014        ops
2015    }
2016
2017    /// Get all document keys for a given type.
2018    ///
2019    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
2020    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2021        let docs = self.app_documents.read().unwrap();
2022        docs.keys()
2023            .filter(|(tid, _, _)| *tid == type_id)
2024            .map(|(_, source, ts)| (*source, *ts))
2025            .collect()
2026    }
2027
2028    /// Get the number of stored app documents.
2029    pub fn app_document_count(&self) -> usize {
2030        self.app_documents.read().unwrap().len()
2031    }
2032
2033    // ==================== Observer Management ====================
2034
2035    /// Add an observer for mesh events
2036    pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2037        self.observers.add(observer);
2038    }
2039
2040    /// Remove an observer
2041    pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2042        self.observers.remove(observer);
2043    }
2044
2045    // ==================== User Actions ====================
2046
2047    /// Send an emergency alert
2048    ///
2049    /// Returns the document bytes to broadcast to all peers.
2050    /// If encryption is enabled, the document is encrypted.
2051    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2052        let data = self.document_sync.send_emergency(timestamp);
2053        self.notify(PeatEvent::MeshStateChanged {
2054            peer_count: self.peer_manager.peer_count(),
2055            connected_count: self.peer_manager.connected_count(),
2056        });
2057        self.encrypt_document(&data)
2058    }
2059
2060    /// Send an ACK response
2061    ///
2062    /// Returns the document bytes to broadcast to all peers.
2063    /// If encryption is enabled, the document is encrypted.
2064    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2065        let data = self.document_sync.send_ack(timestamp);
2066        self.notify(PeatEvent::MeshStateChanged {
2067            peer_count: self.peer_manager.peer_count(),
2068            connected_count: self.peer_manager.connected_count(),
2069        });
2070        self.encrypt_document(&data)
2071    }
2072
2073    /// Broadcast arbitrary bytes over the mesh.
2074    ///
2075    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
2076    /// and returns bytes ready to send to all connected peers.
2077    ///
2078    /// This is useful for sending extension data like CannedMessages from peat-lite.
2079    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2080        self.encrypt_document(payload)
2081    }
2082
2083    /// Clear the current event (emergency or ack)
2084    pub fn clear_event(&self) {
2085        self.document_sync.clear_event();
2086    }
2087
2088    /// Check if emergency is active
2089    pub fn is_emergency_active(&self) -> bool {
2090        self.document_sync.is_emergency_active()
2091    }
2092
2093    /// Check if ACK is active
2094    pub fn is_ack_active(&self) -> bool {
2095        self.document_sync.is_ack_active()
2096    }
2097
2098    /// Get current event type
2099    pub fn current_event(&self) -> Option<EventType> {
2100        self.document_sync.current_event()
2101    }
2102
2103    // ==================== Emergency Management (Document-Based) ====================
2104
2105    /// Start a new emergency event with ACK tracking
2106    ///
2107    /// Creates an emergency event that tracks ACKs from all known peers.
2108    /// Pass the list of known peer node IDs to track.
2109    /// Returns the document bytes to broadcast.
2110    /// If encryption is enabled, the document is encrypted.
2111    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2112        let data = self.document_sync.start_emergency(timestamp, known_peers);
2113        self.notify(PeatEvent::MeshStateChanged {
2114            peer_count: self.peer_manager.peer_count(),
2115            connected_count: self.peer_manager.connected_count(),
2116        });
2117        self.encrypt_document(&data)
2118    }
2119
2120    /// Start a new emergency using all currently known peers
2121    ///
2122    /// Convenience method that automatically includes all discovered peers.
2123    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2124        let peers: Vec<u32> = self
2125            .peer_manager
2126            .get_peers()
2127            .iter()
2128            .map(|p| p.node_id.as_u32())
2129            .collect();
2130        self.start_emergency(timestamp, &peers)
2131    }
2132
2133    /// Record our ACK for the current emergency
2134    ///
2135    /// Returns the document bytes to broadcast, or None if no emergency is active.
2136    /// If encryption is enabled, the document is encrypted.
2137    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2138        let result = self.document_sync.ack_emergency(timestamp);
2139        if result.is_some() {
2140            self.notify(PeatEvent::MeshStateChanged {
2141                peer_count: self.peer_manager.peer_count(),
2142                connected_count: self.peer_manager.connected_count(),
2143            });
2144        }
2145        result.map(|data| self.encrypt_document(&data))
2146    }
2147
2148    /// Clear the current emergency event
2149    pub fn clear_emergency(&self) {
2150        self.document_sync.clear_emergency();
2151    }
2152
2153    /// Check if there's an active emergency
2154    pub fn has_active_emergency(&self) -> bool {
2155        self.document_sync.has_active_emergency()
2156    }
2157
2158    /// Get emergency status info
2159    ///
2160    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
2161    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2162        self.document_sync.get_emergency_status()
2163    }
2164
2165    /// Check if a specific peer has ACKed the current emergency
2166    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2167        self.document_sync.has_peer_acked(peer_id)
2168    }
2169
2170    /// Check if all peers have ACKed the current emergency
2171    pub fn all_peers_acked(&self) -> bool {
2172        self.document_sync.all_peers_acked()
2173    }
2174
2175    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
2176
2177    /// Send a chat message
2178    ///
2179    /// Adds the message to the local CRDT and returns the document bytes
2180    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
2181    ///
2182    /// Returns the encrypted document bytes if the message was new,
2183    /// or None if it was a duplicate.
2184    #[cfg(feature = "legacy-chat")]
2185    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2186        if self.document_sync.add_chat_message(sender, text, timestamp) {
2187            Some(self.encrypt_document(&self.build_document()))
2188        } else {
2189            None
2190        }
2191    }
2192
2193    /// Send a chat reply
2194    ///
2195    /// Adds the reply to the local CRDT with reply-to information and returns
2196    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
2197    ///
2198    /// Returns the encrypted document bytes if the message was new,
2199    /// or None if it was a duplicate.
2200    #[cfg(feature = "legacy-chat")]
2201    pub fn send_chat_reply(
2202        &self,
2203        sender: &str,
2204        text: &str,
2205        reply_to_node: u32,
2206        reply_to_timestamp: u64,
2207        timestamp: u64,
2208    ) -> Option<Vec<u8>> {
2209        if self.document_sync.add_chat_reply(
2210            sender,
2211            text,
2212            reply_to_node,
2213            reply_to_timestamp,
2214            timestamp,
2215        ) {
2216            Some(self.encrypt_document(&self.build_document()))
2217        } else {
2218            None
2219        }
2220    }
2221
2222    /// Get the number of chat messages in the local CRDT
2223    #[cfg(feature = "legacy-chat")]
2224    pub fn chat_count(&self) -> usize {
2225        self.document_sync.chat_count()
2226    }
2227
2228    /// Get chat messages newer than a timestamp
2229    ///
2230    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2231    #[cfg(feature = "legacy-chat")]
2232    pub fn chat_messages_since(
2233        &self,
2234        since_timestamp: u64,
2235    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2236        self.document_sync.chat_messages_since(since_timestamp)
2237    }
2238
2239    /// Get all chat messages
2240    ///
2241    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2242    #[cfg(feature = "legacy-chat")]
2243    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2244        self.document_sync.all_chat_messages()
2245    }
2246
2247    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2248
2249    /// Called when a BLE device is discovered
2250    ///
2251    /// Returns `Some(PeatPeer)` if this is a new Peat peer on our mesh.
2252    pub fn on_ble_discovered(
2253        &self,
2254        identifier: &str,
2255        name: Option<&str>,
2256        rssi: i8,
2257        mesh_id: Option<&str>,
2258        now_ms: u64,
2259    ) -> Option<PeatPeer> {
2260        let (node_id, is_new) = self
2261            .peer_manager
2262            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2263
2264        let peer = self.peer_manager.get_peer(node_id)?;
2265
2266        // Update connection graph
2267        {
2268            let mut graph = self.connection_graph.lock().unwrap();
2269            graph.on_discovered(
2270                node_id,
2271                identifier.to_string(),
2272                name.map(|s| s.to_string()),
2273                mesh_id.map(|s| s.to_string()),
2274                rssi,
2275                now_ms,
2276            );
2277        }
2278
2279        if is_new {
2280            self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2281            self.notify_mesh_state_changed();
2282        }
2283
2284        Some(peer)
2285    }
2286
2287    /// Called when a BLE connection is established (outgoing)
2288    ///
2289    /// Returns the NodeId if this identifier is known.
2290    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2291        let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2292            Some(id) => id,
2293            None => {
2294                log::warn!(
2295                    "on_ble_connected: identifier {:?} not in peer map — \
2296                     use on_incoming_connection() for peripheral connections",
2297                    identifier
2298                );
2299                return None;
2300            }
2301        };
2302
2303        // Update connection graph
2304        {
2305            let mut graph = self.connection_graph.lock().unwrap();
2306            graph.on_connected(node_id, now_ms);
2307        }
2308
2309        // Register peer for delta sync tracking
2310        self.register_peer_for_delta(&node_id);
2311
2312        self.notify(PeatEvent::PeerConnected { node_id });
2313        self.notify_mesh_state_changed();
2314        Some(node_id)
2315    }
2316
2317    /// Called when a BLE connection is lost
2318    pub fn on_ble_disconnected(
2319        &self,
2320        identifier: &str,
2321        reason: DisconnectReason,
2322    ) -> Option<NodeId> {
2323        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2324
2325        // Update connection graph (convert observer reason to platform reason)
2326        {
2327            let mut graph = self.connection_graph.lock().unwrap();
2328            let platform_reason = match observer_reason {
2329                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2330                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2331                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2332                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2333                DisconnectReason::ConnectionFailed => {
2334                    crate::platform::DisconnectReason::ConnectionFailed
2335                }
2336                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2337            };
2338            let now_ms = std::time::SystemTime::now()
2339                .duration_since(std::time::UNIX_EPOCH)
2340                .map(|d| d.as_millis() as u64)
2341                .unwrap_or(0);
2342            graph.on_disconnected(node_id, platform_reason, now_ms);
2343
2344            // Remove indirect peer paths that went through this peer
2345            // These paths may no longer be valid since the direct connection is lost
2346            graph.remove_via_peer(node_id);
2347        }
2348
2349        // Unregister peer from delta sync tracking
2350        self.unregister_peer_for_delta(&node_id);
2351
2352        self.notify(PeatEvent::PeerDisconnected {
2353            node_id,
2354            reason: observer_reason,
2355        });
2356        self.notify_mesh_state_changed();
2357        Some(node_id)
2358    }
2359
2360    /// Called when a BLE connection is lost, using NodeId directly
2361    ///
2362    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2363    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2364        if self
2365            .peer_manager
2366            .on_disconnected_by_node_id(node_id, reason)
2367        {
2368            // Update connection graph
2369            {
2370                let mut graph = self.connection_graph.lock().unwrap();
2371                let platform_reason = match reason {
2372                    DisconnectReason::LocalRequest => {
2373                        crate::platform::DisconnectReason::LocalRequest
2374                    }
2375                    DisconnectReason::RemoteRequest => {
2376                        crate::platform::DisconnectReason::RemoteRequest
2377                    }
2378                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2379                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2380                    DisconnectReason::ConnectionFailed => {
2381                        crate::platform::DisconnectReason::ConnectionFailed
2382                    }
2383                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2384                };
2385                let now_ms = std::time::SystemTime::now()
2386                    .duration_since(std::time::UNIX_EPOCH)
2387                    .map(|d| d.as_millis() as u64)
2388                    .unwrap_or(0);
2389                graph.on_disconnected(node_id, platform_reason, now_ms);
2390
2391                // Remove indirect peer paths that went through this peer
2392                graph.remove_via_peer(node_id);
2393            }
2394
2395            // Unregister peer from delta sync tracking
2396            self.unregister_peer_for_delta(&node_id);
2397
2398            self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2399            self.notify_mesh_state_changed();
2400        }
2401    }
2402
2403    /// Called when a remote device connects to us (incoming connection)
2404    ///
2405    /// Use this when we're acting as a peripheral and a central connects to us.
2406    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2407        let is_new = self
2408            .peer_manager
2409            .on_incoming_connection(identifier, node_id, now_ms);
2410
2411        // Update connection graph
2412        {
2413            let mut graph = self.connection_graph.lock().unwrap();
2414            if is_new {
2415                graph.on_discovered(
2416                    node_id,
2417                    identifier.to_string(),
2418                    None,
2419                    Some(self.config.mesh_id.clone()),
2420                    -50, // Default good RSSI for incoming connections
2421                    now_ms,
2422                );
2423            }
2424            graph.on_connected(node_id, now_ms);
2425        }
2426
2427        // Register peer for delta sync tracking
2428        self.register_peer_for_delta(&node_id);
2429
2430        if is_new {
2431            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2432                self.notify(PeatEvent::PeerDiscovered { peer });
2433            }
2434        }
2435
2436        self.notify(PeatEvent::PeerConnected { node_id });
2437        self.notify_mesh_state_changed();
2438
2439        is_new
2440    }
2441
2442    /// Called when data is received from a peer
2443    ///
2444    /// Parses the document, merges it, and generates appropriate events.
2445    /// If encryption is enabled, decrypts the document first.
2446    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2447    /// Returns the source NodeId and whether the document contained an event.
2448    pub fn on_ble_data_received(
2449        &self,
2450        identifier: &str,
2451        data: &[u8],
2452        now_ms: u64,
2453    ) -> Option<DataReceivedResult> {
2454        // Get node ID from identifier
2455        let node_id = self.peer_manager.get_node_id(identifier)?;
2456
2457        // Check for special message types first
2458        if data.len() >= 2 {
2459            match data[0] {
2460                KEY_EXCHANGE_MARKER => {
2461                    // Handle key exchange - returns response to send back
2462                    let _response = self.handle_key_exchange(data, now_ms);
2463                    // Return None as this isn't a document sync
2464                    return None;
2465                }
2466                PEER_E2EE_MARKER => {
2467                    // Handle encrypted peer message
2468                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2469                    // Return None as this isn't a document sync
2470                    return None;
2471                }
2472                RELAY_ENVELOPE_MARKER => {
2473                    // Handle relay envelope for multi-hop
2474                    return self
2475                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2476                }
2477                _ => {}
2478            }
2479        }
2480
2481        // Direct document (not relay envelope)
2482        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2483    }
2484
2485    /// Internal: Process document data with identifier as source hint
2486    #[allow(clippy::too_many_arguments)]
2487    fn process_document_data_with_identifier(
2488        &self,
2489        source_node: NodeId,
2490        identifier: &str,
2491        data: &[u8],
2492        now_ms: u64,
2493        relay_data: Option<Vec<u8>>,
2494        origin_node: Option<NodeId>,
2495        hop_count: u8,
2496    ) -> Option<DataReceivedResult> {
2497        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2498        let decrypted = self.decrypt_document(data, Some(identifier))?;
2499
2500        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
2501        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
2502        #[cfg(feature = "mesh-translator")]
2503        if self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2504            return None;
2505        }
2506
2507        // Check if this is a delta document (wire format v2)
2508        if DeltaDocument::is_delta_document(&decrypted) {
2509            return self.process_delta_document_internal(
2510                source_node,
2511                &decrypted,
2512                now_ms,
2513                relay_data,
2514                origin_node,
2515                hop_count,
2516            );
2517        }
2518
2519        // Merge the document (legacy wire format v1)
2520        let result = self.document_sync.merge_document(&decrypted)?;
2521
2522        // Store peer peripheral if present (for callsign lookup)
2523        if let Some(ref peripheral) = result.peer_peripheral {
2524            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2525                peripherals.insert(result.source_node, peripheral.clone());
2526            }
2527        }
2528
2529        // Record sync
2530        self.peer_manager.record_sync(source_node, now_ms);
2531
2532        // Generate events based on what was received
2533        if result.is_emergency() {
2534            self.notify(PeatEvent::EmergencyReceived {
2535                from_node: result.source_node,
2536            });
2537        } else if result.is_ack() {
2538            self.notify(PeatEvent::AckReceived {
2539                from_node: result.source_node,
2540            });
2541        }
2542
2543        if result.counter_changed {
2544            self.notify(PeatEvent::DocumentSynced {
2545                from_node: result.source_node,
2546                total_count: result.total_count,
2547            });
2548        }
2549
2550        // Emit relay event if we're relaying
2551        if relay_data.is_some() {
2552            let relay_targets = self.get_relay_targets(Some(source_node));
2553            self.notify(PeatEvent::MessageRelayed {
2554                origin_node: origin_node.unwrap_or(result.source_node),
2555                relay_count: relay_targets.len(),
2556                hop_count,
2557            });
2558        }
2559
2560        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2561            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2562
2563        Some(DataReceivedResult {
2564            source_node: result.source_node,
2565            is_emergency: result.is_emergency(),
2566            is_ack: result.is_ack(),
2567            counter_changed: result.counter_changed,
2568            emergency_changed: result.emergency_changed,
2569            total_count: result.total_count,
2570            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2571            relay_data,
2572            origin_node,
2573            hop_count,
2574            callsign,
2575            battery_percent,
2576            heart_rate,
2577            event_type,
2578            latitude,
2579            longitude,
2580            altitude,
2581        })
2582    }
2583
2584    /// Internal: Handle relay envelope with identifier as source hint
2585    fn handle_relay_envelope_with_identifier(
2586        &self,
2587        source_node: NodeId,
2588        identifier: &str,
2589        data: &[u8],
2590        now_ms: u64,
2591    ) -> Option<DataReceivedResult> {
2592        // Process the relay envelope
2593        let envelope = RelayEnvelope::decode(data)?;
2594
2595        // Check deduplication
2596        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2597            let stats = self
2598                .seen_cache
2599                .lock()
2600                .unwrap()
2601                .get_stats(&envelope.message_id);
2602            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2603
2604            self.notify(PeatEvent::DuplicateMessageDropped {
2605                origin_node: envelope.origin_node,
2606                seen_count,
2607            });
2608            return None;
2609        }
2610
2611        // Check TTL and get relay data
2612        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2613            envelope.relay().map(|e| e.encode())
2614        } else {
2615            if !envelope.can_relay() {
2616                self.notify(PeatEvent::MessageTtlExpired {
2617                    origin_node: envelope.origin_node,
2618                    hop_count: envelope.hop_count,
2619                });
2620            }
2621            None
2622        };
2623
2624        // Process the inner payload
2625        self.process_document_data_with_identifier(
2626            source_node,
2627            identifier,
2628            &envelope.payload,
2629            now_ms,
2630            relay_data,
2631            Some(envelope.origin_node),
2632            envelope.hop_count,
2633        )
2634    }
2635
2636    /// Called when data is received but we don't have the identifier mapped
2637    ///
2638    /// Use this when receiving data from a peripheral we discovered.
2639    /// If encryption is enabled, decrypts the document first.
2640    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2641    /// Handles relay envelopes for multi-hop mesh operation.
2642    pub fn on_ble_data_received_from_node(
2643        &self,
2644        node_id: NodeId,
2645        data: &[u8],
2646        now_ms: u64,
2647    ) -> Option<DataReceivedResult> {
2648        // Check for special message types first
2649        if data.len() >= 2 {
2650            match data[0] {
2651                KEY_EXCHANGE_MARKER => {
2652                    let _response = self.handle_key_exchange(data, now_ms);
2653                    return None;
2654                }
2655                PEER_E2EE_MARKER => {
2656                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2657                    return None;
2658                }
2659                RELAY_ENVELOPE_MARKER => {
2660                    // Handle relay envelope for multi-hop
2661                    return self.handle_relay_envelope(node_id, data, now_ms);
2662                }
2663                _ => {}
2664            }
2665        }
2666
2667        // Direct document (not relay envelope)
2668        self.process_document_data(node_id, data, now_ms, None, None, 0)
2669    }
2670
2671    /// Called when encrypted data is received from an unknown peer
2672    ///
2673    /// This handles the case where we receive an encrypted document from a BLE address
2674    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2675    /// The function decrypts first using the mesh key, then extracts the source_node
2676    /// from the decrypted document header and registers the peer.
2677    ///
2678    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2679    /// Returns `None` if decryption fails or the document is invalid.
2680    pub fn on_ble_data_received_anonymous(
2681        &self,
2682        identifier: &str,
2683        data: &[u8],
2684        now_ms: u64,
2685    ) -> Option<DataReceivedResult> {
2686        log::debug!(
2687            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2688            identifier,
2689            data.len(),
2690            data.first().copied().unwrap_or(0)
2691        );
2692
2693        // Try to decrypt (handles both encrypted and unencrypted documents)
2694        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2695            Some(d) => d,
2696            None => {
2697                log::warn!(
2698                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2699                    data.len(),
2700                    identifier
2701                );
2702                return None;
2703            }
2704        };
2705
2706        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6
2707        // translator frames through BleTranslator::decode_inbound, and
2708        // silent-drop reserved 0xB7..=0xBF frames before they reach
2709        // PeatDocument::decode (where a 0xB6+ payload could hit the
2710        // GCounter-pollution hazard for inputs whose bytes 8..11 fall
2711        // <= MAX_COUNTER_ENTRIES). Anonymous receive: source_node is
2712        // unknown until the legacy path extracts it; for translator
2713        // frames there's no PeatDocument header to extract from, so we
2714        // pass None and accept that `tracks` decoding will Err for lack
2715        // of `local_wire_id` (other collections succeed).
2716        #[cfg(feature = "mesh-translator")]
2717        if self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2718            return None;
2719        }
2720
2721        // Extract source_node from decrypted document header
2722        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2723        if decrypted.len() < 8 {
2724            log::warn!("Decrypted document too short to extract source_node");
2725            return None;
2726        }
2727
2728        let source_node_u32 =
2729            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2730        let source_node = NodeId::new(source_node_u32);
2731
2732        log::info!(
2733            "Anonymous document from {}: source_node={:08X}, len={}",
2734            identifier,
2735            source_node_u32,
2736            decrypted.len()
2737        );
2738
2739        // Register the peer with this identifier so future lookups work
2740        // This handles BLE address rotation
2741        self.peer_manager
2742            .register_identifier(identifier, source_node);
2743
2744        // Check if this is a delta document
2745        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2746        log::info!(
2747            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2748            is_delta,
2749            decrypted.first().copied().unwrap_or(0),
2750            decrypted.len()
2751        );
2752
2753        if is_delta {
2754            return self.process_delta_document_internal(
2755                source_node,
2756                &decrypted,
2757                now_ms,
2758                None,
2759                None,
2760                0,
2761            );
2762        }
2763
2764        // Handle app-layer message (0xAF marker from peat-lite CannedMessages)
2765        // These are handled by consumers who register DocumentType implementations
2766        // via the DocumentRegistry. Pass through as relay_data for platform layer.
2767        const APP_LAYER_MARKER: u8 = 0xAF;
2768        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2769            log::debug!(
2770                "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2771                source_node.as_u32(),
2772                decrypted.len()
2773            );
2774            return Some(DataReceivedResult {
2775                source_node,
2776                is_emergency: false,
2777                is_ack: false,
2778                counter_changed: false,
2779                emergency_changed: false,
2780                total_count: 0,
2781                event_timestamp: now_ms,
2782                relay_data: Some(decrypted.to_vec()),
2783                origin_node: None,
2784                hop_count: 0,
2785                callsign: None,
2786                battery_percent: None,
2787                heart_rate: None,
2788                event_type: None,
2789                latitude: None,
2790                longitude: None,
2791                altitude: None,
2792            });
2793        }
2794
2795        // Merge the document (legacy wire format v1)
2796        log::info!(
2797            "Processing legacy document from {:08X}",
2798            source_node.as_u32()
2799        );
2800        let result = self.document_sync.merge_document(&decrypted)?;
2801
2802        // Log what we got from the merge
2803        log::info!(
2804            "Merge result: peer_peripheral={}, counter_changed={}",
2805            result.peer_peripheral.is_some(),
2806            result.counter_changed
2807        );
2808        if let Some(ref p) = result.peer_peripheral {
2809            log::info!("Peripheral callsign: '{}'", p.callsign_str());
2810        }
2811
2812        // Record sync
2813        self.peer_manager.record_sync(source_node, now_ms);
2814
2815        // Generate events
2816        if result.is_emergency() {
2817            self.notify(PeatEvent::EmergencyReceived {
2818                from_node: result.source_node,
2819            });
2820        } else if result.is_ack() {
2821            self.notify(PeatEvent::AckReceived {
2822                from_node: result.source_node,
2823            });
2824        }
2825
2826        if result.counter_changed {
2827            self.notify(PeatEvent::DocumentSynced {
2828                from_node: result.source_node,
2829                total_count: result.total_count,
2830            });
2831        }
2832
2833        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2834            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2835
2836        Some(DataReceivedResult {
2837            source_node: result.source_node,
2838            is_emergency: result.is_emergency(),
2839            is_ack: result.is_ack(),
2840            counter_changed: result.counter_changed,
2841            emergency_changed: result.emergency_changed,
2842            total_count: result.total_count,
2843            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2844            relay_data: None,
2845            origin_node: None,
2846            hop_count: 0,
2847            callsign,
2848            battery_percent,
2849            heart_rate,
2850            event_type,
2851            latitude,
2852            longitude,
2853            altitude,
2854        })
2855    }
2856
2857    /// Internal: Process document data (shared by direct and relay paths)
2858    fn process_document_data(
2859        &self,
2860        source_node: NodeId,
2861        data: &[u8],
2862        now_ms: u64,
2863        relay_data: Option<Vec<u8>>,
2864        origin_node: Option<NodeId>,
2865        hop_count: u8,
2866    ) -> Option<DataReceivedResult> {
2867        // Decrypt if encrypted (mesh-wide encryption)
2868        let source_hint = format!("node:{:08X}", source_node.as_u32());
2869        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2870
2871        // ADR-059 Amendment 1 §"Receive-side dispatch": route 0xB6 frames
2872        // through BleTranslator + silent-drop reserved 0xB7..=0xBF.
2873        #[cfg(feature = "mesh-translator")]
2874        if self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
2875            return None;
2876        }
2877
2878        // Check if this is a delta document (wire format v2)
2879        if DeltaDocument::is_delta_document(&decrypted) {
2880            return self.process_delta_document_internal(
2881                source_node,
2882                &decrypted,
2883                now_ms,
2884                relay_data,
2885                origin_node,
2886                hop_count,
2887            );
2888        }
2889
2890        // Merge the document (legacy wire format v1)
2891        let result = self.document_sync.merge_document(&decrypted)?;
2892
2893        // Store peer peripheral if present (for callsign lookup)
2894        if let Some(ref peripheral) = result.peer_peripheral {
2895            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2896                peripherals.insert(result.source_node, peripheral.clone());
2897            }
2898        }
2899
2900        // Record sync
2901        self.peer_manager.record_sync(source_node, now_ms);
2902
2903        // Generate events based on what was received
2904        if result.is_emergency() {
2905            self.notify(PeatEvent::EmergencyReceived {
2906                from_node: result.source_node,
2907            });
2908        } else if result.is_ack() {
2909            self.notify(PeatEvent::AckReceived {
2910                from_node: result.source_node,
2911            });
2912        }
2913
2914        if result.counter_changed {
2915            self.notify(PeatEvent::DocumentSynced {
2916                from_node: result.source_node,
2917                total_count: result.total_count,
2918            });
2919        }
2920
2921        // Emit relay event if we're relaying
2922        if relay_data.is_some() {
2923            let relay_targets = self.get_relay_targets(Some(source_node));
2924            self.notify(PeatEvent::MessageRelayed {
2925                origin_node: origin_node.unwrap_or(result.source_node),
2926                relay_count: relay_targets.len(),
2927                hop_count,
2928            });
2929        }
2930
2931        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2932            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2933
2934        Some(DataReceivedResult {
2935            source_node: result.source_node,
2936            is_emergency: result.is_emergency(),
2937            is_ack: result.is_ack(),
2938            counter_changed: result.counter_changed,
2939            emergency_changed: result.emergency_changed,
2940            total_count: result.total_count,
2941            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2942            relay_data,
2943            origin_node,
2944            hop_count,
2945            callsign,
2946            battery_percent,
2947            heart_rate,
2948            event_type,
2949            latitude,
2950            longitude,
2951            altitude,
2952        })
2953    }
2954
2955    /// Internal: Handle relay envelope
2956    fn handle_relay_envelope(
2957        &self,
2958        source_node: NodeId,
2959        data: &[u8],
2960        now_ms: u64,
2961    ) -> Option<DataReceivedResult> {
2962        // Process the relay envelope
2963        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2964
2965        // Get relay data if we should relay
2966        let relay_data = if decision.should_relay {
2967            decision.relay_data()
2968        } else {
2969            None
2970        };
2971
2972        // Process the inner payload
2973        self.process_document_data(
2974            source_node,
2975            &decision.payload,
2976            now_ms,
2977            relay_data,
2978            Some(decision.origin_node),
2979            decision.hop_count,
2980        )
2981    }
2982
2983    /// Called when data is received without a known identifier
2984    ///
2985    /// This is the simplest data receive method - it extracts the source node_id
2986    /// from the document itself. Use this when you don't track identifiers
2987    /// (e.g., ESP32 NimBLE).
2988    /// If encryption is enabled, decrypts the document first.
2989    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2990    /// Handles relay envelopes for multi-hop mesh operation.
2991    pub fn on_ble_data(
2992        &self,
2993        identifier: &str,
2994        data: &[u8],
2995        now_ms: u64,
2996    ) -> Option<DataReceivedResult> {
2997        // Check for special message types first
2998        if data.len() >= 2 {
2999            match data[0] {
3000                KEY_EXCHANGE_MARKER => {
3001                    let _response = self.handle_key_exchange(data, now_ms);
3002                    return None;
3003                }
3004                PEER_E2EE_MARKER => {
3005                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3006                    return None;
3007                }
3008                RELAY_ENVELOPE_MARKER => {
3009                    // Handle relay envelope - extract origin from envelope
3010                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3011                }
3012                _ => {}
3013            }
3014        }
3015
3016        // Direct document - process normally
3017        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3018    }
3019
3020    /// Internal: Process incoming document (handles peer registration)
3021    fn process_incoming_document(
3022        &self,
3023        identifier: &str,
3024        data: &[u8],
3025        now_ms: u64,
3026        relay_data: Option<Vec<u8>>,
3027        origin_node: Option<NodeId>,
3028        hop_count: u8,
3029    ) -> Option<DataReceivedResult> {
3030        // Decrypt if encrypted (mesh-wide encryption)
3031        let decrypted = self.decrypt_document(data, Some(identifier))?;
3032
3033        // Merge the document (extracts node_id internally)
3034        let result = self.document_sync.merge_document(&decrypted)?;
3035
3036        // Record sync using the source_node from the merged document
3037        self.peer_manager.record_sync(result.source_node, now_ms);
3038
3039        // Only register the identifier mapping for direct messages (not relayed)
3040        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
3041        // not the origin node (who created the document). The relay source is already registered
3042        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
3043        if origin_node.is_none() {
3044            // Direct message - register the peer with this identifier
3045            let is_new =
3046                self.peer_manager
3047                    .on_incoming_connection(identifier, result.source_node, now_ms);
3048
3049            // Update connection graph to track connection state
3050            {
3051                let mut graph = self.connection_graph.lock().unwrap();
3052                if is_new {
3053                    graph.on_discovered(
3054                        result.source_node,
3055                        identifier.to_string(),
3056                        None,
3057                        Some(self.config.mesh_id.clone()),
3058                        -50, // Default RSSI for data-based discovery
3059                        now_ms,
3060                    );
3061                }
3062                graph.on_connected(result.source_node, now_ms);
3063            }
3064        }
3065
3066        // Generate events based on what was received
3067        if result.is_emergency() {
3068            self.notify(PeatEvent::EmergencyReceived {
3069                from_node: result.source_node,
3070            });
3071        } else if result.is_ack() {
3072            self.notify(PeatEvent::AckReceived {
3073                from_node: result.source_node,
3074            });
3075        }
3076
3077        if result.counter_changed {
3078            self.notify(PeatEvent::DocumentSynced {
3079                from_node: result.source_node,
3080                total_count: result.total_count,
3081            });
3082        }
3083
3084        // Emit relay event if we're relaying
3085        if relay_data.is_some() {
3086            let relay_targets = self.get_relay_targets(Some(result.source_node));
3087            self.notify(PeatEvent::MessageRelayed {
3088                origin_node: origin_node.unwrap_or(result.source_node),
3089                relay_count: relay_targets.len(),
3090                hop_count,
3091            });
3092        }
3093
3094        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3095            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3096
3097        Some(DataReceivedResult {
3098            source_node: result.source_node,
3099            is_emergency: result.is_emergency(),
3100            is_ack: result.is_ack(),
3101            counter_changed: result.counter_changed,
3102            emergency_changed: result.emergency_changed,
3103            total_count: result.total_count,
3104            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3105            relay_data,
3106            origin_node,
3107            hop_count,
3108            callsign,
3109            battery_percent,
3110            heart_rate,
3111            event_type,
3112            latitude,
3113            longitude,
3114            altitude,
3115        })
3116    }
3117
3118    /// Internal: Handle relay envelope with incoming connection registration
3119    fn handle_relay_envelope_with_incoming(
3120        &self,
3121        identifier: &str,
3122        data: &[u8],
3123        now_ms: u64,
3124    ) -> Option<DataReceivedResult> {
3125        // Parse envelope to get origin
3126        let envelope = RelayEnvelope::decode(data)?;
3127
3128        // Try to look up the source peer from identifier to register indirect path
3129        // If we know who sent this relay, we can track indirect peers via them
3130        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3131            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3132                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3133                    source_peer,
3134                    envelope.origin_node,
3135                    envelope.hop_count,
3136                    now_ms,
3137                );
3138
3139                if is_new {
3140                    log::debug!(
3141                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3142                        envelope.origin_node.as_u32(),
3143                        source_peer.as_u32(),
3144                        envelope.hop_count
3145                    );
3146                }
3147            }
3148        }
3149
3150        // Check deduplication
3151        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3152            // Duplicate - get stats for event
3153            let stats = self
3154                .seen_cache
3155                .lock()
3156                .unwrap()
3157                .get_stats(&envelope.message_id);
3158            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3159
3160            self.notify(PeatEvent::DuplicateMessageDropped {
3161                origin_node: envelope.origin_node,
3162                seen_count,
3163            });
3164            return None;
3165        }
3166
3167        // Check TTL
3168        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3169            let relay_env = envelope.relay();
3170            (true, relay_env.map(|e| e.encode()))
3171        } else {
3172            if !envelope.can_relay() {
3173                self.notify(PeatEvent::MessageTtlExpired {
3174                    origin_node: envelope.origin_node,
3175                    hop_count: envelope.hop_count,
3176                });
3177            }
3178            (false, None)
3179        };
3180
3181        // Process the inner payload
3182        self.process_incoming_document(
3183            identifier,
3184            &envelope.payload,
3185            now_ms,
3186            if should_relay { relay_data } else { None },
3187            Some(envelope.origin_node),
3188            envelope.hop_count,
3189        )
3190    }
3191
3192    // ==================== Periodic Maintenance ====================
3193
3194    /// Periodic tick - call this regularly (e.g., every second)
3195    ///
3196    /// Performs:
3197    /// - Stale peer cleanup
3198    /// - Periodic sync broadcast (if interval elapsed)
3199    ///
3200    /// Returns `Some(data)` if a sync broadcast is needed.
3201    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3202        use std::sync::atomic::Ordering;
3203
3204        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
3205        let now_ms_32 = now_ms as u32;
3206
3207        // Cleanup stale peers
3208        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3209        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3210        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3211            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3212            let removed = self.peer_manager.cleanup_stale(now_ms);
3213            for node_id in &removed {
3214                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3215            }
3216            if !removed.is_empty() {
3217                self.notify_mesh_state_changed();
3218            }
3219
3220            // Run connection graph maintenance (transition Disconnected -> Lost)
3221            {
3222                let mut graph = self.connection_graph.lock().unwrap();
3223                let newly_lost = graph.tick(now_ms);
3224                // Also cleanup peers lost for more than peer_timeout
3225                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3226                drop(graph);
3227
3228                // Emit PeerLost events for newly lost peers from graph
3229                // (these may differ from peer_manager removals)
3230                for node_id in newly_lost {
3231                    // Only notify if not already notified by peer_manager
3232                    if !removed.contains(&node_id) {
3233                        self.notify(PeatEvent::PeerLost { node_id });
3234                    }
3235                }
3236            }
3237        }
3238
3239        // Check if sync broadcast is needed
3240        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3241        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3242        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3243            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3244            // Only broadcast if we have connected peers
3245            if self.peer_manager.connected_count() > 0 {
3246                let doc = self.document_sync.build_document();
3247                return Some(self.encrypt_document(&doc));
3248            }
3249        }
3250
3251        None
3252    }
3253
3254    /// Periodic tick returning per-peer delta documents
3255    ///
3256    /// Unlike `tick()` which broadcasts a single document to all peers,
3257    /// this returns targeted deltas that only include changes each peer
3258    /// hasn't seen. Use this for platforms that support per-peer transmission.
3259    ///
3260    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3261    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3262    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3263        use std::sync::atomic::Ordering;
3264        let now_ms_32 = now_ms as u32;
3265
3266        // Cleanup stale peers (same as tick())
3267        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3268        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3269        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3270            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3271            let removed = self.peer_manager.cleanup_stale(now_ms);
3272            for node_id in &removed {
3273                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3274            }
3275            if !removed.is_empty() {
3276                self.notify_mesh_state_changed();
3277            }
3278
3279            // Run connection graph maintenance
3280            {
3281                let mut graph = self.connection_graph.lock().unwrap();
3282                let newly_lost = graph.tick(now_ms);
3283                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3284                drop(graph);
3285
3286                for node_id in newly_lost {
3287                    if !removed.contains(&node_id) {
3288                        self.notify(PeatEvent::PeerLost { node_id });
3289                    }
3290                }
3291            }
3292        }
3293
3294        // Check if sync is needed
3295        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3296        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3297        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3298            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3299
3300            // Build document for each connected peer
3301            let doc = self.document_sync.build_document();
3302            let encrypted = self.encrypt_document(&doc);
3303            let mut results = Vec::new();
3304            for peer in self.get_connected_peers() {
3305                results.push((peer.node_id, encrypted.clone()));
3306            }
3307            return results;
3308        }
3309
3310        Vec::new()
3311    }
3312
3313    // ==================== State Queries ====================
3314
3315    /// Get all known peers
3316    pub fn get_peers(&self) -> Vec<PeatPeer> {
3317        self.peer_manager.get_peers()
3318    }
3319
3320    /// Get connected peers only
3321    pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3322        self.peer_manager.get_connected_peers()
3323    }
3324
3325    /// Get a specific peer by NodeId
3326    pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3327        self.peer_manager.get_peer(node_id)
3328    }
3329
3330    /// Get peer count
3331    pub fn peer_count(&self) -> usize {
3332        self.peer_manager.peer_count()
3333    }
3334
3335    /// Get connected peer count
3336    pub fn connected_count(&self) -> usize {
3337        self.peer_manager.connected_count()
3338    }
3339
3340    /// Check if a device mesh ID matches our mesh
3341    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3342        self.peer_manager.matches_mesh(device_mesh_id)
3343    }
3344
3345    // ==================== Connection State Graph ====================
3346
3347    /// Get the connection state graph with all peer states
3348    ///
3349    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3350    /// Apps can use this to display appropriate UI indicators:
3351    /// - Green for Connected peers
3352    /// - Yellow for Degraded or RecentlyDisconnected peers
3353    /// - Gray for Lost peers
3354    ///
3355    /// # Example
3356    /// ```ignore
3357    /// let states = mesh.get_connection_graph();
3358    /// for peer in states {
3359    ///     match peer.state {
3360    ///         ConnectionState::Connected => show_green_indicator(&peer),
3361    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3362    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3363    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3364    ///         _ => {}
3365    ///     }
3366    /// }
3367    /// ```
3368    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3369        self.connection_graph.lock().unwrap().get_all_owned()
3370    }
3371
3372    /// Get a specific peer's connection state
3373    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3374        self.connection_graph
3375            .lock()
3376            .unwrap()
3377            .get_peer(node_id)
3378            .cloned()
3379    }
3380
3381    /// Get all currently connected peers from the connection graph
3382    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3383        self.connection_graph
3384            .lock()
3385            .unwrap()
3386            .get_connected()
3387            .into_iter()
3388            .cloned()
3389            .collect()
3390    }
3391
3392    /// Get peers in degraded state (connected but poor signal quality)
3393    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3394        self.connection_graph
3395            .lock()
3396            .unwrap()
3397            .get_degraded()
3398            .into_iter()
3399            .cloned()
3400            .collect()
3401    }
3402
3403    /// Get peers that disconnected within the specified time window
3404    ///
3405    /// Useful for showing "stale" peers that were recently connected.
3406    pub fn get_recently_disconnected(
3407        &self,
3408        within_ms: u64,
3409        now_ms: u64,
3410    ) -> Vec<PeerConnectionState> {
3411        self.connection_graph
3412            .lock()
3413            .unwrap()
3414            .get_recently_disconnected(within_ms, now_ms)
3415            .into_iter()
3416            .cloned()
3417            .collect()
3418    }
3419
3420    /// Get peers in Lost state (disconnected and no longer advertising)
3421    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3422        self.connection_graph
3423            .lock()
3424            .unwrap()
3425            .get_lost()
3426            .into_iter()
3427            .cloned()
3428            .collect()
3429    }
3430
3431    /// Get summary counts of peers in each connection state
3432    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3433        self.connection_graph.lock().unwrap().state_counts()
3434    }
3435
3436    // ==================== Indirect Peer Methods ====================
3437
3438    /// Get all indirect (multi-hop) peers
3439    ///
3440    /// Returns peers discovered via relay messages that are not directly
3441    /// connected via BLE. Each indirect peer includes the minimum hop count
3442    /// and the direct peers through which they can be reached.
3443    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3444        self.connection_graph
3445            .lock()
3446            .unwrap()
3447            .get_indirect_peers_owned()
3448    }
3449
3450    /// Get the degree (hop count) for a specific peer
3451    ///
3452    /// Returns:
3453    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3454    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3455    /// - `None` if peer is not known
3456    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3457        self.connection_graph.lock().unwrap().peer_degree(node_id)
3458    }
3459
3460    /// Get full state counts including indirect peers
3461    ///
3462    /// Returns counts of direct peers by connection state plus counts
3463    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3464    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3465        self.connection_graph.lock().unwrap().full_state_counts()
3466    }
3467
3468    /// Get all paths to reach an indirect peer
3469    ///
3470    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3471    /// known routes to the specified peer.
3472    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3473        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3474    }
3475
3476    /// Check if a node is known (either direct or indirect)
3477    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3478        self.connection_graph.lock().unwrap().is_known(node_id)
3479    }
3480
3481    /// Get number of indirect peers
3482    pub fn indirect_peer_count(&self) -> usize {
3483        self.connection_graph.lock().unwrap().indirect_peer_count()
3484    }
3485
3486    /// Cleanup stale indirect peers
3487    ///
3488    /// Removes indirect peers that haven't been seen within the timeout.
3489    /// Returns the list of removed peer IDs.
3490    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3491        self.connection_graph
3492            .lock()
3493            .unwrap()
3494            .cleanup_indirect(now_ms)
3495    }
3496
3497    /// Get total counter value
3498    pub fn total_count(&self) -> u64 {
3499        self.document_sync.total_count()
3500    }
3501
3502    /// Get document version
3503    pub fn document_version(&self) -> u32 {
3504        self.document_sync.version()
3505    }
3506
3507    /// Get document version (alias)
3508    pub fn version(&self) -> u32 {
3509        self.document_sync.version()
3510    }
3511
3512    /// Update health status (battery percentage)
3513    pub fn update_health(&self, battery_percent: u8) {
3514        self.document_sync.update_health(battery_percent);
3515    }
3516
3517    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
3518    pub fn update_activity(&self, activity: u8) {
3519        self.document_sync.update_activity(activity);
3520    }
3521
3522    /// Update full health status (battery and activity)
3523    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3524        self.document_sync
3525            .update_health_full(battery_percent, activity);
3526    }
3527
3528    /// Update heart rate
3529    pub fn update_heart_rate(&self, heart_rate: u8) {
3530        self.document_sync.update_heart_rate(heart_rate);
3531    }
3532
3533    /// Update location
3534    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3535        self.document_sync
3536            .update_location(latitude, longitude, altitude);
3537    }
3538
3539    /// Clear location
3540    pub fn clear_location(&self) {
3541        self.document_sync.clear_location();
3542    }
3543
3544    /// Update callsign
3545    pub fn update_callsign(&self, callsign: &str) {
3546        self.document_sync.update_callsign(callsign);
3547    }
3548
3549    /// Set peripheral event type
3550    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3551        self.document_sync
3552            .set_peripheral_event(event_type, timestamp);
3553    }
3554
3555    /// Clear peripheral event
3556    pub fn clear_peripheral_event(&self) {
3557        self.document_sync.clear_peripheral_event();
3558    }
3559
3560    /// Update full peripheral state in one call
3561    ///
3562    /// This is the most efficient way to update all peripheral data before
3563    /// calling `build_document()` for encrypted transmission.
3564    #[allow(clippy::too_many_arguments)]
3565    pub fn update_peripheral_state(
3566        &self,
3567        callsign: &str,
3568        battery_percent: u8,
3569        heart_rate: Option<u8>,
3570        latitude: Option<f32>,
3571        longitude: Option<f32>,
3572        altitude: Option<f32>,
3573        event_type: Option<EventType>,
3574        timestamp: u64,
3575    ) {
3576        self.document_sync.update_peripheral_state(
3577            callsign,
3578            battery_percent,
3579            heart_rate,
3580            latitude,
3581            longitude,
3582            altitude,
3583            event_type,
3584            timestamp,
3585        );
3586    }
3587
3588    /// Build current document for transmission
3589    ///
3590    /// If encryption is enabled, the document is encrypted.
3591    pub fn build_document(&self) -> Vec<u8> {
3592        let doc = self.document_sync.build_document();
3593        self.encrypt_document(&doc)
3594    }
3595
3596    /// Get peers that should be synced with
3597    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3598        self.peer_manager.peers_needing_sync(now_ms)
3599    }
3600
3601    // ==================== Internal Helpers ====================
3602
3603    fn notify(&self, event: PeatEvent) {
3604        self.observers.notify(event);
3605    }
3606
3607    fn notify_mesh_state_changed(&self) {
3608        self.notify(PeatEvent::MeshStateChanged {
3609            peer_count: self.peer_manager.peer_count(),
3610            connected_count: self.peer_manager.connected_count(),
3611        });
3612    }
3613
3614    // ==================== CannedMessage Integration ====================
3615    //
3616    // These methods provide deduplication support for peat-lite CannedMessages.
3617    // They use document identity (source_node + timestamp) instead of content hash,
3618    // because CRDT merge can change byte ordering.
3619
3620    /// Check if a CannedMessage should be processed.
3621    ///
3622    /// Uses document identity (source_node + timestamp) for deduplication.
3623    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
3624    ///
3625    /// # Arguments
3626    /// * `source_node` - The source node ID from the CannedMessage
3627    /// * `timestamp` - The timestamp from the CannedMessage
3628    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
3629    ///
3630    /// # Returns
3631    /// `true` if this message is new and should be processed,
3632    /// `false` if it was seen recently and should be skipped.
3633    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3634        // Create a unique key from source_node and timestamp
3635        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3636        let mut id_bytes = [0u8; 16];
3637        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3638        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3639        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3640
3641        // Check the seen cache
3642        let seen = self.seen_cache.lock().unwrap();
3643        !seen.has_seen(&message_id)
3644    }
3645
3646    /// Mark a CannedMessage as seen (for deduplication).
3647    ///
3648    /// Call this after processing a CannedMessage to prevent reprocessing
3649    /// the same message from other relay paths.
3650    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3651        let now = std::time::SystemTime::now()
3652            .duration_since(std::time::UNIX_EPOCH)
3653            .map(|d| d.as_millis() as u64)
3654            .unwrap_or(0);
3655
3656        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3657        let mut id_bytes = [0u8; 16];
3658        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3659        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3660        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3661        let origin = NodeId::new(source_node);
3662
3663        let mut seen = self.seen_cache.lock().unwrap();
3664        seen.mark_seen(message_id, origin, now);
3665    }
3666
3667    /// Get list of connected peer identifiers for relay.
3668    ///
3669    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
3670    /// to other peers after deduplication check.
3671    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3672        self.peer_manager.get_connected_identifiers()
3673    }
3674}
3675
3676/// Result from receiving BLE data
3677#[derive(Debug, Clone)]
3678pub struct DataReceivedResult {
3679    /// Node that sent this data
3680    pub source_node: NodeId,
3681
3682    /// Whether this contained an emergency event
3683    pub is_emergency: bool,
3684
3685    /// Whether this contained an ACK event
3686    pub is_ack: bool,
3687
3688    /// Whether the counter changed (new data)
3689    pub counter_changed: bool,
3690
3691    /// Whether emergency state changed (new emergency or ACK updates)
3692    pub emergency_changed: bool,
3693
3694    /// Updated total count
3695    pub total_count: u64,
3696
3697    /// Event timestamp (if event present) - use to detect duplicate events
3698    pub event_timestamp: u64,
3699
3700    /// Data to relay to other peers (if multi-hop relay is enabled)
3701    ///
3702    /// When present, the platform adapter should send this data to peers
3703    /// returned by `get_relay_targets(Some(source_node))`.
3704    pub relay_data: Option<Vec<u8>>,
3705
3706    /// Origin node for relay (may differ from source_node for relayed messages)
3707    pub origin_node: Option<NodeId>,
3708
3709    /// Current hop count (for relayed messages)
3710    pub hop_count: u8,
3711
3712    // ========== Peripheral data from sender ==========
3713    /// Sender's callsign (up to 12 chars)
3714    pub callsign: Option<String>,
3715
3716    /// Sender's battery percentage (0-100)
3717    pub battery_percent: Option<u8>,
3718
3719    /// Sender's heart rate (BPM)
3720    pub heart_rate: Option<u8>,
3721
3722    /// Sender's event type (from PeripheralEvent)
3723    pub event_type: Option<u8>,
3724
3725    /// Sender's latitude
3726    pub latitude: Option<f32>,
3727
3728    /// Sender's longitude
3729    pub longitude: Option<f32>,
3730
3731    /// Sender's altitude (meters)
3732    pub altitude: Option<f32>,
3733}
3734
3735impl DataReceivedResult {
3736    /// Extract peripheral fields from an Option<Peripheral>
3737    #[allow(clippy::type_complexity)]
3738    fn peripheral_fields(
3739        peripheral: &Option<crate::sync::crdt::Peripheral>,
3740    ) -> (
3741        Option<String>,
3742        Option<u8>,
3743        Option<u8>,
3744        Option<u8>,
3745        Option<f32>,
3746        Option<f32>,
3747        Option<f32>,
3748    ) {
3749        match peripheral {
3750            Some(p) => {
3751                let callsign = {
3752                    let s = p.callsign_str();
3753                    if s.is_empty() {
3754                        None
3755                    } else {
3756                        Some(s.to_string())
3757                    }
3758                };
3759                let battery = if p.health.battery_percent > 0 {
3760                    Some(p.health.battery_percent)
3761                } else {
3762                    None
3763                };
3764                let heart_rate = p.health.heart_rate;
3765                let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3766                let (lat, lon, alt) = match &p.location {
3767                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3768                    None => (None, None, None),
3769                };
3770                (callsign, battery, heart_rate, event_type, lat, lon, alt)
3771            }
3772            None => (None, None, None, None, None, None, None),
3773        }
3774    }
3775}
3776
3777/// Decision from processing a relay envelope
3778#[derive(Debug, Clone)]
3779pub struct RelayDecision {
3780    /// The payload (document) to process locally
3781    pub payload: Vec<u8>,
3782
3783    /// Original sender of the message
3784    pub origin_node: NodeId,
3785
3786    /// Current hop count
3787    pub hop_count: u8,
3788
3789    /// Whether this message should be relayed to other peers
3790    pub should_relay: bool,
3791
3792    /// The relay envelope to forward (with incremented hop count)
3793    ///
3794    /// Only present if `should_relay` is true and TTL not expired.
3795    pub relay_envelope: Option<RelayEnvelope>,
3796}
3797
3798impl RelayDecision {
3799    /// Get the relay data to send to peers
3800    ///
3801    /// Returns None if relay is not needed.
3802    pub fn relay_data(&self) -> Option<Vec<u8>> {
3803        self.relay_envelope.as_ref().map(|e| e.encode())
3804    }
3805}
3806
3807#[cfg(all(test, feature = "std"))]
3808mod tests {
3809    use super::*;
3810    use crate::observer::CollectingObserver;
3811
3812    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
3813    const TEST_TIMESTAMP: u64 = 1705276800000;
3814
3815    fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
3816        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3817        PeatMesh::new(config)
3818    }
3819
3820    #[test]
3821    fn test_mesh_creation() {
3822        let mesh = create_mesh(0x12345678, "ALPHA-1");
3823
3824        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3825        assert_eq!(mesh.callsign(), "ALPHA-1");
3826        assert_eq!(mesh.mesh_id(), "TEST");
3827        assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
3828    }
3829
3830    #[test]
3831    fn test_peer_discovery() {
3832        let mesh = create_mesh(0x11111111, "ALPHA-1");
3833        let observer = Arc::new(CollectingObserver::new());
3834        mesh.add_observer(observer.clone());
3835
3836        // Discover a peer
3837        let peer = mesh.on_ble_discovered(
3838            "device-uuid",
3839            Some("PEAT_TEST-22222222"),
3840            -65,
3841            Some("TEST"),
3842            1000,
3843        );
3844
3845        assert!(peer.is_some());
3846        let peer = peer.unwrap();
3847        assert_eq!(peer.node_id.as_u32(), 0x22222222);
3848
3849        // Check events were generated
3850        let events = observer.events();
3851        assert!(events
3852            .iter()
3853            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3854        assert!(events
3855            .iter()
3856            .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
3857    }
3858
3859    #[test]
3860    fn test_connection_lifecycle() {
3861        let mesh = create_mesh(0x11111111, "ALPHA-1");
3862        let observer = Arc::new(CollectingObserver::new());
3863        mesh.add_observer(observer.clone());
3864
3865        // Discover and connect
3866        mesh.on_ble_discovered(
3867            "device-uuid",
3868            Some("PEAT_TEST-22222222"),
3869            -65,
3870            Some("TEST"),
3871            1000,
3872        );
3873
3874        let node_id = mesh.on_ble_connected("device-uuid", 2000);
3875        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3876        assert_eq!(mesh.connected_count(), 1);
3877
3878        // Disconnect
3879        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3880        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3881        assert_eq!(mesh.connected_count(), 0);
3882
3883        // Check events
3884        let events = observer.events();
3885        assert!(events
3886            .iter()
3887            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3888        assert!(events
3889            .iter()
3890            .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
3891    }
3892
3893    #[test]
3894    fn test_emergency_flow() {
3895        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3896        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3897
3898        let observer2 = Arc::new(CollectingObserver::new());
3899        mesh2.add_observer(observer2.clone());
3900
3901        // mesh1 sends emergency
3902        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3903        assert!(mesh1.is_emergency_active());
3904
3905        // mesh2 receives it
3906        let result =
3907            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3908
3909        assert!(result.is_some());
3910        let result = result.unwrap();
3911        assert!(result.is_emergency);
3912        assert_eq!(result.source_node.as_u32(), 0x11111111);
3913
3914        // Check events on mesh2
3915        let events = observer2.events();
3916        assert!(events
3917            .iter()
3918            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
3919    }
3920
3921    #[test]
3922    fn test_ack_flow() {
3923        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3924        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3925
3926        let observer2 = Arc::new(CollectingObserver::new());
3927        mesh2.add_observer(observer2.clone());
3928
3929        // mesh1 sends ACK
3930        let doc = mesh1.send_ack(TEST_TIMESTAMP);
3931        assert!(mesh1.is_ack_active());
3932
3933        // mesh2 receives it
3934        let result =
3935            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3936
3937        assert!(result.is_some());
3938        let result = result.unwrap();
3939        assert!(result.is_ack);
3940
3941        // Check events on mesh2
3942        let events = observer2.events();
3943        assert!(events
3944            .iter()
3945            .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
3946    }
3947
3948    #[test]
3949    fn test_tick_cleanup() {
3950        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3951            .with_peer_timeout(10_000);
3952        let mesh = PeatMesh::new(config);
3953
3954        let observer = Arc::new(CollectingObserver::new());
3955        mesh.add_observer(observer.clone());
3956
3957        // Discover a peer
3958        mesh.on_ble_discovered(
3959            "device-uuid",
3960            Some("PEAT_TEST-22222222"),
3961            -65,
3962            Some("TEST"),
3963            1000,
3964        );
3965        assert_eq!(mesh.peer_count(), 1);
3966
3967        // Tick at t=5000 - not stale yet
3968        mesh.tick(5000);
3969        assert_eq!(mesh.peer_count(), 1);
3970
3971        // Tick at t=20000 - peer is stale (10s timeout exceeded)
3972        mesh.tick(20000);
3973        assert_eq!(mesh.peer_count(), 0);
3974
3975        // Check PeerLost event
3976        let events = observer.events();
3977        assert!(events
3978            .iter()
3979            .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
3980    }
3981
3982    #[test]
3983    fn test_tick_sync_broadcast() {
3984        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3985            .with_sync_interval(5000);
3986        let mesh = PeatMesh::new(config);
3987
3988        // Discover and connect a peer first
3989        mesh.on_ble_discovered(
3990            "device-uuid",
3991            Some("PEAT_TEST-22222222"),
3992            -65,
3993            Some("TEST"),
3994            1000,
3995        );
3996        mesh.on_ble_connected("device-uuid", 1000);
3997
3998        // First tick at t=0 sets last_sync
3999        let _result = mesh.tick(0);
4000        // May or may not broadcast depending on initial state
4001
4002        // Tick before interval - no broadcast
4003        let result = mesh.tick(3000);
4004        assert!(result.is_none());
4005
4006        // After interval - should broadcast
4007        let result = mesh.tick(6000);
4008        assert!(result.is_some());
4009
4010        // Immediate second tick - no broadcast (interval not elapsed)
4011        let result = mesh.tick(6100);
4012        assert!(result.is_none());
4013
4014        // After another interval - should broadcast again
4015        let result = mesh.tick(12000);
4016        assert!(result.is_some());
4017    }
4018
4019    #[test]
4020    fn test_incoming_connection() {
4021        let mesh = create_mesh(0x11111111, "ALPHA-1");
4022        let observer = Arc::new(CollectingObserver::new());
4023        mesh.add_observer(observer.clone());
4024
4025        // Incoming connection from unknown peer
4026        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4027
4028        assert!(is_new);
4029        assert_eq!(mesh.peer_count(), 1);
4030        assert_eq!(mesh.connected_count(), 1);
4031
4032        // Check events
4033        let events = observer.events();
4034        assert!(events
4035            .iter()
4036            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4037        assert!(events
4038            .iter()
4039            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4040    }
4041
4042    #[test]
4043    fn test_mesh_filtering() {
4044        let mesh = create_mesh(0x11111111, "ALPHA-1");
4045
4046        // Wrong mesh - ignored
4047        let peer = mesh.on_ble_discovered(
4048            "device-uuid-1",
4049            Some("PEAT_OTHER-22222222"),
4050            -65,
4051            Some("OTHER"),
4052            1000,
4053        );
4054        assert!(peer.is_none());
4055        assert_eq!(mesh.peer_count(), 0);
4056
4057        // Correct mesh - accepted
4058        let peer = mesh.on_ble_discovered(
4059            "device-uuid-2",
4060            Some("PEAT_TEST-33333333"),
4061            -65,
4062            Some("TEST"),
4063            1000,
4064        );
4065        assert!(peer.is_some());
4066        assert_eq!(mesh.peer_count(), 1);
4067    }
4068
4069    // ==================== Encryption Tests ====================
4070
4071    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4072        let config =
4073            PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4074        PeatMesh::new(config)
4075    }
4076
4077    #[test]
4078    fn test_encryption_enabled() {
4079        let secret = [0x42u8; 32];
4080        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4081
4082        assert!(mesh.is_encryption_enabled());
4083    }
4084
4085    #[test]
4086    fn test_encryption_disabled_by_default() {
4087        let mesh = create_mesh(0x11111111, "ALPHA-1");
4088
4089        assert!(!mesh.is_encryption_enabled());
4090    }
4091
4092    #[test]
4093    fn test_encrypted_document_exchange() {
4094        let secret = [0x42u8; 32];
4095        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4096        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4097
4098        // mesh1 sends document
4099        let doc = mesh1.build_document();
4100
4101        // Document should be encrypted (starts with ENCRYPTED_MARKER)
4102        assert!(doc.len() >= 2);
4103        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4104
4105        // mesh2 receives and decrypts
4106        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4107
4108        assert!(result.is_some());
4109        let result = result.unwrap();
4110        assert_eq!(result.source_node.as_u32(), 0x11111111);
4111    }
4112
4113    #[test]
4114    fn test_encrypted_emergency_exchange() {
4115        let secret = [0x42u8; 32];
4116        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4117        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4118
4119        let observer = Arc::new(CollectingObserver::new());
4120        mesh2.add_observer(observer.clone());
4121
4122        // mesh1 sends emergency
4123        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4124
4125        // mesh2 receives and decrypts
4126        let result =
4127            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4128
4129        assert!(result.is_some());
4130        let result = result.unwrap();
4131        assert!(result.is_emergency);
4132
4133        // Check EmergencyReceived event was fired
4134        let events = observer.events();
4135        assert!(events
4136            .iter()
4137            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4138    }
4139
4140    #[test]
4141    fn test_wrong_key_fails_decrypt() {
4142        let secret1 = [0x42u8; 32];
4143        let secret2 = [0x43u8; 32]; // Different key
4144        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4145        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4146
4147        // mesh1 sends document
4148        let doc = mesh1.build_document();
4149
4150        // mesh2 cannot decrypt (wrong key)
4151        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4152
4153        assert!(result.is_none());
4154    }
4155
4156    #[test]
4157    fn test_unencrypted_mesh_can_read_unencrypted() {
4158        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4159        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4160
4161        // mesh1 sends document (unencrypted)
4162        let doc = mesh1.build_document();
4163
4164        // mesh2 receives (also unencrypted)
4165        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4166
4167        assert!(result.is_some());
4168    }
4169
4170    #[test]
4171    fn test_encrypted_mesh_can_receive_unencrypted() {
4172        // Backward compatibility: encrypted mesh can receive unencrypted docs
4173        let secret = [0x42u8; 32];
4174        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
4175        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
4176
4177        // mesh1 sends unencrypted document
4178        let doc = mesh1.build_document();
4179
4180        // mesh2 can receive unencrypted (backward compat)
4181        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4182
4183        assert!(result.is_some());
4184    }
4185
4186    #[test]
4187    fn test_unencrypted_mesh_cannot_receive_encrypted() {
4188        let secret = [0x42u8; 32];
4189        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
4190        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
4191
4192        // mesh1 sends encrypted document
4193        let doc = mesh1.build_document();
4194
4195        // mesh2 cannot decrypt (no key)
4196        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4197
4198        assert!(result.is_none());
4199    }
4200
4201    #[test]
4202    fn test_enable_disable_encryption() {
4203        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4204
4205        assert!(!mesh.is_encryption_enabled());
4206
4207        // Enable encryption
4208        let secret = [0x42u8; 32];
4209        mesh.enable_encryption(&secret);
4210        assert!(mesh.is_encryption_enabled());
4211
4212        // Build document should now be encrypted
4213        let doc = mesh.build_document();
4214        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4215
4216        // Disable encryption
4217        mesh.disable_encryption();
4218        assert!(!mesh.is_encryption_enabled());
4219
4220        // Build document should now be unencrypted
4221        let doc = mesh.build_document();
4222        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4223    }
4224
4225    #[test]
4226    fn test_encryption_overhead() {
4227        let secret = [0x42u8; 32];
4228        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4229        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4230
4231        let doc_encrypted = mesh_encrypted.build_document();
4232        let doc_unencrypted = mesh_unencrypted.build_document();
4233
4234        // Encrypted doc should be larger by:
4235        // - 2 bytes marker header (0xAE + reserved)
4236        // - 12 bytes nonce
4237        // - 16 bytes auth tag
4238        // Total: 30 bytes overhead
4239        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4240        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4241    }
4242
4243    // ==================== Per-Peer E2EE Tests ====================
4244
4245    #[test]
4246    fn test_peer_e2ee_enable_disable() {
4247        let mesh = create_mesh(0x11111111, "ALPHA-1");
4248
4249        assert!(!mesh.is_peer_e2ee_enabled());
4250        assert!(mesh.peer_e2ee_public_key().is_none());
4251
4252        mesh.enable_peer_e2ee();
4253        assert!(mesh.is_peer_e2ee_enabled());
4254        assert!(mesh.peer_e2ee_public_key().is_some());
4255
4256        mesh.disable_peer_e2ee();
4257        assert!(!mesh.is_peer_e2ee_enabled());
4258    }
4259
4260    #[test]
4261    fn test_peer_e2ee_initiate_session() {
4262        let mesh = create_mesh(0x11111111, "ALPHA-1");
4263        mesh.enable_peer_e2ee();
4264
4265        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4266        assert!(key_exchange.is_some());
4267
4268        let key_exchange = key_exchange.unwrap();
4269        // Should start with KEY_EXCHANGE_MARKER
4270        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4271
4272        // Should have a pending session
4273        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4274        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4275    }
4276
4277    #[test]
4278    fn test_peer_e2ee_full_handshake() {
4279        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4280        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4281
4282        mesh1.enable_peer_e2ee();
4283        mesh2.enable_peer_e2ee();
4284
4285        let observer1 = Arc::new(CollectingObserver::new());
4286        let observer2 = Arc::new(CollectingObserver::new());
4287        mesh1.add_observer(observer1.clone());
4288        mesh2.add_observer(observer2.clone());
4289
4290        // mesh1 initiates to mesh2
4291        let key_exchange1 = mesh1
4292            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4293            .unwrap();
4294
4295        // mesh2 receives and responds
4296        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4297        assert!(response.is_some());
4298
4299        // Check mesh2 has established session
4300        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4301
4302        // mesh1 receives mesh2's response
4303        let key_exchange2 = response.unwrap();
4304        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4305
4306        // Check mesh1 has established session
4307        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4308
4309        // Both should have E2EE established events
4310        let events1 = observer1.events();
4311        assert!(events1
4312            .iter()
4313            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4314
4315        let events2 = observer2.events();
4316        assert!(events2
4317            .iter()
4318            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4319    }
4320
4321    #[test]
4322    fn test_peer_e2ee_encrypt_decrypt() {
4323        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4324        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4325
4326        mesh1.enable_peer_e2ee();
4327        mesh2.enable_peer_e2ee();
4328
4329        // Establish session via key exchange
4330        let key_exchange1 = mesh1
4331            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4332            .unwrap();
4333        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4334        mesh1.handle_key_exchange(&key_exchange2, 1000);
4335
4336        // mesh1 sends encrypted message to mesh2
4337        let plaintext = b"Secret message from mesh1";
4338        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4339        assert!(encrypted.is_some());
4340
4341        let encrypted = encrypted.unwrap();
4342        // Should start with PEER_E2EE_MARKER
4343        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4344
4345        // mesh2 receives and decrypts
4346        let observer2 = Arc::new(CollectingObserver::new());
4347        mesh2.add_observer(observer2.clone());
4348
4349        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4350        assert!(decrypted.is_some());
4351        assert_eq!(decrypted.unwrap(), plaintext);
4352
4353        // Should have received message event
4354        let events = observer2.events();
4355        assert!(events.iter().any(|e| matches!(
4356            e,
4357            PeatEvent::PeerE2eeMessageReceived { from_node, data }
4358            if from_node.as_u32() == 0x11111111 && data == plaintext
4359        )));
4360    }
4361
4362    #[test]
4363    fn test_peer_e2ee_bidirectional() {
4364        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4365        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4366
4367        mesh1.enable_peer_e2ee();
4368        mesh2.enable_peer_e2ee();
4369
4370        // Establish session
4371        let key_exchange1 = mesh1
4372            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4373            .unwrap();
4374        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4375        mesh1.handle_key_exchange(&key_exchange2, 1000);
4376
4377        // mesh1 -> mesh2
4378        let msg1 = mesh1
4379            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4380            .unwrap();
4381        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4382        assert_eq!(dec1, b"Hello from mesh1");
4383
4384        // mesh2 -> mesh1
4385        let msg2 = mesh2
4386            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4387            .unwrap();
4388        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4389        assert_eq!(dec2, b"Hello from mesh2");
4390    }
4391
4392    #[test]
4393    fn test_peer_e2ee_close_session() {
4394        let mesh = create_mesh(0x11111111, "ALPHA-1");
4395        mesh.enable_peer_e2ee();
4396
4397        let observer = Arc::new(CollectingObserver::new());
4398        mesh.add_observer(observer.clone());
4399
4400        // Initiate a session
4401        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4402        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4403
4404        // Close session
4405        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4406
4407        // Check close event
4408        let events = observer.events();
4409        assert!(events
4410            .iter()
4411            .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4412    }
4413
4414    #[test]
4415    fn test_peer_e2ee_without_enabling() {
4416        let mesh = create_mesh(0x11111111, "ALPHA-1");
4417
4418        // E2EE not enabled - should return None
4419        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4420        assert!(result.is_none());
4421
4422        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4423        assert!(result.is_none());
4424
4425        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4426    }
4427
4428    #[test]
4429    fn test_peer_e2ee_overhead() {
4430        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4431        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4432
4433        mesh1.enable_peer_e2ee();
4434        mesh2.enable_peer_e2ee();
4435
4436        // Establish session
4437        let key_exchange1 = mesh1
4438            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4439            .unwrap();
4440        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4441        mesh1.handle_key_exchange(&key_exchange2, 1000);
4442
4443        // Encrypt a message
4444        let plaintext = b"Test message";
4445        let encrypted = mesh1
4446            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4447            .unwrap();
4448
4449        // Overhead should be:
4450        // - 2 bytes marker header
4451        // - 4 bytes recipient node ID
4452        // - 4 bytes sender node ID
4453        // - 8 bytes counter
4454        // - 12 bytes nonce
4455        // - 16 bytes auth tag
4456        // Total: 46 bytes overhead
4457        let overhead = encrypted.len() - plaintext.len();
4458        assert_eq!(overhead, 46);
4459    }
4460
4461    // ==================== Strict Encryption Mode Tests ====================
4462
4463    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4464        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4465            .with_encryption(secret)
4466            .with_strict_encryption();
4467        PeatMesh::new(config)
4468    }
4469
4470    #[test]
4471    fn test_strict_encryption_enabled() {
4472        let secret = [0x42u8; 32];
4473        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4474
4475        assert!(mesh.is_encryption_enabled());
4476        assert!(mesh.is_strict_encryption_enabled());
4477    }
4478
4479    #[test]
4480    fn test_strict_encryption_disabled_by_default() {
4481        let secret = [0x42u8; 32];
4482        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4483
4484        assert!(mesh.is_encryption_enabled());
4485        assert!(!mesh.is_strict_encryption_enabled());
4486    }
4487
4488    #[test]
4489    fn test_strict_encryption_requires_encryption_enabled() {
4490        // strict_encryption without encryption should have no effect
4491        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4492            .with_strict_encryption(); // No encryption!
4493        let mesh = PeatMesh::new(config);
4494
4495        assert!(!mesh.is_encryption_enabled());
4496        assert!(!mesh.is_strict_encryption_enabled());
4497    }
4498
4499    #[test]
4500    fn test_strict_mode_accepts_encrypted_documents() {
4501        let secret = [0x42u8; 32];
4502        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4503        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4504
4505        // mesh1 sends encrypted document
4506        let doc = mesh1.build_document();
4507        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4508
4509        // mesh2 (strict mode) should accept encrypted documents
4510        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4511        assert!(result.is_some());
4512    }
4513
4514    #[test]
4515    fn test_strict_mode_rejects_unencrypted_documents() {
4516        let secret = [0x42u8; 32];
4517        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4518        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
4519
4520        let observer = Arc::new(CollectingObserver::new());
4521        mesh2.add_observer(observer.clone());
4522
4523        // mesh1 sends unencrypted document
4524        let doc = mesh1.build_document();
4525        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4526
4527        // mesh2 (strict mode) should reject unencrypted documents
4528        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4529        assert!(result.is_none());
4530
4531        // Should have SecurityViolation event
4532        let events = observer.events();
4533        assert!(events.iter().any(|e| matches!(
4534            e,
4535            PeatEvent::SecurityViolation {
4536                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4537                ..
4538            }
4539        )));
4540    }
4541
4542    #[test]
4543    fn test_non_strict_mode_accepts_unencrypted_documents() {
4544        let secret = [0x42u8; 32];
4545        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4546        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
4547
4548        // mesh1 sends unencrypted document
4549        let doc = mesh1.build_document();
4550
4551        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
4552        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4553        assert!(result.is_some());
4554    }
4555
4556    #[test]
4557    fn test_strict_mode_security_violation_event_includes_source() {
4558        let secret = [0x42u8; 32];
4559        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4560        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4561
4562        let observer = Arc::new(CollectingObserver::new());
4563        mesh2.add_observer(observer.clone());
4564
4565        let doc = mesh1.build_document();
4566
4567        // Use on_ble_data_received with identifier to test source is captured
4568        mesh2.on_ble_discovered(
4569            "test-device-uuid",
4570            Some("PEAT_TEST-11111111"),
4571            -65,
4572            Some("TEST"),
4573            500,
4574        );
4575        mesh2.on_ble_connected("test-device-uuid", 600);
4576
4577        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4578
4579        // Check SecurityViolation event has source
4580        let events = observer.events();
4581        let violation = events.iter().find(|e| {
4582            matches!(
4583                e,
4584                PeatEvent::SecurityViolation {
4585                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4586                    ..
4587                }
4588            )
4589        });
4590        assert!(violation.is_some());
4591
4592        if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4593            assert!(source.is_some());
4594            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4595        }
4596    }
4597
4598    #[test]
4599    fn test_decryption_failure_emits_security_violation() {
4600        let secret1 = [0x42u8; 32];
4601        let secret2 = [0x43u8; 32]; // Different key
4602        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4603        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4604
4605        let observer = Arc::new(CollectingObserver::new());
4606        mesh2.add_observer(observer.clone());
4607
4608        // mesh1 sends encrypted document
4609        let doc = mesh1.build_document();
4610
4611        // mesh2 cannot decrypt (wrong key)
4612        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4613        assert!(result.is_none());
4614
4615        // Should have SecurityViolation event for decryption failure
4616        let events = observer.events();
4617        assert!(events.iter().any(|e| matches!(
4618            e,
4619            PeatEvent::SecurityViolation {
4620                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4621                ..
4622            }
4623        )));
4624    }
4625
4626    #[test]
4627    fn test_strict_mode_builder_chain() {
4628        let secret = [0x42u8; 32];
4629        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4630            .with_encryption(secret)
4631            .with_strict_encryption()
4632            .with_sync_interval(10_000)
4633            .with_peer_timeout(60_000);
4634
4635        let mesh = PeatMesh::new(config);
4636
4637        assert!(mesh.is_encryption_enabled());
4638        assert!(mesh.is_strict_encryption_enabled());
4639    }
4640
4641    // ==================== Multi-Hop Relay Tests ====================
4642
4643    fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4644        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4645        PeatMesh::new(config)
4646    }
4647
4648    #[test]
4649    fn test_relay_disabled_by_default() {
4650        let mesh = create_mesh(0x11111111, "ALPHA-1");
4651        assert!(!mesh.is_relay_enabled());
4652    }
4653
4654    #[test]
4655    fn test_relay_enabled() {
4656        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4657        assert!(mesh.is_relay_enabled());
4658    }
4659
4660    #[test]
4661    fn test_relay_config_builder() {
4662        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4663            .with_relay()
4664            .with_max_relay_hops(5)
4665            .with_relay_fanout(3)
4666            .with_seen_cache_ttl(60_000);
4667
4668        assert!(config.enable_relay);
4669        assert_eq!(config.max_relay_hops, 5);
4670        assert_eq!(config.relay_fanout, 3);
4671        assert_eq!(config.seen_cache_ttl_ms, 60_000);
4672    }
4673
4674    #[test]
4675    fn test_seen_message_deduplication() {
4676        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4677        let origin = NodeId::new(0x22222222);
4678        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4679
4680        // First time - should be new
4681        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4682
4683        // Second time - should be duplicate
4684        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4685
4686        assert_eq!(mesh.seen_cache_size(), 1);
4687    }
4688
4689    #[test]
4690    fn test_wrap_for_relay() {
4691        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4692
4693        let payload = vec![1, 2, 3, 4, 5];
4694        let wrapped = mesh.wrap_for_relay(payload.clone());
4695
4696        // Should start with relay envelope marker
4697        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4698
4699        // Decode and verify
4700        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4701        assert_eq!(envelope.payload, payload);
4702        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4703        assert_eq!(envelope.hop_count, 0);
4704    }
4705
4706    #[test]
4707    fn test_process_relay_envelope_new_message() {
4708        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4709        let observer = Arc::new(CollectingObserver::new());
4710        mesh.add_observer(observer.clone());
4711
4712        // Create an envelope from another node
4713        let payload = vec![1, 2, 3, 4, 5];
4714        let envelope =
4715            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4716                .with_max_hops(7);
4717        let data = envelope.encode();
4718
4719        // Process it
4720        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4721
4722        assert!(decision.is_some());
4723        let decision = decision.unwrap();
4724        assert_eq!(decision.payload, payload);
4725        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4726        assert_eq!(decision.hop_count, 0);
4727        assert!(decision.should_relay);
4728        assert!(decision.relay_envelope.is_some());
4729
4730        // Relay envelope should have incremented hop count
4731        let relay_env = decision.relay_envelope.unwrap();
4732        assert_eq!(relay_env.hop_count, 1);
4733    }
4734
4735    #[test]
4736    fn test_process_relay_envelope_duplicate() {
4737        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4738        let observer = Arc::new(CollectingObserver::new());
4739        mesh.add_observer(observer.clone());
4740
4741        let payload = vec![1, 2, 3, 4, 5];
4742        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4743        let data = envelope.encode();
4744
4745        // First time - should succeed
4746        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4747        assert!(decision.is_some());
4748
4749        // Second time - should be duplicate
4750        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4751        assert!(decision.is_none());
4752
4753        // Should have DuplicateMessageDropped event
4754        let events = observer.events();
4755        assert!(events
4756            .iter()
4757            .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4758    }
4759
4760    #[test]
4761    fn test_process_relay_envelope_ttl_expired() {
4762        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4763        let observer = Arc::new(CollectingObserver::new());
4764        mesh.add_observer(observer.clone());
4765
4766        // Create envelope at max hops (TTL expired)
4767        let payload = vec![1, 2, 3, 4, 5];
4768        let mut envelope =
4769            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4770                .with_max_hops(3);
4771
4772        // Simulate having been relayed 3 times already
4773        envelope = envelope.relay().unwrap(); // hop 1
4774        envelope = envelope.relay().unwrap(); // hop 2
4775        envelope = envelope.relay().unwrap(); // hop 3 - at max now
4776
4777        let data = envelope.encode();
4778
4779        // Process - should still process locally but not relay further
4780        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4781
4782        assert!(decision.is_some());
4783        let decision = decision.unwrap();
4784        assert_eq!(decision.payload, payload);
4785        assert!(!decision.should_relay); // Cannot relay further
4786        assert!(decision.relay_envelope.is_none());
4787
4788        // Should have MessageTtlExpired event
4789        let events = observer.events();
4790        assert!(events
4791            .iter()
4792            .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
4793    }
4794
4795    #[test]
4796    fn test_build_relay_document() {
4797        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4798
4799        let relay_doc = mesh.build_relay_document();
4800
4801        // Should be a valid relay envelope
4802        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4803
4804        // Decode and verify it contains a valid document
4805        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4806        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4807
4808        // The payload should be a valid PeatDocument
4809        let doc = crate::document::PeatDocument::decode(&envelope.payload);
4810        assert!(doc.is_some());
4811    }
4812
4813    #[test]
4814    fn test_relay_targets_excludes_source() {
4815        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4816
4817        // Add some peers
4818        mesh.on_ble_discovered(
4819            "peer-1",
4820            Some("PEAT_TEST-22222222"),
4821            -60,
4822            Some("TEST"),
4823            1000,
4824        );
4825        mesh.on_ble_connected("peer-1", 1000);
4826
4827        mesh.on_ble_discovered(
4828            "peer-2",
4829            Some("PEAT_TEST-33333333"),
4830            -65,
4831            Some("TEST"),
4832            1000,
4833        );
4834        mesh.on_ble_connected("peer-2", 1000);
4835
4836        mesh.on_ble_discovered(
4837            "peer-3",
4838            Some("PEAT_TEST-44444444"),
4839            -70,
4840            Some("TEST"),
4841            1000,
4842        );
4843        mesh.on_ble_connected("peer-3", 1000);
4844
4845        // Get relay targets excluding peer-2
4846        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4847
4848        // Should not include peer-2 in targets
4849        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4850    }
4851
4852    #[test]
4853    fn test_clear_seen_cache() {
4854        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4855        let origin = NodeId::new(0x22222222);
4856
4857        // Add some messages
4858        mesh.mark_message_seen(
4859            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4860            origin,
4861            1000,
4862        );
4863        mesh.mark_message_seen(
4864            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4865            origin,
4866            2000,
4867        );
4868
4869        assert_eq!(mesh.seen_cache_size(), 2);
4870
4871        // Clear
4872        mesh.clear_seen_cache();
4873        assert_eq!(mesh.seen_cache_size(), 0);
4874    }
4875}