Skip to main content

hive_btle/
hive_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HiveMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for HIVE BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use hive_btle::hive_mesh::{HiveMesh, HiveMeshConfig};
26//! use hive_btle::observer::{HiveEvent, HiveObserver};
27//! use hive_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = HiveMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = HiveMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl HiveObserver for MyObserver {
39//!     fn on_event(&self, event: HiveEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("HIVE_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64use crate::document_sync::DocumentSync;
65use crate::gossip::{GossipStrategy, RandomFanout};
66use crate::observer::{DisconnectReason, HiveEvent, HiveObserver, SecurityViolationKind};
67use crate::peer::{
68    ConnectionStateGraph, FullStateCountSummary, HivePeer, IndirectPeer, PeerConnectionState,
69    PeerDegree, PeerManagerConfig, StateCountSummary,
70};
71use crate::peer_manager::PeerManager;
72use crate::relay::{
73    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
74    RELAY_ENVELOPE_MARKER,
75};
76use crate::security::{
77    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
78    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
79};
80use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
81use crate::sync::delta::{DeltaEncoder, DeltaStats};
82use crate::sync::delta_document::{DeltaDocument, Operation};
83use crate::NodeId;
84
85#[cfg(feature = "std")]
86use crate::observer::ObserverManager;
87
88use crate::registry::DocumentRegistry;
89
90/// Configuration for HiveMesh
91#[derive(Debug, Clone)]
92pub struct HiveMeshConfig {
93    /// Our node ID
94    pub node_id: NodeId,
95
96    /// Our callsign (e.g., "ALPHA-1")
97    pub callsign: String,
98
99    /// Mesh ID to filter peers (e.g., "DEMO")
100    pub mesh_id: String,
101
102    /// Peripheral type for this device
103    pub peripheral_type: PeripheralType,
104
105    /// Peer management configuration
106    pub peer_config: PeerManagerConfig,
107
108    /// Sync interval in milliseconds (how often to broadcast state)
109    pub sync_interval_ms: u64,
110
111    /// Whether to auto-broadcast on emergency/ack
112    pub auto_broadcast_events: bool,
113
114    /// Optional shared secret for mesh-wide encryption (32 bytes)
115    ///
116    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
117    /// transmission and decrypted upon receipt. All nodes in the mesh must
118    /// share the same secret to communicate.
119    pub encryption_secret: Option<[u8; 32]>,
120
121    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
122    ///
123    /// When true and encryption is enabled, any unencrypted documents received
124    /// will be rejected and trigger a SecurityViolation event. This prevents
125    /// downgrade attacks where an adversary sends unencrypted malicious documents.
126    ///
127    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
128    pub strict_encryption: bool,
129
130    /// Enable multi-hop relay
131    ///
132    /// When enabled, received messages will be forwarded to other peers based
133    /// on the gossip strategy. Requires message deduplication to prevent loops.
134    ///
135    /// Default: false
136    pub enable_relay: bool,
137
138    /// Maximum hops for relay messages (TTL)
139    ///
140    /// Messages will not be relayed beyond this many hops from the origin.
141    /// Default: 7
142    pub max_relay_hops: u8,
143
144    /// Gossip fanout for relay
145    ///
146    /// Number of peers to forward each message to. Higher values increase
147    /// convergence speed but also bandwidth usage.
148    /// Default: 2
149    pub relay_fanout: usize,
150
151    /// TTL for seen message cache (milliseconds)
152    ///
153    /// How long to remember message IDs for deduplication.
154    /// Default: 300_000 (5 minutes)
155    pub seen_cache_ttl_ms: u64,
156}
157
158impl HiveMeshConfig {
159    /// Create a new configuration with required fields
160    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
161        Self {
162            node_id,
163            callsign: callsign.into(),
164            mesh_id: mesh_id.into(),
165            peripheral_type: PeripheralType::SoldierSensor,
166            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
167            sync_interval_ms: 5000,
168            auto_broadcast_events: true,
169            encryption_secret: None,
170            strict_encryption: false,
171            enable_relay: false,
172            max_relay_hops: DEFAULT_MAX_HOPS,
173            relay_fanout: 2,
174            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
175        }
176    }
177
178    /// Enable mesh-wide encryption with a shared secret
179    ///
180    /// All documents will be encrypted using ChaCha20-Poly1305 before
181    /// transmission. All mesh participants must use the same secret.
182    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
183        self.encryption_secret = Some(secret);
184        self
185    }
186
187    /// Set peripheral type
188    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
189        self.peripheral_type = ptype;
190        self
191    }
192
193    /// Set sync interval
194    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
195        self.sync_interval_ms = interval_ms;
196        self
197    }
198
199    /// Set peer timeout
200    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
201        self.peer_config.peer_timeout_ms = timeout_ms;
202        self
203    }
204
205    /// Set max peers (for embedded systems)
206    pub fn with_max_peers(mut self, max: usize) -> Self {
207        self.peer_config.max_peers = max;
208        self
209    }
210
211    /// Enable strict encryption mode
212    ///
213    /// When enabled (and encryption is also enabled), any unencrypted documents
214    /// received will be rejected and trigger a `SecurityViolation` event.
215    /// This prevents downgrade attacks.
216    ///
217    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
218    pub fn with_strict_encryption(mut self) -> Self {
219        self.strict_encryption = true;
220        self
221    }
222
223    /// Enable multi-hop relay
224    ///
225    /// When enabled, received messages will be forwarded to other connected peers
226    /// based on the gossip strategy. This enables mesh-wide message propagation.
227    pub fn with_relay(mut self) -> Self {
228        self.enable_relay = true;
229        self
230    }
231
232    /// Set maximum relay hops (TTL)
233    ///
234    /// Messages will not be relayed beyond this many hops from the origin.
235    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
236        self.max_relay_hops = max_hops;
237        self
238    }
239
240    /// Set gossip fanout for relay
241    ///
242    /// Number of peers to forward each message to.
243    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
244        self.relay_fanout = fanout.max(1);
245        self
246    }
247
248    /// Set TTL for seen message cache
249    ///
250    /// How long to remember message IDs for deduplication (milliseconds).
251    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
252        self.seen_cache_ttl_ms = ttl_ms;
253        self
254    }
255}
256
257/// Type alias for app document storage to reduce type complexity
258#[cfg(feature = "std")]
259type AppDocumentStore =
260    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
261
262/// Main facade for HIVE BLE mesh operations
263///
264/// Composes peer management, document sync, and observer notifications.
265/// Platform implementations call into this from their BLE callbacks.
266#[cfg(feature = "std")]
267pub struct HiveMesh {
268    /// Configuration
269    config: HiveMeshConfig,
270
271    /// Peer manager
272    peer_manager: PeerManager,
273
274    /// Document sync
275    document_sync: DocumentSync,
276
277    /// Observer manager
278    observers: ObserverManager,
279
280    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
281    last_sync_ms: std::sync::atomic::AtomicU32,
282
283    /// Last cleanup time
284    last_cleanup_ms: std::sync::atomic::AtomicU32,
285
286    /// Optional mesh-wide encryption key (derived from shared secret)
287    encryption_key: Option<MeshEncryptionKey>,
288
289    /// Optional per-peer E2EE session manager
290    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
291
292    /// Connection state graph for tracking peer connection lifecycle
293    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
294
295    /// Seen message cache for relay deduplication
296    seen_cache: std::sync::Mutex<SeenMessageCache>,
297
298    /// Gossip strategy for relay peer selection
299    gossip_strategy: Box<dyn GossipStrategy>,
300
301    /// Delta encoder for per-peer sync state tracking
302    ///
303    /// Tracks what data has been sent to each peer to enable delta sync
304    /// (sending only changes instead of full documents).
305    delta_encoder: std::sync::Mutex<DeltaEncoder>,
306
307    /// This node's cryptographic identity (Ed25519 keypair)
308    ///
309    /// When set, the node_id is derived from the public key and documents
310    /// can be signed for authenticity verification.
311    identity: Option<DeviceIdentity>,
312
313    /// TOFU identity registry for tracking peer identities
314    ///
315    /// Maps node_id to public key on first contact, rejects mismatches.
316    identity_registry: std::sync::Mutex<IdentityRegistry>,
317
318    /// Peripheral state received from peers
319    ///
320    /// Stores the most recent peripheral data (callsign, location, etc.)
321    /// received from each peer via document sync.
322    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
323
324    /// Document registry for app-layer CRDT types.
325    ///
326    /// Enables external crates to register custom document types that sync
327    /// through the mesh using the extensible registry pattern.
328    document_registry: DocumentRegistry,
329
330    /// Storage for app-layer documents.
331    ///
332    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
333    /// document instance. Values are type-erased boxes that can be downcast
334    /// using the document registry handlers.
335    app_documents: AppDocumentStore,
336}
337
338#[cfg(feature = "std")]
339impl HiveMesh {
340    /// Create a new HiveMesh instance
341    pub fn new(config: HiveMeshConfig) -> Self {
342        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
343        let document_sync = DocumentSync::with_peripheral_type(
344            config.node_id,
345            &config.callsign,
346            config.peripheral_type,
347        );
348
349        // Derive encryption key from shared secret if configured
350        let encryption_key = config
351            .encryption_secret
352            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
353
354        // Create connection state graph with config thresholds
355        let connection_graph = ConnectionStateGraph::with_config(
356            config.peer_config.rssi_degraded_threshold,
357            config.peer_config.lost_timeout_ms,
358        );
359
360        // Create seen message cache for relay deduplication
361        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
362
363        // Create gossip strategy for relay
364        let gossip_strategy: Box<dyn GossipStrategy> =
365            Box::new(RandomFanout::new(config.relay_fanout));
366
367        // Create delta encoder for per-peer sync state tracking
368        let delta_encoder = DeltaEncoder::new(config.node_id);
369
370        // Create document registry and auto-register types
371        let document_registry = DocumentRegistry::new();
372        #[cfg(feature = "hive-lite-sync")]
373        {
374            use crate::hive_lite_sync::CannedMessageDocument;
375            document_registry.register::<CannedMessageDocument>();
376            log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
377        }
378
379        Self {
380            config,
381            peer_manager,
382            document_sync,
383            observers: ObserverManager::new(),
384            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
385            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
386            encryption_key,
387            peer_sessions: std::sync::Mutex::new(None),
388            connection_graph: std::sync::Mutex::new(connection_graph),
389            seen_cache: std::sync::Mutex::new(seen_cache),
390            gossip_strategy,
391            delta_encoder: std::sync::Mutex::new(delta_encoder),
392            identity: None,
393            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
394            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
395            document_registry,
396            app_documents: std::sync::RwLock::new(HashMap::new()),
397        }
398    }
399
400    /// Create a new HiveMesh with a cryptographic identity
401    ///
402    /// The node_id will be derived from the identity's public key, overriding
403    /// any node_id specified in the config. This ensures cryptographic binding
404    /// between node_id and identity.
405    pub fn with_identity(config: HiveMeshConfig, identity: DeviceIdentity) -> Self {
406        // Override node_id with identity-derived value
407        let mut config = config;
408        config.node_id = identity.node_id();
409
410        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
411        let document_sync = DocumentSync::with_peripheral_type(
412            config.node_id,
413            &config.callsign,
414            config.peripheral_type,
415        );
416
417        let encryption_key = config
418            .encryption_secret
419            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
420
421        let connection_graph = ConnectionStateGraph::with_config(
422            config.peer_config.rssi_degraded_threshold,
423            config.peer_config.lost_timeout_ms,
424        );
425
426        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
427        let gossip_strategy: Box<dyn GossipStrategy> =
428            Box::new(RandomFanout::new(config.relay_fanout));
429        let delta_encoder = DeltaEncoder::new(config.node_id);
430
431        // Create document registry and auto-register types
432        let document_registry = DocumentRegistry::new();
433        #[cfg(feature = "hive-lite-sync")]
434        {
435            use crate::hive_lite_sync::CannedMessageDocument;
436            document_registry.register::<CannedMessageDocument>();
437            log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
438        }
439
440        Self {
441            config,
442            peer_manager,
443            document_sync,
444            observers: ObserverManager::new(),
445            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
446            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
447            encryption_key,
448            peer_sessions: std::sync::Mutex::new(None),
449            connection_graph: std::sync::Mutex::new(connection_graph),
450            seen_cache: std::sync::Mutex::new(seen_cache),
451            gossip_strategy,
452            delta_encoder: std::sync::Mutex::new(delta_encoder),
453            identity: Some(identity),
454            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
455            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
456            document_registry,
457            app_documents: std::sync::RwLock::new(HashMap::new()),
458        }
459    }
460
461    /// Create a new HiveMesh from genesis data
462    ///
463    /// This is the recommended way to create a mesh for production use.
464    /// The mesh will be configured with:
465    /// - node_id derived from identity
466    /// - mesh_id from genesis
467    /// - encryption enabled using genesis-derived secret
468    pub fn from_genesis(
469        genesis: &crate::security::MeshGenesis,
470        identity: DeviceIdentity,
471        callsign: &str,
472    ) -> Self {
473        let config = HiveMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
474            .with_encryption(genesis.encryption_secret());
475
476        Self::with_identity(config, identity)
477    }
478
479    /// Create a HiveMesh from persisted state.
480    ///
481    /// Restores mesh configuration from previously saved state, including:
482    /// - Device identity (Ed25519 keypair)
483    /// - Mesh genesis (if present)
484    /// - Identity registry (TOFU cache)
485    ///
486    /// Use this on device boot to restore mesh membership without re-provisioning.
487    ///
488    /// # Arguments
489    ///
490    /// * `state` - Previously persisted state
491    /// * `callsign` - Human-readable identifier (may differ from original)
492    ///
493    /// # Errors
494    ///
495    /// Returns `PersistenceError` if the identity cannot be restored.
496    ///
497    /// # Example
498    ///
499    /// ```ignore
500    /// // On boot, restore from secure storage
501    /// let state = PersistedState::load(&storage)?;
502    /// let mesh = HiveMesh::from_persisted(state, "SENSOR-01")?;
503    /// ```
504    #[cfg(feature = "std")]
505    pub fn from_persisted(
506        state: crate::security::PersistedState,
507        callsign: &str,
508    ) -> Result<Self, crate::security::PersistenceError> {
509        // Restore identity
510        let identity = state.restore_identity()?;
511
512        // Restore genesis (if present)
513        let genesis = state.restore_genesis();
514
515        // Create mesh with or without genesis
516        let mesh = if let Some(ref gen) = genesis {
517            Self::from_genesis(gen, identity, callsign)
518        } else {
519            let config = HiveMeshConfig::new(identity.node_id(), callsign, "RESTORED");
520            Self::with_identity(config, identity)
521        };
522
523        // Restore identity registry
524        let restored_registry = state.restore_registry();
525        if let Ok(mut registry) = mesh.identity_registry.lock() {
526            *registry = restored_registry;
527        }
528
529        log::info!(
530            "HiveMesh restored from persisted state: node_id={:08X}, known_peers={}",
531            mesh.config.node_id.as_u32(),
532            mesh.known_identity_count()
533        );
534
535        Ok(mesh)
536    }
537
538    /// Create persisted state from current mesh state.
539    ///
540    /// Captures the current identity, genesis, and registry for persistence.
541    /// Call this periodically or before shutdown to save state.
542    ///
543    /// # Arguments
544    ///
545    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
546    ///
547    /// # Returns
548    ///
549    /// `None` if the mesh has no identity bound.
550    #[cfg(feature = "std")]
551    pub fn to_persisted_state(
552        &self,
553        genesis: Option<&crate::security::MeshGenesis>,
554    ) -> Option<crate::security::PersistedState> {
555        let identity = self.identity.as_ref()?;
556        let registry = self.identity_registry.lock().ok()?;
557
558        Some(crate::security::PersistedState::with_registry(
559            identity, genesis, &registry,
560        ))
561    }
562
563    // ==================== Encryption ====================
564
565    /// Check if mesh-wide encryption is enabled
566    pub fn is_encryption_enabled(&self) -> bool {
567        self.encryption_key.is_some()
568    }
569
570    /// Check if strict encryption mode is enabled
571    ///
572    /// Returns true only if both encryption and strict_encryption are enabled.
573    pub fn is_strict_encryption_enabled(&self) -> bool {
574        self.config.strict_encryption && self.encryption_key.is_some()
575    }
576
577    /// Enable mesh-wide encryption with a shared secret
578    ///
579    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
580    /// All mesh participants must use the same secret to communicate.
581    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
582        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
583            &self.config.mesh_id,
584            secret,
585        ));
586    }
587
588    /// Disable mesh-wide encryption
589    pub fn disable_encryption(&mut self) {
590        self.encryption_key = None;
591    }
592
593    /// Encrypt document bytes for transmission
594    ///
595    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
596    /// original bytes if encryption is disabled.
597    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
598        match &self.encryption_key {
599            Some(key) => {
600                // Encrypt and prepend marker
601                match key.encrypt_to_bytes(plaintext) {
602                    Ok(ciphertext) => {
603                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
604                        buf.push(ENCRYPTED_MARKER);
605                        buf.push(0x00); // reserved
606                        buf.extend_from_slice(&ciphertext);
607                        buf
608                    }
609                    Err(e) => {
610                        log::error!("Encryption failed: {}", e);
611                        // Fall back to unencrypted on error (shouldn't happen)
612                        plaintext.to_vec()
613                    }
614                }
615            }
616            None => plaintext.to_vec(),
617        }
618    }
619
620    /// Decrypt document bytes received from peer
621    ///
622    /// Returns the decrypted bytes if encrypted and valid, or the original
623    /// bytes if not encrypted. Returns None if decryption fails.
624    ///
625    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
626    /// unencrypted documents are rejected and trigger a SecurityViolation event.
627    fn decrypt_document<'a>(
628        &self,
629        data: &'a [u8],
630        source_hint: Option<&str>,
631    ) -> Option<std::borrow::Cow<'a, [u8]>> {
632        log::debug!(
633            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
634            data.len(),
635            data.first().copied().unwrap_or(0),
636            source_hint
637        );
638
639        // Check for encrypted marker
640        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
641            // Encrypted document
642            let _reserved = data[1];
643            let encrypted_payload = &data[2..];
644
645            log::debug!(
646                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
647                encrypted_payload.len()
648            );
649
650            match &self.encryption_key {
651                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
652                    Ok(plaintext) => {
653                        log::debug!(
654                            "decrypt_document: SUCCESS, plaintext len={}",
655                            plaintext.len()
656                        );
657                        Some(std::borrow::Cow::Owned(plaintext))
658                    }
659                    Err(e) => {
660                        log::warn!(
661                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
662                            e,
663                            encrypted_payload.len(),
664                            source_hint
665                        );
666                        self.notify(HiveEvent::SecurityViolation {
667                            kind: SecurityViolationKind::DecryptionFailed,
668                            source: source_hint.map(String::from),
669                        });
670                        None
671                    }
672                },
673                None => {
674                    log::warn!(
675                        "decrypt_document: encryption not enabled but received encrypted doc"
676                    );
677                    None
678                }
679            }
680        } else {
681            // Unencrypted document
682            // Check strict encryption mode
683            if self.config.strict_encryption && self.encryption_key.is_some() {
684                log::warn!(
685                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
686                    source_hint
687                );
688                self.notify(HiveEvent::SecurityViolation {
689                    kind: SecurityViolationKind::UnencryptedInStrictMode,
690                    source: source_hint.map(String::from),
691                });
692                None
693            } else {
694                // Permissive mode: accept unencrypted for backward compatibility
695                Some(std::borrow::Cow::Borrowed(data))
696            }
697        }
698    }
699
700    // ==================== Transport Layer API ====================
701
702    /// Decrypt data without parsing (transport-only operation)
703    ///
704    /// This method provides raw decrypted bytes for apps that want to handle
705    /// message parsing themselves (using hive-lite or other libraries).
706    ///
707    /// # Arguments
708    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
709    ///
710    /// # Returns
711    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
712    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
713    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
714        self.decrypt_document(data, None)
715            .map(|cow| cow.into_owned())
716    }
717
718    // ==================== Identity ====================
719
720    /// Check if this mesh has a cryptographic identity
721    pub fn has_identity(&self) -> bool {
722        self.identity.is_some()
723    }
724
725    /// Get this node's public key (if identity is configured)
726    pub fn public_key(&self) -> Option<[u8; 32]> {
727        self.identity.as_ref().map(|id| id.public_key())
728    }
729
730    /// Create an identity attestation for this node
731    ///
732    /// Returns None if no identity is configured.
733    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
734        self.identity
735            .as_ref()
736            .map(|id| id.create_attestation(now_ms))
737    }
738
739    /// Verify and register a peer's identity attestation
740    ///
741    /// Implements TOFU (Trust On First Use):
742    /// - On first contact, registers the node_id → public_key binding
743    /// - On subsequent contacts, verifies the public key matches
744    ///
745    /// Returns the verification result. Security violations should be handled
746    /// by the caller (e.g., disconnect, alert).
747    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
748        self.identity_registry
749            .lock()
750            .unwrap()
751            .verify_or_register(attestation)
752    }
753
754    /// Check if a peer's identity is known (has been registered)
755    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
756        self.identity_registry.lock().unwrap().is_known(node_id)
757    }
758
759    /// Get a peer's public key if known
760    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
761        self.identity_registry
762            .lock()
763            .unwrap()
764            .get_public_key(node_id)
765            .copied()
766    }
767
768    /// Get the number of known peer identities
769    pub fn known_identity_count(&self) -> usize {
770        self.identity_registry.lock().unwrap().len()
771    }
772
773    /// Pre-register a peer's identity (for out-of-band key exchange)
774    ///
775    /// Use this when keys are exchanged through a secure side channel
776    /// (e.g., QR code, NFC tap, or provisioning server).
777    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
778        self.identity_registry
779            .lock()
780            .unwrap()
781            .pre_register(node_id, public_key, now_ms);
782    }
783
784    /// Remove a peer's identity from the registry
785    ///
786    /// Use with caution - this allows re-registration with a different key.
787    pub fn forget_peer_identity(&self, node_id: NodeId) {
788        self.identity_registry.lock().unwrap().remove(node_id);
789    }
790
791    /// Sign arbitrary data with this node's identity
792    ///
793    /// Returns None if no identity is configured.
794    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
795        self.identity.as_ref().map(|id| id.sign(data))
796    }
797
798    /// Verify a signature from a peer
799    ///
800    /// Uses the peer's public key from the identity registry.
801    /// Returns false if peer is unknown or signature is invalid.
802    pub fn verify_peer_signature(
803        &self,
804        node_id: NodeId,
805        data: &[u8],
806        signature: &[u8; 64],
807    ) -> bool {
808        if let Some(public_key) = self.peer_public_key(node_id) {
809            crate::security::verify_signature(&public_key, data, signature)
810        } else {
811            false
812        }
813    }
814
815    // ==================== Multi-Hop Relay ====================
816
817    /// Check if multi-hop relay is enabled
818    pub fn is_relay_enabled(&self) -> bool {
819        self.config.enable_relay
820    }
821
822    /// Enable multi-hop relay
823    pub fn enable_relay(&mut self) {
824        self.config.enable_relay = true;
825    }
826
827    /// Disable multi-hop relay
828    pub fn disable_relay(&mut self) {
829        self.config.enable_relay = false;
830    }
831
832    /// Check if a message has been seen before (for deduplication)
833    ///
834    /// Returns true if the message was already seen (duplicate).
835    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
836        self.seen_cache.lock().unwrap().has_seen(message_id)
837    }
838
839    /// Mark a message as seen
840    ///
841    /// Returns true if this is a new message (first time seen).
842    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
843        self.seen_cache
844            .lock()
845            .unwrap()
846            .check_and_mark(message_id, origin, now_ms)
847    }
848
849    /// Get the number of entries in the seen message cache
850    pub fn seen_cache_size(&self) -> usize {
851        self.seen_cache.lock().unwrap().len()
852    }
853
854    /// Clear the seen message cache
855    pub fn clear_seen_cache(&self) {
856        self.seen_cache.lock().unwrap().clear();
857    }
858
859    /// Wrap a document in a relay envelope for multi-hop transmission
860    ///
861    /// The returned bytes can be sent to peers and will be automatically
862    /// relayed through the mesh if relay is enabled on receiving nodes.
863    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
864        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
865            .with_max_hops(self.config.max_relay_hops);
866        envelope.encode()
867    }
868
869    /// Get peers to relay a message to
870    ///
871    /// Uses the configured gossip strategy to select relay targets.
872    /// Excludes the source peer (if provided) to avoid sending back to sender.
873    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<HivePeer> {
874        let connected = self.peer_manager.get_connected_peers();
875        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
876            connected
877                .into_iter()
878                .filter(|p| p.node_id != exclude)
879                .collect()
880        } else {
881            connected
882        };
883
884        self.gossip_strategy
885            .select_peers(&filtered)
886            .into_iter()
887            .cloned()
888            .collect()
889    }
890
891    /// Process an incoming relay envelope
892    ///
893    /// Handles deduplication, TTL checking, and determines if the message
894    /// should be processed and/or relayed.
895    ///
896    /// Returns:
897    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
898    /// - `Ok(None)` if message was a duplicate or TTL expired
899    /// - `Err` if parsing failed
900    pub fn process_relay_envelope(
901        &self,
902        data: &[u8],
903        source_peer: NodeId,
904        now_ms: u64,
905    ) -> Option<RelayDecision> {
906        // Parse envelope
907        let envelope = RelayEnvelope::decode(data)?;
908
909        // Update indirect peer graph if origin differs from source
910        // This means the message was relayed through source_peer from origin_node
911        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
912            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
913                source_peer,
914                envelope.origin_node,
915                envelope.hop_count,
916                now_ms,
917            );
918
919            if is_new {
920                log::debug!(
921                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
922                    envelope.origin_node.as_u32(),
923                    source_peer.as_u32(),
924                    envelope.hop_count
925                );
926            }
927        }
928
929        // Check deduplication
930        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
931            // Duplicate message
932            let stats = self
933                .seen_cache
934                .lock()
935                .unwrap()
936                .get_stats(&envelope.message_id);
937            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
938
939            self.notify(HiveEvent::DuplicateMessageDropped {
940                origin_node: envelope.origin_node,
941                seen_count,
942            });
943
944            log::debug!(
945                "Dropped duplicate message {} from {:08X} (seen {} times)",
946                envelope.message_id,
947                envelope.origin_node.as_u32(),
948                seen_count
949            );
950            return None;
951        }
952
953        // Check TTL
954        if !envelope.can_relay() {
955            self.notify(HiveEvent::MessageTtlExpired {
956                origin_node: envelope.origin_node,
957                hop_count: envelope.hop_count,
958            });
959
960            log::debug!(
961                "Message {} from {:08X} TTL expired at hop {}",
962                envelope.message_id,
963                envelope.origin_node.as_u32(),
964                envelope.hop_count
965            );
966
967            // Still process locally even if TTL expired
968            return Some(RelayDecision {
969                payload: envelope.payload,
970                origin_node: envelope.origin_node,
971                hop_count: envelope.hop_count,
972                should_relay: false,
973                relay_envelope: None,
974            });
975        }
976
977        // Determine if we should relay
978        let should_relay = self.config.enable_relay;
979        let relay_envelope = if should_relay {
980            envelope.relay() // Increments hop count
981        } else {
982            None
983        };
984
985        Some(RelayDecision {
986            payload: envelope.payload,
987            origin_node: envelope.origin_node,
988            hop_count: envelope.hop_count,
989            should_relay,
990            relay_envelope,
991        })
992    }
993
994    /// Build a document wrapped in a relay envelope
995    ///
996    /// Convenience method that builds the document, encrypts it (if enabled),
997    /// and wraps it in a relay envelope for multi-hop transmission.
998    pub fn build_relay_document(&self) -> Vec<u8> {
999        let doc = self.build_document(); // Already encrypted if encryption enabled
1000        self.wrap_for_relay(doc)
1001    }
1002
1003    // ==================== Delta Sync ====================
1004
1005    /// Register a peer for delta sync tracking
1006    ///
1007    /// Call this when a peer connects to start tracking what data has been
1008    /// sent to them. This enables future delta sync (sending only changes).
1009    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1010        let mut encoder = self.delta_encoder.lock().unwrap();
1011        encoder.add_peer(peer_id);
1012        log::debug!(
1013            "Registered peer {:08X} for delta sync tracking",
1014            peer_id.as_u32()
1015        );
1016    }
1017
1018    /// Unregister a peer from delta sync tracking
1019    ///
1020    /// Call this when a peer disconnects to clean up tracking state.
1021    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1022        let mut encoder = self.delta_encoder.lock().unwrap();
1023        encoder.remove_peer(peer_id);
1024        log::debug!(
1025            "Unregistered peer {:08X} from delta sync tracking",
1026            peer_id.as_u32()
1027        );
1028    }
1029
1030    /// Reset delta sync state for a peer
1031    ///
1032    /// Call this when a peer reconnects to force a full sync on next
1033    /// communication. This clears the record of what was previously sent.
1034    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1035        let mut encoder = self.delta_encoder.lock().unwrap();
1036        encoder.reset_peer(peer_id);
1037        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1038    }
1039
1040    /// Record bytes sent to a peer (for delta statistics)
1041    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1042        let mut encoder = self.delta_encoder.lock().unwrap();
1043        encoder.record_sent(peer_id, bytes);
1044    }
1045
1046    /// Record bytes received from a peer (for delta statistics)
1047    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1048        let mut encoder = self.delta_encoder.lock().unwrap();
1049        encoder.record_received(peer_id, bytes, timestamp);
1050    }
1051
1052    /// Get delta sync statistics
1053    ///
1054    /// Returns aggregate statistics about delta sync across all peers,
1055    /// including bytes sent/received and sync counts.
1056    pub fn delta_stats(&self) -> DeltaStats {
1057        self.delta_encoder.lock().unwrap().stats()
1058    }
1059
1060    /// Get delta sync statistics for a specific peer
1061    ///
1062    /// Returns the bytes sent/received and sync count for a single peer.
1063    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1064        let encoder = self.delta_encoder.lock().unwrap();
1065        encoder
1066            .get_peer_state(peer_id)
1067            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1068    }
1069
1070    /// Build a delta document for a specific peer
1071    ///
1072    /// This only includes operations that have changed since the last sync
1073    /// with this peer. Uses the delta encoder to track per-peer state.
1074    ///
1075    /// Returns the encoded delta document bytes, or None if there's nothing
1076    /// new to send to this peer.
1077    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1078        // Collect all current operations
1079        let mut all_operations: Vec<Operation> = Vec::new();
1080
1081        // Add counter operations (one per node that has contributed)
1082        // Use the count value as the "timestamp" for tracking - only send if count increased
1083        for (node_id_u32, count) in self.document_sync.counter_entries() {
1084            all_operations.push(Operation::IncrementCounter {
1085                counter_id: 0, // Default mesh counter
1086                node_id: NodeId::new(node_id_u32),
1087                amount: count,
1088                timestamp: count, // Use count as timestamp for delta tracking
1089            });
1090        }
1091
1092        // Add peripheral update
1093        // Use event timestamp if available, otherwise use 1 for initial send
1094        let peripheral = self.document_sync.peripheral_snapshot();
1095        let peripheral_timestamp = peripheral
1096            .last_event
1097            .as_ref()
1098            .map(|e| e.timestamp)
1099            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1100        all_operations.push(Operation::UpdatePeripheral {
1101            peripheral,
1102            timestamp: peripheral_timestamp,
1103        });
1104
1105        // Add emergency operations if active
1106        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1107            let source_node = NodeId::new(emergency.source_node());
1108            let timestamp = emergency.timestamp();
1109
1110            // Add SetEmergency operation
1111            all_operations.push(Operation::SetEmergency {
1112                source_node,
1113                timestamp,
1114                known_peers: emergency.all_nodes(),
1115            });
1116
1117            // Add AckEmergency for each node that has acked
1118            for acked_node in emergency.acked_nodes() {
1119                all_operations.push(Operation::AckEmergency {
1120                    node_id: NodeId::new(acked_node),
1121                    emergency_timestamp: timestamp,
1122                });
1123            }
1124        }
1125
1126        // Add app document operations
1127        for app_op in self.app_document_delta_ops() {
1128            all_operations.push(Operation::App(app_op));
1129        }
1130
1131        // Filter operations for this peer (only send what's new)
1132        let filtered_operations: Vec<Operation> = {
1133            let encoder = self.delta_encoder.lock().unwrap();
1134            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1135                all_operations
1136                    .into_iter()
1137                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1138                    .collect()
1139            } else {
1140                // Unknown peer, send all operations
1141                all_operations
1142            }
1143        };
1144
1145        // If nothing new to send, return None
1146        if filtered_operations.is_empty() {
1147            return None;
1148        }
1149
1150        // Mark operations as sent
1151        {
1152            let mut encoder = self.delta_encoder.lock().unwrap();
1153            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1154                for op in &filtered_operations {
1155                    peer_state.mark_sent(&op.key(), op.timestamp());
1156                }
1157            }
1158        }
1159
1160        // Build the delta document
1161        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1162        for op in filtered_operations {
1163            delta.add_operation(op);
1164        }
1165
1166        // Encode and optionally encrypt
1167        let encoded = delta.encode();
1168        let result = self.encrypt_document(&encoded);
1169
1170        // Record stats
1171        {
1172            let mut encoder = self.delta_encoder.lock().unwrap();
1173            encoder.record_sent(peer_id, result.len());
1174        }
1175
1176        Some(result)
1177    }
1178
1179    /// Build a full delta document (for broadcast or new peers)
1180    ///
1181    /// Unlike `build_delta_document_for_peer`, this includes all state
1182    /// regardless of what has been sent before. Use this for broadcasts.
1183    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1184        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1185
1186        // Add all counter operations
1187        for (node_id_u32, count) in self.document_sync.counter_entries() {
1188            delta.add_operation(Operation::IncrementCounter {
1189                counter_id: 0,
1190                node_id: NodeId::new(node_id_u32),
1191                amount: count,
1192                timestamp: now_ms,
1193            });
1194        }
1195
1196        // Add peripheral
1197        let peripheral = self.document_sync.peripheral_snapshot();
1198        let peripheral_timestamp = peripheral
1199            .last_event
1200            .as_ref()
1201            .map(|e| e.timestamp)
1202            .unwrap_or(now_ms);
1203        delta.add_operation(Operation::UpdatePeripheral {
1204            peripheral,
1205            timestamp: peripheral_timestamp,
1206        });
1207
1208        // Add emergency if active
1209        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1210            let source_node = NodeId::new(emergency.source_node());
1211            let timestamp = emergency.timestamp();
1212
1213            delta.add_operation(Operation::SetEmergency {
1214                source_node,
1215                timestamp,
1216                known_peers: emergency.all_nodes(),
1217            });
1218
1219            for acked_node in emergency.acked_nodes() {
1220                delta.add_operation(Operation::AckEmergency {
1221                    node_id: NodeId::new(acked_node),
1222                    emergency_timestamp: timestamp,
1223                });
1224            }
1225        }
1226
1227        // Add app document operations
1228        for app_op in self.app_document_delta_ops() {
1229            delta.add_operation(Operation::App(app_op));
1230        }
1231
1232        let encoded = delta.encode();
1233        self.encrypt_document(&encoded)
1234    }
1235
1236    /// Internal: Process a received delta document
1237    ///
1238    /// Applies operations from a delta document to local state.
1239    fn process_delta_document_internal(
1240        &self,
1241        source_node: NodeId,
1242        data: &[u8],
1243        now_ms: u64,
1244        relay_data: Option<Vec<u8>>,
1245        origin_node: Option<NodeId>,
1246        hop_count: u8,
1247    ) -> Option<DataReceivedResult> {
1248        // Decode the delta document
1249        let delta = DeltaDocument::decode(data)?;
1250
1251        // Don't process our own documents
1252        if delta.origin_node == self.config.node_id {
1253            return None;
1254        }
1255
1256        // Apply operations to local state
1257        let mut counter_changed = false;
1258        let mut emergency_changed = false;
1259        let mut is_emergency = false;
1260        let mut is_ack = false;
1261        let mut event_timestamp = 0u64;
1262        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1263
1264        log::info!(
1265            "Delta document from {:08X}: {} operations, data_len={}",
1266            delta.origin_node.as_u32(),
1267            delta.operations.len(),
1268            data.len()
1269        );
1270        for op in &delta.operations {
1271            log::info!("  Operation: {}", op.key());
1272            match op {
1273                Operation::IncrementCounter {
1274                    node_id, amount, ..
1275                } => {
1276                    // Merge counter value (take max)
1277                    let current = self.document_sync.counter_entries();
1278                    let current_value = current
1279                        .iter()
1280                        .find(|(id, _)| *id == node_id.as_u32())
1281                        .map(|(_, v)| *v)
1282                        .unwrap_or(0);
1283
1284                    if *amount > current_value {
1285                        // Need to merge - this is handled by the counter merge logic
1286                        // For now, we record that counter changed
1287                        counter_changed = true;
1288                    }
1289                }
1290                Operation::UpdatePeripheral {
1291                    peripheral,
1292                    timestamp,
1293                } => {
1294                    // Store peer peripheral for callsign lookup
1295                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1296                        peripherals.insert(delta.origin_node, peripheral.clone());
1297                    }
1298                    // Track the peripheral for the result
1299                    peer_peripheral = Some(peripheral.clone());
1300                    // Track the timestamp for the result
1301                    if *timestamp > event_timestamp {
1302                        event_timestamp = *timestamp;
1303                    }
1304                }
1305                Operation::SetEmergency { timestamp, .. } => {
1306                    is_emergency = true;
1307                    emergency_changed = true;
1308                    event_timestamp = *timestamp;
1309                }
1310                Operation::AckEmergency {
1311                    emergency_timestamp,
1312                    ..
1313                } => {
1314                    is_ack = true;
1315                    emergency_changed = true;
1316                    if *emergency_timestamp > event_timestamp {
1317                        event_timestamp = *emergency_timestamp;
1318                    }
1319                }
1320                Operation::ClearEmergency {
1321                    emergency_timestamp,
1322                } => {
1323                    emergency_changed = true;
1324                    if *emergency_timestamp > event_timestamp {
1325                        event_timestamp = *emergency_timestamp;
1326                    }
1327                }
1328                Operation::App(app_op) => {
1329                    // Handle app-layer document operation
1330                    //
1331                    // The timestamp field may contain version info in the upper bits
1332                    // (used by delta encoder for change detection). Extract the
1333                    // original document timestamp from the lower 48 bits.
1334                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1335
1336                    log::info!(
1337                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1338                        app_op.type_id,
1339                        app_op.op_code,
1340                        app_op.source_node,
1341                        doc_timestamp,
1342                        app_op.payload.len()
1343                    );
1344
1345                    // Try to find/create document and apply the delta operation
1346                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1347                    let changed = {
1348                        let mut docs = self.app_documents.write().unwrap();
1349
1350                        if let Some(existing) = docs.get_mut(&doc_key) {
1351                            // Apply delta to existing document (merge via CRDT)
1352                            self.document_registry.apply_delta_op(
1353                                app_op.type_id,
1354                                existing.as_mut(),
1355                                app_op,
1356                            )
1357                        } else {
1358                            // Try to decode from payload (for FULL_STATE operations)
1359                            if let Some(decoded) = self
1360                                .document_registry
1361                                .decode(app_op.type_id, &app_op.payload)
1362                            {
1363                                docs.insert(doc_key, decoded);
1364                                true
1365                            } else {
1366                                // For delta ops on unknown documents, we can't apply
1367                                // The full state will be sent later
1368                                log::debug!(
1369                                    "Received delta for unknown doc {:?}, waiting for full state",
1370                                    doc_key
1371                                );
1372                                false
1373                            }
1374                        }
1375                    };
1376
1377                    // Emit observer event with the real document timestamp
1378                    self.observers.notify(HiveEvent::app_document_received(
1379                        app_op.type_id,
1380                        NodeId::new(app_op.source_node),
1381                        doc_timestamp,
1382                        changed,
1383                    ));
1384                }
1385            }
1386        }
1387
1388        // Record sync
1389        self.peer_manager.record_sync(source_node, now_ms);
1390
1391        // Record delta received
1392        {
1393            let mut encoder = self.delta_encoder.lock().unwrap();
1394            encoder.record_received(&source_node, data.len(), now_ms);
1395        }
1396
1397        // Generate events based on what was received
1398        if is_emergency {
1399            self.notify(HiveEvent::EmergencyReceived {
1400                from_node: delta.origin_node,
1401            });
1402        } else if is_ack {
1403            self.notify(HiveEvent::AckReceived {
1404                from_node: delta.origin_node,
1405            });
1406        }
1407
1408        if counter_changed {
1409            let total_count = self.document_sync.total_count();
1410            self.notify(HiveEvent::DocumentSynced {
1411                from_node: delta.origin_node,
1412                total_count,
1413            });
1414        }
1415
1416        // Emit relay event if we're relaying
1417        if relay_data.is_some() {
1418            let relay_targets = self.get_relay_targets(Some(source_node));
1419            self.notify(HiveEvent::MessageRelayed {
1420                origin_node: origin_node.unwrap_or(delta.origin_node),
1421                relay_count: relay_targets.len(),
1422                hop_count,
1423            });
1424        }
1425
1426        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1427            DataReceivedResult::peripheral_fields(&peer_peripheral);
1428
1429        Some(DataReceivedResult {
1430            source_node: delta.origin_node,
1431            is_emergency,
1432            is_ack,
1433            counter_changed,
1434            emergency_changed,
1435            total_count: self.document_sync.total_count(),
1436            event_timestamp,
1437            relay_data,
1438            origin_node,
1439            hop_count,
1440            callsign,
1441            battery_percent,
1442            heart_rate,
1443            event_type,
1444            latitude,
1445            longitude,
1446            altitude,
1447        })
1448    }
1449
1450    // ==================== Per-Peer E2EE ====================
1451
1452    /// Enable per-peer E2EE capability
1453    ///
1454    /// Creates a new identity key for this node. This allows establishing
1455    /// encrypted sessions with specific peers where only the sender and
1456    /// recipient can read messages (other mesh members cannot).
1457    pub fn enable_peer_e2ee(&self) {
1458        let mut sessions = self.peer_sessions.lock().unwrap();
1459        if sessions.is_none() {
1460            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1461            log::info!(
1462                "Per-peer E2EE enabled for node {:08X}",
1463                self.config.node_id.as_u32()
1464            );
1465        }
1466    }
1467
1468    /// Disable per-peer E2EE capability
1469    ///
1470    /// Clears all peer sessions and disables E2EE.
1471    pub fn disable_peer_e2ee(&self) {
1472        let mut sessions = self.peer_sessions.lock().unwrap();
1473        *sessions = None;
1474        log::info!("Per-peer E2EE disabled");
1475    }
1476
1477    /// Check if per-peer E2EE is enabled
1478    pub fn is_peer_e2ee_enabled(&self) -> bool {
1479        self.peer_sessions.lock().unwrap().is_some()
1480    }
1481
1482    /// Get our E2EE public key (for sharing with peers)
1483    ///
1484    /// Returns None if per-peer E2EE is not enabled.
1485    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1486        self.peer_sessions
1487            .lock()
1488            .unwrap()
1489            .as_ref()
1490            .map(|s| s.our_public_key())
1491    }
1492
1493    /// Initiate E2EE session with a specific peer
1494    ///
1495    /// Returns the key exchange message bytes to send to the peer.
1496    /// The message should be broadcast/sent to the peer.
1497    /// Returns None if per-peer E2EE is not enabled.
1498    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1499        let mut sessions = self.peer_sessions.lock().unwrap();
1500        let session_mgr = sessions.as_mut()?;
1501
1502        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1503        let mut buf = Vec::with_capacity(2 + 37);
1504        buf.push(KEY_EXCHANGE_MARKER);
1505        buf.push(0x00); // reserved
1506        buf.extend_from_slice(&key_exchange.encode());
1507
1508        log::info!(
1509            "Initiated E2EE session with peer {:08X}",
1510            peer_node_id.as_u32()
1511        );
1512        Some(buf)
1513    }
1514
1515    /// Check if we have an established E2EE session with a peer
1516    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1517        self.peer_sessions
1518            .lock()
1519            .unwrap()
1520            .as_ref()
1521            .is_some_and(|s| s.has_session(peer_node_id))
1522    }
1523
1524    /// Get E2EE session state with a peer
1525    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1526        self.peer_sessions
1527            .lock()
1528            .unwrap()
1529            .as_ref()
1530            .and_then(|s| s.session_state(peer_node_id))
1531    }
1532
1533    /// Send an E2EE encrypted message to a specific peer
1534    ///
1535    /// Returns the encrypted message bytes to send, or None if no session exists.
1536    /// The message should be sent directly to the peer (not broadcast).
1537    pub fn send_peer_e2ee(
1538        &self,
1539        peer_node_id: NodeId,
1540        plaintext: &[u8],
1541        now_ms: u64,
1542    ) -> Option<Vec<u8>> {
1543        let mut sessions = self.peer_sessions.lock().unwrap();
1544        let session_mgr = sessions.as_mut()?;
1545
1546        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1547            Ok(encrypted) => {
1548                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1549                buf.push(PEER_E2EE_MARKER);
1550                buf.push(0x00); // reserved
1551                buf.extend_from_slice(&encrypted.encode());
1552                Some(buf)
1553            }
1554            Err(e) => {
1555                log::warn!(
1556                    "Failed to encrypt for peer {:08X}: {:?}",
1557                    peer_node_id.as_u32(),
1558                    e
1559                );
1560                None
1561            }
1562        }
1563    }
1564
1565    /// Close E2EE session with a peer
1566    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1567        let mut sessions = self.peer_sessions.lock().unwrap();
1568        if let Some(session_mgr) = sessions.as_mut() {
1569            session_mgr.close_session(peer_node_id);
1570            self.notify(HiveEvent::PeerE2eeClosed { peer_node_id });
1571            log::info!(
1572                "Closed E2EE session with peer {:08X}",
1573                peer_node_id.as_u32()
1574            );
1575        }
1576    }
1577
1578    /// Get count of active E2EE sessions
1579    pub fn peer_e2ee_session_count(&self) -> usize {
1580        self.peer_sessions
1581            .lock()
1582            .unwrap()
1583            .as_ref()
1584            .map(|s| s.session_count())
1585            .unwrap_or(0)
1586    }
1587
1588    /// Get count of established E2EE sessions
1589    pub fn peer_e2ee_established_count(&self) -> usize {
1590        self.peer_sessions
1591            .lock()
1592            .unwrap()
1593            .as_ref()
1594            .map(|s| s.established_count())
1595            .unwrap_or(0)
1596    }
1597
1598    /// Handle incoming key exchange message
1599    ///
1600    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1601    /// Returns the response key exchange bytes to send back, or None if invalid.
1602    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1603        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1604            return None;
1605        }
1606
1607        let payload = &data[2..];
1608        let msg = KeyExchangeMessage::decode(payload)?;
1609
1610        let mut sessions = self.peer_sessions.lock().unwrap();
1611        let session_mgr = sessions.as_mut()?;
1612
1613        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1614
1615        if established {
1616            self.notify(HiveEvent::PeerE2eeEstablished {
1617                peer_node_id: msg.sender_node_id,
1618            });
1619            log::info!(
1620                "E2EE session established with peer {:08X}",
1621                msg.sender_node_id.as_u32()
1622            );
1623        }
1624
1625        // Return response key exchange
1626        let mut buf = Vec::with_capacity(2 + 37);
1627        buf.push(KEY_EXCHANGE_MARKER);
1628        buf.push(0x00);
1629        buf.extend_from_slice(&response.encode());
1630        Some(buf)
1631    }
1632
1633    /// Handle incoming E2EE encrypted message
1634    ///
1635    /// Called internally when we receive a PEER_E2EE_MARKER message.
1636    /// Decrypts and notifies observers of the received message.
1637    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1638        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1639            return None;
1640        }
1641
1642        let payload = &data[2..];
1643        let msg = PeerEncryptedMessage::decode(payload)?;
1644
1645        let mut sessions = self.peer_sessions.lock().unwrap();
1646        let session_mgr = sessions.as_mut()?;
1647
1648        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1649            Ok(plaintext) => {
1650                // Notify observers of the decrypted message
1651                self.notify(HiveEvent::PeerE2eeMessageReceived {
1652                    from_node: msg.sender_node_id,
1653                    data: plaintext.clone(),
1654                });
1655                Some(plaintext)
1656            }
1657            Err(e) => {
1658                log::warn!(
1659                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1660                    msg.sender_node_id.as_u32(),
1661                    e
1662                );
1663                None
1664            }
1665        }
1666    }
1667
1668    // ==================== Configuration ====================
1669
1670    /// Get our node ID
1671    pub fn node_id(&self) -> NodeId {
1672        self.config.node_id
1673    }
1674
1675    /// Get our callsign
1676    pub fn callsign(&self) -> &str {
1677        &self.config.callsign
1678    }
1679
1680    /// Get the mesh ID
1681    pub fn mesh_id(&self) -> &str {
1682        &self.config.mesh_id
1683    }
1684
1685    /// Get the device name for BLE advertising
1686    pub fn device_name(&self) -> String {
1687        format!(
1688            "HIVE_{}-{:08X}",
1689            self.config.mesh_id,
1690            self.config.node_id.as_u32()
1691        )
1692    }
1693
1694    /// Get a peer's callsign by node ID
1695    ///
1696    /// Returns the callsign from the peer's most recently received peripheral data,
1697    /// or None if no peripheral data has been received from this peer.
1698    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1699        self.peer_peripherals.read().ok().and_then(|peripherals| {
1700            peripherals
1701                .get(&node_id)
1702                .map(|p| p.callsign_str().to_string())
1703        })
1704    }
1705
1706    /// Get a peer's full peripheral data by node ID
1707    ///
1708    /// Returns a clone of the peripheral data from the peer's most recently received
1709    /// document, or None if no peripheral data has been received from this peer.
1710    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1711        self.peer_peripherals
1712            .read()
1713            .ok()
1714            .and_then(|peripherals| peripherals.get(&node_id).cloned())
1715    }
1716
1717    // ==================== Document Registry ====================
1718
1719    /// Get a reference to the document registry.
1720    ///
1721    /// Use this to register custom document types for mesh synchronization.
1722    ///
1723    /// # Example
1724    ///
1725    /// ```ignore
1726    /// use hive_btle::registry::DocumentType;
1727    ///
1728    /// // Register a custom document type
1729    /// mesh.document_registry().register::<MyCustomDocument>();
1730    /// ```
1731    pub fn document_registry(&self) -> &DocumentRegistry {
1732        &self.document_registry
1733    }
1734
1735    /// Store an app-layer document.
1736    ///
1737    /// If a document with the same identity (type_id, source_node, timestamp)
1738    /// already exists, it will be merged using CRDT semantics.
1739    ///
1740    /// Returns true if the document was newly added or changed via merge.
1741    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1742        let type_id = T::TYPE_ID;
1743        let (source_node, timestamp) = doc.identity();
1744        let key = (type_id, source_node, timestamp);
1745
1746        let mut docs = self.app_documents.write().unwrap();
1747
1748        if let Some(existing) = docs.get_mut(&key) {
1749            // Merge with existing document
1750            self.document_registry
1751                .merge(type_id, existing.as_mut(), &doc)
1752        } else {
1753            // Insert new document
1754            docs.insert(key, Box::new(doc));
1755            true
1756        }
1757    }
1758
1759    /// Store a type-erased app-layer document.
1760    ///
1761    /// Used when receiving documents from the network where the type is
1762    /// determined at runtime.
1763    ///
1764    /// Returns true if the document was newly added or changed via merge.
1765    pub fn store_app_document_boxed(
1766        &self,
1767        type_id: u8,
1768        source_node: u32,
1769        timestamp: u64,
1770        doc: Box<dyn core::any::Any + Send + Sync>,
1771    ) -> bool {
1772        let key = (type_id, source_node, timestamp);
1773
1774        let mut docs = self.app_documents.write().unwrap();
1775
1776        if let Some(existing) = docs.get_mut(&key) {
1777            // Merge with existing document
1778            self.document_registry
1779                .merge(type_id, existing.as_mut(), doc.as_ref())
1780        } else {
1781            // Insert new document
1782            docs.insert(key, doc);
1783            true
1784        }
1785    }
1786
1787    /// Get a stored app-layer document by identity.
1788    ///
1789    /// Returns None if not found or if downcast to T fails.
1790    pub fn get_app_document<T: crate::registry::DocumentType>(
1791        &self,
1792        source_node: u32,
1793        timestamp: u64,
1794    ) -> Option<T> {
1795        let key = (T::TYPE_ID, source_node, timestamp);
1796
1797        let docs = self.app_documents.read().unwrap();
1798        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1799    }
1800
1801    /// Get all stored app-layer documents of a specific type.
1802    ///
1803    /// Returns a vector of all documents of type T currently stored.
1804    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1805        let docs = self.app_documents.read().unwrap();
1806        docs.iter()
1807            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1808            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1809            .collect()
1810    }
1811
1812    /// Get delta operations for all stored app documents.
1813    ///
1814    /// Used during sync to include app documents in the delta stream.
1815    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
1816        let docs = self.app_documents.read().unwrap();
1817        let mut ops = Vec::new();
1818
1819        for ((type_id, _source, _ts), doc) in docs.iter() {
1820            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
1821                ops.push(op);
1822            }
1823        }
1824
1825        ops
1826    }
1827
1828    /// Get all document keys for a given type.
1829    ///
1830    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
1831    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
1832        let docs = self.app_documents.read().unwrap();
1833        docs.keys()
1834            .filter(|(tid, _, _)| *tid == type_id)
1835            .map(|(_, source, ts)| (*source, *ts))
1836            .collect()
1837    }
1838
1839    /// Get the number of stored app documents.
1840    pub fn app_document_count(&self) -> usize {
1841        self.app_documents.read().unwrap().len()
1842    }
1843
1844    // ==================== Observer Management ====================
1845
1846    /// Add an observer for mesh events
1847    pub fn add_observer(&self, observer: Arc<dyn HiveObserver>) {
1848        self.observers.add(observer);
1849    }
1850
1851    /// Remove an observer
1852    pub fn remove_observer(&self, observer: &Arc<dyn HiveObserver>) {
1853        self.observers.remove(observer);
1854    }
1855
1856    // ==================== User Actions ====================
1857
1858    /// Send an emergency alert
1859    ///
1860    /// Returns the document bytes to broadcast to all peers.
1861    /// If encryption is enabled, the document is encrypted.
1862    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1863        let data = self.document_sync.send_emergency(timestamp);
1864        self.notify(HiveEvent::MeshStateChanged {
1865            peer_count: self.peer_manager.peer_count(),
1866            connected_count: self.peer_manager.connected_count(),
1867        });
1868        self.encrypt_document(&data)
1869    }
1870
1871    /// Send an ACK response
1872    ///
1873    /// Returns the document bytes to broadcast to all peers.
1874    /// If encryption is enabled, the document is encrypted.
1875    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1876        let data = self.document_sync.send_ack(timestamp);
1877        self.notify(HiveEvent::MeshStateChanged {
1878            peer_count: self.peer_manager.peer_count(),
1879            connected_count: self.peer_manager.connected_count(),
1880        });
1881        self.encrypt_document(&data)
1882    }
1883
1884    /// Broadcast arbitrary bytes over the mesh.
1885    ///
1886    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
1887    /// and returns bytes ready to send to all connected peers.
1888    ///
1889    /// This is useful for sending extension data like CannedMessages from hive-lite.
1890    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
1891        self.encrypt_document(payload)
1892    }
1893
1894    /// Clear the current event (emergency or ack)
1895    pub fn clear_event(&self) {
1896        self.document_sync.clear_event();
1897    }
1898
1899    /// Check if emergency is active
1900    pub fn is_emergency_active(&self) -> bool {
1901        self.document_sync.is_emergency_active()
1902    }
1903
1904    /// Check if ACK is active
1905    pub fn is_ack_active(&self) -> bool {
1906        self.document_sync.is_ack_active()
1907    }
1908
1909    /// Get current event type
1910    pub fn current_event(&self) -> Option<EventType> {
1911        self.document_sync.current_event()
1912    }
1913
1914    // ==================== Emergency Management (Document-Based) ====================
1915
1916    /// Start a new emergency event with ACK tracking
1917    ///
1918    /// Creates an emergency event that tracks ACKs from all known peers.
1919    /// Pass the list of known peer node IDs to track.
1920    /// Returns the document bytes to broadcast.
1921    /// If encryption is enabled, the document is encrypted.
1922    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1923        let data = self.document_sync.start_emergency(timestamp, known_peers);
1924        self.notify(HiveEvent::MeshStateChanged {
1925            peer_count: self.peer_manager.peer_count(),
1926            connected_count: self.peer_manager.connected_count(),
1927        });
1928        self.encrypt_document(&data)
1929    }
1930
1931    /// Start a new emergency using all currently known peers
1932    ///
1933    /// Convenience method that automatically includes all discovered peers.
1934    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1935        let peers: Vec<u32> = self
1936            .peer_manager
1937            .get_peers()
1938            .iter()
1939            .map(|p| p.node_id.as_u32())
1940            .collect();
1941        self.start_emergency(timestamp, &peers)
1942    }
1943
1944    /// Record our ACK for the current emergency
1945    ///
1946    /// Returns the document bytes to broadcast, or None if no emergency is active.
1947    /// If encryption is enabled, the document is encrypted.
1948    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1949        let result = self.document_sync.ack_emergency(timestamp);
1950        if result.is_some() {
1951            self.notify(HiveEvent::MeshStateChanged {
1952                peer_count: self.peer_manager.peer_count(),
1953                connected_count: self.peer_manager.connected_count(),
1954            });
1955        }
1956        result.map(|data| self.encrypt_document(&data))
1957    }
1958
1959    /// Clear the current emergency event
1960    pub fn clear_emergency(&self) {
1961        self.document_sync.clear_emergency();
1962    }
1963
1964    /// Check if there's an active emergency
1965    pub fn has_active_emergency(&self) -> bool {
1966        self.document_sync.has_active_emergency()
1967    }
1968
1969    /// Get emergency status info
1970    ///
1971    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
1972    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1973        self.document_sync.get_emergency_status()
1974    }
1975
1976    /// Check if a specific peer has ACKed the current emergency
1977    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1978        self.document_sync.has_peer_acked(peer_id)
1979    }
1980
1981    /// Check if all peers have ACKed the current emergency
1982    pub fn all_peers_acked(&self) -> bool {
1983        self.document_sync.all_peers_acked()
1984    }
1985
1986    // ==================== Chat Methods ====================
1987
1988    /// Send a chat message
1989    ///
1990    /// Adds the message to the local CRDT and returns the document bytes
1991    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
1992    ///
1993    /// Returns the encrypted document bytes if the message was new,
1994    /// or None if it was a duplicate.
1995    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1996        if self.document_sync.add_chat_message(sender, text, timestamp) {
1997            Some(self.encrypt_document(&self.build_document()))
1998        } else {
1999            None
2000        }
2001    }
2002
2003    /// Send a chat reply
2004    ///
2005    /// Adds the reply to the local CRDT with reply-to information and returns
2006    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
2007    ///
2008    /// Returns the encrypted document bytes if the message was new,
2009    /// or None if it was a duplicate.
2010    pub fn send_chat_reply(
2011        &self,
2012        sender: &str,
2013        text: &str,
2014        reply_to_node: u32,
2015        reply_to_timestamp: u64,
2016        timestamp: u64,
2017    ) -> Option<Vec<u8>> {
2018        if self.document_sync.add_chat_reply(
2019            sender,
2020            text,
2021            reply_to_node,
2022            reply_to_timestamp,
2023            timestamp,
2024        ) {
2025            Some(self.encrypt_document(&self.build_document()))
2026        } else {
2027            None
2028        }
2029    }
2030
2031    /// Get the number of chat messages in the local CRDT
2032    pub fn chat_count(&self) -> usize {
2033        self.document_sync.chat_count()
2034    }
2035
2036    /// Get chat messages newer than a timestamp
2037    ///
2038    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2039    pub fn chat_messages_since(
2040        &self,
2041        since_timestamp: u64,
2042    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2043        self.document_sync.chat_messages_since(since_timestamp)
2044    }
2045
2046    /// Get all chat messages
2047    ///
2048    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2049    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2050        self.document_sync.all_chat_messages()
2051    }
2052
2053    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2054
2055    /// Called when a BLE device is discovered
2056    ///
2057    /// Returns `Some(HivePeer)` if this is a new HIVE peer on our mesh.
2058    pub fn on_ble_discovered(
2059        &self,
2060        identifier: &str,
2061        name: Option<&str>,
2062        rssi: i8,
2063        mesh_id: Option<&str>,
2064        now_ms: u64,
2065    ) -> Option<HivePeer> {
2066        let (node_id, is_new) = self
2067            .peer_manager
2068            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2069
2070        let peer = self.peer_manager.get_peer(node_id)?;
2071
2072        // Update connection graph
2073        {
2074            let mut graph = self.connection_graph.lock().unwrap();
2075            graph.on_discovered(
2076                node_id,
2077                identifier.to_string(),
2078                name.map(|s| s.to_string()),
2079                mesh_id.map(|s| s.to_string()),
2080                rssi,
2081                now_ms,
2082            );
2083        }
2084
2085        if is_new {
2086            self.notify(HiveEvent::PeerDiscovered { peer: peer.clone() });
2087            self.notify_mesh_state_changed();
2088        }
2089
2090        Some(peer)
2091    }
2092
2093    /// Called when a BLE connection is established (outgoing)
2094    ///
2095    /// Returns the NodeId if this identifier is known.
2096    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2097        let node_id = self.peer_manager.on_connected(identifier, now_ms)?;
2098
2099        // Update connection graph
2100        {
2101            let mut graph = self.connection_graph.lock().unwrap();
2102            graph.on_connected(node_id, now_ms);
2103        }
2104
2105        // Register peer for delta sync tracking
2106        self.register_peer_for_delta(&node_id);
2107
2108        self.notify(HiveEvent::PeerConnected { node_id });
2109        self.notify_mesh_state_changed();
2110        Some(node_id)
2111    }
2112
2113    /// Called when a BLE connection is lost
2114    pub fn on_ble_disconnected(
2115        &self,
2116        identifier: &str,
2117        reason: DisconnectReason,
2118    ) -> Option<NodeId> {
2119        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2120
2121        // Update connection graph (convert observer reason to platform reason)
2122        {
2123            let mut graph = self.connection_graph.lock().unwrap();
2124            let platform_reason = match observer_reason {
2125                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2126                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2127                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2128                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2129                DisconnectReason::ConnectionFailed => {
2130                    crate::platform::DisconnectReason::ConnectionFailed
2131                }
2132                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2133            };
2134            let now_ms = std::time::SystemTime::now()
2135                .duration_since(std::time::UNIX_EPOCH)
2136                .map(|d| d.as_millis() as u64)
2137                .unwrap_or(0);
2138            graph.on_disconnected(node_id, platform_reason, now_ms);
2139
2140            // Remove indirect peer paths that went through this peer
2141            // These paths may no longer be valid since the direct connection is lost
2142            graph.remove_via_peer(node_id);
2143        }
2144
2145        // Unregister peer from delta sync tracking
2146        self.unregister_peer_for_delta(&node_id);
2147
2148        self.notify(HiveEvent::PeerDisconnected {
2149            node_id,
2150            reason: observer_reason,
2151        });
2152        self.notify_mesh_state_changed();
2153        Some(node_id)
2154    }
2155
2156    /// Called when a BLE connection is lost, using NodeId directly
2157    ///
2158    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2159    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2160        if self
2161            .peer_manager
2162            .on_disconnected_by_node_id(node_id, reason)
2163        {
2164            // Update connection graph
2165            {
2166                let mut graph = self.connection_graph.lock().unwrap();
2167                let platform_reason = match reason {
2168                    DisconnectReason::LocalRequest => {
2169                        crate::platform::DisconnectReason::LocalRequest
2170                    }
2171                    DisconnectReason::RemoteRequest => {
2172                        crate::platform::DisconnectReason::RemoteRequest
2173                    }
2174                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2175                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2176                    DisconnectReason::ConnectionFailed => {
2177                        crate::platform::DisconnectReason::ConnectionFailed
2178                    }
2179                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2180                };
2181                let now_ms = std::time::SystemTime::now()
2182                    .duration_since(std::time::UNIX_EPOCH)
2183                    .map(|d| d.as_millis() as u64)
2184                    .unwrap_or(0);
2185                graph.on_disconnected(node_id, platform_reason, now_ms);
2186
2187                // Remove indirect peer paths that went through this peer
2188                graph.remove_via_peer(node_id);
2189            }
2190
2191            // Unregister peer from delta sync tracking
2192            self.unregister_peer_for_delta(&node_id);
2193
2194            self.notify(HiveEvent::PeerDisconnected { node_id, reason });
2195            self.notify_mesh_state_changed();
2196        }
2197    }
2198
2199    /// Called when a remote device connects to us (incoming connection)
2200    ///
2201    /// Use this when we're acting as a peripheral and a central connects to us.
2202    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2203        let is_new = self
2204            .peer_manager
2205            .on_incoming_connection(identifier, node_id, now_ms);
2206
2207        // Update connection graph
2208        {
2209            let mut graph = self.connection_graph.lock().unwrap();
2210            if is_new {
2211                graph.on_discovered(
2212                    node_id,
2213                    identifier.to_string(),
2214                    None,
2215                    Some(self.config.mesh_id.clone()),
2216                    -50, // Default good RSSI for incoming connections
2217                    now_ms,
2218                );
2219            }
2220            graph.on_connected(node_id, now_ms);
2221        }
2222
2223        // Register peer for delta sync tracking
2224        self.register_peer_for_delta(&node_id);
2225
2226        if is_new {
2227            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2228                self.notify(HiveEvent::PeerDiscovered { peer });
2229            }
2230        }
2231
2232        self.notify(HiveEvent::PeerConnected { node_id });
2233        self.notify_mesh_state_changed();
2234
2235        is_new
2236    }
2237
2238    /// Called when data is received from a peer
2239    ///
2240    /// Parses the document, merges it, and generates appropriate events.
2241    /// If encryption is enabled, decrypts the document first.
2242    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2243    /// Returns the source NodeId and whether the document contained an event.
2244    pub fn on_ble_data_received(
2245        &self,
2246        identifier: &str,
2247        data: &[u8],
2248        now_ms: u64,
2249    ) -> Option<DataReceivedResult> {
2250        // Get node ID from identifier
2251        let node_id = self.peer_manager.get_node_id(identifier)?;
2252
2253        // Check for special message types first
2254        if data.len() >= 2 {
2255            match data[0] {
2256                KEY_EXCHANGE_MARKER => {
2257                    // Handle key exchange - returns response to send back
2258                    let _response = self.handle_key_exchange(data, now_ms);
2259                    // Return None as this isn't a document sync
2260                    return None;
2261                }
2262                PEER_E2EE_MARKER => {
2263                    // Handle encrypted peer message
2264                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2265                    // Return None as this isn't a document sync
2266                    return None;
2267                }
2268                RELAY_ENVELOPE_MARKER => {
2269                    // Handle relay envelope for multi-hop
2270                    return self
2271                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2272                }
2273                _ => {}
2274            }
2275        }
2276
2277        // Direct document (not relay envelope)
2278        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2279    }
2280
2281    /// Internal: Process document data with identifier as source hint
2282    #[allow(clippy::too_many_arguments)]
2283    fn process_document_data_with_identifier(
2284        &self,
2285        source_node: NodeId,
2286        identifier: &str,
2287        data: &[u8],
2288        now_ms: u64,
2289        relay_data: Option<Vec<u8>>,
2290        origin_node: Option<NodeId>,
2291        hop_count: u8,
2292    ) -> Option<DataReceivedResult> {
2293        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2294        let decrypted = self.decrypt_document(data, Some(identifier))?;
2295
2296        // Check if this is a delta document (wire format v2)
2297        if DeltaDocument::is_delta_document(&decrypted) {
2298            return self.process_delta_document_internal(
2299                source_node,
2300                &decrypted,
2301                now_ms,
2302                relay_data,
2303                origin_node,
2304                hop_count,
2305            );
2306        }
2307
2308        // Merge the document (legacy wire format v1)
2309        let result = self.document_sync.merge_document(&decrypted)?;
2310
2311        // Store peer peripheral if present (for callsign lookup)
2312        if let Some(ref peripheral) = result.peer_peripheral {
2313            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2314                peripherals.insert(result.source_node, peripheral.clone());
2315            }
2316        }
2317
2318        // Record sync
2319        self.peer_manager.record_sync(source_node, now_ms);
2320
2321        // Generate events based on what was received
2322        if result.is_emergency() {
2323            self.notify(HiveEvent::EmergencyReceived {
2324                from_node: result.source_node,
2325            });
2326        } else if result.is_ack() {
2327            self.notify(HiveEvent::AckReceived {
2328                from_node: result.source_node,
2329            });
2330        }
2331
2332        if result.counter_changed {
2333            self.notify(HiveEvent::DocumentSynced {
2334                from_node: result.source_node,
2335                total_count: result.total_count,
2336            });
2337        }
2338
2339        // Emit relay event if we're relaying
2340        if relay_data.is_some() {
2341            let relay_targets = self.get_relay_targets(Some(source_node));
2342            self.notify(HiveEvent::MessageRelayed {
2343                origin_node: origin_node.unwrap_or(result.source_node),
2344                relay_count: relay_targets.len(),
2345                hop_count,
2346            });
2347        }
2348
2349        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2350            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2351
2352        Some(DataReceivedResult {
2353            source_node: result.source_node,
2354            is_emergency: result.is_emergency(),
2355            is_ack: result.is_ack(),
2356            counter_changed: result.counter_changed,
2357            emergency_changed: result.emergency_changed,
2358            total_count: result.total_count,
2359            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2360            relay_data,
2361            origin_node,
2362            hop_count,
2363            callsign,
2364            battery_percent,
2365            heart_rate,
2366            event_type,
2367            latitude,
2368            longitude,
2369            altitude,
2370        })
2371    }
2372
2373    /// Internal: Handle relay envelope with identifier as source hint
2374    fn handle_relay_envelope_with_identifier(
2375        &self,
2376        source_node: NodeId,
2377        identifier: &str,
2378        data: &[u8],
2379        now_ms: u64,
2380    ) -> Option<DataReceivedResult> {
2381        // Process the relay envelope
2382        let envelope = RelayEnvelope::decode(data)?;
2383
2384        // Check deduplication
2385        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2386            let stats = self
2387                .seen_cache
2388                .lock()
2389                .unwrap()
2390                .get_stats(&envelope.message_id);
2391            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2392
2393            self.notify(HiveEvent::DuplicateMessageDropped {
2394                origin_node: envelope.origin_node,
2395                seen_count,
2396            });
2397            return None;
2398        }
2399
2400        // Check TTL and get relay data
2401        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2402            envelope.relay().map(|e| e.encode())
2403        } else {
2404            if !envelope.can_relay() {
2405                self.notify(HiveEvent::MessageTtlExpired {
2406                    origin_node: envelope.origin_node,
2407                    hop_count: envelope.hop_count,
2408                });
2409            }
2410            None
2411        };
2412
2413        // Process the inner payload
2414        self.process_document_data_with_identifier(
2415            source_node,
2416            identifier,
2417            &envelope.payload,
2418            now_ms,
2419            relay_data,
2420            Some(envelope.origin_node),
2421            envelope.hop_count,
2422        )
2423    }
2424
2425    /// Called when data is received but we don't have the identifier mapped
2426    ///
2427    /// Use this when receiving data from a peripheral we discovered.
2428    /// If encryption is enabled, decrypts the document first.
2429    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2430    /// Handles relay envelopes for multi-hop mesh operation.
2431    pub fn on_ble_data_received_from_node(
2432        &self,
2433        node_id: NodeId,
2434        data: &[u8],
2435        now_ms: u64,
2436    ) -> Option<DataReceivedResult> {
2437        // Check for special message types first
2438        if data.len() >= 2 {
2439            match data[0] {
2440                KEY_EXCHANGE_MARKER => {
2441                    let _response = self.handle_key_exchange(data, now_ms);
2442                    return None;
2443                }
2444                PEER_E2EE_MARKER => {
2445                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2446                    return None;
2447                }
2448                RELAY_ENVELOPE_MARKER => {
2449                    // Handle relay envelope for multi-hop
2450                    return self.handle_relay_envelope(node_id, data, now_ms);
2451                }
2452                _ => {}
2453            }
2454        }
2455
2456        // Direct document (not relay envelope)
2457        self.process_document_data(node_id, data, now_ms, None, None, 0)
2458    }
2459
2460    /// Called when encrypted data is received from an unknown peer
2461    ///
2462    /// This handles the case where we receive an encrypted document from a BLE address
2463    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2464    /// The function decrypts first using the mesh key, then extracts the source_node
2465    /// from the decrypted document header and registers the peer.
2466    ///
2467    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2468    /// Returns `None` if decryption fails or the document is invalid.
2469    pub fn on_ble_data_received_anonymous(
2470        &self,
2471        identifier: &str,
2472        data: &[u8],
2473        now_ms: u64,
2474    ) -> Option<DataReceivedResult> {
2475        log::debug!(
2476            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2477            identifier,
2478            data.len(),
2479            data.first().copied().unwrap_or(0)
2480        );
2481
2482        // Try to decrypt (handles both encrypted and unencrypted documents)
2483        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2484            Some(d) => d,
2485            None => {
2486                log::warn!(
2487                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2488                    data.len(),
2489                    identifier
2490                );
2491                return None;
2492            }
2493        };
2494
2495        // Extract source_node from decrypted document header
2496        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2497        if decrypted.len() < 8 {
2498            log::warn!("Decrypted document too short to extract source_node");
2499            return None;
2500        }
2501
2502        let source_node_u32 =
2503            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2504        let source_node = NodeId::new(source_node_u32);
2505
2506        log::info!(
2507            "Anonymous document from {}: source_node={:08X}, len={}",
2508            identifier,
2509            source_node_u32,
2510            decrypted.len()
2511        );
2512
2513        // Register the peer with this identifier so future lookups work
2514        // This handles BLE address rotation
2515        self.peer_manager
2516            .register_identifier(identifier, source_node);
2517
2518        // Check if this is a delta document
2519        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2520        log::info!(
2521            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2522            is_delta,
2523            decrypted.first().copied().unwrap_or(0),
2524            decrypted.len()
2525        );
2526
2527        if is_delta {
2528            return self.process_delta_document_internal(
2529                source_node,
2530                &decrypted,
2531                now_ms,
2532                None,
2533                None,
2534                0,
2535            );
2536        }
2537
2538        // Handle app-layer message (0xAF marker from hive-lite CannedMessages)
2539        // Store in document registry for CRDT sync instead of bypassing to relay_data.
2540        const APP_LAYER_MARKER: u8 = 0xAF;
2541        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2542            #[cfg(feature = "hive-lite-sync")]
2543            {
2544                use crate::hive_lite_sync::CannedMessageDocument;
2545                use crate::registry::DocumentType;
2546
2547                log::info!(
2548                    "App-layer message (0xAF) from {:08X}, {} bytes - storing in registry",
2549                    source_node.as_u32(),
2550                    decrypted.len()
2551                );
2552
2553                // Decode and store the document
2554                // CannedMessageDocument::decode expects payload without 0xAF (it prepends it)
2555                let payload = &decrypted[1..];
2556                if let Some(doc) = CannedMessageDocument::decode(payload) {
2557                    let (doc_source, doc_ts) = doc.identity();
2558                    let changed = self.store_app_document(doc);
2559                    log::info!(
2560                        "Stored CannedMessage: source={:08X} ts={} changed={}",
2561                        doc_source,
2562                        doc_ts,
2563                        changed
2564                    );
2565
2566                    // Emit observer event
2567                    self.observers.notify(HiveEvent::app_document_received(
2568                        CannedMessageDocument::TYPE_ID,
2569                        NodeId::new(doc_source),
2570                        doc_ts,
2571                        changed,
2572                    ));
2573
2574                    // Return minimal result - document is now in registry
2575                    return Some(DataReceivedResult {
2576                        source_node,
2577                        is_emergency: false,
2578                        is_ack: false,
2579                        counter_changed: false,
2580                        emergency_changed: false,
2581                        total_count: 0,
2582                        event_timestamp: doc_ts,
2583                        relay_data: None, // No longer needed - flows via delta sync
2584                        origin_node: None,
2585                        hop_count: 0,
2586                        callsign: None,
2587                        battery_percent: None,
2588                        heart_rate: None,
2589                        event_type: None,
2590                        latitude: None,
2591                        longitude: None,
2592                        altitude: None,
2593                    });
2594                } else {
2595                    log::warn!("Failed to decode 0xAF message as CannedMessageDocument");
2596                }
2597            }
2598
2599            #[cfg(not(feature = "hive-lite-sync"))]
2600            {
2601                log::debug!("Ignoring 0xAF message (hive-lite-sync feature not enabled)");
2602            }
2603
2604            return None;
2605        }
2606
2607        // Merge the document (legacy wire format v1)
2608        log::info!(
2609            "Processing legacy document from {:08X}",
2610            source_node.as_u32()
2611        );
2612        let result = self.document_sync.merge_document(&decrypted)?;
2613
2614        // Log what we got from the merge
2615        log::info!(
2616            "Merge result: peer_peripheral={}, counter_changed={}",
2617            result.peer_peripheral.is_some(),
2618            result.counter_changed
2619        );
2620        if let Some(ref p) = result.peer_peripheral {
2621            log::info!("Peripheral callsign: '{}'", p.callsign_str());
2622        }
2623
2624        // Record sync
2625        self.peer_manager.record_sync(source_node, now_ms);
2626
2627        // Generate events
2628        if result.is_emergency() {
2629            self.notify(HiveEvent::EmergencyReceived {
2630                from_node: result.source_node,
2631            });
2632        } else if result.is_ack() {
2633            self.notify(HiveEvent::AckReceived {
2634                from_node: result.source_node,
2635            });
2636        }
2637
2638        if result.counter_changed {
2639            self.notify(HiveEvent::DocumentSynced {
2640                from_node: result.source_node,
2641                total_count: result.total_count,
2642            });
2643        }
2644
2645        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2646            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2647
2648        Some(DataReceivedResult {
2649            source_node: result.source_node,
2650            is_emergency: result.is_emergency(),
2651            is_ack: result.is_ack(),
2652            counter_changed: result.counter_changed,
2653            emergency_changed: result.emergency_changed,
2654            total_count: result.total_count,
2655            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2656            relay_data: None,
2657            origin_node: None,
2658            hop_count: 0,
2659            callsign,
2660            battery_percent,
2661            heart_rate,
2662            event_type,
2663            latitude,
2664            longitude,
2665            altitude,
2666        })
2667    }
2668
2669    /// Internal: Process document data (shared by direct and relay paths)
2670    fn process_document_data(
2671        &self,
2672        source_node: NodeId,
2673        data: &[u8],
2674        now_ms: u64,
2675        relay_data: Option<Vec<u8>>,
2676        origin_node: Option<NodeId>,
2677        hop_count: u8,
2678    ) -> Option<DataReceivedResult> {
2679        // Decrypt if encrypted (mesh-wide encryption)
2680        let source_hint = format!("node:{:08X}", source_node.as_u32());
2681        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2682
2683        // Check if this is a delta document (wire format v2)
2684        if DeltaDocument::is_delta_document(&decrypted) {
2685            return self.process_delta_document_internal(
2686                source_node,
2687                &decrypted,
2688                now_ms,
2689                relay_data,
2690                origin_node,
2691                hop_count,
2692            );
2693        }
2694
2695        // Merge the document (legacy wire format v1)
2696        let result = self.document_sync.merge_document(&decrypted)?;
2697
2698        // Store peer peripheral if present (for callsign lookup)
2699        if let Some(ref peripheral) = result.peer_peripheral {
2700            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2701                peripherals.insert(result.source_node, peripheral.clone());
2702            }
2703        }
2704
2705        // Record sync
2706        self.peer_manager.record_sync(source_node, now_ms);
2707
2708        // Generate events based on what was received
2709        if result.is_emergency() {
2710            self.notify(HiveEvent::EmergencyReceived {
2711                from_node: result.source_node,
2712            });
2713        } else if result.is_ack() {
2714            self.notify(HiveEvent::AckReceived {
2715                from_node: result.source_node,
2716            });
2717        }
2718
2719        if result.counter_changed {
2720            self.notify(HiveEvent::DocumentSynced {
2721                from_node: result.source_node,
2722                total_count: result.total_count,
2723            });
2724        }
2725
2726        // Emit relay event if we're relaying
2727        if relay_data.is_some() {
2728            let relay_targets = self.get_relay_targets(Some(source_node));
2729            self.notify(HiveEvent::MessageRelayed {
2730                origin_node: origin_node.unwrap_or(result.source_node),
2731                relay_count: relay_targets.len(),
2732                hop_count,
2733            });
2734        }
2735
2736        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2737            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2738
2739        Some(DataReceivedResult {
2740            source_node: result.source_node,
2741            is_emergency: result.is_emergency(),
2742            is_ack: result.is_ack(),
2743            counter_changed: result.counter_changed,
2744            emergency_changed: result.emergency_changed,
2745            total_count: result.total_count,
2746            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2747            relay_data,
2748            origin_node,
2749            hop_count,
2750            callsign,
2751            battery_percent,
2752            heart_rate,
2753            event_type,
2754            latitude,
2755            longitude,
2756            altitude,
2757        })
2758    }
2759
2760    /// Internal: Handle relay envelope
2761    fn handle_relay_envelope(
2762        &self,
2763        source_node: NodeId,
2764        data: &[u8],
2765        now_ms: u64,
2766    ) -> Option<DataReceivedResult> {
2767        // Process the relay envelope
2768        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2769
2770        // Get relay data if we should relay
2771        let relay_data = if decision.should_relay {
2772            decision.relay_data()
2773        } else {
2774            None
2775        };
2776
2777        // Process the inner payload
2778        self.process_document_data(
2779            source_node,
2780            &decision.payload,
2781            now_ms,
2782            relay_data,
2783            Some(decision.origin_node),
2784            decision.hop_count,
2785        )
2786    }
2787
2788    /// Called when data is received without a known identifier
2789    ///
2790    /// This is the simplest data receive method - it extracts the source node_id
2791    /// from the document itself. Use this when you don't track identifiers
2792    /// (e.g., ESP32 NimBLE).
2793    /// If encryption is enabled, decrypts the document first.
2794    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2795    /// Handles relay envelopes for multi-hop mesh operation.
2796    pub fn on_ble_data(
2797        &self,
2798        identifier: &str,
2799        data: &[u8],
2800        now_ms: u64,
2801    ) -> Option<DataReceivedResult> {
2802        // Check for special message types first
2803        if data.len() >= 2 {
2804            match data[0] {
2805                KEY_EXCHANGE_MARKER => {
2806                    let _response = self.handle_key_exchange(data, now_ms);
2807                    return None;
2808                }
2809                PEER_E2EE_MARKER => {
2810                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2811                    return None;
2812                }
2813                RELAY_ENVELOPE_MARKER => {
2814                    // Handle relay envelope - extract origin from envelope
2815                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
2816                }
2817                _ => {}
2818            }
2819        }
2820
2821        // Direct document - process normally
2822        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
2823    }
2824
2825    /// Internal: Process incoming document (handles peer registration)
2826    fn process_incoming_document(
2827        &self,
2828        identifier: &str,
2829        data: &[u8],
2830        now_ms: u64,
2831        relay_data: Option<Vec<u8>>,
2832        origin_node: Option<NodeId>,
2833        hop_count: u8,
2834    ) -> Option<DataReceivedResult> {
2835        // Decrypt if encrypted (mesh-wide encryption)
2836        let decrypted = self.decrypt_document(data, Some(identifier))?;
2837
2838        // Merge the document (extracts node_id internally)
2839        let result = self.document_sync.merge_document(&decrypted)?;
2840
2841        // Record sync using the source_node from the merged document
2842        self.peer_manager.record_sync(result.source_node, now_ms);
2843
2844        // Only register the identifier mapping for direct messages (not relayed)
2845        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
2846        // not the origin node (who created the document). The relay source is already registered
2847        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
2848        if origin_node.is_none() {
2849            // Direct message - register the peer with this identifier
2850            let is_new =
2851                self.peer_manager
2852                    .on_incoming_connection(identifier, result.source_node, now_ms);
2853
2854            // Update connection graph to track connection state
2855            {
2856                let mut graph = self.connection_graph.lock().unwrap();
2857                if is_new {
2858                    graph.on_discovered(
2859                        result.source_node,
2860                        identifier.to_string(),
2861                        None,
2862                        Some(self.config.mesh_id.clone()),
2863                        -50, // Default RSSI for data-based discovery
2864                        now_ms,
2865                    );
2866                }
2867                graph.on_connected(result.source_node, now_ms);
2868            }
2869        }
2870
2871        // Generate events based on what was received
2872        if result.is_emergency() {
2873            self.notify(HiveEvent::EmergencyReceived {
2874                from_node: result.source_node,
2875            });
2876        } else if result.is_ack() {
2877            self.notify(HiveEvent::AckReceived {
2878                from_node: result.source_node,
2879            });
2880        }
2881
2882        if result.counter_changed {
2883            self.notify(HiveEvent::DocumentSynced {
2884                from_node: result.source_node,
2885                total_count: result.total_count,
2886            });
2887        }
2888
2889        // Emit relay event if we're relaying
2890        if relay_data.is_some() {
2891            let relay_targets = self.get_relay_targets(Some(result.source_node));
2892            self.notify(HiveEvent::MessageRelayed {
2893                origin_node: origin_node.unwrap_or(result.source_node),
2894                relay_count: relay_targets.len(),
2895                hop_count,
2896            });
2897        }
2898
2899        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2900            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2901
2902        Some(DataReceivedResult {
2903            source_node: result.source_node,
2904            is_emergency: result.is_emergency(),
2905            is_ack: result.is_ack(),
2906            counter_changed: result.counter_changed,
2907            emergency_changed: result.emergency_changed,
2908            total_count: result.total_count,
2909            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2910            relay_data,
2911            origin_node,
2912            hop_count,
2913            callsign,
2914            battery_percent,
2915            heart_rate,
2916            event_type,
2917            latitude,
2918            longitude,
2919            altitude,
2920        })
2921    }
2922
2923    /// Internal: Handle relay envelope with incoming connection registration
2924    fn handle_relay_envelope_with_incoming(
2925        &self,
2926        identifier: &str,
2927        data: &[u8],
2928        now_ms: u64,
2929    ) -> Option<DataReceivedResult> {
2930        // Parse envelope to get origin
2931        let envelope = RelayEnvelope::decode(data)?;
2932
2933        // Try to look up the source peer from identifier to register indirect path
2934        // If we know who sent this relay, we can track indirect peers via them
2935        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
2936            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
2937                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
2938                    source_peer,
2939                    envelope.origin_node,
2940                    envelope.hop_count,
2941                    now_ms,
2942                );
2943
2944                if is_new {
2945                    log::debug!(
2946                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
2947                        envelope.origin_node.as_u32(),
2948                        source_peer.as_u32(),
2949                        envelope.hop_count
2950                    );
2951                }
2952            }
2953        }
2954
2955        // Check deduplication
2956        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2957            // Duplicate - get stats for event
2958            let stats = self
2959                .seen_cache
2960                .lock()
2961                .unwrap()
2962                .get_stats(&envelope.message_id);
2963            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2964
2965            self.notify(HiveEvent::DuplicateMessageDropped {
2966                origin_node: envelope.origin_node,
2967                seen_count,
2968            });
2969            return None;
2970        }
2971
2972        // Check TTL
2973        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2974            let relay_env = envelope.relay();
2975            (true, relay_env.map(|e| e.encode()))
2976        } else {
2977            if !envelope.can_relay() {
2978                self.notify(HiveEvent::MessageTtlExpired {
2979                    origin_node: envelope.origin_node,
2980                    hop_count: envelope.hop_count,
2981                });
2982            }
2983            (false, None)
2984        };
2985
2986        // Process the inner payload
2987        self.process_incoming_document(
2988            identifier,
2989            &envelope.payload,
2990            now_ms,
2991            if should_relay { relay_data } else { None },
2992            Some(envelope.origin_node),
2993            envelope.hop_count,
2994        )
2995    }
2996
2997    // ==================== Periodic Maintenance ====================
2998
2999    /// Periodic tick - call this regularly (e.g., every second)
3000    ///
3001    /// Performs:
3002    /// - Stale peer cleanup
3003    /// - Periodic sync broadcast (if interval elapsed)
3004    ///
3005    /// Returns `Some(data)` if a sync broadcast is needed.
3006    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3007        use std::sync::atomic::Ordering;
3008
3009        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
3010        let now_ms_32 = now_ms as u32;
3011
3012        // Cleanup stale peers
3013        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3014        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3015        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3016            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3017            let removed = self.peer_manager.cleanup_stale(now_ms);
3018            for node_id in &removed {
3019                self.notify(HiveEvent::PeerLost { node_id: *node_id });
3020            }
3021            if !removed.is_empty() {
3022                self.notify_mesh_state_changed();
3023            }
3024
3025            // Run connection graph maintenance (transition Disconnected -> Lost)
3026            {
3027                let mut graph = self.connection_graph.lock().unwrap();
3028                let newly_lost = graph.tick(now_ms);
3029                // Also cleanup peers lost for more than peer_timeout
3030                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3031                drop(graph);
3032
3033                // Emit PeerLost events for newly lost peers from graph
3034                // (these may differ from peer_manager removals)
3035                for node_id in newly_lost {
3036                    // Only notify if not already notified by peer_manager
3037                    if !removed.contains(&node_id) {
3038                        self.notify(HiveEvent::PeerLost { node_id });
3039                    }
3040                }
3041            }
3042        }
3043
3044        // Check if sync broadcast is needed
3045        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3046        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3047        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3048            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3049            // Only broadcast if we have connected peers
3050            if self.peer_manager.connected_count() > 0 {
3051                let doc = self.document_sync.build_document();
3052                return Some(self.encrypt_document(&doc));
3053            }
3054        }
3055
3056        None
3057    }
3058
3059    /// Periodic tick returning per-peer delta documents
3060    ///
3061    /// Unlike `tick()` which broadcasts a single document to all peers,
3062    /// this returns targeted deltas that only include changes each peer
3063    /// hasn't seen. Use this for platforms that support per-peer transmission.
3064    ///
3065    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3066    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3067    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3068        use std::sync::atomic::Ordering;
3069        let now_ms_32 = now_ms as u32;
3070
3071        // Cleanup stale peers (same as tick())
3072        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3073        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3074        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3075            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3076            let removed = self.peer_manager.cleanup_stale(now_ms);
3077            for node_id in &removed {
3078                self.notify(HiveEvent::PeerLost { node_id: *node_id });
3079            }
3080            if !removed.is_empty() {
3081                self.notify_mesh_state_changed();
3082            }
3083
3084            // Run connection graph maintenance
3085            {
3086                let mut graph = self.connection_graph.lock().unwrap();
3087                let newly_lost = graph.tick(now_ms);
3088                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3089                drop(graph);
3090
3091                for node_id in newly_lost {
3092                    if !removed.contains(&node_id) {
3093                        self.notify(HiveEvent::PeerLost { node_id });
3094                    }
3095                }
3096            }
3097        }
3098
3099        // Check if sync is needed
3100        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3101        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3102        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3103            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3104
3105            // Build document for each connected peer
3106            let doc = self.document_sync.build_document();
3107            let encrypted = self.encrypt_document(&doc);
3108            let mut results = Vec::new();
3109            for peer in self.get_connected_peers() {
3110                results.push((peer.node_id, encrypted.clone()));
3111            }
3112            return results;
3113        }
3114
3115        Vec::new()
3116    }
3117
3118    // ==================== State Queries ====================
3119
3120    /// Get all known peers
3121    pub fn get_peers(&self) -> Vec<HivePeer> {
3122        self.peer_manager.get_peers()
3123    }
3124
3125    /// Get connected peers only
3126    pub fn get_connected_peers(&self) -> Vec<HivePeer> {
3127        self.peer_manager.get_connected_peers()
3128    }
3129
3130    /// Get a specific peer by NodeId
3131    pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
3132        self.peer_manager.get_peer(node_id)
3133    }
3134
3135    /// Get peer count
3136    pub fn peer_count(&self) -> usize {
3137        self.peer_manager.peer_count()
3138    }
3139
3140    /// Get connected peer count
3141    pub fn connected_count(&self) -> usize {
3142        self.peer_manager.connected_count()
3143    }
3144
3145    /// Check if a device mesh ID matches our mesh
3146    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3147        self.peer_manager.matches_mesh(device_mesh_id)
3148    }
3149
3150    // ==================== Connection State Graph ====================
3151
3152    /// Get the connection state graph with all peer states
3153    ///
3154    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3155    /// Apps can use this to display appropriate UI indicators:
3156    /// - Green for Connected peers
3157    /// - Yellow for Degraded or RecentlyDisconnected peers
3158    /// - Gray for Lost peers
3159    ///
3160    /// # Example
3161    /// ```ignore
3162    /// let states = mesh.get_connection_graph();
3163    /// for peer in states {
3164    ///     match peer.state {
3165    ///         ConnectionState::Connected => show_green_indicator(&peer),
3166    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3167    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3168    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3169    ///         _ => {}
3170    ///     }
3171    /// }
3172    /// ```
3173    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3174        self.connection_graph.lock().unwrap().get_all_owned()
3175    }
3176
3177    /// Get a specific peer's connection state
3178    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3179        self.connection_graph
3180            .lock()
3181            .unwrap()
3182            .get_peer(node_id)
3183            .cloned()
3184    }
3185
3186    /// Get all currently connected peers from the connection graph
3187    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3188        self.connection_graph
3189            .lock()
3190            .unwrap()
3191            .get_connected()
3192            .into_iter()
3193            .cloned()
3194            .collect()
3195    }
3196
3197    /// Get peers in degraded state (connected but poor signal quality)
3198    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3199        self.connection_graph
3200            .lock()
3201            .unwrap()
3202            .get_degraded()
3203            .into_iter()
3204            .cloned()
3205            .collect()
3206    }
3207
3208    /// Get peers that disconnected within the specified time window
3209    ///
3210    /// Useful for showing "stale" peers that were recently connected.
3211    pub fn get_recently_disconnected(
3212        &self,
3213        within_ms: u64,
3214        now_ms: u64,
3215    ) -> Vec<PeerConnectionState> {
3216        self.connection_graph
3217            .lock()
3218            .unwrap()
3219            .get_recently_disconnected(within_ms, now_ms)
3220            .into_iter()
3221            .cloned()
3222            .collect()
3223    }
3224
3225    /// Get peers in Lost state (disconnected and no longer advertising)
3226    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3227        self.connection_graph
3228            .lock()
3229            .unwrap()
3230            .get_lost()
3231            .into_iter()
3232            .cloned()
3233            .collect()
3234    }
3235
3236    /// Get summary counts of peers in each connection state
3237    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3238        self.connection_graph.lock().unwrap().state_counts()
3239    }
3240
3241    // ==================== Indirect Peer Methods ====================
3242
3243    /// Get all indirect (multi-hop) peers
3244    ///
3245    /// Returns peers discovered via relay messages that are not directly
3246    /// connected via BLE. Each indirect peer includes the minimum hop count
3247    /// and the direct peers through which they can be reached.
3248    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3249        self.connection_graph
3250            .lock()
3251            .unwrap()
3252            .get_indirect_peers_owned()
3253    }
3254
3255    /// Get the degree (hop count) for a specific peer
3256    ///
3257    /// Returns:
3258    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3259    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3260    /// - `None` if peer is not known
3261    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3262        self.connection_graph.lock().unwrap().peer_degree(node_id)
3263    }
3264
3265    /// Get full state counts including indirect peers
3266    ///
3267    /// Returns counts of direct peers by connection state plus counts
3268    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3269    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3270        self.connection_graph.lock().unwrap().full_state_counts()
3271    }
3272
3273    /// Get all paths to reach an indirect peer
3274    ///
3275    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3276    /// known routes to the specified peer.
3277    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3278        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3279    }
3280
3281    /// Check if a node is known (either direct or indirect)
3282    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3283        self.connection_graph.lock().unwrap().is_known(node_id)
3284    }
3285
3286    /// Get number of indirect peers
3287    pub fn indirect_peer_count(&self) -> usize {
3288        self.connection_graph.lock().unwrap().indirect_peer_count()
3289    }
3290
3291    /// Cleanup stale indirect peers
3292    ///
3293    /// Removes indirect peers that haven't been seen within the timeout.
3294    /// Returns the list of removed peer IDs.
3295    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3296        self.connection_graph
3297            .lock()
3298            .unwrap()
3299            .cleanup_indirect(now_ms)
3300    }
3301
3302    /// Get total counter value
3303    pub fn total_count(&self) -> u64 {
3304        self.document_sync.total_count()
3305    }
3306
3307    /// Get document version
3308    pub fn document_version(&self) -> u32 {
3309        self.document_sync.version()
3310    }
3311
3312    /// Get document version (alias)
3313    pub fn version(&self) -> u32 {
3314        self.document_sync.version()
3315    }
3316
3317    /// Update health status (battery percentage)
3318    pub fn update_health(&self, battery_percent: u8) {
3319        self.document_sync.update_health(battery_percent);
3320    }
3321
3322    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
3323    pub fn update_activity(&self, activity: u8) {
3324        self.document_sync.update_activity(activity);
3325    }
3326
3327    /// Update full health status (battery and activity)
3328    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3329        self.document_sync
3330            .update_health_full(battery_percent, activity);
3331    }
3332
3333    /// Update heart rate
3334    pub fn update_heart_rate(&self, heart_rate: u8) {
3335        self.document_sync.update_heart_rate(heart_rate);
3336    }
3337
3338    /// Update location
3339    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3340        self.document_sync
3341            .update_location(latitude, longitude, altitude);
3342    }
3343
3344    /// Clear location
3345    pub fn clear_location(&self) {
3346        self.document_sync.clear_location();
3347    }
3348
3349    /// Update callsign
3350    pub fn update_callsign(&self, callsign: &str) {
3351        self.document_sync.update_callsign(callsign);
3352    }
3353
3354    /// Set peripheral event type
3355    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3356        self.document_sync
3357            .set_peripheral_event(event_type, timestamp);
3358    }
3359
3360    /// Clear peripheral event
3361    pub fn clear_peripheral_event(&self) {
3362        self.document_sync.clear_peripheral_event();
3363    }
3364
3365    /// Update full peripheral state in one call
3366    ///
3367    /// This is the most efficient way to update all peripheral data before
3368    /// calling `build_document()` for encrypted transmission.
3369    #[allow(clippy::too_many_arguments)]
3370    pub fn update_peripheral_state(
3371        &self,
3372        callsign: &str,
3373        battery_percent: u8,
3374        heart_rate: Option<u8>,
3375        latitude: Option<f32>,
3376        longitude: Option<f32>,
3377        altitude: Option<f32>,
3378        event_type: Option<EventType>,
3379        timestamp: u64,
3380    ) {
3381        self.document_sync.update_peripheral_state(
3382            callsign,
3383            battery_percent,
3384            heart_rate,
3385            latitude,
3386            longitude,
3387            altitude,
3388            event_type,
3389            timestamp,
3390        );
3391    }
3392
3393    /// Build current document for transmission
3394    ///
3395    /// If encryption is enabled, the document is encrypted.
3396    pub fn build_document(&self) -> Vec<u8> {
3397        let doc = self.document_sync.build_document();
3398        self.encrypt_document(&doc)
3399    }
3400
3401    /// Get peers that should be synced with
3402    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
3403        self.peer_manager.peers_needing_sync(now_ms)
3404    }
3405
3406    // ==================== Internal Helpers ====================
3407
3408    fn notify(&self, event: HiveEvent) {
3409        self.observers.notify(event);
3410    }
3411
3412    fn notify_mesh_state_changed(&self) {
3413        self.notify(HiveEvent::MeshStateChanged {
3414            peer_count: self.peer_manager.peer_count(),
3415            connected_count: self.peer_manager.connected_count(),
3416        });
3417    }
3418
3419    // ==================== CannedMessage Integration ====================
3420    //
3421    // These methods provide deduplication support for hive-lite CannedMessages.
3422    // They use document identity (source_node + timestamp) instead of content hash,
3423    // because CRDT merge can change byte ordering.
3424
3425    /// Check if a CannedMessage should be processed.
3426    ///
3427    /// Uses document identity (source_node + timestamp) for deduplication.
3428    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
3429    ///
3430    /// # Arguments
3431    /// * `source_node` - The source node ID from the CannedMessage
3432    /// * `timestamp` - The timestamp from the CannedMessage
3433    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
3434    ///
3435    /// # Returns
3436    /// `true` if this message is new and should be processed,
3437    /// `false` if it was seen recently and should be skipped.
3438    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3439        // Create a unique key from source_node and timestamp
3440        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3441        let mut id_bytes = [0u8; 16];
3442        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3443        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3444        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3445
3446        // Check the seen cache
3447        let seen = self.seen_cache.lock().unwrap();
3448        !seen.has_seen(&message_id)
3449    }
3450
3451    /// Mark a CannedMessage as seen (for deduplication).
3452    ///
3453    /// Call this after processing a CannedMessage to prevent reprocessing
3454    /// the same message from other relay paths.
3455    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3456        let now = std::time::SystemTime::now()
3457            .duration_since(std::time::UNIX_EPOCH)
3458            .map(|d| d.as_millis() as u64)
3459            .unwrap_or(0);
3460
3461        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3462        let mut id_bytes = [0u8; 16];
3463        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3464        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3465        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3466        let origin = NodeId::new(source_node);
3467
3468        let mut seen = self.seen_cache.lock().unwrap();
3469        seen.mark_seen(message_id, origin, now);
3470    }
3471
3472    /// Get list of connected peer identifiers for relay.
3473    ///
3474    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
3475    /// to other peers after deduplication check.
3476    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3477        self.peer_manager.get_connected_identifiers()
3478    }
3479}
3480
3481/// Result from receiving BLE data
3482#[derive(Debug, Clone)]
3483pub struct DataReceivedResult {
3484    /// Node that sent this data
3485    pub source_node: NodeId,
3486
3487    /// Whether this contained an emergency event
3488    pub is_emergency: bool,
3489
3490    /// Whether this contained an ACK event
3491    pub is_ack: bool,
3492
3493    /// Whether the counter changed (new data)
3494    pub counter_changed: bool,
3495
3496    /// Whether emergency state changed (new emergency or ACK updates)
3497    pub emergency_changed: bool,
3498
3499    /// Updated total count
3500    pub total_count: u64,
3501
3502    /// Event timestamp (if event present) - use to detect duplicate events
3503    pub event_timestamp: u64,
3504
3505    /// Data to relay to other peers (if multi-hop relay is enabled)
3506    ///
3507    /// When present, the platform adapter should send this data to peers
3508    /// returned by `get_relay_targets(Some(source_node))`.
3509    pub relay_data: Option<Vec<u8>>,
3510
3511    /// Origin node for relay (may differ from source_node for relayed messages)
3512    pub origin_node: Option<NodeId>,
3513
3514    /// Current hop count (for relayed messages)
3515    pub hop_count: u8,
3516
3517    // ========== Peripheral data from sender ==========
3518    /// Sender's callsign (up to 12 chars)
3519    pub callsign: Option<String>,
3520
3521    /// Sender's battery percentage (0-100)
3522    pub battery_percent: Option<u8>,
3523
3524    /// Sender's heart rate (BPM)
3525    pub heart_rate: Option<u8>,
3526
3527    /// Sender's event type (from PeripheralEvent)
3528    pub event_type: Option<u8>,
3529
3530    /// Sender's latitude
3531    pub latitude: Option<f32>,
3532
3533    /// Sender's longitude
3534    pub longitude: Option<f32>,
3535
3536    /// Sender's altitude (meters)
3537    pub altitude: Option<f32>,
3538}
3539
3540impl DataReceivedResult {
3541    /// Extract peripheral fields from an Option<Peripheral>
3542    #[allow(clippy::type_complexity)]
3543    fn peripheral_fields(
3544        peripheral: &Option<crate::sync::crdt::Peripheral>,
3545    ) -> (
3546        Option<String>,
3547        Option<u8>,
3548        Option<u8>,
3549        Option<u8>,
3550        Option<f32>,
3551        Option<f32>,
3552        Option<f32>,
3553    ) {
3554        match peripheral {
3555            Some(p) => {
3556                let callsign = {
3557                    let s = p.callsign_str();
3558                    if s.is_empty() {
3559                        None
3560                    } else {
3561                        Some(s.to_string())
3562                    }
3563                };
3564                let battery = if p.health.battery_percent > 0 {
3565                    Some(p.health.battery_percent)
3566                } else {
3567                    None
3568                };
3569                let heart_rate = p.health.heart_rate;
3570                let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3571                let (lat, lon, alt) = match &p.location {
3572                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3573                    None => (None, None, None),
3574                };
3575                (callsign, battery, heart_rate, event_type, lat, lon, alt)
3576            }
3577            None => (None, None, None, None, None, None, None),
3578        }
3579    }
3580}
3581
3582/// Decision from processing a relay envelope
3583#[derive(Debug, Clone)]
3584pub struct RelayDecision {
3585    /// The payload (document) to process locally
3586    pub payload: Vec<u8>,
3587
3588    /// Original sender of the message
3589    pub origin_node: NodeId,
3590
3591    /// Current hop count
3592    pub hop_count: u8,
3593
3594    /// Whether this message should be relayed to other peers
3595    pub should_relay: bool,
3596
3597    /// The relay envelope to forward (with incremented hop count)
3598    ///
3599    /// Only present if `should_relay` is true and TTL not expired.
3600    pub relay_envelope: Option<RelayEnvelope>,
3601}
3602
3603impl RelayDecision {
3604    /// Get the relay data to send to peers
3605    ///
3606    /// Returns None if relay is not needed.
3607    pub fn relay_data(&self) -> Option<Vec<u8>> {
3608        self.relay_envelope.as_ref().map(|e| e.encode())
3609    }
3610}
3611
3612#[cfg(all(test, feature = "std"))]
3613mod tests {
3614    use super::*;
3615    use crate::observer::CollectingObserver;
3616
3617    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
3618    const TEST_TIMESTAMP: u64 = 1705276800000;
3619
3620    fn create_mesh(node_id: u32, callsign: &str) -> HiveMesh {
3621        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3622        HiveMesh::new(config)
3623    }
3624
3625    #[test]
3626    fn test_mesh_creation() {
3627        let mesh = create_mesh(0x12345678, "ALPHA-1");
3628
3629        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3630        assert_eq!(mesh.callsign(), "ALPHA-1");
3631        assert_eq!(mesh.mesh_id(), "TEST");
3632        assert_eq!(mesh.device_name(), "HIVE_TEST-12345678");
3633    }
3634
3635    #[test]
3636    fn test_peer_discovery() {
3637        let mesh = create_mesh(0x11111111, "ALPHA-1");
3638        let observer = Arc::new(CollectingObserver::new());
3639        mesh.add_observer(observer.clone());
3640
3641        // Discover a peer
3642        let peer = mesh.on_ble_discovered(
3643            "device-uuid",
3644            Some("HIVE_TEST-22222222"),
3645            -65,
3646            Some("TEST"),
3647            1000,
3648        );
3649
3650        assert!(peer.is_some());
3651        let peer = peer.unwrap();
3652        assert_eq!(peer.node_id.as_u32(), 0x22222222);
3653
3654        // Check events were generated
3655        let events = observer.events();
3656        assert!(events
3657            .iter()
3658            .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3659        assert!(events
3660            .iter()
3661            .any(|e| matches!(e, HiveEvent::MeshStateChanged { .. })));
3662    }
3663
3664    #[test]
3665    fn test_connection_lifecycle() {
3666        let mesh = create_mesh(0x11111111, "ALPHA-1");
3667        let observer = Arc::new(CollectingObserver::new());
3668        mesh.add_observer(observer.clone());
3669
3670        // Discover and connect
3671        mesh.on_ble_discovered(
3672            "device-uuid",
3673            Some("HIVE_TEST-22222222"),
3674            -65,
3675            Some("TEST"),
3676            1000,
3677        );
3678
3679        let node_id = mesh.on_ble_connected("device-uuid", 2000);
3680        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3681        assert_eq!(mesh.connected_count(), 1);
3682
3683        // Disconnect
3684        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3685        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3686        assert_eq!(mesh.connected_count(), 0);
3687
3688        // Check events
3689        let events = observer.events();
3690        assert!(events
3691            .iter()
3692            .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3693        assert!(events
3694            .iter()
3695            .any(|e| matches!(e, HiveEvent::PeerDisconnected { .. })));
3696    }
3697
3698    #[test]
3699    fn test_emergency_flow() {
3700        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3701        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3702
3703        let observer2 = Arc::new(CollectingObserver::new());
3704        mesh2.add_observer(observer2.clone());
3705
3706        // mesh1 sends emergency
3707        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3708        assert!(mesh1.is_emergency_active());
3709
3710        // mesh2 receives it
3711        let result =
3712            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3713
3714        assert!(result.is_some());
3715        let result = result.unwrap();
3716        assert!(result.is_emergency);
3717        assert_eq!(result.source_node.as_u32(), 0x11111111);
3718
3719        // Check events on mesh2
3720        let events = observer2.events();
3721        assert!(events
3722            .iter()
3723            .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3724    }
3725
3726    #[test]
3727    fn test_ack_flow() {
3728        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3729        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3730
3731        let observer2 = Arc::new(CollectingObserver::new());
3732        mesh2.add_observer(observer2.clone());
3733
3734        // mesh1 sends ACK
3735        let doc = mesh1.send_ack(TEST_TIMESTAMP);
3736        assert!(mesh1.is_ack_active());
3737
3738        // mesh2 receives it
3739        let result =
3740            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3741
3742        assert!(result.is_some());
3743        let result = result.unwrap();
3744        assert!(result.is_ack);
3745
3746        // Check events on mesh2
3747        let events = observer2.events();
3748        assert!(events
3749            .iter()
3750            .any(|e| matches!(e, HiveEvent::AckReceived { .. })));
3751    }
3752
3753    #[test]
3754    fn test_tick_cleanup() {
3755        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3756            .with_peer_timeout(10_000);
3757        let mesh = HiveMesh::new(config);
3758
3759        let observer = Arc::new(CollectingObserver::new());
3760        mesh.add_observer(observer.clone());
3761
3762        // Discover a peer
3763        mesh.on_ble_discovered(
3764            "device-uuid",
3765            Some("HIVE_TEST-22222222"),
3766            -65,
3767            Some("TEST"),
3768            1000,
3769        );
3770        assert_eq!(mesh.peer_count(), 1);
3771
3772        // Tick at t=5000 - not stale yet
3773        mesh.tick(5000);
3774        assert_eq!(mesh.peer_count(), 1);
3775
3776        // Tick at t=20000 - peer is stale (10s timeout exceeded)
3777        mesh.tick(20000);
3778        assert_eq!(mesh.peer_count(), 0);
3779
3780        // Check PeerLost event
3781        let events = observer.events();
3782        assert!(events
3783            .iter()
3784            .any(|e| matches!(e, HiveEvent::PeerLost { .. })));
3785    }
3786
3787    #[test]
3788    fn test_tick_sync_broadcast() {
3789        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3790            .with_sync_interval(5000);
3791        let mesh = HiveMesh::new(config);
3792
3793        // Discover and connect a peer first
3794        mesh.on_ble_discovered(
3795            "device-uuid",
3796            Some("HIVE_TEST-22222222"),
3797            -65,
3798            Some("TEST"),
3799            1000,
3800        );
3801        mesh.on_ble_connected("device-uuid", 1000);
3802
3803        // First tick at t=0 sets last_sync
3804        let _result = mesh.tick(0);
3805        // May or may not broadcast depending on initial state
3806
3807        // Tick before interval - no broadcast
3808        let result = mesh.tick(3000);
3809        assert!(result.is_none());
3810
3811        // After interval - should broadcast
3812        let result = mesh.tick(6000);
3813        assert!(result.is_some());
3814
3815        // Immediate second tick - no broadcast (interval not elapsed)
3816        let result = mesh.tick(6100);
3817        assert!(result.is_none());
3818
3819        // After another interval - should broadcast again
3820        let result = mesh.tick(12000);
3821        assert!(result.is_some());
3822    }
3823
3824    #[test]
3825    fn test_incoming_connection() {
3826        let mesh = create_mesh(0x11111111, "ALPHA-1");
3827        let observer = Arc::new(CollectingObserver::new());
3828        mesh.add_observer(observer.clone());
3829
3830        // Incoming connection from unknown peer
3831        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
3832
3833        assert!(is_new);
3834        assert_eq!(mesh.peer_count(), 1);
3835        assert_eq!(mesh.connected_count(), 1);
3836
3837        // Check events
3838        let events = observer.events();
3839        assert!(events
3840            .iter()
3841            .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3842        assert!(events
3843            .iter()
3844            .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3845    }
3846
3847    #[test]
3848    fn test_mesh_filtering() {
3849        let mesh = create_mesh(0x11111111, "ALPHA-1");
3850
3851        // Wrong mesh - ignored
3852        let peer = mesh.on_ble_discovered(
3853            "device-uuid-1",
3854            Some("HIVE_OTHER-22222222"),
3855            -65,
3856            Some("OTHER"),
3857            1000,
3858        );
3859        assert!(peer.is_none());
3860        assert_eq!(mesh.peer_count(), 0);
3861
3862        // Correct mesh - accepted
3863        let peer = mesh.on_ble_discovered(
3864            "device-uuid-2",
3865            Some("HIVE_TEST-33333333"),
3866            -65,
3867            Some("TEST"),
3868            1000,
3869        );
3870        assert!(peer.is_some());
3871        assert_eq!(mesh.peer_count(), 1);
3872    }
3873
3874    // ==================== Encryption Tests ====================
3875
3876    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
3877        let config =
3878            HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
3879        HiveMesh::new(config)
3880    }
3881
3882    #[test]
3883    fn test_encryption_enabled() {
3884        let secret = [0x42u8; 32];
3885        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3886
3887        assert!(mesh.is_encryption_enabled());
3888    }
3889
3890    #[test]
3891    fn test_encryption_disabled_by_default() {
3892        let mesh = create_mesh(0x11111111, "ALPHA-1");
3893
3894        assert!(!mesh.is_encryption_enabled());
3895    }
3896
3897    #[test]
3898    fn test_encrypted_document_exchange() {
3899        let secret = [0x42u8; 32];
3900        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3901        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3902
3903        // mesh1 sends document
3904        let doc = mesh1.build_document();
3905
3906        // Document should be encrypted (starts with ENCRYPTED_MARKER)
3907        assert!(doc.len() >= 2);
3908        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3909
3910        // mesh2 receives and decrypts
3911        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3912
3913        assert!(result.is_some());
3914        let result = result.unwrap();
3915        assert_eq!(result.source_node.as_u32(), 0x11111111);
3916    }
3917
3918    #[test]
3919    fn test_encrypted_emergency_exchange() {
3920        let secret = [0x42u8; 32];
3921        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3922        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3923
3924        let observer = Arc::new(CollectingObserver::new());
3925        mesh2.add_observer(observer.clone());
3926
3927        // mesh1 sends emergency
3928        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3929
3930        // mesh2 receives and decrypts
3931        let result =
3932            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3933
3934        assert!(result.is_some());
3935        let result = result.unwrap();
3936        assert!(result.is_emergency);
3937
3938        // Check EmergencyReceived event was fired
3939        let events = observer.events();
3940        assert!(events
3941            .iter()
3942            .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3943    }
3944
3945    #[test]
3946    fn test_wrong_key_fails_decrypt() {
3947        let secret1 = [0x42u8; 32];
3948        let secret2 = [0x43u8; 32]; // Different key
3949        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3950        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3951
3952        // mesh1 sends document
3953        let doc = mesh1.build_document();
3954
3955        // mesh2 cannot decrypt (wrong key)
3956        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3957
3958        assert!(result.is_none());
3959    }
3960
3961    #[test]
3962    fn test_unencrypted_mesh_can_read_unencrypted() {
3963        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3964        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3965
3966        // mesh1 sends document (unencrypted)
3967        let doc = mesh1.build_document();
3968
3969        // mesh2 receives (also unencrypted)
3970        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3971
3972        assert!(result.is_some());
3973    }
3974
3975    #[test]
3976    fn test_encrypted_mesh_can_receive_unencrypted() {
3977        // Backward compatibility: encrypted mesh can receive unencrypted docs
3978        let secret = [0x42u8; 32];
3979        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
3980        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
3981
3982        // mesh1 sends unencrypted document
3983        let doc = mesh1.build_document();
3984
3985        // mesh2 can receive unencrypted (backward compat)
3986        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3987
3988        assert!(result.is_some());
3989    }
3990
3991    #[test]
3992    fn test_unencrypted_mesh_cannot_receive_encrypted() {
3993        let secret = [0x42u8; 32];
3994        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
3995        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
3996
3997        // mesh1 sends encrypted document
3998        let doc = mesh1.build_document();
3999
4000        // mesh2 cannot decrypt (no key)
4001        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4002
4003        assert!(result.is_none());
4004    }
4005
4006    #[test]
4007    fn test_enable_disable_encryption() {
4008        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4009
4010        assert!(!mesh.is_encryption_enabled());
4011
4012        // Enable encryption
4013        let secret = [0x42u8; 32];
4014        mesh.enable_encryption(&secret);
4015        assert!(mesh.is_encryption_enabled());
4016
4017        // Build document should now be encrypted
4018        let doc = mesh.build_document();
4019        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4020
4021        // Disable encryption
4022        mesh.disable_encryption();
4023        assert!(!mesh.is_encryption_enabled());
4024
4025        // Build document should now be unencrypted
4026        let doc = mesh.build_document();
4027        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4028    }
4029
4030    #[test]
4031    fn test_encryption_overhead() {
4032        let secret = [0x42u8; 32];
4033        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4034        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4035
4036        let doc_encrypted = mesh_encrypted.build_document();
4037        let doc_unencrypted = mesh_unencrypted.build_document();
4038
4039        // Encrypted doc should be larger by:
4040        // - 2 bytes marker header (0xAE + reserved)
4041        // - 12 bytes nonce
4042        // - 16 bytes auth tag
4043        // Total: 30 bytes overhead
4044        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4045        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4046    }
4047
4048    // ==================== Per-Peer E2EE Tests ====================
4049
4050    #[test]
4051    fn test_peer_e2ee_enable_disable() {
4052        let mesh = create_mesh(0x11111111, "ALPHA-1");
4053
4054        assert!(!mesh.is_peer_e2ee_enabled());
4055        assert!(mesh.peer_e2ee_public_key().is_none());
4056
4057        mesh.enable_peer_e2ee();
4058        assert!(mesh.is_peer_e2ee_enabled());
4059        assert!(mesh.peer_e2ee_public_key().is_some());
4060
4061        mesh.disable_peer_e2ee();
4062        assert!(!mesh.is_peer_e2ee_enabled());
4063    }
4064
4065    #[test]
4066    fn test_peer_e2ee_initiate_session() {
4067        let mesh = create_mesh(0x11111111, "ALPHA-1");
4068        mesh.enable_peer_e2ee();
4069
4070        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4071        assert!(key_exchange.is_some());
4072
4073        let key_exchange = key_exchange.unwrap();
4074        // Should start with KEY_EXCHANGE_MARKER
4075        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4076
4077        // Should have a pending session
4078        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4079        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4080    }
4081
4082    #[test]
4083    fn test_peer_e2ee_full_handshake() {
4084        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4085        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4086
4087        mesh1.enable_peer_e2ee();
4088        mesh2.enable_peer_e2ee();
4089
4090        let observer1 = Arc::new(CollectingObserver::new());
4091        let observer2 = Arc::new(CollectingObserver::new());
4092        mesh1.add_observer(observer1.clone());
4093        mesh2.add_observer(observer2.clone());
4094
4095        // mesh1 initiates to mesh2
4096        let key_exchange1 = mesh1
4097            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4098            .unwrap();
4099
4100        // mesh2 receives and responds
4101        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4102        assert!(response.is_some());
4103
4104        // Check mesh2 has established session
4105        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4106
4107        // mesh1 receives mesh2's response
4108        let key_exchange2 = response.unwrap();
4109        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4110
4111        // Check mesh1 has established session
4112        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4113
4114        // Both should have E2EE established events
4115        let events1 = observer1.events();
4116        assert!(events1
4117            .iter()
4118            .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4119
4120        let events2 = observer2.events();
4121        assert!(events2
4122            .iter()
4123            .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4124    }
4125
4126    #[test]
4127    fn test_peer_e2ee_encrypt_decrypt() {
4128        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4129        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4130
4131        mesh1.enable_peer_e2ee();
4132        mesh2.enable_peer_e2ee();
4133
4134        // Establish session via key exchange
4135        let key_exchange1 = mesh1
4136            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4137            .unwrap();
4138        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4139        mesh1.handle_key_exchange(&key_exchange2, 1000);
4140
4141        // mesh1 sends encrypted message to mesh2
4142        let plaintext = b"Secret message from mesh1";
4143        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4144        assert!(encrypted.is_some());
4145
4146        let encrypted = encrypted.unwrap();
4147        // Should start with PEER_E2EE_MARKER
4148        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4149
4150        // mesh2 receives and decrypts
4151        let observer2 = Arc::new(CollectingObserver::new());
4152        mesh2.add_observer(observer2.clone());
4153
4154        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4155        assert!(decrypted.is_some());
4156        assert_eq!(decrypted.unwrap(), plaintext);
4157
4158        // Should have received message event
4159        let events = observer2.events();
4160        assert!(events.iter().any(|e| matches!(
4161            e,
4162            HiveEvent::PeerE2eeMessageReceived { from_node, data }
4163            if from_node.as_u32() == 0x11111111 && data == plaintext
4164        )));
4165    }
4166
4167    #[test]
4168    fn test_peer_e2ee_bidirectional() {
4169        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4170        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4171
4172        mesh1.enable_peer_e2ee();
4173        mesh2.enable_peer_e2ee();
4174
4175        // Establish session
4176        let key_exchange1 = mesh1
4177            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4178            .unwrap();
4179        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4180        mesh1.handle_key_exchange(&key_exchange2, 1000);
4181
4182        // mesh1 -> mesh2
4183        let msg1 = mesh1
4184            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4185            .unwrap();
4186        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4187        assert_eq!(dec1, b"Hello from mesh1");
4188
4189        // mesh2 -> mesh1
4190        let msg2 = mesh2
4191            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4192            .unwrap();
4193        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4194        assert_eq!(dec2, b"Hello from mesh2");
4195    }
4196
4197    #[test]
4198    fn test_peer_e2ee_close_session() {
4199        let mesh = create_mesh(0x11111111, "ALPHA-1");
4200        mesh.enable_peer_e2ee();
4201
4202        let observer = Arc::new(CollectingObserver::new());
4203        mesh.add_observer(observer.clone());
4204
4205        // Initiate a session
4206        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4207        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4208
4209        // Close session
4210        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4211
4212        // Check close event
4213        let events = observer.events();
4214        assert!(events
4215            .iter()
4216            .any(|e| matches!(e, HiveEvent::PeerE2eeClosed { .. })));
4217    }
4218
4219    #[test]
4220    fn test_peer_e2ee_without_enabling() {
4221        let mesh = create_mesh(0x11111111, "ALPHA-1");
4222
4223        // E2EE not enabled - should return None
4224        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4225        assert!(result.is_none());
4226
4227        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4228        assert!(result.is_none());
4229
4230        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4231    }
4232
4233    #[test]
4234    fn test_peer_e2ee_overhead() {
4235        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4236        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4237
4238        mesh1.enable_peer_e2ee();
4239        mesh2.enable_peer_e2ee();
4240
4241        // Establish session
4242        let key_exchange1 = mesh1
4243            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4244            .unwrap();
4245        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4246        mesh1.handle_key_exchange(&key_exchange2, 1000);
4247
4248        // Encrypt a message
4249        let plaintext = b"Test message";
4250        let encrypted = mesh1
4251            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4252            .unwrap();
4253
4254        // Overhead should be:
4255        // - 2 bytes marker header
4256        // - 4 bytes recipient node ID
4257        // - 4 bytes sender node ID
4258        // - 8 bytes counter
4259        // - 12 bytes nonce
4260        // - 16 bytes auth tag
4261        // Total: 46 bytes overhead
4262        let overhead = encrypted.len() - plaintext.len();
4263        assert_eq!(overhead, 46);
4264    }
4265
4266    // ==================== Strict Encryption Mode Tests ====================
4267
4268    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
4269        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4270            .with_encryption(secret)
4271            .with_strict_encryption();
4272        HiveMesh::new(config)
4273    }
4274
4275    #[test]
4276    fn test_strict_encryption_enabled() {
4277        let secret = [0x42u8; 32];
4278        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4279
4280        assert!(mesh.is_encryption_enabled());
4281        assert!(mesh.is_strict_encryption_enabled());
4282    }
4283
4284    #[test]
4285    fn test_strict_encryption_disabled_by_default() {
4286        let secret = [0x42u8; 32];
4287        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4288
4289        assert!(mesh.is_encryption_enabled());
4290        assert!(!mesh.is_strict_encryption_enabled());
4291    }
4292
4293    #[test]
4294    fn test_strict_encryption_requires_encryption_enabled() {
4295        // strict_encryption without encryption should have no effect
4296        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4297            .with_strict_encryption(); // No encryption!
4298        let mesh = HiveMesh::new(config);
4299
4300        assert!(!mesh.is_encryption_enabled());
4301        assert!(!mesh.is_strict_encryption_enabled());
4302    }
4303
4304    #[test]
4305    fn test_strict_mode_accepts_encrypted_documents() {
4306        let secret = [0x42u8; 32];
4307        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4308        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4309
4310        // mesh1 sends encrypted document
4311        let doc = mesh1.build_document();
4312        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4313
4314        // mesh2 (strict mode) should accept encrypted documents
4315        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4316        assert!(result.is_some());
4317    }
4318
4319    #[test]
4320    fn test_strict_mode_rejects_unencrypted_documents() {
4321        let secret = [0x42u8; 32];
4322        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4323        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
4324
4325        let observer = Arc::new(CollectingObserver::new());
4326        mesh2.add_observer(observer.clone());
4327
4328        // mesh1 sends unencrypted document
4329        let doc = mesh1.build_document();
4330        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4331
4332        // mesh2 (strict mode) should reject unencrypted documents
4333        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4334        assert!(result.is_none());
4335
4336        // Should have SecurityViolation event
4337        let events = observer.events();
4338        assert!(events.iter().any(|e| matches!(
4339            e,
4340            HiveEvent::SecurityViolation {
4341                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4342                ..
4343            }
4344        )));
4345    }
4346
4347    #[test]
4348    fn test_non_strict_mode_accepts_unencrypted_documents() {
4349        let secret = [0x42u8; 32];
4350        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4351        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
4352
4353        // mesh1 sends unencrypted document
4354        let doc = mesh1.build_document();
4355
4356        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
4357        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4358        assert!(result.is_some());
4359    }
4360
4361    #[test]
4362    fn test_strict_mode_security_violation_event_includes_source() {
4363        let secret = [0x42u8; 32];
4364        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4365        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4366
4367        let observer = Arc::new(CollectingObserver::new());
4368        mesh2.add_observer(observer.clone());
4369
4370        let doc = mesh1.build_document();
4371
4372        // Use on_ble_data_received with identifier to test source is captured
4373        mesh2.on_ble_discovered(
4374            "test-device-uuid",
4375            Some("HIVE_TEST-11111111"),
4376            -65,
4377            Some("TEST"),
4378            500,
4379        );
4380        mesh2.on_ble_connected("test-device-uuid", 600);
4381
4382        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4383
4384        // Check SecurityViolation event has source
4385        let events = observer.events();
4386        let violation = events.iter().find(|e| {
4387            matches!(
4388                e,
4389                HiveEvent::SecurityViolation {
4390                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4391                    ..
4392                }
4393            )
4394        });
4395        assert!(violation.is_some());
4396
4397        if let Some(HiveEvent::SecurityViolation { source, .. }) = violation {
4398            assert!(source.is_some());
4399            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4400        }
4401    }
4402
4403    #[test]
4404    fn test_decryption_failure_emits_security_violation() {
4405        let secret1 = [0x42u8; 32];
4406        let secret2 = [0x43u8; 32]; // Different key
4407        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4408        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4409
4410        let observer = Arc::new(CollectingObserver::new());
4411        mesh2.add_observer(observer.clone());
4412
4413        // mesh1 sends encrypted document
4414        let doc = mesh1.build_document();
4415
4416        // mesh2 cannot decrypt (wrong key)
4417        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4418        assert!(result.is_none());
4419
4420        // Should have SecurityViolation event for decryption failure
4421        let events = observer.events();
4422        assert!(events.iter().any(|e| matches!(
4423            e,
4424            HiveEvent::SecurityViolation {
4425                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4426                ..
4427            }
4428        )));
4429    }
4430
4431    #[test]
4432    fn test_strict_mode_builder_chain() {
4433        let secret = [0x42u8; 32];
4434        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4435            .with_encryption(secret)
4436            .with_strict_encryption()
4437            .with_sync_interval(10_000)
4438            .with_peer_timeout(60_000);
4439
4440        let mesh = HiveMesh::new(config);
4441
4442        assert!(mesh.is_encryption_enabled());
4443        assert!(mesh.is_strict_encryption_enabled());
4444    }
4445
4446    // ==================== Multi-Hop Relay Tests ====================
4447
4448    fn create_relay_mesh(node_id: u32, callsign: &str) -> HiveMesh {
4449        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4450        HiveMesh::new(config)
4451    }
4452
4453    #[test]
4454    fn test_relay_disabled_by_default() {
4455        let mesh = create_mesh(0x11111111, "ALPHA-1");
4456        assert!(!mesh.is_relay_enabled());
4457    }
4458
4459    #[test]
4460    fn test_relay_enabled() {
4461        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4462        assert!(mesh.is_relay_enabled());
4463    }
4464
4465    #[test]
4466    fn test_relay_config_builder() {
4467        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4468            .with_relay()
4469            .with_max_relay_hops(5)
4470            .with_relay_fanout(3)
4471            .with_seen_cache_ttl(60_000);
4472
4473        assert!(config.enable_relay);
4474        assert_eq!(config.max_relay_hops, 5);
4475        assert_eq!(config.relay_fanout, 3);
4476        assert_eq!(config.seen_cache_ttl_ms, 60_000);
4477    }
4478
4479    #[test]
4480    fn test_seen_message_deduplication() {
4481        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4482        let origin = NodeId::new(0x22222222);
4483        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4484
4485        // First time - should be new
4486        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4487
4488        // Second time - should be duplicate
4489        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4490
4491        assert_eq!(mesh.seen_cache_size(), 1);
4492    }
4493
4494    #[test]
4495    fn test_wrap_for_relay() {
4496        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4497
4498        let payload = vec![1, 2, 3, 4, 5];
4499        let wrapped = mesh.wrap_for_relay(payload.clone());
4500
4501        // Should start with relay envelope marker
4502        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4503
4504        // Decode and verify
4505        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4506        assert_eq!(envelope.payload, payload);
4507        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4508        assert_eq!(envelope.hop_count, 0);
4509    }
4510
4511    #[test]
4512    fn test_process_relay_envelope_new_message() {
4513        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4514        let observer = Arc::new(CollectingObserver::new());
4515        mesh.add_observer(observer.clone());
4516
4517        // Create an envelope from another node
4518        let payload = vec![1, 2, 3, 4, 5];
4519        let envelope =
4520            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4521                .with_max_hops(7);
4522        let data = envelope.encode();
4523
4524        // Process it
4525        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4526
4527        assert!(decision.is_some());
4528        let decision = decision.unwrap();
4529        assert_eq!(decision.payload, payload);
4530        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4531        assert_eq!(decision.hop_count, 0);
4532        assert!(decision.should_relay);
4533        assert!(decision.relay_envelope.is_some());
4534
4535        // Relay envelope should have incremented hop count
4536        let relay_env = decision.relay_envelope.unwrap();
4537        assert_eq!(relay_env.hop_count, 1);
4538    }
4539
4540    #[test]
4541    fn test_process_relay_envelope_duplicate() {
4542        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4543        let observer = Arc::new(CollectingObserver::new());
4544        mesh.add_observer(observer.clone());
4545
4546        let payload = vec![1, 2, 3, 4, 5];
4547        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4548        let data = envelope.encode();
4549
4550        // First time - should succeed
4551        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4552        assert!(decision.is_some());
4553
4554        // Second time - should be duplicate
4555        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4556        assert!(decision.is_none());
4557
4558        // Should have DuplicateMessageDropped event
4559        let events = observer.events();
4560        assert!(events
4561            .iter()
4562            .any(|e| matches!(e, HiveEvent::DuplicateMessageDropped { .. })));
4563    }
4564
4565    #[test]
4566    fn test_process_relay_envelope_ttl_expired() {
4567        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4568        let observer = Arc::new(CollectingObserver::new());
4569        mesh.add_observer(observer.clone());
4570
4571        // Create envelope at max hops (TTL expired)
4572        let payload = vec![1, 2, 3, 4, 5];
4573        let mut envelope =
4574            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4575                .with_max_hops(3);
4576
4577        // Simulate having been relayed 3 times already
4578        envelope = envelope.relay().unwrap(); // hop 1
4579        envelope = envelope.relay().unwrap(); // hop 2
4580        envelope = envelope.relay().unwrap(); // hop 3 - at max now
4581
4582        let data = envelope.encode();
4583
4584        // Process - should still process locally but not relay further
4585        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4586
4587        assert!(decision.is_some());
4588        let decision = decision.unwrap();
4589        assert_eq!(decision.payload, payload);
4590        assert!(!decision.should_relay); // Cannot relay further
4591        assert!(decision.relay_envelope.is_none());
4592
4593        // Should have MessageTtlExpired event
4594        let events = observer.events();
4595        assert!(events
4596            .iter()
4597            .any(|e| matches!(e, HiveEvent::MessageTtlExpired { .. })));
4598    }
4599
4600    #[test]
4601    fn test_build_relay_document() {
4602        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4603
4604        let relay_doc = mesh.build_relay_document();
4605
4606        // Should be a valid relay envelope
4607        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4608
4609        // Decode and verify it contains a valid document
4610        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4611        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4612
4613        // The payload should be a valid HiveDocument
4614        let doc = crate::document::HiveDocument::decode(&envelope.payload);
4615        assert!(doc.is_some());
4616    }
4617
4618    #[test]
4619    fn test_relay_targets_excludes_source() {
4620        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4621
4622        // Add some peers
4623        mesh.on_ble_discovered(
4624            "peer-1",
4625            Some("HIVE_TEST-22222222"),
4626            -60,
4627            Some("TEST"),
4628            1000,
4629        );
4630        mesh.on_ble_connected("peer-1", 1000);
4631
4632        mesh.on_ble_discovered(
4633            "peer-2",
4634            Some("HIVE_TEST-33333333"),
4635            -65,
4636            Some("TEST"),
4637            1000,
4638        );
4639        mesh.on_ble_connected("peer-2", 1000);
4640
4641        mesh.on_ble_discovered(
4642            "peer-3",
4643            Some("HIVE_TEST-44444444"),
4644            -70,
4645            Some("TEST"),
4646            1000,
4647        );
4648        mesh.on_ble_connected("peer-3", 1000);
4649
4650        // Get relay targets excluding peer-2
4651        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4652
4653        // Should not include peer-2 in targets
4654        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4655    }
4656
4657    #[test]
4658    fn test_clear_seen_cache() {
4659        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4660        let origin = NodeId::new(0x22222222);
4661
4662        // Add some messages
4663        mesh.mark_message_seen(
4664            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4665            origin,
4666            1000,
4667        );
4668        mesh.mark_message_seen(
4669            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4670            origin,
4671            2000,
4672        );
4673
4674        assert_eq!(mesh.seen_cache_size(), 2);
4675
4676        // Clear
4677        mesh.clear_seen_cache();
4678        assert_eq!(mesh.seen_cache_size(), 0);
4679    }
4680}