Skip to main content

peat_btle/
peat_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PeatMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for Peat BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use peat_btle::peat_mesh::{PeatMesh, PeatMeshConfig};
26//! use peat_btle::observer::{PeatEvent, PeatObserver};
27//! use peat_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = PeatMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = PeatMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl PeatObserver for MyObserver {
39//!     fn on_event(&self, event: PeatEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("PEAT_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64use crate::document_sync::DocumentSync;
65use crate::gossip::{GossipStrategy, RandomFanout};
66use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
67use crate::peer::{
68    ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
69    PeerDegree, PeerManagerConfig, StateCountSummary,
70};
71use crate::peer_manager::PeerManager;
72use crate::relay::{
73    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
74    RELAY_ENVELOPE_MARKER,
75};
76use crate::security::{
77    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
78    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
79};
80use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
81use crate::sync::delta::{DeltaEncoder, DeltaStats};
82use crate::sync::delta_document::{DeltaDocument, Operation};
83use crate::NodeId;
84
85#[cfg(feature = "std")]
86use crate::observer::ObserverManager;
87
88use crate::registry::DocumentRegistry;
89
90/// Configuration for PeatMesh
91#[derive(Debug, Clone)]
92pub struct PeatMeshConfig {
93    /// Our node ID
94    pub node_id: NodeId,
95
96    /// Our callsign (e.g., "ALPHA-1")
97    pub callsign: String,
98
99    /// Mesh ID to filter peers (e.g., "DEMO")
100    pub mesh_id: String,
101
102    /// Peripheral type for this device
103    pub peripheral_type: PeripheralType,
104
105    /// Peer management configuration
106    pub peer_config: PeerManagerConfig,
107
108    /// Sync interval in milliseconds (how often to broadcast state)
109    pub sync_interval_ms: u64,
110
111    /// Whether to auto-broadcast on emergency/ack
112    pub auto_broadcast_events: bool,
113
114    /// Optional shared secret for mesh-wide encryption (32 bytes)
115    ///
116    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
117    /// transmission and decrypted upon receipt. All nodes in the mesh must
118    /// share the same secret to communicate.
119    pub encryption_secret: Option<[u8; 32]>,
120
121    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
122    ///
123    /// When true and encryption is enabled, any unencrypted documents received
124    /// will be rejected and trigger a SecurityViolation event. This prevents
125    /// downgrade attacks where an adversary sends unencrypted malicious documents.
126    ///
127    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
128    pub strict_encryption: bool,
129
130    /// Enable multi-hop relay
131    ///
132    /// When enabled, received messages will be forwarded to other peers based
133    /// on the gossip strategy. Requires message deduplication to prevent loops.
134    ///
135    /// Default: false
136    pub enable_relay: bool,
137
138    /// Maximum hops for relay messages (TTL)
139    ///
140    /// Messages will not be relayed beyond this many hops from the origin.
141    /// Default: 7
142    pub max_relay_hops: u8,
143
144    /// Gossip fanout for relay
145    ///
146    /// Number of peers to forward each message to. Higher values increase
147    /// convergence speed but also bandwidth usage.
148    /// Default: 2
149    pub relay_fanout: usize,
150
151    /// TTL for seen message cache (milliseconds)
152    ///
153    /// How long to remember message IDs for deduplication.
154    /// Default: 300_000 (5 minutes)
155    pub seen_cache_ttl_ms: u64,
156}
157
158impl PeatMeshConfig {
159    /// Create a new configuration with required fields
160    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
161        Self {
162            node_id,
163            callsign: callsign.into(),
164            mesh_id: mesh_id.into(),
165            peripheral_type: PeripheralType::SoldierSensor,
166            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
167            sync_interval_ms: 5000,
168            auto_broadcast_events: true,
169            encryption_secret: None,
170            strict_encryption: false,
171            enable_relay: false,
172            max_relay_hops: DEFAULT_MAX_HOPS,
173            relay_fanout: 2,
174            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
175        }
176    }
177
178    /// Enable mesh-wide encryption with a shared secret
179    ///
180    /// All documents will be encrypted using ChaCha20-Poly1305 before
181    /// transmission. All mesh participants must use the same secret.
182    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
183        self.encryption_secret = Some(secret);
184        self
185    }
186
187    /// Set peripheral type
188    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
189        self.peripheral_type = ptype;
190        self
191    }
192
193    /// Set sync interval
194    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
195        self.sync_interval_ms = interval_ms;
196        self
197    }
198
199    /// Set peer timeout
200    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
201        self.peer_config.peer_timeout_ms = timeout_ms;
202        self
203    }
204
205    /// Set max peers (for embedded systems)
206    pub fn with_max_peers(mut self, max: usize) -> Self {
207        self.peer_config.max_peers = max;
208        self
209    }
210
211    /// Enable strict encryption mode
212    ///
213    /// When enabled (and encryption is also enabled), any unencrypted documents
214    /// received will be rejected and trigger a `SecurityViolation` event.
215    /// This prevents downgrade attacks.
216    ///
217    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
218    pub fn with_strict_encryption(mut self) -> Self {
219        self.strict_encryption = true;
220        self
221    }
222
223    /// Enable multi-hop relay
224    ///
225    /// When enabled, received messages will be forwarded to other connected peers
226    /// based on the gossip strategy. This enables mesh-wide message propagation.
227    pub fn with_relay(mut self) -> Self {
228        self.enable_relay = true;
229        self
230    }
231
232    /// Set maximum relay hops (TTL)
233    ///
234    /// Messages will not be relayed beyond this many hops from the origin.
235    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
236        self.max_relay_hops = max_hops;
237        self
238    }
239
240    /// Set gossip fanout for relay
241    ///
242    /// Number of peers to forward each message to.
243    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
244        self.relay_fanout = fanout.max(1);
245        self
246    }
247
248    /// Set TTL for seen message cache
249    ///
250    /// How long to remember message IDs for deduplication (milliseconds).
251    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
252        self.seen_cache_ttl_ms = ttl_ms;
253        self
254    }
255}
256
257/// Type alias for app document storage to reduce type complexity
258#[cfg(feature = "std")]
259type AppDocumentStore =
260    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
261
262/// Main facade for Peat BLE mesh operations
263///
264/// Composes peer management, document sync, and observer notifications.
265/// Platform implementations call into this from their BLE callbacks.
266#[cfg(feature = "std")]
267pub struct PeatMesh {
268    /// Configuration
269    config: PeatMeshConfig,
270
271    /// Peer manager
272    peer_manager: PeerManager,
273
274    /// Document sync
275    document_sync: DocumentSync,
276
277    /// Observer manager
278    observers: ObserverManager,
279
280    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
281    last_sync_ms: std::sync::atomic::AtomicU32,
282
283    /// Last cleanup time
284    last_cleanup_ms: std::sync::atomic::AtomicU32,
285
286    /// Optional mesh-wide encryption key (derived from shared secret)
287    encryption_key: Option<MeshEncryptionKey>,
288
289    /// Optional per-peer E2EE session manager
290    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
291
292    /// Connection state graph for tracking peer connection lifecycle
293    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
294
295    /// Seen message cache for relay deduplication
296    seen_cache: std::sync::Mutex<SeenMessageCache>,
297
298    /// Gossip strategy for relay peer selection
299    gossip_strategy: Box<dyn GossipStrategy>,
300
301    /// Delta encoder for per-peer sync state tracking
302    ///
303    /// Tracks what data has been sent to each peer to enable delta sync
304    /// (sending only changes instead of full documents).
305    delta_encoder: std::sync::Mutex<DeltaEncoder>,
306
307    /// This node's cryptographic identity (Ed25519 keypair)
308    ///
309    /// When set, the node_id is derived from the public key and documents
310    /// can be signed for authenticity verification.
311    identity: Option<DeviceIdentity>,
312
313    /// TOFU identity registry for tracking peer identities
314    ///
315    /// Maps node_id to public key on first contact, rejects mismatches.
316    identity_registry: std::sync::Mutex<IdentityRegistry>,
317
318    /// Peripheral state received from peers
319    ///
320    /// Stores the most recent peripheral data (callsign, location, etc.)
321    /// received from each peer via document sync.
322    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
323
324    /// Document registry for app-layer CRDT types.
325    ///
326    /// Enables external crates to register custom document types that sync
327    /// through the mesh using the extensible registry pattern.
328    document_registry: DocumentRegistry,
329
330    /// Storage for app-layer documents.
331    ///
332    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
333    /// document instance. Values are type-erased boxes that can be downcast
334    /// using the document registry handlers.
335    app_documents: AppDocumentStore,
336}
337
338#[cfg(feature = "std")]
339impl PeatMesh {
340    /// Create a new PeatMesh instance
341    pub fn new(config: PeatMeshConfig) -> Self {
342        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
343        let document_sync = DocumentSync::with_peripheral_type(
344            config.node_id,
345            &config.callsign,
346            config.peripheral_type,
347        );
348
349        // Derive encryption key from shared secret if configured
350        let encryption_key = config
351            .encryption_secret
352            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
353
354        // Create connection state graph with config thresholds
355        let connection_graph = ConnectionStateGraph::with_config(
356            config.peer_config.rssi_degraded_threshold,
357            config.peer_config.lost_timeout_ms,
358        );
359
360        // Create seen message cache for relay deduplication
361        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
362
363        // Create gossip strategy for relay
364        let gossip_strategy: Box<dyn GossipStrategy> =
365            Box::new(RandomFanout::new(config.relay_fanout));
366
367        // Create delta encoder for per-peer sync state tracking
368        let delta_encoder = DeltaEncoder::new(config.node_id);
369
370        let document_registry = DocumentRegistry::new();
371
372        Self {
373            config,
374            peer_manager,
375            document_sync,
376            observers: ObserverManager::new(),
377            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
378            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
379            encryption_key,
380            peer_sessions: std::sync::Mutex::new(None),
381            connection_graph: std::sync::Mutex::new(connection_graph),
382            seen_cache: std::sync::Mutex::new(seen_cache),
383            gossip_strategy,
384            delta_encoder: std::sync::Mutex::new(delta_encoder),
385            identity: None,
386            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
387            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
388            document_registry,
389            app_documents: std::sync::RwLock::new(HashMap::new()),
390        }
391    }
392
393    /// Create a new PeatMesh with a cryptographic identity
394    ///
395    /// The node_id will be derived from the identity's public key, overriding
396    /// any node_id specified in the config. This ensures cryptographic binding
397    /// between node_id and identity.
398    pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
399        // Override node_id with identity-derived value
400        let mut config = config;
401        config.node_id = identity.node_id();
402
403        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
404        let document_sync = DocumentSync::with_peripheral_type(
405            config.node_id,
406            &config.callsign,
407            config.peripheral_type,
408        );
409
410        let encryption_key = config
411            .encryption_secret
412            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
413
414        let connection_graph = ConnectionStateGraph::with_config(
415            config.peer_config.rssi_degraded_threshold,
416            config.peer_config.lost_timeout_ms,
417        );
418
419        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
420        let gossip_strategy: Box<dyn GossipStrategy> =
421            Box::new(RandomFanout::new(config.relay_fanout));
422        let delta_encoder = DeltaEncoder::new(config.node_id);
423
424        let document_registry = DocumentRegistry::new();
425
426        Self {
427            config,
428            peer_manager,
429            document_sync,
430            observers: ObserverManager::new(),
431            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
432            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
433            encryption_key,
434            peer_sessions: std::sync::Mutex::new(None),
435            connection_graph: std::sync::Mutex::new(connection_graph),
436            seen_cache: std::sync::Mutex::new(seen_cache),
437            gossip_strategy,
438            delta_encoder: std::sync::Mutex::new(delta_encoder),
439            identity: Some(identity),
440            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
441            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
442            document_registry,
443            app_documents: std::sync::RwLock::new(HashMap::new()),
444        }
445    }
446
447    /// Create a new PeatMesh from genesis data
448    ///
449    /// This is the recommended way to create a mesh for production use.
450    /// The mesh will be configured with:
451    /// - node_id derived from identity
452    /// - mesh_id from genesis
453    /// - encryption enabled using genesis-derived secret
454    pub fn from_genesis(
455        genesis: &crate::security::MeshGenesis,
456        identity: DeviceIdentity,
457        callsign: &str,
458    ) -> Self {
459        let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
460            .with_encryption(genesis.encryption_secret());
461
462        Self::with_identity(config, identity)
463    }
464
465    /// Create a PeatMesh from persisted state.
466    ///
467    /// Restores mesh configuration from previously saved state, including:
468    /// - Device identity (Ed25519 keypair)
469    /// - Mesh genesis (if present)
470    /// - Identity registry (TOFU cache)
471    ///
472    /// Use this on device boot to restore mesh membership without re-provisioning.
473    ///
474    /// # Arguments
475    ///
476    /// * `state` - Previously persisted state
477    /// * `callsign` - Human-readable identifier (may differ from original)
478    ///
479    /// # Errors
480    ///
481    /// Returns `PersistenceError` if the identity cannot be restored.
482    ///
483    /// # Example
484    ///
485    /// ```ignore
486    /// // On boot, restore from secure storage
487    /// let state = PersistedState::load(&storage)?;
488    /// let mesh = PeatMesh::from_persisted(state, "SENSOR-01")?;
489    /// ```
490    #[cfg(feature = "std")]
491    pub fn from_persisted(
492        state: crate::security::PersistedState,
493        callsign: &str,
494    ) -> Result<Self, crate::security::PersistenceError> {
495        // Restore identity
496        let identity = state.restore_identity()?;
497
498        // Restore genesis (if present)
499        let genesis = state.restore_genesis();
500
501        // Create mesh with or without genesis
502        let mesh = if let Some(ref gen) = genesis {
503            Self::from_genesis(gen, identity, callsign)
504        } else {
505            let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
506            Self::with_identity(config, identity)
507        };
508
509        // Restore identity registry
510        let restored_registry = state.restore_registry();
511        if let Ok(mut registry) = mesh.identity_registry.lock() {
512            *registry = restored_registry;
513        }
514
515        log::info!(
516            "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
517            mesh.config.node_id.as_u32(),
518            mesh.known_identity_count()
519        );
520
521        Ok(mesh)
522    }
523
524    /// Create persisted state from current mesh state.
525    ///
526    /// Captures the current identity, genesis, and registry for persistence.
527    /// Call this periodically or before shutdown to save state.
528    ///
529    /// # Arguments
530    ///
531    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
532    ///
533    /// # Returns
534    ///
535    /// `None` if the mesh has no identity bound.
536    #[cfg(feature = "std")]
537    pub fn to_persisted_state(
538        &self,
539        genesis: Option<&crate::security::MeshGenesis>,
540    ) -> Option<crate::security::PersistedState> {
541        let identity = self.identity.as_ref()?;
542        let registry = self.identity_registry.lock().ok()?;
543
544        Some(crate::security::PersistedState::with_registry(
545            identity, genesis, &registry,
546        ))
547    }
548
549    // ==================== Encryption ====================
550
551    /// Check if mesh-wide encryption is enabled
552    pub fn is_encryption_enabled(&self) -> bool {
553        self.encryption_key.is_some()
554    }
555
556    /// Check if strict encryption mode is enabled
557    ///
558    /// Returns true only if both encryption and strict_encryption are enabled.
559    pub fn is_strict_encryption_enabled(&self) -> bool {
560        self.config.strict_encryption && self.encryption_key.is_some()
561    }
562
563    /// Enable mesh-wide encryption with a shared secret
564    ///
565    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
566    /// All mesh participants must use the same secret to communicate.
567    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
568        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
569            &self.config.mesh_id,
570            secret,
571        ));
572    }
573
574    /// Disable mesh-wide encryption
575    pub fn disable_encryption(&mut self) {
576        self.encryption_key = None;
577    }
578
579    /// Encrypt document bytes for transmission
580    ///
581    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
582    /// original bytes if encryption is disabled.
583    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
584        match &self.encryption_key {
585            Some(key) => {
586                // Encrypt and prepend marker
587                match key.encrypt_to_bytes(plaintext) {
588                    Ok(ciphertext) => {
589                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
590                        buf.push(ENCRYPTED_MARKER);
591                        buf.push(0x00); // reserved
592                        buf.extend_from_slice(&ciphertext);
593                        buf
594                    }
595                    Err(e) => {
596                        log::error!("Encryption failed: {}", e);
597                        // Fall back to unencrypted on error (shouldn't happen)
598                        plaintext.to_vec()
599                    }
600                }
601            }
602            None => plaintext.to_vec(),
603        }
604    }
605
606    /// Decrypt document bytes received from peer
607    ///
608    /// Returns the decrypted bytes if encrypted and valid, or the original
609    /// bytes if not encrypted. Returns None if decryption fails.
610    ///
611    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
612    /// unencrypted documents are rejected and trigger a SecurityViolation event.
613    fn decrypt_document<'a>(
614        &self,
615        data: &'a [u8],
616        source_hint: Option<&str>,
617    ) -> Option<std::borrow::Cow<'a, [u8]>> {
618        log::debug!(
619            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
620            data.len(),
621            data.first().copied().unwrap_or(0),
622            source_hint
623        );
624
625        // Check for encrypted marker
626        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
627            // Encrypted document
628            let _reserved = data[1];
629            let encrypted_payload = &data[2..];
630
631            log::debug!(
632                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
633                encrypted_payload.len()
634            );
635
636            match &self.encryption_key {
637                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
638                    Ok(plaintext) => {
639                        log::debug!(
640                            "decrypt_document: SUCCESS, plaintext len={}",
641                            plaintext.len()
642                        );
643                        Some(std::borrow::Cow::Owned(plaintext))
644                    }
645                    Err(e) => {
646                        log::warn!(
647                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
648                            e,
649                            encrypted_payload.len(),
650                            source_hint
651                        );
652                        self.notify(PeatEvent::SecurityViolation {
653                            kind: SecurityViolationKind::DecryptionFailed,
654                            source: source_hint.map(String::from),
655                        });
656                        None
657                    }
658                },
659                None => {
660                    log::warn!(
661                        "decrypt_document: encryption not enabled but received encrypted doc"
662                    );
663                    None
664                }
665            }
666        } else {
667            // Unencrypted document
668            // Check strict encryption mode
669            if self.config.strict_encryption && self.encryption_key.is_some() {
670                log::warn!(
671                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
672                    source_hint
673                );
674                self.notify(PeatEvent::SecurityViolation {
675                    kind: SecurityViolationKind::UnencryptedInStrictMode,
676                    source: source_hint.map(String::from),
677                });
678                None
679            } else {
680                // Permissive mode: accept unencrypted for backward compatibility
681                Some(std::borrow::Cow::Borrowed(data))
682            }
683        }
684    }
685
686    // ==================== Transport Layer API ====================
687
688    /// Decrypt data without parsing (transport-only operation)
689    ///
690    /// This method provides raw decrypted bytes for apps that want to handle
691    /// message parsing themselves (using peat-lite or other libraries).
692    ///
693    /// # Arguments
694    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
695    ///
696    /// # Returns
697    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
698    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
699    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
700        self.decrypt_document(data, None)
701            .map(|cow| cow.into_owned())
702    }
703
704    // ==================== Identity ====================
705
706    /// Check if this mesh has a cryptographic identity
707    pub fn has_identity(&self) -> bool {
708        self.identity.is_some()
709    }
710
711    /// Get this node's public key (if identity is configured)
712    pub fn public_key(&self) -> Option<[u8; 32]> {
713        self.identity.as_ref().map(|id| id.public_key())
714    }
715
716    /// Create an identity attestation for this node
717    ///
718    /// Returns None if no identity is configured.
719    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
720        self.identity
721            .as_ref()
722            .map(|id| id.create_attestation(now_ms))
723    }
724
725    /// Verify and register a peer's identity attestation
726    ///
727    /// Implements TOFU (Trust On First Use):
728    /// - On first contact, registers the node_id → public_key binding
729    /// - On subsequent contacts, verifies the public key matches
730    ///
731    /// Returns the verification result. Security violations should be handled
732    /// by the caller (e.g., disconnect, alert).
733    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
734        self.identity_registry
735            .lock()
736            .unwrap()
737            .verify_or_register(attestation)
738    }
739
740    /// Check if a peer's identity is known (has been registered)
741    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
742        self.identity_registry.lock().unwrap().is_known(node_id)
743    }
744
745    /// Get a peer's public key if known
746    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
747        self.identity_registry
748            .lock()
749            .unwrap()
750            .get_public_key(node_id)
751            .copied()
752    }
753
754    /// Get the number of known peer identities
755    pub fn known_identity_count(&self) -> usize {
756        self.identity_registry.lock().unwrap().len()
757    }
758
759    /// Pre-register a peer's identity (for out-of-band key exchange)
760    ///
761    /// Use this when keys are exchanged through a secure side channel
762    /// (e.g., QR code, NFC tap, or provisioning server).
763    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
764        self.identity_registry
765            .lock()
766            .unwrap()
767            .pre_register(node_id, public_key, now_ms);
768    }
769
770    /// Remove a peer's identity from the registry
771    ///
772    /// Use with caution - this allows re-registration with a different key.
773    pub fn forget_peer_identity(&self, node_id: NodeId) {
774        self.identity_registry.lock().unwrap().remove(node_id);
775    }
776
777    /// Sign arbitrary data with this node's identity
778    ///
779    /// Returns None if no identity is configured.
780    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
781        self.identity.as_ref().map(|id| id.sign(data))
782    }
783
784    /// Verify a signature from a peer
785    ///
786    /// Uses the peer's public key from the identity registry.
787    /// Returns false if peer is unknown or signature is invalid.
788    pub fn verify_peer_signature(
789        &self,
790        node_id: NodeId,
791        data: &[u8],
792        signature: &[u8; 64],
793    ) -> bool {
794        if let Some(public_key) = self.peer_public_key(node_id) {
795            crate::security::verify_signature(&public_key, data, signature)
796        } else {
797            false
798        }
799    }
800
801    // ==================== Multi-Hop Relay ====================
802
803    /// Check if multi-hop relay is enabled
804    pub fn is_relay_enabled(&self) -> bool {
805        self.config.enable_relay
806    }
807
808    /// Enable multi-hop relay
809    pub fn enable_relay(&mut self) {
810        self.config.enable_relay = true;
811    }
812
813    /// Disable multi-hop relay
814    pub fn disable_relay(&mut self) {
815        self.config.enable_relay = false;
816    }
817
818    /// Check if a message has been seen before (for deduplication)
819    ///
820    /// Returns true if the message was already seen (duplicate).
821    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
822        self.seen_cache.lock().unwrap().has_seen(message_id)
823    }
824
825    /// Mark a message as seen
826    ///
827    /// Returns true if this is a new message (first time seen).
828    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
829        self.seen_cache
830            .lock()
831            .unwrap()
832            .check_and_mark(message_id, origin, now_ms)
833    }
834
835    /// Get the number of entries in the seen message cache
836    pub fn seen_cache_size(&self) -> usize {
837        self.seen_cache.lock().unwrap().len()
838    }
839
840    /// Clear the seen message cache
841    pub fn clear_seen_cache(&self) {
842        self.seen_cache.lock().unwrap().clear();
843    }
844
845    /// Wrap a document in a relay envelope for multi-hop transmission
846    ///
847    /// The returned bytes can be sent to peers and will be automatically
848    /// relayed through the mesh if relay is enabled on receiving nodes.
849    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
850        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
851            .with_max_hops(self.config.max_relay_hops);
852        envelope.encode()
853    }
854
855    /// Get peers to relay a message to
856    ///
857    /// Uses the configured gossip strategy to select relay targets.
858    /// Excludes the source peer (if provided) to avoid sending back to sender.
859    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
860        let connected = self.peer_manager.get_connected_peers();
861        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
862            connected
863                .into_iter()
864                .filter(|p| p.node_id != exclude)
865                .collect()
866        } else {
867            connected
868        };
869
870        self.gossip_strategy
871            .select_peers(&filtered)
872            .into_iter()
873            .cloned()
874            .collect()
875    }
876
877    /// Process an incoming relay envelope
878    ///
879    /// Handles deduplication, TTL checking, and determines if the message
880    /// should be processed and/or relayed.
881    ///
882    /// Returns:
883    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
884    /// - `Ok(None)` if message was a duplicate or TTL expired
885    /// - `Err` if parsing failed
886    pub fn process_relay_envelope(
887        &self,
888        data: &[u8],
889        source_peer: NodeId,
890        now_ms: u64,
891    ) -> Option<RelayDecision> {
892        // Parse envelope
893        let envelope = RelayEnvelope::decode(data)?;
894
895        // Update indirect peer graph if origin differs from source
896        // This means the message was relayed through source_peer from origin_node
897        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
898            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
899                source_peer,
900                envelope.origin_node,
901                envelope.hop_count,
902                now_ms,
903            );
904
905            if is_new {
906                log::debug!(
907                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
908                    envelope.origin_node.as_u32(),
909                    source_peer.as_u32(),
910                    envelope.hop_count
911                );
912            }
913        }
914
915        // Check deduplication
916        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
917            // Duplicate message
918            let stats = self
919                .seen_cache
920                .lock()
921                .unwrap()
922                .get_stats(&envelope.message_id);
923            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
924
925            self.notify(PeatEvent::DuplicateMessageDropped {
926                origin_node: envelope.origin_node,
927                seen_count,
928            });
929
930            log::debug!(
931                "Dropped duplicate message {} from {:08X} (seen {} times)",
932                envelope.message_id,
933                envelope.origin_node.as_u32(),
934                seen_count
935            );
936            return None;
937        }
938
939        // Check TTL
940        if !envelope.can_relay() {
941            self.notify(PeatEvent::MessageTtlExpired {
942                origin_node: envelope.origin_node,
943                hop_count: envelope.hop_count,
944            });
945
946            log::debug!(
947                "Message {} from {:08X} TTL expired at hop {}",
948                envelope.message_id,
949                envelope.origin_node.as_u32(),
950                envelope.hop_count
951            );
952
953            // Still process locally even if TTL expired
954            return Some(RelayDecision {
955                payload: envelope.payload,
956                origin_node: envelope.origin_node,
957                hop_count: envelope.hop_count,
958                should_relay: false,
959                relay_envelope: None,
960            });
961        }
962
963        // Determine if we should relay
964        let should_relay = self.config.enable_relay;
965        let relay_envelope = if should_relay {
966            envelope.relay() // Increments hop count
967        } else {
968            None
969        };
970
971        Some(RelayDecision {
972            payload: envelope.payload,
973            origin_node: envelope.origin_node,
974            hop_count: envelope.hop_count,
975            should_relay,
976            relay_envelope,
977        })
978    }
979
980    /// Build a document wrapped in a relay envelope
981    ///
982    /// Convenience method that builds the document, encrypts it (if enabled),
983    /// and wraps it in a relay envelope for multi-hop transmission.
984    pub fn build_relay_document(&self) -> Vec<u8> {
985        let doc = self.build_document(); // Already encrypted if encryption enabled
986        self.wrap_for_relay(doc)
987    }
988
989    // ==================== Delta Sync ====================
990
991    /// Register a peer for delta sync tracking
992    ///
993    /// Call this when a peer connects to start tracking what data has been
994    /// sent to them. This enables future delta sync (sending only changes).
995    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
996        let mut encoder = self.delta_encoder.lock().unwrap();
997        encoder.add_peer(peer_id);
998        log::debug!(
999            "Registered peer {:08X} for delta sync tracking",
1000            peer_id.as_u32()
1001        );
1002    }
1003
1004    /// Unregister a peer from delta sync tracking
1005    ///
1006    /// Call this when a peer disconnects to clean up tracking state.
1007    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1008        let mut encoder = self.delta_encoder.lock().unwrap();
1009        encoder.remove_peer(peer_id);
1010        log::debug!(
1011            "Unregistered peer {:08X} from delta sync tracking",
1012            peer_id.as_u32()
1013        );
1014    }
1015
1016    /// Reset delta sync state for a peer
1017    ///
1018    /// Call this when a peer reconnects to force a full sync on next
1019    /// communication. This clears the record of what was previously sent.
1020    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1021        let mut encoder = self.delta_encoder.lock().unwrap();
1022        encoder.reset_peer(peer_id);
1023        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1024    }
1025
1026    /// Record bytes sent to a peer (for delta statistics)
1027    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1028        let mut encoder = self.delta_encoder.lock().unwrap();
1029        encoder.record_sent(peer_id, bytes);
1030    }
1031
1032    /// Record bytes received from a peer (for delta statistics)
1033    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1034        let mut encoder = self.delta_encoder.lock().unwrap();
1035        encoder.record_received(peer_id, bytes, timestamp);
1036    }
1037
1038    /// Get delta sync statistics
1039    ///
1040    /// Returns aggregate statistics about delta sync across all peers,
1041    /// including bytes sent/received and sync counts.
1042    pub fn delta_stats(&self) -> DeltaStats {
1043        self.delta_encoder.lock().unwrap().stats()
1044    }
1045
1046    /// Get delta sync statistics for a specific peer
1047    ///
1048    /// Returns the bytes sent/received and sync count for a single peer.
1049    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1050        let encoder = self.delta_encoder.lock().unwrap();
1051        encoder
1052            .get_peer_state(peer_id)
1053            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1054    }
1055
1056    /// Build a delta document for a specific peer
1057    ///
1058    /// This only includes operations that have changed since the last sync
1059    /// with this peer. Uses the delta encoder to track per-peer state.
1060    ///
1061    /// Returns the encoded delta document bytes, or None if there's nothing
1062    /// new to send to this peer.
1063    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1064        // Collect all current operations
1065        let mut all_operations: Vec<Operation> = Vec::new();
1066
1067        // Add counter operations (one per node that has contributed)
1068        // Use the count value as the "timestamp" for tracking - only send if count increased
1069        for (node_id_u32, count) in self.document_sync.counter_entries() {
1070            all_operations.push(Operation::IncrementCounter {
1071                counter_id: 0, // Default mesh counter
1072                node_id: NodeId::new(node_id_u32),
1073                amount: count,
1074                timestamp: count, // Use count as timestamp for delta tracking
1075            });
1076        }
1077
1078        // Add peripheral update
1079        // Use event timestamp if available, otherwise use 1 for initial send
1080        let peripheral = self.document_sync.peripheral_snapshot();
1081        let peripheral_timestamp = peripheral
1082            .last_event
1083            .as_ref()
1084            .map(|e| e.timestamp)
1085            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1086        all_operations.push(Operation::UpdatePeripheral {
1087            peripheral,
1088            timestamp: peripheral_timestamp,
1089        });
1090
1091        // Add emergency operations if active
1092        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1093            let source_node = NodeId::new(emergency.source_node());
1094            let timestamp = emergency.timestamp();
1095
1096            // Add SetEmergency operation
1097            all_operations.push(Operation::SetEmergency {
1098                source_node,
1099                timestamp,
1100                known_peers: emergency.all_nodes(),
1101            });
1102
1103            // Add AckEmergency for each node that has acked
1104            for acked_node in emergency.acked_nodes() {
1105                all_operations.push(Operation::AckEmergency {
1106                    node_id: NodeId::new(acked_node),
1107                    emergency_timestamp: timestamp,
1108                });
1109            }
1110        }
1111
1112        // Add app document operations
1113        for app_op in self.app_document_delta_ops() {
1114            all_operations.push(Operation::App(app_op));
1115        }
1116
1117        // Filter operations for this peer (only send what's new)
1118        let filtered_operations: Vec<Operation> = {
1119            let encoder = self.delta_encoder.lock().unwrap();
1120            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1121                all_operations
1122                    .into_iter()
1123                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1124                    .collect()
1125            } else {
1126                // Unknown peer, send all operations
1127                all_operations
1128            }
1129        };
1130
1131        // If nothing new to send, return None
1132        if filtered_operations.is_empty() {
1133            return None;
1134        }
1135
1136        // Mark operations as sent
1137        {
1138            let mut encoder = self.delta_encoder.lock().unwrap();
1139            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1140                for op in &filtered_operations {
1141                    peer_state.mark_sent(&op.key(), op.timestamp());
1142                }
1143            }
1144        }
1145
1146        // Build the delta document
1147        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1148        for op in filtered_operations {
1149            delta.add_operation(op);
1150        }
1151
1152        // Encode and optionally encrypt
1153        let encoded = delta.encode();
1154        let result = self.encrypt_document(&encoded);
1155
1156        // Record stats
1157        {
1158            let mut encoder = self.delta_encoder.lock().unwrap();
1159            encoder.record_sent(peer_id, result.len());
1160        }
1161
1162        Some(result)
1163    }
1164
1165    /// Build a full delta document (for broadcast or new peers)
1166    ///
1167    /// Unlike `build_delta_document_for_peer`, this includes all state
1168    /// regardless of what has been sent before. Use this for broadcasts.
1169    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1170        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1171
1172        // Add all counter operations
1173        for (node_id_u32, count) in self.document_sync.counter_entries() {
1174            delta.add_operation(Operation::IncrementCounter {
1175                counter_id: 0,
1176                node_id: NodeId::new(node_id_u32),
1177                amount: count,
1178                timestamp: now_ms,
1179            });
1180        }
1181
1182        // Add peripheral
1183        let peripheral = self.document_sync.peripheral_snapshot();
1184        let peripheral_timestamp = peripheral
1185            .last_event
1186            .as_ref()
1187            .map(|e| e.timestamp)
1188            .unwrap_or(now_ms);
1189        delta.add_operation(Operation::UpdatePeripheral {
1190            peripheral,
1191            timestamp: peripheral_timestamp,
1192        });
1193
1194        // Add emergency if active
1195        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1196            let source_node = NodeId::new(emergency.source_node());
1197            let timestamp = emergency.timestamp();
1198
1199            delta.add_operation(Operation::SetEmergency {
1200                source_node,
1201                timestamp,
1202                known_peers: emergency.all_nodes(),
1203            });
1204
1205            for acked_node in emergency.acked_nodes() {
1206                delta.add_operation(Operation::AckEmergency {
1207                    node_id: NodeId::new(acked_node),
1208                    emergency_timestamp: timestamp,
1209                });
1210            }
1211        }
1212
1213        // Add app document operations
1214        for app_op in self.app_document_delta_ops() {
1215            delta.add_operation(Operation::App(app_op));
1216        }
1217
1218        let encoded = delta.encode();
1219        self.encrypt_document(&encoded)
1220    }
1221
1222    /// Internal: Process a received delta document
1223    ///
1224    /// Applies operations from a delta document to local state.
1225    fn process_delta_document_internal(
1226        &self,
1227        source_node: NodeId,
1228        data: &[u8],
1229        now_ms: u64,
1230        relay_data: Option<Vec<u8>>,
1231        origin_node: Option<NodeId>,
1232        hop_count: u8,
1233    ) -> Option<DataReceivedResult> {
1234        // Decode the delta document
1235        let delta = DeltaDocument::decode(data)?;
1236
1237        // Don't process our own documents
1238        if delta.origin_node == self.config.node_id {
1239            return None;
1240        }
1241
1242        // Apply operations to local state
1243        let mut counter_changed = false;
1244        let mut emergency_changed = false;
1245        let mut is_emergency = false;
1246        let mut is_ack = false;
1247        let mut event_timestamp = 0u64;
1248        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1249
1250        log::info!(
1251            "Delta document from {:08X}: {} operations, data_len={}",
1252            delta.origin_node.as_u32(),
1253            delta.operations.len(),
1254            data.len()
1255        );
1256        for op in &delta.operations {
1257            log::info!("  Operation: {}", op.key());
1258            match op {
1259                Operation::IncrementCounter {
1260                    node_id, amount, ..
1261                } => {
1262                    // Merge counter value (take max)
1263                    let current = self.document_sync.counter_entries();
1264                    let current_value = current
1265                        .iter()
1266                        .find(|(id, _)| *id == node_id.as_u32())
1267                        .map(|(_, v)| *v)
1268                        .unwrap_or(0);
1269
1270                    if *amount > current_value {
1271                        // Need to merge - this is handled by the counter merge logic
1272                        // For now, we record that counter changed
1273                        counter_changed = true;
1274                    }
1275                }
1276                Operation::UpdatePeripheral {
1277                    peripheral,
1278                    timestamp,
1279                } => {
1280                    // Store peer peripheral for callsign lookup
1281                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1282                        peripherals.insert(delta.origin_node, peripheral.clone());
1283                    }
1284                    // Track the peripheral for the result
1285                    peer_peripheral = Some(peripheral.clone());
1286                    // Track the timestamp for the result
1287                    if *timestamp > event_timestamp {
1288                        event_timestamp = *timestamp;
1289                    }
1290                }
1291                Operation::SetEmergency { timestamp, .. } => {
1292                    is_emergency = true;
1293                    emergency_changed = true;
1294                    event_timestamp = *timestamp;
1295                }
1296                Operation::AckEmergency {
1297                    emergency_timestamp,
1298                    ..
1299                } => {
1300                    is_ack = true;
1301                    emergency_changed = true;
1302                    if *emergency_timestamp > event_timestamp {
1303                        event_timestamp = *emergency_timestamp;
1304                    }
1305                }
1306                Operation::ClearEmergency {
1307                    emergency_timestamp,
1308                } => {
1309                    emergency_changed = true;
1310                    if *emergency_timestamp > event_timestamp {
1311                        event_timestamp = *emergency_timestamp;
1312                    }
1313                }
1314                Operation::App(app_op) => {
1315                    // Handle app-layer document operation
1316                    //
1317                    // The timestamp field may contain version info in the upper bits
1318                    // (used by delta encoder for change detection). Extract the
1319                    // original document timestamp from the lower 48 bits.
1320                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1321
1322                    log::info!(
1323                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1324                        app_op.type_id,
1325                        app_op.op_code,
1326                        app_op.source_node,
1327                        doc_timestamp,
1328                        app_op.payload.len()
1329                    );
1330
1331                    // Try to find/create document and apply the delta operation
1332                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1333                    let changed = {
1334                        let mut docs = self.app_documents.write().unwrap();
1335
1336                        if let Some(existing) = docs.get_mut(&doc_key) {
1337                            // Apply delta to existing document (merge via CRDT)
1338                            self.document_registry.apply_delta_op(
1339                                app_op.type_id,
1340                                existing.as_mut(),
1341                                app_op,
1342                            )
1343                        } else {
1344                            // Try to decode from payload (for FULL_STATE operations)
1345                            if let Some(decoded) = self
1346                                .document_registry
1347                                .decode(app_op.type_id, &app_op.payload)
1348                            {
1349                                docs.insert(doc_key, decoded);
1350                                true
1351                            } else {
1352                                // For delta ops on unknown documents, we can't apply
1353                                // The full state will be sent later
1354                                log::debug!(
1355                                    "Received delta for unknown doc {:?}, waiting for full state",
1356                                    doc_key
1357                                );
1358                                false
1359                            }
1360                        }
1361                    };
1362
1363                    // Emit observer event with the real document timestamp
1364                    self.observers.notify(PeatEvent::app_document_received(
1365                        app_op.type_id,
1366                        NodeId::new(app_op.source_node),
1367                        doc_timestamp,
1368                        changed,
1369                    ));
1370                }
1371            }
1372        }
1373
1374        // Record sync
1375        self.peer_manager.record_sync(source_node, now_ms);
1376
1377        // Record delta received
1378        {
1379            let mut encoder = self.delta_encoder.lock().unwrap();
1380            encoder.record_received(&source_node, data.len(), now_ms);
1381        }
1382
1383        // Generate events based on what was received
1384        if is_emergency {
1385            self.notify(PeatEvent::EmergencyReceived {
1386                from_node: delta.origin_node,
1387            });
1388        } else if is_ack {
1389            self.notify(PeatEvent::AckReceived {
1390                from_node: delta.origin_node,
1391            });
1392        }
1393
1394        if counter_changed {
1395            let total_count = self.document_sync.total_count();
1396            self.notify(PeatEvent::DocumentSynced {
1397                from_node: delta.origin_node,
1398                total_count,
1399            });
1400        }
1401
1402        // Emit relay event if we're relaying
1403        if relay_data.is_some() {
1404            let relay_targets = self.get_relay_targets(Some(source_node));
1405            self.notify(PeatEvent::MessageRelayed {
1406                origin_node: origin_node.unwrap_or(delta.origin_node),
1407                relay_count: relay_targets.len(),
1408                hop_count,
1409            });
1410        }
1411
1412        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1413            DataReceivedResult::peripheral_fields(&peer_peripheral);
1414
1415        Some(DataReceivedResult {
1416            source_node: delta.origin_node,
1417            is_emergency,
1418            is_ack,
1419            counter_changed,
1420            emergency_changed,
1421            total_count: self.document_sync.total_count(),
1422            event_timestamp,
1423            relay_data,
1424            origin_node,
1425            hop_count,
1426            callsign,
1427            battery_percent,
1428            heart_rate,
1429            event_type,
1430            latitude,
1431            longitude,
1432            altitude,
1433        })
1434    }
1435
1436    // ==================== Per-Peer E2EE ====================
1437
1438    /// Enable per-peer E2EE capability
1439    ///
1440    /// Creates a new identity key for this node. This allows establishing
1441    /// encrypted sessions with specific peers where only the sender and
1442    /// recipient can read messages (other mesh members cannot).
1443    pub fn enable_peer_e2ee(&self) {
1444        let mut sessions = self.peer_sessions.lock().unwrap();
1445        if sessions.is_none() {
1446            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1447            log::info!(
1448                "Per-peer E2EE enabled for node {:08X}",
1449                self.config.node_id.as_u32()
1450            );
1451        }
1452    }
1453
1454    /// Disable per-peer E2EE capability
1455    ///
1456    /// Clears all peer sessions and disables E2EE.
1457    pub fn disable_peer_e2ee(&self) {
1458        let mut sessions = self.peer_sessions.lock().unwrap();
1459        *sessions = None;
1460        log::info!("Per-peer E2EE disabled");
1461    }
1462
1463    /// Check if per-peer E2EE is enabled
1464    pub fn is_peer_e2ee_enabled(&self) -> bool {
1465        self.peer_sessions.lock().unwrap().is_some()
1466    }
1467
1468    /// Get our E2EE public key (for sharing with peers)
1469    ///
1470    /// Returns None if per-peer E2EE is not enabled.
1471    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1472        self.peer_sessions
1473            .lock()
1474            .unwrap()
1475            .as_ref()
1476            .map(|s| s.our_public_key())
1477    }
1478
1479    /// Initiate E2EE session with a specific peer
1480    ///
1481    /// Returns the key exchange message bytes to send to the peer.
1482    /// The message should be broadcast/sent to the peer.
1483    /// Returns None if per-peer E2EE is not enabled.
1484    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1485        let mut sessions = self.peer_sessions.lock().unwrap();
1486        let session_mgr = sessions.as_mut()?;
1487
1488        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1489        let mut buf = Vec::with_capacity(2 + 37);
1490        buf.push(KEY_EXCHANGE_MARKER);
1491        buf.push(0x00); // reserved
1492        buf.extend_from_slice(&key_exchange.encode());
1493
1494        log::info!(
1495            "Initiated E2EE session with peer {:08X}",
1496            peer_node_id.as_u32()
1497        );
1498        Some(buf)
1499    }
1500
1501    /// Check if we have an established E2EE session with a peer
1502    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1503        self.peer_sessions
1504            .lock()
1505            .unwrap()
1506            .as_ref()
1507            .is_some_and(|s| s.has_session(peer_node_id))
1508    }
1509
1510    /// Get E2EE session state with a peer
1511    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1512        self.peer_sessions
1513            .lock()
1514            .unwrap()
1515            .as_ref()
1516            .and_then(|s| s.session_state(peer_node_id))
1517    }
1518
1519    /// Send an E2EE encrypted message to a specific peer
1520    ///
1521    /// Returns the encrypted message bytes to send, or None if no session exists.
1522    /// The message should be sent directly to the peer (not broadcast).
1523    pub fn send_peer_e2ee(
1524        &self,
1525        peer_node_id: NodeId,
1526        plaintext: &[u8],
1527        now_ms: u64,
1528    ) -> Option<Vec<u8>> {
1529        let mut sessions = self.peer_sessions.lock().unwrap();
1530        let session_mgr = sessions.as_mut()?;
1531
1532        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1533            Ok(encrypted) => {
1534                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1535                buf.push(PEER_E2EE_MARKER);
1536                buf.push(0x00); // reserved
1537                buf.extend_from_slice(&encrypted.encode());
1538                Some(buf)
1539            }
1540            Err(e) => {
1541                log::warn!(
1542                    "Failed to encrypt for peer {:08X}: {:?}",
1543                    peer_node_id.as_u32(),
1544                    e
1545                );
1546                None
1547            }
1548        }
1549    }
1550
1551    /// Close E2EE session with a peer
1552    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1553        let mut sessions = self.peer_sessions.lock().unwrap();
1554        if let Some(session_mgr) = sessions.as_mut() {
1555            session_mgr.close_session(peer_node_id);
1556            self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1557            log::info!(
1558                "Closed E2EE session with peer {:08X}",
1559                peer_node_id.as_u32()
1560            );
1561        }
1562    }
1563
1564    /// Get count of active E2EE sessions
1565    pub fn peer_e2ee_session_count(&self) -> usize {
1566        self.peer_sessions
1567            .lock()
1568            .unwrap()
1569            .as_ref()
1570            .map(|s| s.session_count())
1571            .unwrap_or(0)
1572    }
1573
1574    /// Get count of established E2EE sessions
1575    pub fn peer_e2ee_established_count(&self) -> usize {
1576        self.peer_sessions
1577            .lock()
1578            .unwrap()
1579            .as_ref()
1580            .map(|s| s.established_count())
1581            .unwrap_or(0)
1582    }
1583
1584    /// Handle incoming key exchange message
1585    ///
1586    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1587    /// Returns the response key exchange bytes to send back, or None if invalid.
1588    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1589        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1590            return None;
1591        }
1592
1593        let payload = &data[2..];
1594        let msg = KeyExchangeMessage::decode(payload)?;
1595
1596        let mut sessions = self.peer_sessions.lock().unwrap();
1597        let session_mgr = sessions.as_mut()?;
1598
1599        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1600
1601        if established {
1602            self.notify(PeatEvent::PeerE2eeEstablished {
1603                peer_node_id: msg.sender_node_id,
1604            });
1605            log::info!(
1606                "E2EE session established with peer {:08X}",
1607                msg.sender_node_id.as_u32()
1608            );
1609        }
1610
1611        // Return response key exchange
1612        let mut buf = Vec::with_capacity(2 + 37);
1613        buf.push(KEY_EXCHANGE_MARKER);
1614        buf.push(0x00);
1615        buf.extend_from_slice(&response.encode());
1616        Some(buf)
1617    }
1618
1619    /// Handle incoming E2EE encrypted message
1620    ///
1621    /// Called internally when we receive a PEER_E2EE_MARKER message.
1622    /// Decrypts and notifies observers of the received message.
1623    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1624        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1625            return None;
1626        }
1627
1628        let payload = &data[2..];
1629        let msg = PeerEncryptedMessage::decode(payload)?;
1630
1631        let mut sessions = self.peer_sessions.lock().unwrap();
1632        let session_mgr = sessions.as_mut()?;
1633
1634        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1635            Ok(plaintext) => {
1636                // Notify observers of the decrypted message
1637                self.notify(PeatEvent::PeerE2eeMessageReceived {
1638                    from_node: msg.sender_node_id,
1639                    data: plaintext.clone(),
1640                });
1641                Some(plaintext)
1642            }
1643            Err(e) => {
1644                log::warn!(
1645                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1646                    msg.sender_node_id.as_u32(),
1647                    e
1648                );
1649                None
1650            }
1651        }
1652    }
1653
1654    // ==================== Configuration ====================
1655
1656    /// Get our node ID
1657    pub fn node_id(&self) -> NodeId {
1658        self.config.node_id
1659    }
1660
1661    /// Get our callsign
1662    pub fn callsign(&self) -> &str {
1663        &self.config.callsign
1664    }
1665
1666    /// Get the mesh ID
1667    pub fn mesh_id(&self) -> &str {
1668        &self.config.mesh_id
1669    }
1670
1671    /// Get the device name for BLE advertising
1672    pub fn device_name(&self) -> String {
1673        format!(
1674            "PEAT_{}-{:08X}",
1675            self.config.mesh_id,
1676            self.config.node_id.as_u32()
1677        )
1678    }
1679
1680    /// Get a peer's callsign by node ID
1681    ///
1682    /// Returns the callsign from the peer's most recently received peripheral data,
1683    /// or None if no peripheral data has been received from this peer.
1684    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1685        self.peer_peripherals.read().ok().and_then(|peripherals| {
1686            peripherals
1687                .get(&node_id)
1688                .map(|p| p.callsign_str().to_string())
1689        })
1690    }
1691
1692    /// Get a peer's full peripheral data by node ID
1693    ///
1694    /// Returns a clone of the peripheral data from the peer's most recently received
1695    /// document, or None if no peripheral data has been received from this peer.
1696    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1697        self.peer_peripherals
1698            .read()
1699            .ok()
1700            .and_then(|peripherals| peripherals.get(&node_id).cloned())
1701    }
1702
1703    // ==================== Document Registry ====================
1704
1705    /// Get a reference to the document registry.
1706    ///
1707    /// Use this to register custom document types for mesh synchronization.
1708    ///
1709    /// # Example
1710    ///
1711    /// ```ignore
1712    /// use peat_btle::registry::DocumentType;
1713    ///
1714    /// // Register a custom document type
1715    /// mesh.document_registry().register::<MyCustomDocument>();
1716    /// ```
1717    pub fn document_registry(&self) -> &DocumentRegistry {
1718        &self.document_registry
1719    }
1720
1721    /// Store an app-layer document.
1722    ///
1723    /// If a document with the same identity (type_id, source_node, timestamp)
1724    /// already exists, it will be merged using CRDT semantics.
1725    ///
1726    /// Returns true if the document was newly added or changed via merge.
1727    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1728        let type_id = T::TYPE_ID;
1729        let (source_node, timestamp) = doc.identity();
1730        let key = (type_id, source_node, timestamp);
1731
1732        let mut docs = self.app_documents.write().unwrap();
1733
1734        if let Some(existing) = docs.get_mut(&key) {
1735            // Merge with existing document
1736            self.document_registry
1737                .merge(type_id, existing.as_mut(), &doc)
1738        } else {
1739            // Insert new document
1740            docs.insert(key, Box::new(doc));
1741            true
1742        }
1743    }
1744
1745    /// Store a type-erased app-layer document.
1746    ///
1747    /// Used when receiving documents from the network where the type is
1748    /// determined at runtime.
1749    ///
1750    /// Returns true if the document was newly added or changed via merge.
1751    pub fn store_app_document_boxed(
1752        &self,
1753        type_id: u8,
1754        source_node: u32,
1755        timestamp: u64,
1756        doc: Box<dyn core::any::Any + Send + Sync>,
1757    ) -> bool {
1758        let key = (type_id, source_node, timestamp);
1759
1760        let mut docs = self.app_documents.write().unwrap();
1761
1762        if let Some(existing) = docs.get_mut(&key) {
1763            // Merge with existing document
1764            self.document_registry
1765                .merge(type_id, existing.as_mut(), doc.as_ref())
1766        } else {
1767            // Insert new document
1768            docs.insert(key, doc);
1769            true
1770        }
1771    }
1772
1773    /// Get a stored app-layer document by identity.
1774    ///
1775    /// Returns None if not found or if downcast to T fails.
1776    pub fn get_app_document<T: crate::registry::DocumentType>(
1777        &self,
1778        source_node: u32,
1779        timestamp: u64,
1780    ) -> Option<T> {
1781        let key = (T::TYPE_ID, source_node, timestamp);
1782
1783        let docs = self.app_documents.read().unwrap();
1784        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1785    }
1786
1787    /// Get all stored app-layer documents of a specific type.
1788    ///
1789    /// Returns a vector of all documents of type T currently stored.
1790    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1791        let docs = self.app_documents.read().unwrap();
1792        docs.iter()
1793            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1794            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1795            .collect()
1796    }
1797
1798    /// Get delta operations for all stored app documents.
1799    ///
1800    /// Used during sync to include app documents in the delta stream.
1801    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
1802        let docs = self.app_documents.read().unwrap();
1803        let mut ops = Vec::new();
1804
1805        for ((type_id, _source, _ts), doc) in docs.iter() {
1806            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
1807                ops.push(op);
1808            }
1809        }
1810
1811        ops
1812    }
1813
1814    /// Get all document keys for a given type.
1815    ///
1816    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
1817    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
1818        let docs = self.app_documents.read().unwrap();
1819        docs.keys()
1820            .filter(|(tid, _, _)| *tid == type_id)
1821            .map(|(_, source, ts)| (*source, *ts))
1822            .collect()
1823    }
1824
1825    /// Get the number of stored app documents.
1826    pub fn app_document_count(&self) -> usize {
1827        self.app_documents.read().unwrap().len()
1828    }
1829
1830    // ==================== Observer Management ====================
1831
1832    /// Add an observer for mesh events
1833    pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
1834        self.observers.add(observer);
1835    }
1836
1837    /// Remove an observer
1838    pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
1839        self.observers.remove(observer);
1840    }
1841
1842    // ==================== User Actions ====================
1843
1844    /// Send an emergency alert
1845    ///
1846    /// Returns the document bytes to broadcast to all peers.
1847    /// If encryption is enabled, the document is encrypted.
1848    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1849        let data = self.document_sync.send_emergency(timestamp);
1850        self.notify(PeatEvent::MeshStateChanged {
1851            peer_count: self.peer_manager.peer_count(),
1852            connected_count: self.peer_manager.connected_count(),
1853        });
1854        self.encrypt_document(&data)
1855    }
1856
1857    /// Send an ACK response
1858    ///
1859    /// Returns the document bytes to broadcast to all peers.
1860    /// If encryption is enabled, the document is encrypted.
1861    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1862        let data = self.document_sync.send_ack(timestamp);
1863        self.notify(PeatEvent::MeshStateChanged {
1864            peer_count: self.peer_manager.peer_count(),
1865            connected_count: self.peer_manager.connected_count(),
1866        });
1867        self.encrypt_document(&data)
1868    }
1869
1870    /// Broadcast arbitrary bytes over the mesh.
1871    ///
1872    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
1873    /// and returns bytes ready to send to all connected peers.
1874    ///
1875    /// This is useful for sending extension data like CannedMessages from peat-lite.
1876    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
1877        self.encrypt_document(payload)
1878    }
1879
1880    /// Clear the current event (emergency or ack)
1881    pub fn clear_event(&self) {
1882        self.document_sync.clear_event();
1883    }
1884
1885    /// Check if emergency is active
1886    pub fn is_emergency_active(&self) -> bool {
1887        self.document_sync.is_emergency_active()
1888    }
1889
1890    /// Check if ACK is active
1891    pub fn is_ack_active(&self) -> bool {
1892        self.document_sync.is_ack_active()
1893    }
1894
1895    /// Get current event type
1896    pub fn current_event(&self) -> Option<EventType> {
1897        self.document_sync.current_event()
1898    }
1899
1900    // ==================== Emergency Management (Document-Based) ====================
1901
1902    /// Start a new emergency event with ACK tracking
1903    ///
1904    /// Creates an emergency event that tracks ACKs from all known peers.
1905    /// Pass the list of known peer node IDs to track.
1906    /// Returns the document bytes to broadcast.
1907    /// If encryption is enabled, the document is encrypted.
1908    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1909        let data = self.document_sync.start_emergency(timestamp, known_peers);
1910        self.notify(PeatEvent::MeshStateChanged {
1911            peer_count: self.peer_manager.peer_count(),
1912            connected_count: self.peer_manager.connected_count(),
1913        });
1914        self.encrypt_document(&data)
1915    }
1916
1917    /// Start a new emergency using all currently known peers
1918    ///
1919    /// Convenience method that automatically includes all discovered peers.
1920    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1921        let peers: Vec<u32> = self
1922            .peer_manager
1923            .get_peers()
1924            .iter()
1925            .map(|p| p.node_id.as_u32())
1926            .collect();
1927        self.start_emergency(timestamp, &peers)
1928    }
1929
1930    /// Record our ACK for the current emergency
1931    ///
1932    /// Returns the document bytes to broadcast, or None if no emergency is active.
1933    /// If encryption is enabled, the document is encrypted.
1934    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1935        let result = self.document_sync.ack_emergency(timestamp);
1936        if result.is_some() {
1937            self.notify(PeatEvent::MeshStateChanged {
1938                peer_count: self.peer_manager.peer_count(),
1939                connected_count: self.peer_manager.connected_count(),
1940            });
1941        }
1942        result.map(|data| self.encrypt_document(&data))
1943    }
1944
1945    /// Clear the current emergency event
1946    pub fn clear_emergency(&self) {
1947        self.document_sync.clear_emergency();
1948    }
1949
1950    /// Check if there's an active emergency
1951    pub fn has_active_emergency(&self) -> bool {
1952        self.document_sync.has_active_emergency()
1953    }
1954
1955    /// Get emergency status info
1956    ///
1957    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
1958    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1959        self.document_sync.get_emergency_status()
1960    }
1961
1962    /// Check if a specific peer has ACKed the current emergency
1963    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1964        self.document_sync.has_peer_acked(peer_id)
1965    }
1966
1967    /// Check if all peers have ACKed the current emergency
1968    pub fn all_peers_acked(&self) -> bool {
1969        self.document_sync.all_peers_acked()
1970    }
1971
1972    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
1973
1974    /// Send a chat message
1975    ///
1976    /// Adds the message to the local CRDT and returns the document bytes
1977    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
1978    ///
1979    /// Returns the encrypted document bytes if the message was new,
1980    /// or None if it was a duplicate.
1981    #[cfg(feature = "legacy-chat")]
1982    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1983        if self.document_sync.add_chat_message(sender, text, timestamp) {
1984            Some(self.encrypt_document(&self.build_document()))
1985        } else {
1986            None
1987        }
1988    }
1989
1990    /// Send a chat reply
1991    ///
1992    /// Adds the reply to the local CRDT with reply-to information and returns
1993    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
1994    ///
1995    /// Returns the encrypted document bytes if the message was new,
1996    /// or None if it was a duplicate.
1997    #[cfg(feature = "legacy-chat")]
1998    pub fn send_chat_reply(
1999        &self,
2000        sender: &str,
2001        text: &str,
2002        reply_to_node: u32,
2003        reply_to_timestamp: u64,
2004        timestamp: u64,
2005    ) -> Option<Vec<u8>> {
2006        if self.document_sync.add_chat_reply(
2007            sender,
2008            text,
2009            reply_to_node,
2010            reply_to_timestamp,
2011            timestamp,
2012        ) {
2013            Some(self.encrypt_document(&self.build_document()))
2014        } else {
2015            None
2016        }
2017    }
2018
2019    /// Get the number of chat messages in the local CRDT
2020    #[cfg(feature = "legacy-chat")]
2021    pub fn chat_count(&self) -> usize {
2022        self.document_sync.chat_count()
2023    }
2024
2025    /// Get chat messages newer than a timestamp
2026    ///
2027    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2028    #[cfg(feature = "legacy-chat")]
2029    pub fn chat_messages_since(
2030        &self,
2031        since_timestamp: u64,
2032    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2033        self.document_sync.chat_messages_since(since_timestamp)
2034    }
2035
2036    /// Get all chat messages
2037    ///
2038    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2039    #[cfg(feature = "legacy-chat")]
2040    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2041        self.document_sync.all_chat_messages()
2042    }
2043
2044    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2045
2046    /// Called when a BLE device is discovered
2047    ///
2048    /// Returns `Some(PeatPeer)` if this is a new Peat peer on our mesh.
2049    pub fn on_ble_discovered(
2050        &self,
2051        identifier: &str,
2052        name: Option<&str>,
2053        rssi: i8,
2054        mesh_id: Option<&str>,
2055        now_ms: u64,
2056    ) -> Option<PeatPeer> {
2057        let (node_id, is_new) = self
2058            .peer_manager
2059            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2060
2061        let peer = self.peer_manager.get_peer(node_id)?;
2062
2063        // Update connection graph
2064        {
2065            let mut graph = self.connection_graph.lock().unwrap();
2066            graph.on_discovered(
2067                node_id,
2068                identifier.to_string(),
2069                name.map(|s| s.to_string()),
2070                mesh_id.map(|s| s.to_string()),
2071                rssi,
2072                now_ms,
2073            );
2074        }
2075
2076        if is_new {
2077            self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2078            self.notify_mesh_state_changed();
2079        }
2080
2081        Some(peer)
2082    }
2083
2084    /// Called when a BLE connection is established (outgoing)
2085    ///
2086    /// Returns the NodeId if this identifier is known.
2087    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2088        let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2089            Some(id) => id,
2090            None => {
2091                log::warn!(
2092                    "on_ble_connected: identifier {:?} not in peer map — \
2093                     use on_incoming_connection() for peripheral connections",
2094                    identifier
2095                );
2096                return None;
2097            }
2098        };
2099
2100        // Update connection graph
2101        {
2102            let mut graph = self.connection_graph.lock().unwrap();
2103            graph.on_connected(node_id, now_ms);
2104        }
2105
2106        // Register peer for delta sync tracking
2107        self.register_peer_for_delta(&node_id);
2108
2109        self.notify(PeatEvent::PeerConnected { node_id });
2110        self.notify_mesh_state_changed();
2111        Some(node_id)
2112    }
2113
2114    /// Called when a BLE connection is lost
2115    pub fn on_ble_disconnected(
2116        &self,
2117        identifier: &str,
2118        reason: DisconnectReason,
2119    ) -> Option<NodeId> {
2120        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2121
2122        // Update connection graph (convert observer reason to platform reason)
2123        {
2124            let mut graph = self.connection_graph.lock().unwrap();
2125            let platform_reason = match observer_reason {
2126                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2127                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2128                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2129                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2130                DisconnectReason::ConnectionFailed => {
2131                    crate::platform::DisconnectReason::ConnectionFailed
2132                }
2133                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2134            };
2135            let now_ms = std::time::SystemTime::now()
2136                .duration_since(std::time::UNIX_EPOCH)
2137                .map(|d| d.as_millis() as u64)
2138                .unwrap_or(0);
2139            graph.on_disconnected(node_id, platform_reason, now_ms);
2140
2141            // Remove indirect peer paths that went through this peer
2142            // These paths may no longer be valid since the direct connection is lost
2143            graph.remove_via_peer(node_id);
2144        }
2145
2146        // Unregister peer from delta sync tracking
2147        self.unregister_peer_for_delta(&node_id);
2148
2149        self.notify(PeatEvent::PeerDisconnected {
2150            node_id,
2151            reason: observer_reason,
2152        });
2153        self.notify_mesh_state_changed();
2154        Some(node_id)
2155    }
2156
2157    /// Called when a BLE connection is lost, using NodeId directly
2158    ///
2159    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2160    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2161        if self
2162            .peer_manager
2163            .on_disconnected_by_node_id(node_id, reason)
2164        {
2165            // Update connection graph
2166            {
2167                let mut graph = self.connection_graph.lock().unwrap();
2168                let platform_reason = match reason {
2169                    DisconnectReason::LocalRequest => {
2170                        crate::platform::DisconnectReason::LocalRequest
2171                    }
2172                    DisconnectReason::RemoteRequest => {
2173                        crate::platform::DisconnectReason::RemoteRequest
2174                    }
2175                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2176                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2177                    DisconnectReason::ConnectionFailed => {
2178                        crate::platform::DisconnectReason::ConnectionFailed
2179                    }
2180                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2181                };
2182                let now_ms = std::time::SystemTime::now()
2183                    .duration_since(std::time::UNIX_EPOCH)
2184                    .map(|d| d.as_millis() as u64)
2185                    .unwrap_or(0);
2186                graph.on_disconnected(node_id, platform_reason, now_ms);
2187
2188                // Remove indirect peer paths that went through this peer
2189                graph.remove_via_peer(node_id);
2190            }
2191
2192            // Unregister peer from delta sync tracking
2193            self.unregister_peer_for_delta(&node_id);
2194
2195            self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2196            self.notify_mesh_state_changed();
2197        }
2198    }
2199
2200    /// Called when a remote device connects to us (incoming connection)
2201    ///
2202    /// Use this when we're acting as a peripheral and a central connects to us.
2203    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2204        let is_new = self
2205            .peer_manager
2206            .on_incoming_connection(identifier, node_id, now_ms);
2207
2208        // Update connection graph
2209        {
2210            let mut graph = self.connection_graph.lock().unwrap();
2211            if is_new {
2212                graph.on_discovered(
2213                    node_id,
2214                    identifier.to_string(),
2215                    None,
2216                    Some(self.config.mesh_id.clone()),
2217                    -50, // Default good RSSI for incoming connections
2218                    now_ms,
2219                );
2220            }
2221            graph.on_connected(node_id, now_ms);
2222        }
2223
2224        // Register peer for delta sync tracking
2225        self.register_peer_for_delta(&node_id);
2226
2227        if is_new {
2228            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2229                self.notify(PeatEvent::PeerDiscovered { peer });
2230            }
2231        }
2232
2233        self.notify(PeatEvent::PeerConnected { node_id });
2234        self.notify_mesh_state_changed();
2235
2236        is_new
2237    }
2238
2239    /// Called when data is received from a peer
2240    ///
2241    /// Parses the document, merges it, and generates appropriate events.
2242    /// If encryption is enabled, decrypts the document first.
2243    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2244    /// Returns the source NodeId and whether the document contained an event.
2245    pub fn on_ble_data_received(
2246        &self,
2247        identifier: &str,
2248        data: &[u8],
2249        now_ms: u64,
2250    ) -> Option<DataReceivedResult> {
2251        // Get node ID from identifier
2252        let node_id = self.peer_manager.get_node_id(identifier)?;
2253
2254        // Check for special message types first
2255        if data.len() >= 2 {
2256            match data[0] {
2257                KEY_EXCHANGE_MARKER => {
2258                    // Handle key exchange - returns response to send back
2259                    let _response = self.handle_key_exchange(data, now_ms);
2260                    // Return None as this isn't a document sync
2261                    return None;
2262                }
2263                PEER_E2EE_MARKER => {
2264                    // Handle encrypted peer message
2265                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2266                    // Return None as this isn't a document sync
2267                    return None;
2268                }
2269                RELAY_ENVELOPE_MARKER => {
2270                    // Handle relay envelope for multi-hop
2271                    return self
2272                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2273                }
2274                _ => {}
2275            }
2276        }
2277
2278        // Direct document (not relay envelope)
2279        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2280    }
2281
2282    /// Internal: Process document data with identifier as source hint
2283    #[allow(clippy::too_many_arguments)]
2284    fn process_document_data_with_identifier(
2285        &self,
2286        source_node: NodeId,
2287        identifier: &str,
2288        data: &[u8],
2289        now_ms: u64,
2290        relay_data: Option<Vec<u8>>,
2291        origin_node: Option<NodeId>,
2292        hop_count: u8,
2293    ) -> Option<DataReceivedResult> {
2294        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2295        let decrypted = self.decrypt_document(data, Some(identifier))?;
2296
2297        // Check if this is a delta document (wire format v2)
2298        if DeltaDocument::is_delta_document(&decrypted) {
2299            return self.process_delta_document_internal(
2300                source_node,
2301                &decrypted,
2302                now_ms,
2303                relay_data,
2304                origin_node,
2305                hop_count,
2306            );
2307        }
2308
2309        // Merge the document (legacy wire format v1)
2310        let result = self.document_sync.merge_document(&decrypted)?;
2311
2312        // Store peer peripheral if present (for callsign lookup)
2313        if let Some(ref peripheral) = result.peer_peripheral {
2314            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2315                peripherals.insert(result.source_node, peripheral.clone());
2316            }
2317        }
2318
2319        // Record sync
2320        self.peer_manager.record_sync(source_node, now_ms);
2321
2322        // Generate events based on what was received
2323        if result.is_emergency() {
2324            self.notify(PeatEvent::EmergencyReceived {
2325                from_node: result.source_node,
2326            });
2327        } else if result.is_ack() {
2328            self.notify(PeatEvent::AckReceived {
2329                from_node: result.source_node,
2330            });
2331        }
2332
2333        if result.counter_changed {
2334            self.notify(PeatEvent::DocumentSynced {
2335                from_node: result.source_node,
2336                total_count: result.total_count,
2337            });
2338        }
2339
2340        // Emit relay event if we're relaying
2341        if relay_data.is_some() {
2342            let relay_targets = self.get_relay_targets(Some(source_node));
2343            self.notify(PeatEvent::MessageRelayed {
2344                origin_node: origin_node.unwrap_or(result.source_node),
2345                relay_count: relay_targets.len(),
2346                hop_count,
2347            });
2348        }
2349
2350        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2351            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2352
2353        Some(DataReceivedResult {
2354            source_node: result.source_node,
2355            is_emergency: result.is_emergency(),
2356            is_ack: result.is_ack(),
2357            counter_changed: result.counter_changed,
2358            emergency_changed: result.emergency_changed,
2359            total_count: result.total_count,
2360            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2361            relay_data,
2362            origin_node,
2363            hop_count,
2364            callsign,
2365            battery_percent,
2366            heart_rate,
2367            event_type,
2368            latitude,
2369            longitude,
2370            altitude,
2371        })
2372    }
2373
2374    /// Internal: Handle relay envelope with identifier as source hint
2375    fn handle_relay_envelope_with_identifier(
2376        &self,
2377        source_node: NodeId,
2378        identifier: &str,
2379        data: &[u8],
2380        now_ms: u64,
2381    ) -> Option<DataReceivedResult> {
2382        // Process the relay envelope
2383        let envelope = RelayEnvelope::decode(data)?;
2384
2385        // Check deduplication
2386        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2387            let stats = self
2388                .seen_cache
2389                .lock()
2390                .unwrap()
2391                .get_stats(&envelope.message_id);
2392            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2393
2394            self.notify(PeatEvent::DuplicateMessageDropped {
2395                origin_node: envelope.origin_node,
2396                seen_count,
2397            });
2398            return None;
2399        }
2400
2401        // Check TTL and get relay data
2402        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2403            envelope.relay().map(|e| e.encode())
2404        } else {
2405            if !envelope.can_relay() {
2406                self.notify(PeatEvent::MessageTtlExpired {
2407                    origin_node: envelope.origin_node,
2408                    hop_count: envelope.hop_count,
2409                });
2410            }
2411            None
2412        };
2413
2414        // Process the inner payload
2415        self.process_document_data_with_identifier(
2416            source_node,
2417            identifier,
2418            &envelope.payload,
2419            now_ms,
2420            relay_data,
2421            Some(envelope.origin_node),
2422            envelope.hop_count,
2423        )
2424    }
2425
2426    /// Called when data is received but we don't have the identifier mapped
2427    ///
2428    /// Use this when receiving data from a peripheral we discovered.
2429    /// If encryption is enabled, decrypts the document first.
2430    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2431    /// Handles relay envelopes for multi-hop mesh operation.
2432    pub fn on_ble_data_received_from_node(
2433        &self,
2434        node_id: NodeId,
2435        data: &[u8],
2436        now_ms: u64,
2437    ) -> Option<DataReceivedResult> {
2438        // Check for special message types first
2439        if data.len() >= 2 {
2440            match data[0] {
2441                KEY_EXCHANGE_MARKER => {
2442                    let _response = self.handle_key_exchange(data, now_ms);
2443                    return None;
2444                }
2445                PEER_E2EE_MARKER => {
2446                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2447                    return None;
2448                }
2449                RELAY_ENVELOPE_MARKER => {
2450                    // Handle relay envelope for multi-hop
2451                    return self.handle_relay_envelope(node_id, data, now_ms);
2452                }
2453                _ => {}
2454            }
2455        }
2456
2457        // Direct document (not relay envelope)
2458        self.process_document_data(node_id, data, now_ms, None, None, 0)
2459    }
2460
2461    /// Called when encrypted data is received from an unknown peer
2462    ///
2463    /// This handles the case where we receive an encrypted document from a BLE address
2464    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2465    /// The function decrypts first using the mesh key, then extracts the source_node
2466    /// from the decrypted document header and registers the peer.
2467    ///
2468    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2469    /// Returns `None` if decryption fails or the document is invalid.
2470    pub fn on_ble_data_received_anonymous(
2471        &self,
2472        identifier: &str,
2473        data: &[u8],
2474        now_ms: u64,
2475    ) -> Option<DataReceivedResult> {
2476        log::debug!(
2477            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2478            identifier,
2479            data.len(),
2480            data.first().copied().unwrap_or(0)
2481        );
2482
2483        // Try to decrypt (handles both encrypted and unencrypted documents)
2484        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2485            Some(d) => d,
2486            None => {
2487                log::warn!(
2488                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2489                    data.len(),
2490                    identifier
2491                );
2492                return None;
2493            }
2494        };
2495
2496        // Extract source_node from decrypted document header
2497        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2498        if decrypted.len() < 8 {
2499            log::warn!("Decrypted document too short to extract source_node");
2500            return None;
2501        }
2502
2503        let source_node_u32 =
2504            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2505        let source_node = NodeId::new(source_node_u32);
2506
2507        log::info!(
2508            "Anonymous document from {}: source_node={:08X}, len={}",
2509            identifier,
2510            source_node_u32,
2511            decrypted.len()
2512        );
2513
2514        // Register the peer with this identifier so future lookups work
2515        // This handles BLE address rotation
2516        self.peer_manager
2517            .register_identifier(identifier, source_node);
2518
2519        // Check if this is a delta document
2520        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2521        log::info!(
2522            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2523            is_delta,
2524            decrypted.first().copied().unwrap_or(0),
2525            decrypted.len()
2526        );
2527
2528        if is_delta {
2529            return self.process_delta_document_internal(
2530                source_node,
2531                &decrypted,
2532                now_ms,
2533                None,
2534                None,
2535                0,
2536            );
2537        }
2538
2539        // Handle app-layer message (0xAF marker from peat-lite CannedMessages)
2540        // These are handled by consumers who register DocumentType implementations
2541        // via the DocumentRegistry. Pass through as relay_data for platform layer.
2542        const APP_LAYER_MARKER: u8 = 0xAF;
2543        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2544            log::debug!(
2545                "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2546                source_node.as_u32(),
2547                decrypted.len()
2548            );
2549            return Some(DataReceivedResult {
2550                source_node,
2551                is_emergency: false,
2552                is_ack: false,
2553                counter_changed: false,
2554                emergency_changed: false,
2555                total_count: 0,
2556                event_timestamp: now_ms,
2557                relay_data: Some(decrypted.to_vec()),
2558                origin_node: None,
2559                hop_count: 0,
2560                callsign: None,
2561                battery_percent: None,
2562                heart_rate: None,
2563                event_type: None,
2564                latitude: None,
2565                longitude: None,
2566                altitude: None,
2567            });
2568        }
2569
2570        // Merge the document (legacy wire format v1)
2571        log::info!(
2572            "Processing legacy document from {:08X}",
2573            source_node.as_u32()
2574        );
2575        let result = self.document_sync.merge_document(&decrypted)?;
2576
2577        // Log what we got from the merge
2578        log::info!(
2579            "Merge result: peer_peripheral={}, counter_changed={}",
2580            result.peer_peripheral.is_some(),
2581            result.counter_changed
2582        );
2583        if let Some(ref p) = result.peer_peripheral {
2584            log::info!("Peripheral callsign: '{}'", p.callsign_str());
2585        }
2586
2587        // Record sync
2588        self.peer_manager.record_sync(source_node, now_ms);
2589
2590        // Generate events
2591        if result.is_emergency() {
2592            self.notify(PeatEvent::EmergencyReceived {
2593                from_node: result.source_node,
2594            });
2595        } else if result.is_ack() {
2596            self.notify(PeatEvent::AckReceived {
2597                from_node: result.source_node,
2598            });
2599        }
2600
2601        if result.counter_changed {
2602            self.notify(PeatEvent::DocumentSynced {
2603                from_node: result.source_node,
2604                total_count: result.total_count,
2605            });
2606        }
2607
2608        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2609            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2610
2611        Some(DataReceivedResult {
2612            source_node: result.source_node,
2613            is_emergency: result.is_emergency(),
2614            is_ack: result.is_ack(),
2615            counter_changed: result.counter_changed,
2616            emergency_changed: result.emergency_changed,
2617            total_count: result.total_count,
2618            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2619            relay_data: None,
2620            origin_node: None,
2621            hop_count: 0,
2622            callsign,
2623            battery_percent,
2624            heart_rate,
2625            event_type,
2626            latitude,
2627            longitude,
2628            altitude,
2629        })
2630    }
2631
2632    /// Internal: Process document data (shared by direct and relay paths)
2633    fn process_document_data(
2634        &self,
2635        source_node: NodeId,
2636        data: &[u8],
2637        now_ms: u64,
2638        relay_data: Option<Vec<u8>>,
2639        origin_node: Option<NodeId>,
2640        hop_count: u8,
2641    ) -> Option<DataReceivedResult> {
2642        // Decrypt if encrypted (mesh-wide encryption)
2643        let source_hint = format!("node:{:08X}", source_node.as_u32());
2644        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2645
2646        // Check if this is a delta document (wire format v2)
2647        if DeltaDocument::is_delta_document(&decrypted) {
2648            return self.process_delta_document_internal(
2649                source_node,
2650                &decrypted,
2651                now_ms,
2652                relay_data,
2653                origin_node,
2654                hop_count,
2655            );
2656        }
2657
2658        // Merge the document (legacy wire format v1)
2659        let result = self.document_sync.merge_document(&decrypted)?;
2660
2661        // Store peer peripheral if present (for callsign lookup)
2662        if let Some(ref peripheral) = result.peer_peripheral {
2663            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2664                peripherals.insert(result.source_node, peripheral.clone());
2665            }
2666        }
2667
2668        // Record sync
2669        self.peer_manager.record_sync(source_node, now_ms);
2670
2671        // Generate events based on what was received
2672        if result.is_emergency() {
2673            self.notify(PeatEvent::EmergencyReceived {
2674                from_node: result.source_node,
2675            });
2676        } else if result.is_ack() {
2677            self.notify(PeatEvent::AckReceived {
2678                from_node: result.source_node,
2679            });
2680        }
2681
2682        if result.counter_changed {
2683            self.notify(PeatEvent::DocumentSynced {
2684                from_node: result.source_node,
2685                total_count: result.total_count,
2686            });
2687        }
2688
2689        // Emit relay event if we're relaying
2690        if relay_data.is_some() {
2691            let relay_targets = self.get_relay_targets(Some(source_node));
2692            self.notify(PeatEvent::MessageRelayed {
2693                origin_node: origin_node.unwrap_or(result.source_node),
2694                relay_count: relay_targets.len(),
2695                hop_count,
2696            });
2697        }
2698
2699        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2700            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2701
2702        Some(DataReceivedResult {
2703            source_node: result.source_node,
2704            is_emergency: result.is_emergency(),
2705            is_ack: result.is_ack(),
2706            counter_changed: result.counter_changed,
2707            emergency_changed: result.emergency_changed,
2708            total_count: result.total_count,
2709            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2710            relay_data,
2711            origin_node,
2712            hop_count,
2713            callsign,
2714            battery_percent,
2715            heart_rate,
2716            event_type,
2717            latitude,
2718            longitude,
2719            altitude,
2720        })
2721    }
2722
2723    /// Internal: Handle relay envelope
2724    fn handle_relay_envelope(
2725        &self,
2726        source_node: NodeId,
2727        data: &[u8],
2728        now_ms: u64,
2729    ) -> Option<DataReceivedResult> {
2730        // Process the relay envelope
2731        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2732
2733        // Get relay data if we should relay
2734        let relay_data = if decision.should_relay {
2735            decision.relay_data()
2736        } else {
2737            None
2738        };
2739
2740        // Process the inner payload
2741        self.process_document_data(
2742            source_node,
2743            &decision.payload,
2744            now_ms,
2745            relay_data,
2746            Some(decision.origin_node),
2747            decision.hop_count,
2748        )
2749    }
2750
2751    /// Called when data is received without a known identifier
2752    ///
2753    /// This is the simplest data receive method - it extracts the source node_id
2754    /// from the document itself. Use this when you don't track identifiers
2755    /// (e.g., ESP32 NimBLE).
2756    /// If encryption is enabled, decrypts the document first.
2757    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2758    /// Handles relay envelopes for multi-hop mesh operation.
2759    pub fn on_ble_data(
2760        &self,
2761        identifier: &str,
2762        data: &[u8],
2763        now_ms: u64,
2764    ) -> Option<DataReceivedResult> {
2765        // Check for special message types first
2766        if data.len() >= 2 {
2767            match data[0] {
2768                KEY_EXCHANGE_MARKER => {
2769                    let _response = self.handle_key_exchange(data, now_ms);
2770                    return None;
2771                }
2772                PEER_E2EE_MARKER => {
2773                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2774                    return None;
2775                }
2776                RELAY_ENVELOPE_MARKER => {
2777                    // Handle relay envelope - extract origin from envelope
2778                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
2779                }
2780                _ => {}
2781            }
2782        }
2783
2784        // Direct document - process normally
2785        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
2786    }
2787
2788    /// Internal: Process incoming document (handles peer registration)
2789    fn process_incoming_document(
2790        &self,
2791        identifier: &str,
2792        data: &[u8],
2793        now_ms: u64,
2794        relay_data: Option<Vec<u8>>,
2795        origin_node: Option<NodeId>,
2796        hop_count: u8,
2797    ) -> Option<DataReceivedResult> {
2798        // Decrypt if encrypted (mesh-wide encryption)
2799        let decrypted = self.decrypt_document(data, Some(identifier))?;
2800
2801        // Merge the document (extracts node_id internally)
2802        let result = self.document_sync.merge_document(&decrypted)?;
2803
2804        // Record sync using the source_node from the merged document
2805        self.peer_manager.record_sync(result.source_node, now_ms);
2806
2807        // Only register the identifier mapping for direct messages (not relayed)
2808        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
2809        // not the origin node (who created the document). The relay source is already registered
2810        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
2811        if origin_node.is_none() {
2812            // Direct message - register the peer with this identifier
2813            let is_new =
2814                self.peer_manager
2815                    .on_incoming_connection(identifier, result.source_node, now_ms);
2816
2817            // Update connection graph to track connection state
2818            {
2819                let mut graph = self.connection_graph.lock().unwrap();
2820                if is_new {
2821                    graph.on_discovered(
2822                        result.source_node,
2823                        identifier.to_string(),
2824                        None,
2825                        Some(self.config.mesh_id.clone()),
2826                        -50, // Default RSSI for data-based discovery
2827                        now_ms,
2828                    );
2829                }
2830                graph.on_connected(result.source_node, now_ms);
2831            }
2832        }
2833
2834        // Generate events based on what was received
2835        if result.is_emergency() {
2836            self.notify(PeatEvent::EmergencyReceived {
2837                from_node: result.source_node,
2838            });
2839        } else if result.is_ack() {
2840            self.notify(PeatEvent::AckReceived {
2841                from_node: result.source_node,
2842            });
2843        }
2844
2845        if result.counter_changed {
2846            self.notify(PeatEvent::DocumentSynced {
2847                from_node: result.source_node,
2848                total_count: result.total_count,
2849            });
2850        }
2851
2852        // Emit relay event if we're relaying
2853        if relay_data.is_some() {
2854            let relay_targets = self.get_relay_targets(Some(result.source_node));
2855            self.notify(PeatEvent::MessageRelayed {
2856                origin_node: origin_node.unwrap_or(result.source_node),
2857                relay_count: relay_targets.len(),
2858                hop_count,
2859            });
2860        }
2861
2862        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2863            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2864
2865        Some(DataReceivedResult {
2866            source_node: result.source_node,
2867            is_emergency: result.is_emergency(),
2868            is_ack: result.is_ack(),
2869            counter_changed: result.counter_changed,
2870            emergency_changed: result.emergency_changed,
2871            total_count: result.total_count,
2872            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2873            relay_data,
2874            origin_node,
2875            hop_count,
2876            callsign,
2877            battery_percent,
2878            heart_rate,
2879            event_type,
2880            latitude,
2881            longitude,
2882            altitude,
2883        })
2884    }
2885
2886    /// Internal: Handle relay envelope with incoming connection registration
2887    fn handle_relay_envelope_with_incoming(
2888        &self,
2889        identifier: &str,
2890        data: &[u8],
2891        now_ms: u64,
2892    ) -> Option<DataReceivedResult> {
2893        // Parse envelope to get origin
2894        let envelope = RelayEnvelope::decode(data)?;
2895
2896        // Try to look up the source peer from identifier to register indirect path
2897        // If we know who sent this relay, we can track indirect peers via them
2898        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
2899            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
2900                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
2901                    source_peer,
2902                    envelope.origin_node,
2903                    envelope.hop_count,
2904                    now_ms,
2905                );
2906
2907                if is_new {
2908                    log::debug!(
2909                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
2910                        envelope.origin_node.as_u32(),
2911                        source_peer.as_u32(),
2912                        envelope.hop_count
2913                    );
2914                }
2915            }
2916        }
2917
2918        // Check deduplication
2919        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2920            // Duplicate - get stats for event
2921            let stats = self
2922                .seen_cache
2923                .lock()
2924                .unwrap()
2925                .get_stats(&envelope.message_id);
2926            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2927
2928            self.notify(PeatEvent::DuplicateMessageDropped {
2929                origin_node: envelope.origin_node,
2930                seen_count,
2931            });
2932            return None;
2933        }
2934
2935        // Check TTL
2936        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2937            let relay_env = envelope.relay();
2938            (true, relay_env.map(|e| e.encode()))
2939        } else {
2940            if !envelope.can_relay() {
2941                self.notify(PeatEvent::MessageTtlExpired {
2942                    origin_node: envelope.origin_node,
2943                    hop_count: envelope.hop_count,
2944                });
2945            }
2946            (false, None)
2947        };
2948
2949        // Process the inner payload
2950        self.process_incoming_document(
2951            identifier,
2952            &envelope.payload,
2953            now_ms,
2954            if should_relay { relay_data } else { None },
2955            Some(envelope.origin_node),
2956            envelope.hop_count,
2957        )
2958    }
2959
2960    // ==================== Periodic Maintenance ====================
2961
2962    /// Periodic tick - call this regularly (e.g., every second)
2963    ///
2964    /// Performs:
2965    /// - Stale peer cleanup
2966    /// - Periodic sync broadcast (if interval elapsed)
2967    ///
2968    /// Returns `Some(data)` if a sync broadcast is needed.
2969    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
2970        use std::sync::atomic::Ordering;
2971
2972        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
2973        let now_ms_32 = now_ms as u32;
2974
2975        // Cleanup stale peers
2976        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
2977        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
2978        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
2979            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
2980            let removed = self.peer_manager.cleanup_stale(now_ms);
2981            for node_id in &removed {
2982                self.notify(PeatEvent::PeerLost { node_id: *node_id });
2983            }
2984            if !removed.is_empty() {
2985                self.notify_mesh_state_changed();
2986            }
2987
2988            // Run connection graph maintenance (transition Disconnected -> Lost)
2989            {
2990                let mut graph = self.connection_graph.lock().unwrap();
2991                let newly_lost = graph.tick(now_ms);
2992                // Also cleanup peers lost for more than peer_timeout
2993                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
2994                drop(graph);
2995
2996                // Emit PeerLost events for newly lost peers from graph
2997                // (these may differ from peer_manager removals)
2998                for node_id in newly_lost {
2999                    // Only notify if not already notified by peer_manager
3000                    if !removed.contains(&node_id) {
3001                        self.notify(PeatEvent::PeerLost { node_id });
3002                    }
3003                }
3004            }
3005        }
3006
3007        // Check if sync broadcast is needed
3008        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3009        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3010        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3011            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3012            // Only broadcast if we have connected peers
3013            if self.peer_manager.connected_count() > 0 {
3014                let doc = self.document_sync.build_document();
3015                return Some(self.encrypt_document(&doc));
3016            }
3017        }
3018
3019        None
3020    }
3021
3022    /// Periodic tick returning per-peer delta documents
3023    ///
3024    /// Unlike `tick()` which broadcasts a single document to all peers,
3025    /// this returns targeted deltas that only include changes each peer
3026    /// hasn't seen. Use this for platforms that support per-peer transmission.
3027    ///
3028    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3029    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3030    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3031        use std::sync::atomic::Ordering;
3032        let now_ms_32 = now_ms as u32;
3033
3034        // Cleanup stale peers (same as tick())
3035        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3036        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3037        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3038            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3039            let removed = self.peer_manager.cleanup_stale(now_ms);
3040            for node_id in &removed {
3041                self.notify(PeatEvent::PeerLost { node_id: *node_id });
3042            }
3043            if !removed.is_empty() {
3044                self.notify_mesh_state_changed();
3045            }
3046
3047            // Run connection graph maintenance
3048            {
3049                let mut graph = self.connection_graph.lock().unwrap();
3050                let newly_lost = graph.tick(now_ms);
3051                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3052                drop(graph);
3053
3054                for node_id in newly_lost {
3055                    if !removed.contains(&node_id) {
3056                        self.notify(PeatEvent::PeerLost { node_id });
3057                    }
3058                }
3059            }
3060        }
3061
3062        // Check if sync is needed
3063        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3064        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3065        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3066            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3067
3068            // Build document for each connected peer
3069            let doc = self.document_sync.build_document();
3070            let encrypted = self.encrypt_document(&doc);
3071            let mut results = Vec::new();
3072            for peer in self.get_connected_peers() {
3073                results.push((peer.node_id, encrypted.clone()));
3074            }
3075            return results;
3076        }
3077
3078        Vec::new()
3079    }
3080
3081    // ==================== State Queries ====================
3082
3083    /// Get all known peers
3084    pub fn get_peers(&self) -> Vec<PeatPeer> {
3085        self.peer_manager.get_peers()
3086    }
3087
3088    /// Get connected peers only
3089    pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3090        self.peer_manager.get_connected_peers()
3091    }
3092
3093    /// Get a specific peer by NodeId
3094    pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3095        self.peer_manager.get_peer(node_id)
3096    }
3097
3098    /// Get peer count
3099    pub fn peer_count(&self) -> usize {
3100        self.peer_manager.peer_count()
3101    }
3102
3103    /// Get connected peer count
3104    pub fn connected_count(&self) -> usize {
3105        self.peer_manager.connected_count()
3106    }
3107
3108    /// Check if a device mesh ID matches our mesh
3109    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3110        self.peer_manager.matches_mesh(device_mesh_id)
3111    }
3112
3113    // ==================== Connection State Graph ====================
3114
3115    /// Get the connection state graph with all peer states
3116    ///
3117    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3118    /// Apps can use this to display appropriate UI indicators:
3119    /// - Green for Connected peers
3120    /// - Yellow for Degraded or RecentlyDisconnected peers
3121    /// - Gray for Lost peers
3122    ///
3123    /// # Example
3124    /// ```ignore
3125    /// let states = mesh.get_connection_graph();
3126    /// for peer in states {
3127    ///     match peer.state {
3128    ///         ConnectionState::Connected => show_green_indicator(&peer),
3129    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3130    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3131    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3132    ///         _ => {}
3133    ///     }
3134    /// }
3135    /// ```
3136    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3137        self.connection_graph.lock().unwrap().get_all_owned()
3138    }
3139
3140    /// Get a specific peer's connection state
3141    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3142        self.connection_graph
3143            .lock()
3144            .unwrap()
3145            .get_peer(node_id)
3146            .cloned()
3147    }
3148
3149    /// Get all currently connected peers from the connection graph
3150    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3151        self.connection_graph
3152            .lock()
3153            .unwrap()
3154            .get_connected()
3155            .into_iter()
3156            .cloned()
3157            .collect()
3158    }
3159
3160    /// Get peers in degraded state (connected but poor signal quality)
3161    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3162        self.connection_graph
3163            .lock()
3164            .unwrap()
3165            .get_degraded()
3166            .into_iter()
3167            .cloned()
3168            .collect()
3169    }
3170
3171    /// Get peers that disconnected within the specified time window
3172    ///
3173    /// Useful for showing "stale" peers that were recently connected.
3174    pub fn get_recently_disconnected(
3175        &self,
3176        within_ms: u64,
3177        now_ms: u64,
3178    ) -> Vec<PeerConnectionState> {
3179        self.connection_graph
3180            .lock()
3181            .unwrap()
3182            .get_recently_disconnected(within_ms, now_ms)
3183            .into_iter()
3184            .cloned()
3185            .collect()
3186    }
3187
3188    /// Get peers in Lost state (disconnected and no longer advertising)
3189    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3190        self.connection_graph
3191            .lock()
3192            .unwrap()
3193            .get_lost()
3194            .into_iter()
3195            .cloned()
3196            .collect()
3197    }
3198
3199    /// Get summary counts of peers in each connection state
3200    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3201        self.connection_graph.lock().unwrap().state_counts()
3202    }
3203
3204    // ==================== Indirect Peer Methods ====================
3205
3206    /// Get all indirect (multi-hop) peers
3207    ///
3208    /// Returns peers discovered via relay messages that are not directly
3209    /// connected via BLE. Each indirect peer includes the minimum hop count
3210    /// and the direct peers through which they can be reached.
3211    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3212        self.connection_graph
3213            .lock()
3214            .unwrap()
3215            .get_indirect_peers_owned()
3216    }
3217
3218    /// Get the degree (hop count) for a specific peer
3219    ///
3220    /// Returns:
3221    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3222    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3223    /// - `None` if peer is not known
3224    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3225        self.connection_graph.lock().unwrap().peer_degree(node_id)
3226    }
3227
3228    /// Get full state counts including indirect peers
3229    ///
3230    /// Returns counts of direct peers by connection state plus counts
3231    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3232    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3233        self.connection_graph.lock().unwrap().full_state_counts()
3234    }
3235
3236    /// Get all paths to reach an indirect peer
3237    ///
3238    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3239    /// known routes to the specified peer.
3240    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3241        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3242    }
3243
3244    /// Check if a node is known (either direct or indirect)
3245    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3246        self.connection_graph.lock().unwrap().is_known(node_id)
3247    }
3248
3249    /// Get number of indirect peers
3250    pub fn indirect_peer_count(&self) -> usize {
3251        self.connection_graph.lock().unwrap().indirect_peer_count()
3252    }
3253
3254    /// Cleanup stale indirect peers
3255    ///
3256    /// Removes indirect peers that haven't been seen within the timeout.
3257    /// Returns the list of removed peer IDs.
3258    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3259        self.connection_graph
3260            .lock()
3261            .unwrap()
3262            .cleanup_indirect(now_ms)
3263    }
3264
3265    /// Get total counter value
3266    pub fn total_count(&self) -> u64 {
3267        self.document_sync.total_count()
3268    }
3269
3270    /// Get document version
3271    pub fn document_version(&self) -> u32 {
3272        self.document_sync.version()
3273    }
3274
3275    /// Get document version (alias)
3276    pub fn version(&self) -> u32 {
3277        self.document_sync.version()
3278    }
3279
3280    /// Update health status (battery percentage)
3281    pub fn update_health(&self, battery_percent: u8) {
3282        self.document_sync.update_health(battery_percent);
3283    }
3284
3285    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
3286    pub fn update_activity(&self, activity: u8) {
3287        self.document_sync.update_activity(activity);
3288    }
3289
3290    /// Update full health status (battery and activity)
3291    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3292        self.document_sync
3293            .update_health_full(battery_percent, activity);
3294    }
3295
3296    /// Update heart rate
3297    pub fn update_heart_rate(&self, heart_rate: u8) {
3298        self.document_sync.update_heart_rate(heart_rate);
3299    }
3300
3301    /// Update location
3302    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3303        self.document_sync
3304            .update_location(latitude, longitude, altitude);
3305    }
3306
3307    /// Clear location
3308    pub fn clear_location(&self) {
3309        self.document_sync.clear_location();
3310    }
3311
3312    /// Update callsign
3313    pub fn update_callsign(&self, callsign: &str) {
3314        self.document_sync.update_callsign(callsign);
3315    }
3316
3317    /// Set peripheral event type
3318    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3319        self.document_sync
3320            .set_peripheral_event(event_type, timestamp);
3321    }
3322
3323    /// Clear peripheral event
3324    pub fn clear_peripheral_event(&self) {
3325        self.document_sync.clear_peripheral_event();
3326    }
3327
3328    /// Update full peripheral state in one call
3329    ///
3330    /// This is the most efficient way to update all peripheral data before
3331    /// calling `build_document()` for encrypted transmission.
3332    #[allow(clippy::too_many_arguments)]
3333    pub fn update_peripheral_state(
3334        &self,
3335        callsign: &str,
3336        battery_percent: u8,
3337        heart_rate: Option<u8>,
3338        latitude: Option<f32>,
3339        longitude: Option<f32>,
3340        altitude: Option<f32>,
3341        event_type: Option<EventType>,
3342        timestamp: u64,
3343    ) {
3344        self.document_sync.update_peripheral_state(
3345            callsign,
3346            battery_percent,
3347            heart_rate,
3348            latitude,
3349            longitude,
3350            altitude,
3351            event_type,
3352            timestamp,
3353        );
3354    }
3355
3356    /// Build current document for transmission
3357    ///
3358    /// If encryption is enabled, the document is encrypted.
3359    pub fn build_document(&self) -> Vec<u8> {
3360        let doc = self.document_sync.build_document();
3361        self.encrypt_document(&doc)
3362    }
3363
3364    /// Get peers that should be synced with
3365    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3366        self.peer_manager.peers_needing_sync(now_ms)
3367    }
3368
3369    // ==================== Internal Helpers ====================
3370
3371    fn notify(&self, event: PeatEvent) {
3372        self.observers.notify(event);
3373    }
3374
3375    fn notify_mesh_state_changed(&self) {
3376        self.notify(PeatEvent::MeshStateChanged {
3377            peer_count: self.peer_manager.peer_count(),
3378            connected_count: self.peer_manager.connected_count(),
3379        });
3380    }
3381
3382    // ==================== CannedMessage Integration ====================
3383    //
3384    // These methods provide deduplication support for peat-lite CannedMessages.
3385    // They use document identity (source_node + timestamp) instead of content hash,
3386    // because CRDT merge can change byte ordering.
3387
3388    /// Check if a CannedMessage should be processed.
3389    ///
3390    /// Uses document identity (source_node + timestamp) for deduplication.
3391    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
3392    ///
3393    /// # Arguments
3394    /// * `source_node` - The source node ID from the CannedMessage
3395    /// * `timestamp` - The timestamp from the CannedMessage
3396    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
3397    ///
3398    /// # Returns
3399    /// `true` if this message is new and should be processed,
3400    /// `false` if it was seen recently and should be skipped.
3401    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3402        // Create a unique key from source_node and timestamp
3403        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3404        let mut id_bytes = [0u8; 16];
3405        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3406        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3407        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3408
3409        // Check the seen cache
3410        let seen = self.seen_cache.lock().unwrap();
3411        !seen.has_seen(&message_id)
3412    }
3413
3414    /// Mark a CannedMessage as seen (for deduplication).
3415    ///
3416    /// Call this after processing a CannedMessage to prevent reprocessing
3417    /// the same message from other relay paths.
3418    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3419        let now = std::time::SystemTime::now()
3420            .duration_since(std::time::UNIX_EPOCH)
3421            .map(|d| d.as_millis() as u64)
3422            .unwrap_or(0);
3423
3424        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3425        let mut id_bytes = [0u8; 16];
3426        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3427        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3428        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3429        let origin = NodeId::new(source_node);
3430
3431        let mut seen = self.seen_cache.lock().unwrap();
3432        seen.mark_seen(message_id, origin, now);
3433    }
3434
3435    /// Get list of connected peer identifiers for relay.
3436    ///
3437    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
3438    /// to other peers after deduplication check.
3439    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3440        self.peer_manager.get_connected_identifiers()
3441    }
3442}
3443
3444/// Result from receiving BLE data
3445#[derive(Debug, Clone)]
3446pub struct DataReceivedResult {
3447    /// Node that sent this data
3448    pub source_node: NodeId,
3449
3450    /// Whether this contained an emergency event
3451    pub is_emergency: bool,
3452
3453    /// Whether this contained an ACK event
3454    pub is_ack: bool,
3455
3456    /// Whether the counter changed (new data)
3457    pub counter_changed: bool,
3458
3459    /// Whether emergency state changed (new emergency or ACK updates)
3460    pub emergency_changed: bool,
3461
3462    /// Updated total count
3463    pub total_count: u64,
3464
3465    /// Event timestamp (if event present) - use to detect duplicate events
3466    pub event_timestamp: u64,
3467
3468    /// Data to relay to other peers (if multi-hop relay is enabled)
3469    ///
3470    /// When present, the platform adapter should send this data to peers
3471    /// returned by `get_relay_targets(Some(source_node))`.
3472    pub relay_data: Option<Vec<u8>>,
3473
3474    /// Origin node for relay (may differ from source_node for relayed messages)
3475    pub origin_node: Option<NodeId>,
3476
3477    /// Current hop count (for relayed messages)
3478    pub hop_count: u8,
3479
3480    // ========== Peripheral data from sender ==========
3481    /// Sender's callsign (up to 12 chars)
3482    pub callsign: Option<String>,
3483
3484    /// Sender's battery percentage (0-100)
3485    pub battery_percent: Option<u8>,
3486
3487    /// Sender's heart rate (BPM)
3488    pub heart_rate: Option<u8>,
3489
3490    /// Sender's event type (from PeripheralEvent)
3491    pub event_type: Option<u8>,
3492
3493    /// Sender's latitude
3494    pub latitude: Option<f32>,
3495
3496    /// Sender's longitude
3497    pub longitude: Option<f32>,
3498
3499    /// Sender's altitude (meters)
3500    pub altitude: Option<f32>,
3501}
3502
3503impl DataReceivedResult {
3504    /// Extract peripheral fields from an Option<Peripheral>
3505    #[allow(clippy::type_complexity)]
3506    fn peripheral_fields(
3507        peripheral: &Option<crate::sync::crdt::Peripheral>,
3508    ) -> (
3509        Option<String>,
3510        Option<u8>,
3511        Option<u8>,
3512        Option<u8>,
3513        Option<f32>,
3514        Option<f32>,
3515        Option<f32>,
3516    ) {
3517        match peripheral {
3518            Some(p) => {
3519                let callsign = {
3520                    let s = p.callsign_str();
3521                    if s.is_empty() {
3522                        None
3523                    } else {
3524                        Some(s.to_string())
3525                    }
3526                };
3527                let battery = if p.health.battery_percent > 0 {
3528                    Some(p.health.battery_percent)
3529                } else {
3530                    None
3531                };
3532                let heart_rate = p.health.heart_rate;
3533                let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3534                let (lat, lon, alt) = match &p.location {
3535                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3536                    None => (None, None, None),
3537                };
3538                (callsign, battery, heart_rate, event_type, lat, lon, alt)
3539            }
3540            None => (None, None, None, None, None, None, None),
3541        }
3542    }
3543}
3544
3545/// Decision from processing a relay envelope
3546#[derive(Debug, Clone)]
3547pub struct RelayDecision {
3548    /// The payload (document) to process locally
3549    pub payload: Vec<u8>,
3550
3551    /// Original sender of the message
3552    pub origin_node: NodeId,
3553
3554    /// Current hop count
3555    pub hop_count: u8,
3556
3557    /// Whether this message should be relayed to other peers
3558    pub should_relay: bool,
3559
3560    /// The relay envelope to forward (with incremented hop count)
3561    ///
3562    /// Only present if `should_relay` is true and TTL not expired.
3563    pub relay_envelope: Option<RelayEnvelope>,
3564}
3565
3566impl RelayDecision {
3567    /// Get the relay data to send to peers
3568    ///
3569    /// Returns None if relay is not needed.
3570    pub fn relay_data(&self) -> Option<Vec<u8>> {
3571        self.relay_envelope.as_ref().map(|e| e.encode())
3572    }
3573}
3574
3575#[cfg(all(test, feature = "std"))]
3576mod tests {
3577    use super::*;
3578    use crate::observer::CollectingObserver;
3579
3580    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
3581    const TEST_TIMESTAMP: u64 = 1705276800000;
3582
3583    fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
3584        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3585        PeatMesh::new(config)
3586    }
3587
3588    #[test]
3589    fn test_mesh_creation() {
3590        let mesh = create_mesh(0x12345678, "ALPHA-1");
3591
3592        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3593        assert_eq!(mesh.callsign(), "ALPHA-1");
3594        assert_eq!(mesh.mesh_id(), "TEST");
3595        assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
3596    }
3597
3598    #[test]
3599    fn test_peer_discovery() {
3600        let mesh = create_mesh(0x11111111, "ALPHA-1");
3601        let observer = Arc::new(CollectingObserver::new());
3602        mesh.add_observer(observer.clone());
3603
3604        // Discover a peer
3605        let peer = mesh.on_ble_discovered(
3606            "device-uuid",
3607            Some("PEAT_TEST-22222222"),
3608            -65,
3609            Some("TEST"),
3610            1000,
3611        );
3612
3613        assert!(peer.is_some());
3614        let peer = peer.unwrap();
3615        assert_eq!(peer.node_id.as_u32(), 0x22222222);
3616
3617        // Check events were generated
3618        let events = observer.events();
3619        assert!(events
3620            .iter()
3621            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3622        assert!(events
3623            .iter()
3624            .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
3625    }
3626
3627    #[test]
3628    fn test_connection_lifecycle() {
3629        let mesh = create_mesh(0x11111111, "ALPHA-1");
3630        let observer = Arc::new(CollectingObserver::new());
3631        mesh.add_observer(observer.clone());
3632
3633        // Discover and connect
3634        mesh.on_ble_discovered(
3635            "device-uuid",
3636            Some("PEAT_TEST-22222222"),
3637            -65,
3638            Some("TEST"),
3639            1000,
3640        );
3641
3642        let node_id = mesh.on_ble_connected("device-uuid", 2000);
3643        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3644        assert_eq!(mesh.connected_count(), 1);
3645
3646        // Disconnect
3647        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3648        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3649        assert_eq!(mesh.connected_count(), 0);
3650
3651        // Check events
3652        let events = observer.events();
3653        assert!(events
3654            .iter()
3655            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3656        assert!(events
3657            .iter()
3658            .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
3659    }
3660
3661    #[test]
3662    fn test_emergency_flow() {
3663        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3664        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3665
3666        let observer2 = Arc::new(CollectingObserver::new());
3667        mesh2.add_observer(observer2.clone());
3668
3669        // mesh1 sends emergency
3670        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3671        assert!(mesh1.is_emergency_active());
3672
3673        // mesh2 receives it
3674        let result =
3675            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3676
3677        assert!(result.is_some());
3678        let result = result.unwrap();
3679        assert!(result.is_emergency);
3680        assert_eq!(result.source_node.as_u32(), 0x11111111);
3681
3682        // Check events on mesh2
3683        let events = observer2.events();
3684        assert!(events
3685            .iter()
3686            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
3687    }
3688
3689    #[test]
3690    fn test_ack_flow() {
3691        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3692        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3693
3694        let observer2 = Arc::new(CollectingObserver::new());
3695        mesh2.add_observer(observer2.clone());
3696
3697        // mesh1 sends ACK
3698        let doc = mesh1.send_ack(TEST_TIMESTAMP);
3699        assert!(mesh1.is_ack_active());
3700
3701        // mesh2 receives it
3702        let result =
3703            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3704
3705        assert!(result.is_some());
3706        let result = result.unwrap();
3707        assert!(result.is_ack);
3708
3709        // Check events on mesh2
3710        let events = observer2.events();
3711        assert!(events
3712            .iter()
3713            .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
3714    }
3715
3716    #[test]
3717    fn test_tick_cleanup() {
3718        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3719            .with_peer_timeout(10_000);
3720        let mesh = PeatMesh::new(config);
3721
3722        let observer = Arc::new(CollectingObserver::new());
3723        mesh.add_observer(observer.clone());
3724
3725        // Discover a peer
3726        mesh.on_ble_discovered(
3727            "device-uuid",
3728            Some("PEAT_TEST-22222222"),
3729            -65,
3730            Some("TEST"),
3731            1000,
3732        );
3733        assert_eq!(mesh.peer_count(), 1);
3734
3735        // Tick at t=5000 - not stale yet
3736        mesh.tick(5000);
3737        assert_eq!(mesh.peer_count(), 1);
3738
3739        // Tick at t=20000 - peer is stale (10s timeout exceeded)
3740        mesh.tick(20000);
3741        assert_eq!(mesh.peer_count(), 0);
3742
3743        // Check PeerLost event
3744        let events = observer.events();
3745        assert!(events
3746            .iter()
3747            .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
3748    }
3749
3750    #[test]
3751    fn test_tick_sync_broadcast() {
3752        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3753            .with_sync_interval(5000);
3754        let mesh = PeatMesh::new(config);
3755
3756        // Discover and connect a peer first
3757        mesh.on_ble_discovered(
3758            "device-uuid",
3759            Some("PEAT_TEST-22222222"),
3760            -65,
3761            Some("TEST"),
3762            1000,
3763        );
3764        mesh.on_ble_connected("device-uuid", 1000);
3765
3766        // First tick at t=0 sets last_sync
3767        let _result = mesh.tick(0);
3768        // May or may not broadcast depending on initial state
3769
3770        // Tick before interval - no broadcast
3771        let result = mesh.tick(3000);
3772        assert!(result.is_none());
3773
3774        // After interval - should broadcast
3775        let result = mesh.tick(6000);
3776        assert!(result.is_some());
3777
3778        // Immediate second tick - no broadcast (interval not elapsed)
3779        let result = mesh.tick(6100);
3780        assert!(result.is_none());
3781
3782        // After another interval - should broadcast again
3783        let result = mesh.tick(12000);
3784        assert!(result.is_some());
3785    }
3786
3787    #[test]
3788    fn test_incoming_connection() {
3789        let mesh = create_mesh(0x11111111, "ALPHA-1");
3790        let observer = Arc::new(CollectingObserver::new());
3791        mesh.add_observer(observer.clone());
3792
3793        // Incoming connection from unknown peer
3794        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
3795
3796        assert!(is_new);
3797        assert_eq!(mesh.peer_count(), 1);
3798        assert_eq!(mesh.connected_count(), 1);
3799
3800        // Check events
3801        let events = observer.events();
3802        assert!(events
3803            .iter()
3804            .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3805        assert!(events
3806            .iter()
3807            .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3808    }
3809
3810    #[test]
3811    fn test_mesh_filtering() {
3812        let mesh = create_mesh(0x11111111, "ALPHA-1");
3813
3814        // Wrong mesh - ignored
3815        let peer = mesh.on_ble_discovered(
3816            "device-uuid-1",
3817            Some("PEAT_OTHER-22222222"),
3818            -65,
3819            Some("OTHER"),
3820            1000,
3821        );
3822        assert!(peer.is_none());
3823        assert_eq!(mesh.peer_count(), 0);
3824
3825        // Correct mesh - accepted
3826        let peer = mesh.on_ble_discovered(
3827            "device-uuid-2",
3828            Some("PEAT_TEST-33333333"),
3829            -65,
3830            Some("TEST"),
3831            1000,
3832        );
3833        assert!(peer.is_some());
3834        assert_eq!(mesh.peer_count(), 1);
3835    }
3836
3837    // ==================== Encryption Tests ====================
3838
3839    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
3840        let config =
3841            PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
3842        PeatMesh::new(config)
3843    }
3844
3845    #[test]
3846    fn test_encryption_enabled() {
3847        let secret = [0x42u8; 32];
3848        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3849
3850        assert!(mesh.is_encryption_enabled());
3851    }
3852
3853    #[test]
3854    fn test_encryption_disabled_by_default() {
3855        let mesh = create_mesh(0x11111111, "ALPHA-1");
3856
3857        assert!(!mesh.is_encryption_enabled());
3858    }
3859
3860    #[test]
3861    fn test_encrypted_document_exchange() {
3862        let secret = [0x42u8; 32];
3863        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3864        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3865
3866        // mesh1 sends document
3867        let doc = mesh1.build_document();
3868
3869        // Document should be encrypted (starts with ENCRYPTED_MARKER)
3870        assert!(doc.len() >= 2);
3871        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3872
3873        // mesh2 receives and decrypts
3874        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3875
3876        assert!(result.is_some());
3877        let result = result.unwrap();
3878        assert_eq!(result.source_node.as_u32(), 0x11111111);
3879    }
3880
3881    #[test]
3882    fn test_encrypted_emergency_exchange() {
3883        let secret = [0x42u8; 32];
3884        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3885        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3886
3887        let observer = Arc::new(CollectingObserver::new());
3888        mesh2.add_observer(observer.clone());
3889
3890        // mesh1 sends emergency
3891        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3892
3893        // mesh2 receives and decrypts
3894        let result =
3895            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3896
3897        assert!(result.is_some());
3898        let result = result.unwrap();
3899        assert!(result.is_emergency);
3900
3901        // Check EmergencyReceived event was fired
3902        let events = observer.events();
3903        assert!(events
3904            .iter()
3905            .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
3906    }
3907
3908    #[test]
3909    fn test_wrong_key_fails_decrypt() {
3910        let secret1 = [0x42u8; 32];
3911        let secret2 = [0x43u8; 32]; // Different key
3912        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3913        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3914
3915        // mesh1 sends document
3916        let doc = mesh1.build_document();
3917
3918        // mesh2 cannot decrypt (wrong key)
3919        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3920
3921        assert!(result.is_none());
3922    }
3923
3924    #[test]
3925    fn test_unencrypted_mesh_can_read_unencrypted() {
3926        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3927        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3928
3929        // mesh1 sends document (unencrypted)
3930        let doc = mesh1.build_document();
3931
3932        // mesh2 receives (also unencrypted)
3933        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3934
3935        assert!(result.is_some());
3936    }
3937
3938    #[test]
3939    fn test_encrypted_mesh_can_receive_unencrypted() {
3940        // Backward compatibility: encrypted mesh can receive unencrypted docs
3941        let secret = [0x42u8; 32];
3942        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
3943        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
3944
3945        // mesh1 sends unencrypted document
3946        let doc = mesh1.build_document();
3947
3948        // mesh2 can receive unencrypted (backward compat)
3949        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3950
3951        assert!(result.is_some());
3952    }
3953
3954    #[test]
3955    fn test_unencrypted_mesh_cannot_receive_encrypted() {
3956        let secret = [0x42u8; 32];
3957        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
3958        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
3959
3960        // mesh1 sends encrypted document
3961        let doc = mesh1.build_document();
3962
3963        // mesh2 cannot decrypt (no key)
3964        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3965
3966        assert!(result.is_none());
3967    }
3968
3969    #[test]
3970    fn test_enable_disable_encryption() {
3971        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
3972
3973        assert!(!mesh.is_encryption_enabled());
3974
3975        // Enable encryption
3976        let secret = [0x42u8; 32];
3977        mesh.enable_encryption(&secret);
3978        assert!(mesh.is_encryption_enabled());
3979
3980        // Build document should now be encrypted
3981        let doc = mesh.build_document();
3982        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3983
3984        // Disable encryption
3985        mesh.disable_encryption();
3986        assert!(!mesh.is_encryption_enabled());
3987
3988        // Build document should now be unencrypted
3989        let doc = mesh.build_document();
3990        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
3991    }
3992
3993    #[test]
3994    fn test_encryption_overhead() {
3995        let secret = [0x42u8; 32];
3996        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3997        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
3998
3999        let doc_encrypted = mesh_encrypted.build_document();
4000        let doc_unencrypted = mesh_unencrypted.build_document();
4001
4002        // Encrypted doc should be larger by:
4003        // - 2 bytes marker header (0xAE + reserved)
4004        // - 12 bytes nonce
4005        // - 16 bytes auth tag
4006        // Total: 30 bytes overhead
4007        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4008        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4009    }
4010
4011    // ==================== Per-Peer E2EE Tests ====================
4012
4013    #[test]
4014    fn test_peer_e2ee_enable_disable() {
4015        let mesh = create_mesh(0x11111111, "ALPHA-1");
4016
4017        assert!(!mesh.is_peer_e2ee_enabled());
4018        assert!(mesh.peer_e2ee_public_key().is_none());
4019
4020        mesh.enable_peer_e2ee();
4021        assert!(mesh.is_peer_e2ee_enabled());
4022        assert!(mesh.peer_e2ee_public_key().is_some());
4023
4024        mesh.disable_peer_e2ee();
4025        assert!(!mesh.is_peer_e2ee_enabled());
4026    }
4027
4028    #[test]
4029    fn test_peer_e2ee_initiate_session() {
4030        let mesh = create_mesh(0x11111111, "ALPHA-1");
4031        mesh.enable_peer_e2ee();
4032
4033        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4034        assert!(key_exchange.is_some());
4035
4036        let key_exchange = key_exchange.unwrap();
4037        // Should start with KEY_EXCHANGE_MARKER
4038        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4039
4040        // Should have a pending session
4041        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4042        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4043    }
4044
4045    #[test]
4046    fn test_peer_e2ee_full_handshake() {
4047        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4048        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4049
4050        mesh1.enable_peer_e2ee();
4051        mesh2.enable_peer_e2ee();
4052
4053        let observer1 = Arc::new(CollectingObserver::new());
4054        let observer2 = Arc::new(CollectingObserver::new());
4055        mesh1.add_observer(observer1.clone());
4056        mesh2.add_observer(observer2.clone());
4057
4058        // mesh1 initiates to mesh2
4059        let key_exchange1 = mesh1
4060            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4061            .unwrap();
4062
4063        // mesh2 receives and responds
4064        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4065        assert!(response.is_some());
4066
4067        // Check mesh2 has established session
4068        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4069
4070        // mesh1 receives mesh2's response
4071        let key_exchange2 = response.unwrap();
4072        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4073
4074        // Check mesh1 has established session
4075        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4076
4077        // Both should have E2EE established events
4078        let events1 = observer1.events();
4079        assert!(events1
4080            .iter()
4081            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4082
4083        let events2 = observer2.events();
4084        assert!(events2
4085            .iter()
4086            .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4087    }
4088
4089    #[test]
4090    fn test_peer_e2ee_encrypt_decrypt() {
4091        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4092        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4093
4094        mesh1.enable_peer_e2ee();
4095        mesh2.enable_peer_e2ee();
4096
4097        // Establish session via key exchange
4098        let key_exchange1 = mesh1
4099            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4100            .unwrap();
4101        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4102        mesh1.handle_key_exchange(&key_exchange2, 1000);
4103
4104        // mesh1 sends encrypted message to mesh2
4105        let plaintext = b"Secret message from mesh1";
4106        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4107        assert!(encrypted.is_some());
4108
4109        let encrypted = encrypted.unwrap();
4110        // Should start with PEER_E2EE_MARKER
4111        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4112
4113        // mesh2 receives and decrypts
4114        let observer2 = Arc::new(CollectingObserver::new());
4115        mesh2.add_observer(observer2.clone());
4116
4117        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4118        assert!(decrypted.is_some());
4119        assert_eq!(decrypted.unwrap(), plaintext);
4120
4121        // Should have received message event
4122        let events = observer2.events();
4123        assert!(events.iter().any(|e| matches!(
4124            e,
4125            PeatEvent::PeerE2eeMessageReceived { from_node, data }
4126            if from_node.as_u32() == 0x11111111 && data == plaintext
4127        )));
4128    }
4129
4130    #[test]
4131    fn test_peer_e2ee_bidirectional() {
4132        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4133        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4134
4135        mesh1.enable_peer_e2ee();
4136        mesh2.enable_peer_e2ee();
4137
4138        // Establish session
4139        let key_exchange1 = mesh1
4140            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4141            .unwrap();
4142        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4143        mesh1.handle_key_exchange(&key_exchange2, 1000);
4144
4145        // mesh1 -> mesh2
4146        let msg1 = mesh1
4147            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4148            .unwrap();
4149        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4150        assert_eq!(dec1, b"Hello from mesh1");
4151
4152        // mesh2 -> mesh1
4153        let msg2 = mesh2
4154            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4155            .unwrap();
4156        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4157        assert_eq!(dec2, b"Hello from mesh2");
4158    }
4159
4160    #[test]
4161    fn test_peer_e2ee_close_session() {
4162        let mesh = create_mesh(0x11111111, "ALPHA-1");
4163        mesh.enable_peer_e2ee();
4164
4165        let observer = Arc::new(CollectingObserver::new());
4166        mesh.add_observer(observer.clone());
4167
4168        // Initiate a session
4169        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4170        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4171
4172        // Close session
4173        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4174
4175        // Check close event
4176        let events = observer.events();
4177        assert!(events
4178            .iter()
4179            .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4180    }
4181
4182    #[test]
4183    fn test_peer_e2ee_without_enabling() {
4184        let mesh = create_mesh(0x11111111, "ALPHA-1");
4185
4186        // E2EE not enabled - should return None
4187        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4188        assert!(result.is_none());
4189
4190        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4191        assert!(result.is_none());
4192
4193        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4194    }
4195
4196    #[test]
4197    fn test_peer_e2ee_overhead() {
4198        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4199        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4200
4201        mesh1.enable_peer_e2ee();
4202        mesh2.enable_peer_e2ee();
4203
4204        // Establish session
4205        let key_exchange1 = mesh1
4206            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4207            .unwrap();
4208        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4209        mesh1.handle_key_exchange(&key_exchange2, 1000);
4210
4211        // Encrypt a message
4212        let plaintext = b"Test message";
4213        let encrypted = mesh1
4214            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4215            .unwrap();
4216
4217        // Overhead should be:
4218        // - 2 bytes marker header
4219        // - 4 bytes recipient node ID
4220        // - 4 bytes sender node ID
4221        // - 8 bytes counter
4222        // - 12 bytes nonce
4223        // - 16 bytes auth tag
4224        // Total: 46 bytes overhead
4225        let overhead = encrypted.len() - plaintext.len();
4226        assert_eq!(overhead, 46);
4227    }
4228
4229    // ==================== Strict Encryption Mode Tests ====================
4230
4231    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4232        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4233            .with_encryption(secret)
4234            .with_strict_encryption();
4235        PeatMesh::new(config)
4236    }
4237
4238    #[test]
4239    fn test_strict_encryption_enabled() {
4240        let secret = [0x42u8; 32];
4241        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4242
4243        assert!(mesh.is_encryption_enabled());
4244        assert!(mesh.is_strict_encryption_enabled());
4245    }
4246
4247    #[test]
4248    fn test_strict_encryption_disabled_by_default() {
4249        let secret = [0x42u8; 32];
4250        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4251
4252        assert!(mesh.is_encryption_enabled());
4253        assert!(!mesh.is_strict_encryption_enabled());
4254    }
4255
4256    #[test]
4257    fn test_strict_encryption_requires_encryption_enabled() {
4258        // strict_encryption without encryption should have no effect
4259        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4260            .with_strict_encryption(); // No encryption!
4261        let mesh = PeatMesh::new(config);
4262
4263        assert!(!mesh.is_encryption_enabled());
4264        assert!(!mesh.is_strict_encryption_enabled());
4265    }
4266
4267    #[test]
4268    fn test_strict_mode_accepts_encrypted_documents() {
4269        let secret = [0x42u8; 32];
4270        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4271        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4272
4273        // mesh1 sends encrypted document
4274        let doc = mesh1.build_document();
4275        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4276
4277        // mesh2 (strict mode) should accept encrypted documents
4278        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4279        assert!(result.is_some());
4280    }
4281
4282    #[test]
4283    fn test_strict_mode_rejects_unencrypted_documents() {
4284        let secret = [0x42u8; 32];
4285        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4286        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
4287
4288        let observer = Arc::new(CollectingObserver::new());
4289        mesh2.add_observer(observer.clone());
4290
4291        // mesh1 sends unencrypted document
4292        let doc = mesh1.build_document();
4293        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4294
4295        // mesh2 (strict mode) should reject unencrypted documents
4296        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4297        assert!(result.is_none());
4298
4299        // Should have SecurityViolation event
4300        let events = observer.events();
4301        assert!(events.iter().any(|e| matches!(
4302            e,
4303            PeatEvent::SecurityViolation {
4304                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4305                ..
4306            }
4307        )));
4308    }
4309
4310    #[test]
4311    fn test_non_strict_mode_accepts_unencrypted_documents() {
4312        let secret = [0x42u8; 32];
4313        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4314        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
4315
4316        // mesh1 sends unencrypted document
4317        let doc = mesh1.build_document();
4318
4319        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
4320        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4321        assert!(result.is_some());
4322    }
4323
4324    #[test]
4325    fn test_strict_mode_security_violation_event_includes_source() {
4326        let secret = [0x42u8; 32];
4327        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4328        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4329
4330        let observer = Arc::new(CollectingObserver::new());
4331        mesh2.add_observer(observer.clone());
4332
4333        let doc = mesh1.build_document();
4334
4335        // Use on_ble_data_received with identifier to test source is captured
4336        mesh2.on_ble_discovered(
4337            "test-device-uuid",
4338            Some("PEAT_TEST-11111111"),
4339            -65,
4340            Some("TEST"),
4341            500,
4342        );
4343        mesh2.on_ble_connected("test-device-uuid", 600);
4344
4345        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4346
4347        // Check SecurityViolation event has source
4348        let events = observer.events();
4349        let violation = events.iter().find(|e| {
4350            matches!(
4351                e,
4352                PeatEvent::SecurityViolation {
4353                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4354                    ..
4355                }
4356            )
4357        });
4358        assert!(violation.is_some());
4359
4360        if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4361            assert!(source.is_some());
4362            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4363        }
4364    }
4365
4366    #[test]
4367    fn test_decryption_failure_emits_security_violation() {
4368        let secret1 = [0x42u8; 32];
4369        let secret2 = [0x43u8; 32]; // Different key
4370        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4371        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4372
4373        let observer = Arc::new(CollectingObserver::new());
4374        mesh2.add_observer(observer.clone());
4375
4376        // mesh1 sends encrypted document
4377        let doc = mesh1.build_document();
4378
4379        // mesh2 cannot decrypt (wrong key)
4380        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4381        assert!(result.is_none());
4382
4383        // Should have SecurityViolation event for decryption failure
4384        let events = observer.events();
4385        assert!(events.iter().any(|e| matches!(
4386            e,
4387            PeatEvent::SecurityViolation {
4388                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4389                ..
4390            }
4391        )));
4392    }
4393
4394    #[test]
4395    fn test_strict_mode_builder_chain() {
4396        let secret = [0x42u8; 32];
4397        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4398            .with_encryption(secret)
4399            .with_strict_encryption()
4400            .with_sync_interval(10_000)
4401            .with_peer_timeout(60_000);
4402
4403        let mesh = PeatMesh::new(config);
4404
4405        assert!(mesh.is_encryption_enabled());
4406        assert!(mesh.is_strict_encryption_enabled());
4407    }
4408
4409    // ==================== Multi-Hop Relay Tests ====================
4410
4411    fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4412        let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4413        PeatMesh::new(config)
4414    }
4415
4416    #[test]
4417    fn test_relay_disabled_by_default() {
4418        let mesh = create_mesh(0x11111111, "ALPHA-1");
4419        assert!(!mesh.is_relay_enabled());
4420    }
4421
4422    #[test]
4423    fn test_relay_enabled() {
4424        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4425        assert!(mesh.is_relay_enabled());
4426    }
4427
4428    #[test]
4429    fn test_relay_config_builder() {
4430        let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4431            .with_relay()
4432            .with_max_relay_hops(5)
4433            .with_relay_fanout(3)
4434            .with_seen_cache_ttl(60_000);
4435
4436        assert!(config.enable_relay);
4437        assert_eq!(config.max_relay_hops, 5);
4438        assert_eq!(config.relay_fanout, 3);
4439        assert_eq!(config.seen_cache_ttl_ms, 60_000);
4440    }
4441
4442    #[test]
4443    fn test_seen_message_deduplication() {
4444        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4445        let origin = NodeId::new(0x22222222);
4446        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4447
4448        // First time - should be new
4449        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4450
4451        // Second time - should be duplicate
4452        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4453
4454        assert_eq!(mesh.seen_cache_size(), 1);
4455    }
4456
4457    #[test]
4458    fn test_wrap_for_relay() {
4459        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4460
4461        let payload = vec![1, 2, 3, 4, 5];
4462        let wrapped = mesh.wrap_for_relay(payload.clone());
4463
4464        // Should start with relay envelope marker
4465        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4466
4467        // Decode and verify
4468        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4469        assert_eq!(envelope.payload, payload);
4470        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4471        assert_eq!(envelope.hop_count, 0);
4472    }
4473
4474    #[test]
4475    fn test_process_relay_envelope_new_message() {
4476        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4477        let observer = Arc::new(CollectingObserver::new());
4478        mesh.add_observer(observer.clone());
4479
4480        // Create an envelope from another node
4481        let payload = vec![1, 2, 3, 4, 5];
4482        let envelope =
4483            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4484                .with_max_hops(7);
4485        let data = envelope.encode();
4486
4487        // Process it
4488        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4489
4490        assert!(decision.is_some());
4491        let decision = decision.unwrap();
4492        assert_eq!(decision.payload, payload);
4493        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4494        assert_eq!(decision.hop_count, 0);
4495        assert!(decision.should_relay);
4496        assert!(decision.relay_envelope.is_some());
4497
4498        // Relay envelope should have incremented hop count
4499        let relay_env = decision.relay_envelope.unwrap();
4500        assert_eq!(relay_env.hop_count, 1);
4501    }
4502
4503    #[test]
4504    fn test_process_relay_envelope_duplicate() {
4505        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4506        let observer = Arc::new(CollectingObserver::new());
4507        mesh.add_observer(observer.clone());
4508
4509        let payload = vec![1, 2, 3, 4, 5];
4510        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4511        let data = envelope.encode();
4512
4513        // First time - should succeed
4514        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4515        assert!(decision.is_some());
4516
4517        // Second time - should be duplicate
4518        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4519        assert!(decision.is_none());
4520
4521        // Should have DuplicateMessageDropped event
4522        let events = observer.events();
4523        assert!(events
4524            .iter()
4525            .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4526    }
4527
4528    #[test]
4529    fn test_process_relay_envelope_ttl_expired() {
4530        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4531        let observer = Arc::new(CollectingObserver::new());
4532        mesh.add_observer(observer.clone());
4533
4534        // Create envelope at max hops (TTL expired)
4535        let payload = vec![1, 2, 3, 4, 5];
4536        let mut envelope =
4537            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4538                .with_max_hops(3);
4539
4540        // Simulate having been relayed 3 times already
4541        envelope = envelope.relay().unwrap(); // hop 1
4542        envelope = envelope.relay().unwrap(); // hop 2
4543        envelope = envelope.relay().unwrap(); // hop 3 - at max now
4544
4545        let data = envelope.encode();
4546
4547        // Process - should still process locally but not relay further
4548        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4549
4550        assert!(decision.is_some());
4551        let decision = decision.unwrap();
4552        assert_eq!(decision.payload, payload);
4553        assert!(!decision.should_relay); // Cannot relay further
4554        assert!(decision.relay_envelope.is_none());
4555
4556        // Should have MessageTtlExpired event
4557        let events = observer.events();
4558        assert!(events
4559            .iter()
4560            .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
4561    }
4562
4563    #[test]
4564    fn test_build_relay_document() {
4565        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4566
4567        let relay_doc = mesh.build_relay_document();
4568
4569        // Should be a valid relay envelope
4570        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4571
4572        // Decode and verify it contains a valid document
4573        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4574        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4575
4576        // The payload should be a valid PeatDocument
4577        let doc = crate::document::PeatDocument::decode(&envelope.payload);
4578        assert!(doc.is_some());
4579    }
4580
4581    #[test]
4582    fn test_relay_targets_excludes_source() {
4583        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4584
4585        // Add some peers
4586        mesh.on_ble_discovered(
4587            "peer-1",
4588            Some("PEAT_TEST-22222222"),
4589            -60,
4590            Some("TEST"),
4591            1000,
4592        );
4593        mesh.on_ble_connected("peer-1", 1000);
4594
4595        mesh.on_ble_discovered(
4596            "peer-2",
4597            Some("PEAT_TEST-33333333"),
4598            -65,
4599            Some("TEST"),
4600            1000,
4601        );
4602        mesh.on_ble_connected("peer-2", 1000);
4603
4604        mesh.on_ble_discovered(
4605            "peer-3",
4606            Some("PEAT_TEST-44444444"),
4607            -70,
4608            Some("TEST"),
4609            1000,
4610        );
4611        mesh.on_ble_connected("peer-3", 1000);
4612
4613        // Get relay targets excluding peer-2
4614        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4615
4616        // Should not include peer-2 in targets
4617        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4618    }
4619
4620    #[test]
4621    fn test_clear_seen_cache() {
4622        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4623        let origin = NodeId::new(0x22222222);
4624
4625        // Add some messages
4626        mesh.mark_message_seen(
4627            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4628            origin,
4629            1000,
4630        );
4631        mesh.mark_message_seen(
4632            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4633            origin,
4634            2000,
4635        );
4636
4637        assert_eq!(mesh.seen_cache_size(), 2);
4638
4639        // Clear
4640        mesh.clear_seen_cache();
4641        assert_eq!(mesh.seen_cache_size(), 0);
4642    }
4643}