Skip to main content

hive_btle/
hive_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HiveMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for HIVE BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use hive_btle::hive_mesh::{HiveMesh, HiveMeshConfig};
26//! use hive_btle::observer::{HiveEvent, HiveObserver};
27//! use hive_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = HiveMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = HiveMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl HiveObserver for MyObserver {
39//!     fn on_event(&self, event: HiveEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("HIVE_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64use crate::document_sync::DocumentSync;
65use crate::gossip::{GossipStrategy, RandomFanout};
66use crate::observer::{DisconnectReason, HiveEvent, HiveObserver, SecurityViolationKind};
67use crate::peer::{
68    ConnectionStateGraph, FullStateCountSummary, HivePeer, IndirectPeer, PeerConnectionState,
69    PeerDegree, PeerManagerConfig, StateCountSummary,
70};
71use crate::peer_manager::PeerManager;
72use crate::relay::{
73    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
74    RELAY_ENVELOPE_MARKER,
75};
76use crate::security::{
77    DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
78    PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
79};
80use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
81use crate::sync::delta::{DeltaEncoder, DeltaStats};
82use crate::sync::delta_document::{DeltaDocument, Operation};
83use crate::NodeId;
84
85#[cfg(feature = "std")]
86use crate::observer::ObserverManager;
87
88use crate::registry::DocumentRegistry;
89
90/// Configuration for HiveMesh
91#[derive(Debug, Clone)]
92pub struct HiveMeshConfig {
93    /// Our node ID
94    pub node_id: NodeId,
95
96    /// Our callsign (e.g., "ALPHA-1")
97    pub callsign: String,
98
99    /// Mesh ID to filter peers (e.g., "DEMO")
100    pub mesh_id: String,
101
102    /// Peripheral type for this device
103    pub peripheral_type: PeripheralType,
104
105    /// Peer management configuration
106    pub peer_config: PeerManagerConfig,
107
108    /// Sync interval in milliseconds (how often to broadcast state)
109    pub sync_interval_ms: u64,
110
111    /// Whether to auto-broadcast on emergency/ack
112    pub auto_broadcast_events: bool,
113
114    /// Optional shared secret for mesh-wide encryption (32 bytes)
115    ///
116    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
117    /// transmission and decrypted upon receipt. All nodes in the mesh must
118    /// share the same secret to communicate.
119    pub encryption_secret: Option<[u8; 32]>,
120
121    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
122    ///
123    /// When true and encryption is enabled, any unencrypted documents received
124    /// will be rejected and trigger a SecurityViolation event. This prevents
125    /// downgrade attacks where an adversary sends unencrypted malicious documents.
126    ///
127    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
128    pub strict_encryption: bool,
129
130    /// Enable multi-hop relay
131    ///
132    /// When enabled, received messages will be forwarded to other peers based
133    /// on the gossip strategy. Requires message deduplication to prevent loops.
134    ///
135    /// Default: false
136    pub enable_relay: bool,
137
138    /// Maximum hops for relay messages (TTL)
139    ///
140    /// Messages will not be relayed beyond this many hops from the origin.
141    /// Default: 7
142    pub max_relay_hops: u8,
143
144    /// Gossip fanout for relay
145    ///
146    /// Number of peers to forward each message to. Higher values increase
147    /// convergence speed but also bandwidth usage.
148    /// Default: 2
149    pub relay_fanout: usize,
150
151    /// TTL for seen message cache (milliseconds)
152    ///
153    /// How long to remember message IDs for deduplication.
154    /// Default: 300_000 (5 minutes)
155    pub seen_cache_ttl_ms: u64,
156}
157
158impl HiveMeshConfig {
159    /// Create a new configuration with required fields
160    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
161        Self {
162            node_id,
163            callsign: callsign.into(),
164            mesh_id: mesh_id.into(),
165            peripheral_type: PeripheralType::SoldierSensor,
166            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
167            sync_interval_ms: 5000,
168            auto_broadcast_events: true,
169            encryption_secret: None,
170            strict_encryption: false,
171            enable_relay: false,
172            max_relay_hops: DEFAULT_MAX_HOPS,
173            relay_fanout: 2,
174            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
175        }
176    }
177
178    /// Enable mesh-wide encryption with a shared secret
179    ///
180    /// All documents will be encrypted using ChaCha20-Poly1305 before
181    /// transmission. All mesh participants must use the same secret.
182    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
183        self.encryption_secret = Some(secret);
184        self
185    }
186
187    /// Set peripheral type
188    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
189        self.peripheral_type = ptype;
190        self
191    }
192
193    /// Set sync interval
194    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
195        self.sync_interval_ms = interval_ms;
196        self
197    }
198
199    /// Set peer timeout
200    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
201        self.peer_config.peer_timeout_ms = timeout_ms;
202        self
203    }
204
205    /// Set max peers (for embedded systems)
206    pub fn with_max_peers(mut self, max: usize) -> Self {
207        self.peer_config.max_peers = max;
208        self
209    }
210
211    /// Enable strict encryption mode
212    ///
213    /// When enabled (and encryption is also enabled), any unencrypted documents
214    /// received will be rejected and trigger a `SecurityViolation` event.
215    /// This prevents downgrade attacks.
216    ///
217    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
218    pub fn with_strict_encryption(mut self) -> Self {
219        self.strict_encryption = true;
220        self
221    }
222
223    /// Enable multi-hop relay
224    ///
225    /// When enabled, received messages will be forwarded to other connected peers
226    /// based on the gossip strategy. This enables mesh-wide message propagation.
227    pub fn with_relay(mut self) -> Self {
228        self.enable_relay = true;
229        self
230    }
231
232    /// Set maximum relay hops (TTL)
233    ///
234    /// Messages will not be relayed beyond this many hops from the origin.
235    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
236        self.max_relay_hops = max_hops;
237        self
238    }
239
240    /// Set gossip fanout for relay
241    ///
242    /// Number of peers to forward each message to.
243    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
244        self.relay_fanout = fanout.max(1);
245        self
246    }
247
248    /// Set TTL for seen message cache
249    ///
250    /// How long to remember message IDs for deduplication (milliseconds).
251    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
252        self.seen_cache_ttl_ms = ttl_ms;
253        self
254    }
255}
256
257/// Type alias for app document storage to reduce type complexity
258#[cfg(feature = "std")]
259type AppDocumentStore =
260    std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
261
262/// Main facade for HIVE BLE mesh operations
263///
264/// Composes peer management, document sync, and observer notifications.
265/// Platform implementations call into this from their BLE callbacks.
266#[cfg(feature = "std")]
267pub struct HiveMesh {
268    /// Configuration
269    config: HiveMeshConfig,
270
271    /// Peer manager
272    peer_manager: PeerManager,
273
274    /// Document sync
275    document_sync: DocumentSync,
276
277    /// Observer manager
278    observers: ObserverManager,
279
280    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
281    last_sync_ms: std::sync::atomic::AtomicU32,
282
283    /// Last cleanup time
284    last_cleanup_ms: std::sync::atomic::AtomicU32,
285
286    /// Optional mesh-wide encryption key (derived from shared secret)
287    encryption_key: Option<MeshEncryptionKey>,
288
289    /// Optional per-peer E2EE session manager
290    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
291
292    /// Connection state graph for tracking peer connection lifecycle
293    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
294
295    /// Seen message cache for relay deduplication
296    seen_cache: std::sync::Mutex<SeenMessageCache>,
297
298    /// Gossip strategy for relay peer selection
299    gossip_strategy: Box<dyn GossipStrategy>,
300
301    /// Delta encoder for per-peer sync state tracking
302    ///
303    /// Tracks what data has been sent to each peer to enable delta sync
304    /// (sending only changes instead of full documents).
305    delta_encoder: std::sync::Mutex<DeltaEncoder>,
306
307    /// This node's cryptographic identity (Ed25519 keypair)
308    ///
309    /// When set, the node_id is derived from the public key and documents
310    /// can be signed for authenticity verification.
311    identity: Option<DeviceIdentity>,
312
313    /// TOFU identity registry for tracking peer identities
314    ///
315    /// Maps node_id to public key on first contact, rejects mismatches.
316    identity_registry: std::sync::Mutex<IdentityRegistry>,
317
318    /// Peripheral state received from peers
319    ///
320    /// Stores the most recent peripheral data (callsign, location, etc.)
321    /// received from each peer via document sync.
322    peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
323
324    /// Document registry for app-layer CRDT types.
325    ///
326    /// Enables external crates to register custom document types that sync
327    /// through the mesh using the extensible registry pattern.
328    document_registry: DocumentRegistry,
329
330    /// Storage for app-layer documents.
331    ///
332    /// Keyed by (type_id, source_node, timestamp) to uniquely identify each
333    /// document instance. Values are type-erased boxes that can be downcast
334    /// using the document registry handlers.
335    app_documents: AppDocumentStore,
336}
337
338#[cfg(feature = "std")]
339impl HiveMesh {
340    /// Create a new HiveMesh instance
341    pub fn new(config: HiveMeshConfig) -> Self {
342        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
343        let document_sync = DocumentSync::with_peripheral_type(
344            config.node_id,
345            &config.callsign,
346            config.peripheral_type,
347        );
348
349        // Derive encryption key from shared secret if configured
350        let encryption_key = config
351            .encryption_secret
352            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
353
354        // Create connection state graph with config thresholds
355        let connection_graph = ConnectionStateGraph::with_config(
356            config.peer_config.rssi_degraded_threshold,
357            config.peer_config.lost_timeout_ms,
358        );
359
360        // Create seen message cache for relay deduplication
361        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
362
363        // Create gossip strategy for relay
364        let gossip_strategy: Box<dyn GossipStrategy> =
365            Box::new(RandomFanout::new(config.relay_fanout));
366
367        // Create delta encoder for per-peer sync state tracking
368        let delta_encoder = DeltaEncoder::new(config.node_id);
369
370        // Create document registry and auto-register types
371        let document_registry = DocumentRegistry::new();
372        #[cfg(feature = "hive-lite-sync")]
373        {
374            use crate::hive_lite_sync::CannedMessageDocument;
375            document_registry.register::<CannedMessageDocument>();
376            log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
377        }
378
379        Self {
380            config,
381            peer_manager,
382            document_sync,
383            observers: ObserverManager::new(),
384            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
385            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
386            encryption_key,
387            peer_sessions: std::sync::Mutex::new(None),
388            connection_graph: std::sync::Mutex::new(connection_graph),
389            seen_cache: std::sync::Mutex::new(seen_cache),
390            gossip_strategy,
391            delta_encoder: std::sync::Mutex::new(delta_encoder),
392            identity: None,
393            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
394            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
395            document_registry,
396            app_documents: std::sync::RwLock::new(HashMap::new()),
397        }
398    }
399
400    /// Create a new HiveMesh with a cryptographic identity
401    ///
402    /// The node_id will be derived from the identity's public key, overriding
403    /// any node_id specified in the config. This ensures cryptographic binding
404    /// between node_id and identity.
405    pub fn with_identity(config: HiveMeshConfig, identity: DeviceIdentity) -> Self {
406        // Override node_id with identity-derived value
407        let mut config = config;
408        config.node_id = identity.node_id();
409
410        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
411        let document_sync = DocumentSync::with_peripheral_type(
412            config.node_id,
413            &config.callsign,
414            config.peripheral_type,
415        );
416
417        let encryption_key = config
418            .encryption_secret
419            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
420
421        let connection_graph = ConnectionStateGraph::with_config(
422            config.peer_config.rssi_degraded_threshold,
423            config.peer_config.lost_timeout_ms,
424        );
425
426        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
427        let gossip_strategy: Box<dyn GossipStrategy> =
428            Box::new(RandomFanout::new(config.relay_fanout));
429        let delta_encoder = DeltaEncoder::new(config.node_id);
430
431        // Create document registry and auto-register types
432        let document_registry = DocumentRegistry::new();
433        #[cfg(feature = "hive-lite-sync")]
434        {
435            use crate::hive_lite_sync::CannedMessageDocument;
436            document_registry.register::<CannedMessageDocument>();
437            log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
438        }
439
440        Self {
441            config,
442            peer_manager,
443            document_sync,
444            observers: ObserverManager::new(),
445            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
446            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
447            encryption_key,
448            peer_sessions: std::sync::Mutex::new(None),
449            connection_graph: std::sync::Mutex::new(connection_graph),
450            seen_cache: std::sync::Mutex::new(seen_cache),
451            gossip_strategy,
452            delta_encoder: std::sync::Mutex::new(delta_encoder),
453            identity: Some(identity),
454            identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
455            peer_peripherals: std::sync::RwLock::new(HashMap::new()),
456            document_registry,
457            app_documents: std::sync::RwLock::new(HashMap::new()),
458        }
459    }
460
461    /// Create a new HiveMesh from genesis data
462    ///
463    /// This is the recommended way to create a mesh for production use.
464    /// The mesh will be configured with:
465    /// - node_id derived from identity
466    /// - mesh_id from genesis
467    /// - encryption enabled using genesis-derived secret
468    pub fn from_genesis(
469        genesis: &crate::security::MeshGenesis,
470        identity: DeviceIdentity,
471        callsign: &str,
472    ) -> Self {
473        let config = HiveMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
474            .with_encryption(genesis.encryption_secret());
475
476        Self::with_identity(config, identity)
477    }
478
479    /// Create a HiveMesh from persisted state.
480    ///
481    /// Restores mesh configuration from previously saved state, including:
482    /// - Device identity (Ed25519 keypair)
483    /// - Mesh genesis (if present)
484    /// - Identity registry (TOFU cache)
485    ///
486    /// Use this on device boot to restore mesh membership without re-provisioning.
487    ///
488    /// # Arguments
489    ///
490    /// * `state` - Previously persisted state
491    /// * `callsign` - Human-readable identifier (may differ from original)
492    ///
493    /// # Errors
494    ///
495    /// Returns `PersistenceError` if the identity cannot be restored.
496    ///
497    /// # Example
498    ///
499    /// ```ignore
500    /// // On boot, restore from secure storage
501    /// let state = PersistedState::load(&storage)?;
502    /// let mesh = HiveMesh::from_persisted(state, "SENSOR-01")?;
503    /// ```
504    #[cfg(feature = "std")]
505    pub fn from_persisted(
506        state: crate::security::PersistedState,
507        callsign: &str,
508    ) -> Result<Self, crate::security::PersistenceError> {
509        // Restore identity
510        let identity = state.restore_identity()?;
511
512        // Restore genesis (if present)
513        let genesis = state.restore_genesis();
514
515        // Create mesh with or without genesis
516        let mesh = if let Some(ref gen) = genesis {
517            Self::from_genesis(gen, identity, callsign)
518        } else {
519            let config = HiveMeshConfig::new(identity.node_id(), callsign, "RESTORED");
520            Self::with_identity(config, identity)
521        };
522
523        // Restore identity registry
524        let restored_registry = state.restore_registry();
525        if let Ok(mut registry) = mesh.identity_registry.lock() {
526            *registry = restored_registry;
527        }
528
529        log::info!(
530            "HiveMesh restored from persisted state: node_id={:08X}, known_peers={}",
531            mesh.config.node_id.as_u32(),
532            mesh.known_identity_count()
533        );
534
535        Ok(mesh)
536    }
537
538    /// Create persisted state from current mesh state.
539    ///
540    /// Captures the current identity, genesis, and registry for persistence.
541    /// Call this periodically or before shutdown to save state.
542    ///
543    /// # Arguments
544    ///
545    /// * `genesis` - Optional genesis to include (if mesh was created from genesis)
546    ///
547    /// # Returns
548    ///
549    /// `None` if the mesh has no identity bound.
550    #[cfg(feature = "std")]
551    pub fn to_persisted_state(
552        &self,
553        genesis: Option<&crate::security::MeshGenesis>,
554    ) -> Option<crate::security::PersistedState> {
555        let identity = self.identity.as_ref()?;
556        let registry = self.identity_registry.lock().ok()?;
557
558        Some(crate::security::PersistedState::with_registry(
559            identity, genesis, &registry,
560        ))
561    }
562
563    // ==================== Encryption ====================
564
565    /// Check if mesh-wide encryption is enabled
566    pub fn is_encryption_enabled(&self) -> bool {
567        self.encryption_key.is_some()
568    }
569
570    /// Check if strict encryption mode is enabled
571    ///
572    /// Returns true only if both encryption and strict_encryption are enabled.
573    pub fn is_strict_encryption_enabled(&self) -> bool {
574        self.config.strict_encryption && self.encryption_key.is_some()
575    }
576
577    /// Enable mesh-wide encryption with a shared secret
578    ///
579    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
580    /// All mesh participants must use the same secret to communicate.
581    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
582        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
583            &self.config.mesh_id,
584            secret,
585        ));
586    }
587
588    /// Disable mesh-wide encryption
589    pub fn disable_encryption(&mut self) {
590        self.encryption_key = None;
591    }
592
593    /// Encrypt document bytes for transmission
594    ///
595    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
596    /// original bytes if encryption is disabled.
597    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
598        match &self.encryption_key {
599            Some(key) => {
600                // Encrypt and prepend marker
601                match key.encrypt_to_bytes(plaintext) {
602                    Ok(ciphertext) => {
603                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
604                        buf.push(ENCRYPTED_MARKER);
605                        buf.push(0x00); // reserved
606                        buf.extend_from_slice(&ciphertext);
607                        buf
608                    }
609                    Err(e) => {
610                        log::error!("Encryption failed: {}", e);
611                        // Fall back to unencrypted on error (shouldn't happen)
612                        plaintext.to_vec()
613                    }
614                }
615            }
616            None => plaintext.to_vec(),
617        }
618    }
619
620    /// Decrypt document bytes received from peer
621    ///
622    /// Returns the decrypted bytes if encrypted and valid, or the original
623    /// bytes if not encrypted. Returns None if decryption fails.
624    ///
625    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
626    /// unencrypted documents are rejected and trigger a SecurityViolation event.
627    fn decrypt_document<'a>(
628        &self,
629        data: &'a [u8],
630        source_hint: Option<&str>,
631    ) -> Option<std::borrow::Cow<'a, [u8]>> {
632        log::debug!(
633            "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
634            data.len(),
635            data.first().copied().unwrap_or(0),
636            source_hint
637        );
638
639        // Check for encrypted marker
640        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
641            // Encrypted document
642            let _reserved = data[1];
643            let encrypted_payload = &data[2..];
644
645            log::debug!(
646                "decrypt_document: encrypted payload len={}, nonce+ciphertext",
647                encrypted_payload.len()
648            );
649
650            match &self.encryption_key {
651                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
652                    Ok(plaintext) => {
653                        log::debug!(
654                            "decrypt_document: SUCCESS, plaintext len={}",
655                            plaintext.len()
656                        );
657                        Some(std::borrow::Cow::Owned(plaintext))
658                    }
659                    Err(e) => {
660                        log::warn!(
661                            "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
662                            e,
663                            encrypted_payload.len(),
664                            source_hint
665                        );
666                        self.notify(HiveEvent::SecurityViolation {
667                            kind: SecurityViolationKind::DecryptionFailed,
668                            source: source_hint.map(String::from),
669                        });
670                        None
671                    }
672                },
673                None => {
674                    log::warn!(
675                        "decrypt_document: encryption not enabled but received encrypted doc"
676                    );
677                    None
678                }
679            }
680        } else {
681            // Unencrypted document
682            // Check strict encryption mode
683            if self.config.strict_encryption && self.encryption_key.is_some() {
684                log::warn!(
685                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
686                    source_hint
687                );
688                self.notify(HiveEvent::SecurityViolation {
689                    kind: SecurityViolationKind::UnencryptedInStrictMode,
690                    source: source_hint.map(String::from),
691                });
692                None
693            } else {
694                // Permissive mode: accept unencrypted for backward compatibility
695                Some(std::borrow::Cow::Borrowed(data))
696            }
697        }
698    }
699
700    // ==================== Transport Layer API ====================
701
702    /// Decrypt data without parsing (transport-only operation)
703    ///
704    /// This method provides raw decrypted bytes for apps that want to handle
705    /// message parsing themselves (using hive-lite or other libraries).
706    ///
707    /// # Arguments
708    /// * `data` - Potentially encrypted data (0xAE marker indicates encryption)
709    ///
710    /// # Returns
711    /// * `Some(plaintext)` - Decrypted bytes if successful, or original bytes if unencrypted
712    /// * `None` - If decryption failed (wrong key, corrupted, or strict mode violation)
713    pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
714        self.decrypt_document(data, None)
715            .map(|cow| cow.into_owned())
716    }
717
718    // ==================== Identity ====================
719
720    /// Check if this mesh has a cryptographic identity
721    pub fn has_identity(&self) -> bool {
722        self.identity.is_some()
723    }
724
725    /// Get this node's public key (if identity is configured)
726    pub fn public_key(&self) -> Option<[u8; 32]> {
727        self.identity.as_ref().map(|id| id.public_key())
728    }
729
730    /// Create an identity attestation for this node
731    ///
732    /// Returns None if no identity is configured.
733    pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
734        self.identity
735            .as_ref()
736            .map(|id| id.create_attestation(now_ms))
737    }
738
739    /// Verify and register a peer's identity attestation
740    ///
741    /// Implements TOFU (Trust On First Use):
742    /// - On first contact, registers the node_id → public_key binding
743    /// - On subsequent contacts, verifies the public key matches
744    ///
745    /// Returns the verification result. Security violations should be handled
746    /// by the caller (e.g., disconnect, alert).
747    pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
748        self.identity_registry
749            .lock()
750            .unwrap()
751            .verify_or_register(attestation)
752    }
753
754    /// Check if a peer's identity is known (has been registered)
755    pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
756        self.identity_registry.lock().unwrap().is_known(node_id)
757    }
758
759    /// Get a peer's public key if known
760    pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
761        self.identity_registry
762            .lock()
763            .unwrap()
764            .get_public_key(node_id)
765            .copied()
766    }
767
768    /// Get the number of known peer identities
769    pub fn known_identity_count(&self) -> usize {
770        self.identity_registry.lock().unwrap().len()
771    }
772
773    /// Pre-register a peer's identity (for out-of-band key exchange)
774    ///
775    /// Use this when keys are exchanged through a secure side channel
776    /// (e.g., QR code, NFC tap, or provisioning server).
777    pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
778        self.identity_registry
779            .lock()
780            .unwrap()
781            .pre_register(node_id, public_key, now_ms);
782    }
783
784    /// Remove a peer's identity from the registry
785    ///
786    /// Use with caution - this allows re-registration with a different key.
787    pub fn forget_peer_identity(&self, node_id: NodeId) {
788        self.identity_registry.lock().unwrap().remove(node_id);
789    }
790
791    /// Sign arbitrary data with this node's identity
792    ///
793    /// Returns None if no identity is configured.
794    pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
795        self.identity.as_ref().map(|id| id.sign(data))
796    }
797
798    /// Verify a signature from a peer
799    ///
800    /// Uses the peer's public key from the identity registry.
801    /// Returns false if peer is unknown or signature is invalid.
802    pub fn verify_peer_signature(
803        &self,
804        node_id: NodeId,
805        data: &[u8],
806        signature: &[u8; 64],
807    ) -> bool {
808        if let Some(public_key) = self.peer_public_key(node_id) {
809            crate::security::verify_signature(&public_key, data, signature)
810        } else {
811            false
812        }
813    }
814
815    // ==================== Multi-Hop Relay ====================
816
817    /// Check if multi-hop relay is enabled
818    pub fn is_relay_enabled(&self) -> bool {
819        self.config.enable_relay
820    }
821
822    /// Enable multi-hop relay
823    pub fn enable_relay(&mut self) {
824        self.config.enable_relay = true;
825    }
826
827    /// Disable multi-hop relay
828    pub fn disable_relay(&mut self) {
829        self.config.enable_relay = false;
830    }
831
832    /// Check if a message has been seen before (for deduplication)
833    ///
834    /// Returns true if the message was already seen (duplicate).
835    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
836        self.seen_cache.lock().unwrap().has_seen(message_id)
837    }
838
839    /// Mark a message as seen
840    ///
841    /// Returns true if this is a new message (first time seen).
842    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
843        self.seen_cache
844            .lock()
845            .unwrap()
846            .check_and_mark(message_id, origin, now_ms)
847    }
848
849    /// Get the number of entries in the seen message cache
850    pub fn seen_cache_size(&self) -> usize {
851        self.seen_cache.lock().unwrap().len()
852    }
853
854    /// Clear the seen message cache
855    pub fn clear_seen_cache(&self) {
856        self.seen_cache.lock().unwrap().clear();
857    }
858
859    /// Wrap a document in a relay envelope for multi-hop transmission
860    ///
861    /// The returned bytes can be sent to peers and will be automatically
862    /// relayed through the mesh if relay is enabled on receiving nodes.
863    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
864        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
865            .with_max_hops(self.config.max_relay_hops);
866        envelope.encode()
867    }
868
869    /// Get peers to relay a message to
870    ///
871    /// Uses the configured gossip strategy to select relay targets.
872    /// Excludes the source peer (if provided) to avoid sending back to sender.
873    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<HivePeer> {
874        let connected = self.peer_manager.get_connected_peers();
875        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
876            connected
877                .into_iter()
878                .filter(|p| p.node_id != exclude)
879                .collect()
880        } else {
881            connected
882        };
883
884        self.gossip_strategy
885            .select_peers(&filtered)
886            .into_iter()
887            .cloned()
888            .collect()
889    }
890
891    /// Process an incoming relay envelope
892    ///
893    /// Handles deduplication, TTL checking, and determines if the message
894    /// should be processed and/or relayed.
895    ///
896    /// Returns:
897    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
898    /// - `Ok(None)` if message was a duplicate or TTL expired
899    /// - `Err` if parsing failed
900    pub fn process_relay_envelope(
901        &self,
902        data: &[u8],
903        source_peer: NodeId,
904        now_ms: u64,
905    ) -> Option<RelayDecision> {
906        // Parse envelope
907        let envelope = RelayEnvelope::decode(data)?;
908
909        // Update indirect peer graph if origin differs from source
910        // This means the message was relayed through source_peer from origin_node
911        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
912            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
913                source_peer,
914                envelope.origin_node,
915                envelope.hop_count,
916                now_ms,
917            );
918
919            if is_new {
920                log::debug!(
921                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
922                    envelope.origin_node.as_u32(),
923                    source_peer.as_u32(),
924                    envelope.hop_count
925                );
926            }
927        }
928
929        // Check deduplication
930        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
931            // Duplicate message
932            let stats = self
933                .seen_cache
934                .lock()
935                .unwrap()
936                .get_stats(&envelope.message_id);
937            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
938
939            self.notify(HiveEvent::DuplicateMessageDropped {
940                origin_node: envelope.origin_node,
941                seen_count,
942            });
943
944            log::debug!(
945                "Dropped duplicate message {} from {:08X} (seen {} times)",
946                envelope.message_id,
947                envelope.origin_node.as_u32(),
948                seen_count
949            );
950            return None;
951        }
952
953        // Check TTL
954        if !envelope.can_relay() {
955            self.notify(HiveEvent::MessageTtlExpired {
956                origin_node: envelope.origin_node,
957                hop_count: envelope.hop_count,
958            });
959
960            log::debug!(
961                "Message {} from {:08X} TTL expired at hop {}",
962                envelope.message_id,
963                envelope.origin_node.as_u32(),
964                envelope.hop_count
965            );
966
967            // Still process locally even if TTL expired
968            return Some(RelayDecision {
969                payload: envelope.payload,
970                origin_node: envelope.origin_node,
971                hop_count: envelope.hop_count,
972                should_relay: false,
973                relay_envelope: None,
974            });
975        }
976
977        // Determine if we should relay
978        let should_relay = self.config.enable_relay;
979        let relay_envelope = if should_relay {
980            envelope.relay() // Increments hop count
981        } else {
982            None
983        };
984
985        Some(RelayDecision {
986            payload: envelope.payload,
987            origin_node: envelope.origin_node,
988            hop_count: envelope.hop_count,
989            should_relay,
990            relay_envelope,
991        })
992    }
993
994    /// Build a document wrapped in a relay envelope
995    ///
996    /// Convenience method that builds the document, encrypts it (if enabled),
997    /// and wraps it in a relay envelope for multi-hop transmission.
998    pub fn build_relay_document(&self) -> Vec<u8> {
999        let doc = self.build_document(); // Already encrypted if encryption enabled
1000        self.wrap_for_relay(doc)
1001    }
1002
1003    // ==================== Delta Sync ====================
1004
1005    /// Register a peer for delta sync tracking
1006    ///
1007    /// Call this when a peer connects to start tracking what data has been
1008    /// sent to them. This enables future delta sync (sending only changes).
1009    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1010        let mut encoder = self.delta_encoder.lock().unwrap();
1011        encoder.add_peer(peer_id);
1012        log::debug!(
1013            "Registered peer {:08X} for delta sync tracking",
1014            peer_id.as_u32()
1015        );
1016    }
1017
1018    /// Unregister a peer from delta sync tracking
1019    ///
1020    /// Call this when a peer disconnects to clean up tracking state.
1021    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1022        let mut encoder = self.delta_encoder.lock().unwrap();
1023        encoder.remove_peer(peer_id);
1024        log::debug!(
1025            "Unregistered peer {:08X} from delta sync tracking",
1026            peer_id.as_u32()
1027        );
1028    }
1029
1030    /// Reset delta sync state for a peer
1031    ///
1032    /// Call this when a peer reconnects to force a full sync on next
1033    /// communication. This clears the record of what was previously sent.
1034    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1035        let mut encoder = self.delta_encoder.lock().unwrap();
1036        encoder.reset_peer(peer_id);
1037        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1038    }
1039
1040    /// Record bytes sent to a peer (for delta statistics)
1041    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1042        let mut encoder = self.delta_encoder.lock().unwrap();
1043        encoder.record_sent(peer_id, bytes);
1044    }
1045
1046    /// Record bytes received from a peer (for delta statistics)
1047    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1048        let mut encoder = self.delta_encoder.lock().unwrap();
1049        encoder.record_received(peer_id, bytes, timestamp);
1050    }
1051
1052    /// Get delta sync statistics
1053    ///
1054    /// Returns aggregate statistics about delta sync across all peers,
1055    /// including bytes sent/received and sync counts.
1056    pub fn delta_stats(&self) -> DeltaStats {
1057        self.delta_encoder.lock().unwrap().stats()
1058    }
1059
1060    /// Get delta sync statistics for a specific peer
1061    ///
1062    /// Returns the bytes sent/received and sync count for a single peer.
1063    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1064        let encoder = self.delta_encoder.lock().unwrap();
1065        encoder
1066            .get_peer_state(peer_id)
1067            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1068    }
1069
1070    /// Build a delta document for a specific peer
1071    ///
1072    /// This only includes operations that have changed since the last sync
1073    /// with this peer. Uses the delta encoder to track per-peer state.
1074    ///
1075    /// Returns the encoded delta document bytes, or None if there's nothing
1076    /// new to send to this peer.
1077    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1078        // Collect all current operations
1079        let mut all_operations: Vec<Operation> = Vec::new();
1080
1081        // Add counter operations (one per node that has contributed)
1082        // Use the count value as the "timestamp" for tracking - only send if count increased
1083        for (node_id_u32, count) in self.document_sync.counter_entries() {
1084            all_operations.push(Operation::IncrementCounter {
1085                counter_id: 0, // Default mesh counter
1086                node_id: NodeId::new(node_id_u32),
1087                amount: count,
1088                timestamp: count, // Use count as timestamp for delta tracking
1089            });
1090        }
1091
1092        // Add peripheral update
1093        // Use event timestamp if available, otherwise use 1 for initial send
1094        let peripheral = self.document_sync.peripheral_snapshot();
1095        let peripheral_timestamp = peripheral
1096            .last_event
1097            .as_ref()
1098            .map(|e| e.timestamp)
1099            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
1100        all_operations.push(Operation::UpdatePeripheral {
1101            peripheral,
1102            timestamp: peripheral_timestamp,
1103        });
1104
1105        // Add emergency operations if active
1106        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1107            let source_node = NodeId::new(emergency.source_node());
1108            let timestamp = emergency.timestamp();
1109
1110            // Add SetEmergency operation
1111            all_operations.push(Operation::SetEmergency {
1112                source_node,
1113                timestamp,
1114                known_peers: emergency.all_nodes(),
1115            });
1116
1117            // Add AckEmergency for each node that has acked
1118            for acked_node in emergency.acked_nodes() {
1119                all_operations.push(Operation::AckEmergency {
1120                    node_id: NodeId::new(acked_node),
1121                    emergency_timestamp: timestamp,
1122                });
1123            }
1124        }
1125
1126        // Add app document operations
1127        for app_op in self.app_document_delta_ops() {
1128            all_operations.push(Operation::App(app_op));
1129        }
1130
1131        // Filter operations for this peer (only send what's new)
1132        let filtered_operations: Vec<Operation> = {
1133            let encoder = self.delta_encoder.lock().unwrap();
1134            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1135                all_operations
1136                    .into_iter()
1137                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1138                    .collect()
1139            } else {
1140                // Unknown peer, send all operations
1141                all_operations
1142            }
1143        };
1144
1145        // If nothing new to send, return None
1146        if filtered_operations.is_empty() {
1147            return None;
1148        }
1149
1150        // Mark operations as sent
1151        {
1152            let mut encoder = self.delta_encoder.lock().unwrap();
1153            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1154                for op in &filtered_operations {
1155                    peer_state.mark_sent(&op.key(), op.timestamp());
1156                }
1157            }
1158        }
1159
1160        // Build the delta document
1161        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1162        for op in filtered_operations {
1163            delta.add_operation(op);
1164        }
1165
1166        // Encode and optionally encrypt
1167        let encoded = delta.encode();
1168        let result = self.encrypt_document(&encoded);
1169
1170        // Record stats
1171        {
1172            let mut encoder = self.delta_encoder.lock().unwrap();
1173            encoder.record_sent(peer_id, result.len());
1174        }
1175
1176        Some(result)
1177    }
1178
1179    /// Build a full delta document (for broadcast or new peers)
1180    ///
1181    /// Unlike `build_delta_document_for_peer`, this includes all state
1182    /// regardless of what has been sent before. Use this for broadcasts.
1183    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1184        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1185
1186        // Add all counter operations
1187        for (node_id_u32, count) in self.document_sync.counter_entries() {
1188            delta.add_operation(Operation::IncrementCounter {
1189                counter_id: 0,
1190                node_id: NodeId::new(node_id_u32),
1191                amount: count,
1192                timestamp: now_ms,
1193            });
1194        }
1195
1196        // Add peripheral
1197        let peripheral = self.document_sync.peripheral_snapshot();
1198        let peripheral_timestamp = peripheral
1199            .last_event
1200            .as_ref()
1201            .map(|e| e.timestamp)
1202            .unwrap_or(now_ms);
1203        delta.add_operation(Operation::UpdatePeripheral {
1204            peripheral,
1205            timestamp: peripheral_timestamp,
1206        });
1207
1208        // Add emergency if active
1209        if let Some(emergency) = self.document_sync.emergency_snapshot() {
1210            let source_node = NodeId::new(emergency.source_node());
1211            let timestamp = emergency.timestamp();
1212
1213            delta.add_operation(Operation::SetEmergency {
1214                source_node,
1215                timestamp,
1216                known_peers: emergency.all_nodes(),
1217            });
1218
1219            for acked_node in emergency.acked_nodes() {
1220                delta.add_operation(Operation::AckEmergency {
1221                    node_id: NodeId::new(acked_node),
1222                    emergency_timestamp: timestamp,
1223                });
1224            }
1225        }
1226
1227        // Add app document operations
1228        for app_op in self.app_document_delta_ops() {
1229            delta.add_operation(Operation::App(app_op));
1230        }
1231
1232        let encoded = delta.encode();
1233        self.encrypt_document(&encoded)
1234    }
1235
1236    /// Internal: Process a received delta document
1237    ///
1238    /// Applies operations from a delta document to local state.
1239    fn process_delta_document_internal(
1240        &self,
1241        source_node: NodeId,
1242        data: &[u8],
1243        now_ms: u64,
1244        relay_data: Option<Vec<u8>>,
1245        origin_node: Option<NodeId>,
1246        hop_count: u8,
1247    ) -> Option<DataReceivedResult> {
1248        // Decode the delta document
1249        let delta = DeltaDocument::decode(data)?;
1250
1251        // Don't process our own documents
1252        if delta.origin_node == self.config.node_id {
1253            return None;
1254        }
1255
1256        // Apply operations to local state
1257        let mut counter_changed = false;
1258        let mut emergency_changed = false;
1259        let mut is_emergency = false;
1260        let mut is_ack = false;
1261        let mut event_timestamp = 0u64;
1262        let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1263
1264        log::info!(
1265            "Delta document from {:08X}: {} operations, data_len={}",
1266            delta.origin_node.as_u32(),
1267            delta.operations.len(),
1268            data.len()
1269        );
1270        for op in &delta.operations {
1271            log::info!("  Operation: {}", op.key());
1272            match op {
1273                Operation::IncrementCounter {
1274                    node_id, amount, ..
1275                } => {
1276                    // Merge counter value (take max)
1277                    let current = self.document_sync.counter_entries();
1278                    let current_value = current
1279                        .iter()
1280                        .find(|(id, _)| *id == node_id.as_u32())
1281                        .map(|(_, v)| *v)
1282                        .unwrap_or(0);
1283
1284                    if *amount > current_value {
1285                        // Need to merge - this is handled by the counter merge logic
1286                        // For now, we record that counter changed
1287                        counter_changed = true;
1288                    }
1289                }
1290                Operation::UpdatePeripheral {
1291                    peripheral,
1292                    timestamp,
1293                } => {
1294                    // Store peer peripheral for callsign lookup
1295                    if let Ok(mut peripherals) = self.peer_peripherals.write() {
1296                        peripherals.insert(delta.origin_node, peripheral.clone());
1297                    }
1298                    // Track the peripheral for the result
1299                    peer_peripheral = Some(peripheral.clone());
1300                    // Track the timestamp for the result
1301                    if *timestamp > event_timestamp {
1302                        event_timestamp = *timestamp;
1303                    }
1304                }
1305                Operation::SetEmergency { timestamp, .. } => {
1306                    is_emergency = true;
1307                    emergency_changed = true;
1308                    event_timestamp = *timestamp;
1309                }
1310                Operation::AckEmergency {
1311                    emergency_timestamp,
1312                    ..
1313                } => {
1314                    is_ack = true;
1315                    emergency_changed = true;
1316                    if *emergency_timestamp > event_timestamp {
1317                        event_timestamp = *emergency_timestamp;
1318                    }
1319                }
1320                Operation::ClearEmergency {
1321                    emergency_timestamp,
1322                } => {
1323                    emergency_changed = true;
1324                    if *emergency_timestamp > event_timestamp {
1325                        event_timestamp = *emergency_timestamp;
1326                    }
1327                }
1328                Operation::App(app_op) => {
1329                    // Handle app-layer document operation
1330                    //
1331                    // The timestamp field may contain version info in the upper bits
1332                    // (used by delta encoder for change detection). Extract the
1333                    // original document timestamp from the lower 48 bits.
1334                    let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1335
1336                    log::info!(
1337                        "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1338                        app_op.type_id,
1339                        app_op.op_code,
1340                        app_op.source_node,
1341                        doc_timestamp,
1342                        app_op.payload.len()
1343                    );
1344
1345                    // Try to find/create document and apply the delta operation
1346                    let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1347                    let changed = {
1348                        let mut docs = self.app_documents.write().unwrap();
1349
1350                        if let Some(existing) = docs.get_mut(&doc_key) {
1351                            // Apply delta to existing document (merge via CRDT)
1352                            self.document_registry.apply_delta_op(
1353                                app_op.type_id,
1354                                existing.as_mut(),
1355                                app_op,
1356                            )
1357                        } else {
1358                            // Try to decode from payload (for FULL_STATE operations)
1359                            if let Some(decoded) = self
1360                                .document_registry
1361                                .decode(app_op.type_id, &app_op.payload)
1362                            {
1363                                docs.insert(doc_key, decoded);
1364                                true
1365                            } else {
1366                                // For delta ops on unknown documents, we can't apply
1367                                // The full state will be sent later
1368                                log::debug!(
1369                                    "Received delta for unknown doc {:?}, waiting for full state",
1370                                    doc_key
1371                                );
1372                                false
1373                            }
1374                        }
1375                    };
1376
1377                    // Emit observer event with the real document timestamp
1378                    self.observers.notify(HiveEvent::app_document_received(
1379                        app_op.type_id,
1380                        NodeId::new(app_op.source_node),
1381                        doc_timestamp,
1382                        changed,
1383                    ));
1384                }
1385            }
1386        }
1387
1388        // Record sync
1389        self.peer_manager.record_sync(source_node, now_ms);
1390
1391        // Record delta received
1392        {
1393            let mut encoder = self.delta_encoder.lock().unwrap();
1394            encoder.record_received(&source_node, data.len(), now_ms);
1395        }
1396
1397        // Generate events based on what was received
1398        if is_emergency {
1399            self.notify(HiveEvent::EmergencyReceived {
1400                from_node: delta.origin_node,
1401            });
1402        } else if is_ack {
1403            self.notify(HiveEvent::AckReceived {
1404                from_node: delta.origin_node,
1405            });
1406        }
1407
1408        if counter_changed {
1409            let total_count = self.document_sync.total_count();
1410            self.notify(HiveEvent::DocumentSynced {
1411                from_node: delta.origin_node,
1412                total_count,
1413            });
1414        }
1415
1416        // Emit relay event if we're relaying
1417        if relay_data.is_some() {
1418            let relay_targets = self.get_relay_targets(Some(source_node));
1419            self.notify(HiveEvent::MessageRelayed {
1420                origin_node: origin_node.unwrap_or(delta.origin_node),
1421                relay_count: relay_targets.len(),
1422                hop_count,
1423            });
1424        }
1425
1426        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1427            DataReceivedResult::peripheral_fields(&peer_peripheral);
1428
1429        Some(DataReceivedResult {
1430            source_node: delta.origin_node,
1431            is_emergency,
1432            is_ack,
1433            counter_changed,
1434            emergency_changed,
1435            total_count: self.document_sync.total_count(),
1436            event_timestamp,
1437            relay_data,
1438            origin_node,
1439            hop_count,
1440            callsign,
1441            battery_percent,
1442            heart_rate,
1443            event_type,
1444            latitude,
1445            longitude,
1446            altitude,
1447        })
1448    }
1449
1450    // ==================== Per-Peer E2EE ====================
1451
1452    /// Enable per-peer E2EE capability
1453    ///
1454    /// Creates a new identity key for this node. This allows establishing
1455    /// encrypted sessions with specific peers where only the sender and
1456    /// recipient can read messages (other mesh members cannot).
1457    pub fn enable_peer_e2ee(&self) {
1458        let mut sessions = self.peer_sessions.lock().unwrap();
1459        if sessions.is_none() {
1460            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1461            log::info!(
1462                "Per-peer E2EE enabled for node {:08X}",
1463                self.config.node_id.as_u32()
1464            );
1465        }
1466    }
1467
1468    /// Disable per-peer E2EE capability
1469    ///
1470    /// Clears all peer sessions and disables E2EE.
1471    pub fn disable_peer_e2ee(&self) {
1472        let mut sessions = self.peer_sessions.lock().unwrap();
1473        *sessions = None;
1474        log::info!("Per-peer E2EE disabled");
1475    }
1476
1477    /// Check if per-peer E2EE is enabled
1478    pub fn is_peer_e2ee_enabled(&self) -> bool {
1479        self.peer_sessions.lock().unwrap().is_some()
1480    }
1481
1482    /// Get our E2EE public key (for sharing with peers)
1483    ///
1484    /// Returns None if per-peer E2EE is not enabled.
1485    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1486        self.peer_sessions
1487            .lock()
1488            .unwrap()
1489            .as_ref()
1490            .map(|s| s.our_public_key())
1491    }
1492
1493    /// Initiate E2EE session with a specific peer
1494    ///
1495    /// Returns the key exchange message bytes to send to the peer.
1496    /// The message should be broadcast/sent to the peer.
1497    /// Returns None if per-peer E2EE is not enabled.
1498    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1499        let mut sessions = self.peer_sessions.lock().unwrap();
1500        let session_mgr = sessions.as_mut()?;
1501
1502        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1503        let mut buf = Vec::with_capacity(2 + 37);
1504        buf.push(KEY_EXCHANGE_MARKER);
1505        buf.push(0x00); // reserved
1506        buf.extend_from_slice(&key_exchange.encode());
1507
1508        log::info!(
1509            "Initiated E2EE session with peer {:08X}",
1510            peer_node_id.as_u32()
1511        );
1512        Some(buf)
1513    }
1514
1515    /// Check if we have an established E2EE session with a peer
1516    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1517        self.peer_sessions
1518            .lock()
1519            .unwrap()
1520            .as_ref()
1521            .is_some_and(|s| s.has_session(peer_node_id))
1522    }
1523
1524    /// Get E2EE session state with a peer
1525    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1526        self.peer_sessions
1527            .lock()
1528            .unwrap()
1529            .as_ref()
1530            .and_then(|s| s.session_state(peer_node_id))
1531    }
1532
1533    /// Send an E2EE encrypted message to a specific peer
1534    ///
1535    /// Returns the encrypted message bytes to send, or None if no session exists.
1536    /// The message should be sent directly to the peer (not broadcast).
1537    pub fn send_peer_e2ee(
1538        &self,
1539        peer_node_id: NodeId,
1540        plaintext: &[u8],
1541        now_ms: u64,
1542    ) -> Option<Vec<u8>> {
1543        let mut sessions = self.peer_sessions.lock().unwrap();
1544        let session_mgr = sessions.as_mut()?;
1545
1546        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1547            Ok(encrypted) => {
1548                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1549                buf.push(PEER_E2EE_MARKER);
1550                buf.push(0x00); // reserved
1551                buf.extend_from_slice(&encrypted.encode());
1552                Some(buf)
1553            }
1554            Err(e) => {
1555                log::warn!(
1556                    "Failed to encrypt for peer {:08X}: {:?}",
1557                    peer_node_id.as_u32(),
1558                    e
1559                );
1560                None
1561            }
1562        }
1563    }
1564
1565    /// Close E2EE session with a peer
1566    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1567        let mut sessions = self.peer_sessions.lock().unwrap();
1568        if let Some(session_mgr) = sessions.as_mut() {
1569            session_mgr.close_session(peer_node_id);
1570            self.notify(HiveEvent::PeerE2eeClosed { peer_node_id });
1571            log::info!(
1572                "Closed E2EE session with peer {:08X}",
1573                peer_node_id.as_u32()
1574            );
1575        }
1576    }
1577
1578    /// Get count of active E2EE sessions
1579    pub fn peer_e2ee_session_count(&self) -> usize {
1580        self.peer_sessions
1581            .lock()
1582            .unwrap()
1583            .as_ref()
1584            .map(|s| s.session_count())
1585            .unwrap_or(0)
1586    }
1587
1588    /// Get count of established E2EE sessions
1589    pub fn peer_e2ee_established_count(&self) -> usize {
1590        self.peer_sessions
1591            .lock()
1592            .unwrap()
1593            .as_ref()
1594            .map(|s| s.established_count())
1595            .unwrap_or(0)
1596    }
1597
1598    /// Handle incoming key exchange message
1599    ///
1600    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1601    /// Returns the response key exchange bytes to send back, or None if invalid.
1602    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1603        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1604            return None;
1605        }
1606
1607        let payload = &data[2..];
1608        let msg = KeyExchangeMessage::decode(payload)?;
1609
1610        let mut sessions = self.peer_sessions.lock().unwrap();
1611        let session_mgr = sessions.as_mut()?;
1612
1613        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1614
1615        if established {
1616            self.notify(HiveEvent::PeerE2eeEstablished {
1617                peer_node_id: msg.sender_node_id,
1618            });
1619            log::info!(
1620                "E2EE session established with peer {:08X}",
1621                msg.sender_node_id.as_u32()
1622            );
1623        }
1624
1625        // Return response key exchange
1626        let mut buf = Vec::with_capacity(2 + 37);
1627        buf.push(KEY_EXCHANGE_MARKER);
1628        buf.push(0x00);
1629        buf.extend_from_slice(&response.encode());
1630        Some(buf)
1631    }
1632
1633    /// Handle incoming E2EE encrypted message
1634    ///
1635    /// Called internally when we receive a PEER_E2EE_MARKER message.
1636    /// Decrypts and notifies observers of the received message.
1637    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1638        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1639            return None;
1640        }
1641
1642        let payload = &data[2..];
1643        let msg = PeerEncryptedMessage::decode(payload)?;
1644
1645        let mut sessions = self.peer_sessions.lock().unwrap();
1646        let session_mgr = sessions.as_mut()?;
1647
1648        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1649            Ok(plaintext) => {
1650                // Notify observers of the decrypted message
1651                self.notify(HiveEvent::PeerE2eeMessageReceived {
1652                    from_node: msg.sender_node_id,
1653                    data: plaintext.clone(),
1654                });
1655                Some(plaintext)
1656            }
1657            Err(e) => {
1658                log::warn!(
1659                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1660                    msg.sender_node_id.as_u32(),
1661                    e
1662                );
1663                None
1664            }
1665        }
1666    }
1667
1668    // ==================== Configuration ====================
1669
1670    /// Get our node ID
1671    pub fn node_id(&self) -> NodeId {
1672        self.config.node_id
1673    }
1674
1675    /// Get our callsign
1676    pub fn callsign(&self) -> &str {
1677        &self.config.callsign
1678    }
1679
1680    /// Get the mesh ID
1681    pub fn mesh_id(&self) -> &str {
1682        &self.config.mesh_id
1683    }
1684
1685    /// Get the device name for BLE advertising
1686    pub fn device_name(&self) -> String {
1687        format!(
1688            "HIVE_{}-{:08X}",
1689            self.config.mesh_id,
1690            self.config.node_id.as_u32()
1691        )
1692    }
1693
1694    /// Get a peer's callsign by node ID
1695    ///
1696    /// Returns the callsign from the peer's most recently received peripheral data,
1697    /// or None if no peripheral data has been received from this peer.
1698    pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1699        self.peer_peripherals.read().ok().and_then(|peripherals| {
1700            peripherals
1701                .get(&node_id)
1702                .map(|p| p.callsign_str().to_string())
1703        })
1704    }
1705
1706    /// Get a peer's full peripheral data by node ID
1707    ///
1708    /// Returns a clone of the peripheral data from the peer's most recently received
1709    /// document, or None if no peripheral data has been received from this peer.
1710    pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1711        self.peer_peripherals
1712            .read()
1713            .ok()
1714            .and_then(|peripherals| peripherals.get(&node_id).cloned())
1715    }
1716
1717    // ==================== Document Registry ====================
1718
1719    /// Get a reference to the document registry.
1720    ///
1721    /// Use this to register custom document types for mesh synchronization.
1722    ///
1723    /// # Example
1724    ///
1725    /// ```ignore
1726    /// use hive_btle::registry::DocumentType;
1727    ///
1728    /// // Register a custom document type
1729    /// mesh.document_registry().register::<MyCustomDocument>();
1730    /// ```
1731    pub fn document_registry(&self) -> &DocumentRegistry {
1732        &self.document_registry
1733    }
1734
1735    /// Store an app-layer document.
1736    ///
1737    /// If a document with the same identity (type_id, source_node, timestamp)
1738    /// already exists, it will be merged using CRDT semantics.
1739    ///
1740    /// Returns true if the document was newly added or changed via merge.
1741    pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1742        let type_id = T::TYPE_ID;
1743        let (source_node, timestamp) = doc.identity();
1744        let key = (type_id, source_node, timestamp);
1745
1746        let mut docs = self.app_documents.write().unwrap();
1747
1748        if let Some(existing) = docs.get_mut(&key) {
1749            // Merge with existing document
1750            self.document_registry
1751                .merge(type_id, existing.as_mut(), &doc)
1752        } else {
1753            // Insert new document
1754            docs.insert(key, Box::new(doc));
1755            true
1756        }
1757    }
1758
1759    /// Store a type-erased app-layer document.
1760    ///
1761    /// Used when receiving documents from the network where the type is
1762    /// determined at runtime.
1763    ///
1764    /// Returns true if the document was newly added or changed via merge.
1765    pub fn store_app_document_boxed(
1766        &self,
1767        type_id: u8,
1768        source_node: u32,
1769        timestamp: u64,
1770        doc: Box<dyn core::any::Any + Send + Sync>,
1771    ) -> bool {
1772        let key = (type_id, source_node, timestamp);
1773
1774        let mut docs = self.app_documents.write().unwrap();
1775
1776        if let Some(existing) = docs.get_mut(&key) {
1777            // Merge with existing document
1778            self.document_registry
1779                .merge(type_id, existing.as_mut(), doc.as_ref())
1780        } else {
1781            // Insert new document
1782            docs.insert(key, doc);
1783            true
1784        }
1785    }
1786
1787    /// Get a stored app-layer document by identity.
1788    ///
1789    /// Returns None if not found or if downcast to T fails.
1790    pub fn get_app_document<T: crate::registry::DocumentType>(
1791        &self,
1792        source_node: u32,
1793        timestamp: u64,
1794    ) -> Option<T> {
1795        let key = (T::TYPE_ID, source_node, timestamp);
1796
1797        let docs = self.app_documents.read().unwrap();
1798        docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1799    }
1800
1801    /// Get all stored app-layer documents of a specific type.
1802    ///
1803    /// Returns a vector of all documents of type T currently stored.
1804    pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1805        let docs = self.app_documents.read().unwrap();
1806        docs.iter()
1807            .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1808            .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1809            .collect()
1810    }
1811
1812    /// Get delta operations for all stored app documents.
1813    ///
1814    /// Used during sync to include app documents in the delta stream.
1815    pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
1816        let docs = self.app_documents.read().unwrap();
1817        let mut ops = Vec::new();
1818
1819        for ((type_id, _source, _ts), doc) in docs.iter() {
1820            if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
1821                ops.push(op);
1822            }
1823        }
1824
1825        ops
1826    }
1827
1828    /// Get all document keys for a given type.
1829    ///
1830    /// Returns (source_node, timestamp) pairs for all documents of the specified type.
1831    pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
1832        let docs = self.app_documents.read().unwrap();
1833        docs.keys()
1834            .filter(|(tid, _, _)| *tid == type_id)
1835            .map(|(_, source, ts)| (*source, *ts))
1836            .collect()
1837    }
1838
1839    /// Get the number of stored app documents.
1840    pub fn app_document_count(&self) -> usize {
1841        self.app_documents.read().unwrap().len()
1842    }
1843
1844    // ==================== Observer Management ====================
1845
1846    /// Add an observer for mesh events
1847    pub fn add_observer(&self, observer: Arc<dyn HiveObserver>) {
1848        self.observers.add(observer);
1849    }
1850
1851    /// Remove an observer
1852    pub fn remove_observer(&self, observer: &Arc<dyn HiveObserver>) {
1853        self.observers.remove(observer);
1854    }
1855
1856    // ==================== User Actions ====================
1857
1858    /// Send an emergency alert
1859    ///
1860    /// Returns the document bytes to broadcast to all peers.
1861    /// If encryption is enabled, the document is encrypted.
1862    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1863        let data = self.document_sync.send_emergency(timestamp);
1864        self.notify(HiveEvent::MeshStateChanged {
1865            peer_count: self.peer_manager.peer_count(),
1866            connected_count: self.peer_manager.connected_count(),
1867        });
1868        self.encrypt_document(&data)
1869    }
1870
1871    /// Send an ACK response
1872    ///
1873    /// Returns the document bytes to broadcast to all peers.
1874    /// If encryption is enabled, the document is encrypted.
1875    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1876        let data = self.document_sync.send_ack(timestamp);
1877        self.notify(HiveEvent::MeshStateChanged {
1878            peer_count: self.peer_manager.peer_count(),
1879            connected_count: self.peer_manager.connected_count(),
1880        });
1881        self.encrypt_document(&data)
1882    }
1883
1884    /// Broadcast arbitrary bytes over the mesh.
1885    ///
1886    /// Takes raw payload bytes, encrypts them (if encryption is enabled),
1887    /// and returns bytes ready to send to all connected peers.
1888    ///
1889    /// This is useful for sending extension data like CannedMessages from hive-lite.
1890    pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
1891        self.encrypt_document(payload)
1892    }
1893
1894    /// Clear the current event (emergency or ack)
1895    pub fn clear_event(&self) {
1896        self.document_sync.clear_event();
1897    }
1898
1899    /// Check if emergency is active
1900    pub fn is_emergency_active(&self) -> bool {
1901        self.document_sync.is_emergency_active()
1902    }
1903
1904    /// Check if ACK is active
1905    pub fn is_ack_active(&self) -> bool {
1906        self.document_sync.is_ack_active()
1907    }
1908
1909    /// Get current event type
1910    pub fn current_event(&self) -> Option<EventType> {
1911        self.document_sync.current_event()
1912    }
1913
1914    // ==================== Emergency Management (Document-Based) ====================
1915
1916    /// Start a new emergency event with ACK tracking
1917    ///
1918    /// Creates an emergency event that tracks ACKs from all known peers.
1919    /// Pass the list of known peer node IDs to track.
1920    /// Returns the document bytes to broadcast.
1921    /// If encryption is enabled, the document is encrypted.
1922    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1923        let data = self.document_sync.start_emergency(timestamp, known_peers);
1924        self.notify(HiveEvent::MeshStateChanged {
1925            peer_count: self.peer_manager.peer_count(),
1926            connected_count: self.peer_manager.connected_count(),
1927        });
1928        self.encrypt_document(&data)
1929    }
1930
1931    /// Start a new emergency using all currently known peers
1932    ///
1933    /// Convenience method that automatically includes all discovered peers.
1934    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1935        let peers: Vec<u32> = self
1936            .peer_manager
1937            .get_peers()
1938            .iter()
1939            .map(|p| p.node_id.as_u32())
1940            .collect();
1941        self.start_emergency(timestamp, &peers)
1942    }
1943
1944    /// Record our ACK for the current emergency
1945    ///
1946    /// Returns the document bytes to broadcast, or None if no emergency is active.
1947    /// If encryption is enabled, the document is encrypted.
1948    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1949        let result = self.document_sync.ack_emergency(timestamp);
1950        if result.is_some() {
1951            self.notify(HiveEvent::MeshStateChanged {
1952                peer_count: self.peer_manager.peer_count(),
1953                connected_count: self.peer_manager.connected_count(),
1954            });
1955        }
1956        result.map(|data| self.encrypt_document(&data))
1957    }
1958
1959    /// Clear the current emergency event
1960    pub fn clear_emergency(&self) {
1961        self.document_sync.clear_emergency();
1962    }
1963
1964    /// Check if there's an active emergency
1965    pub fn has_active_emergency(&self) -> bool {
1966        self.document_sync.has_active_emergency()
1967    }
1968
1969    /// Get emergency status info
1970    ///
1971    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
1972    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1973        self.document_sync.get_emergency_status()
1974    }
1975
1976    /// Check if a specific peer has ACKed the current emergency
1977    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1978        self.document_sync.has_peer_acked(peer_id)
1979    }
1980
1981    /// Check if all peers have ACKed the current emergency
1982    pub fn all_peers_acked(&self) -> bool {
1983        self.document_sync.all_peers_acked()
1984    }
1985
1986    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
1987
1988    /// Send a chat message
1989    ///
1990    /// Adds the message to the local CRDT and returns the document bytes
1991    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
1992    ///
1993    /// Returns the encrypted document bytes if the message was new,
1994    /// or None if it was a duplicate.
1995    #[cfg(feature = "legacy-chat")]
1996    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1997        if self.document_sync.add_chat_message(sender, text, timestamp) {
1998            Some(self.encrypt_document(&self.build_document()))
1999        } else {
2000            None
2001        }
2002    }
2003
2004    /// Send a chat reply
2005    ///
2006    /// Adds the reply to the local CRDT with reply-to information and returns
2007    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
2008    ///
2009    /// Returns the encrypted document bytes if the message was new,
2010    /// or None if it was a duplicate.
2011    #[cfg(feature = "legacy-chat")]
2012    pub fn send_chat_reply(
2013        &self,
2014        sender: &str,
2015        text: &str,
2016        reply_to_node: u32,
2017        reply_to_timestamp: u64,
2018        timestamp: u64,
2019    ) -> Option<Vec<u8>> {
2020        if self.document_sync.add_chat_reply(
2021            sender,
2022            text,
2023            reply_to_node,
2024            reply_to_timestamp,
2025            timestamp,
2026        ) {
2027            Some(self.encrypt_document(&self.build_document()))
2028        } else {
2029            None
2030        }
2031    }
2032
2033    /// Get the number of chat messages in the local CRDT
2034    #[cfg(feature = "legacy-chat")]
2035    pub fn chat_count(&self) -> usize {
2036        self.document_sync.chat_count()
2037    }
2038
2039    /// Get chat messages newer than a timestamp
2040    ///
2041    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2042    #[cfg(feature = "legacy-chat")]
2043    pub fn chat_messages_since(
2044        &self,
2045        since_timestamp: u64,
2046    ) -> Vec<(u32, u64, String, String, u32, u64)> {
2047        self.document_sync.chat_messages_since(since_timestamp)
2048    }
2049
2050    /// Get all chat messages
2051    ///
2052    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
2053    #[cfg(feature = "legacy-chat")]
2054    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2055        self.document_sync.all_chat_messages()
2056    }
2057
2058    // ==================== BLE Callbacks (Platform -> Mesh) ====================
2059
2060    /// Called when a BLE device is discovered
2061    ///
2062    /// Returns `Some(HivePeer)` if this is a new HIVE peer on our mesh.
2063    pub fn on_ble_discovered(
2064        &self,
2065        identifier: &str,
2066        name: Option<&str>,
2067        rssi: i8,
2068        mesh_id: Option<&str>,
2069        now_ms: u64,
2070    ) -> Option<HivePeer> {
2071        let (node_id, is_new) = self
2072            .peer_manager
2073            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2074
2075        let peer = self.peer_manager.get_peer(node_id)?;
2076
2077        // Update connection graph
2078        {
2079            let mut graph = self.connection_graph.lock().unwrap();
2080            graph.on_discovered(
2081                node_id,
2082                identifier.to_string(),
2083                name.map(|s| s.to_string()),
2084                mesh_id.map(|s| s.to_string()),
2085                rssi,
2086                now_ms,
2087            );
2088        }
2089
2090        if is_new {
2091            self.notify(HiveEvent::PeerDiscovered { peer: peer.clone() });
2092            self.notify_mesh_state_changed();
2093        }
2094
2095        Some(peer)
2096    }
2097
2098    /// Called when a BLE connection is established (outgoing)
2099    ///
2100    /// Returns the NodeId if this identifier is known.
2101    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2102        let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2103            Some(id) => id,
2104            None => {
2105                log::warn!(
2106                    "on_ble_connected: identifier {:?} not in peer map — \
2107                     use on_incoming_connection() for peripheral connections",
2108                    identifier
2109                );
2110                return None;
2111            }
2112        };
2113
2114        // Update connection graph
2115        {
2116            let mut graph = self.connection_graph.lock().unwrap();
2117            graph.on_connected(node_id, now_ms);
2118        }
2119
2120        // Register peer for delta sync tracking
2121        self.register_peer_for_delta(&node_id);
2122
2123        self.notify(HiveEvent::PeerConnected { node_id });
2124        self.notify_mesh_state_changed();
2125        Some(node_id)
2126    }
2127
2128    /// Called when a BLE connection is lost
2129    pub fn on_ble_disconnected(
2130        &self,
2131        identifier: &str,
2132        reason: DisconnectReason,
2133    ) -> Option<NodeId> {
2134        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2135
2136        // Update connection graph (convert observer reason to platform reason)
2137        {
2138            let mut graph = self.connection_graph.lock().unwrap();
2139            let platform_reason = match observer_reason {
2140                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2141                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2142                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2143                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2144                DisconnectReason::ConnectionFailed => {
2145                    crate::platform::DisconnectReason::ConnectionFailed
2146                }
2147                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2148            };
2149            let now_ms = std::time::SystemTime::now()
2150                .duration_since(std::time::UNIX_EPOCH)
2151                .map(|d| d.as_millis() as u64)
2152                .unwrap_or(0);
2153            graph.on_disconnected(node_id, platform_reason, now_ms);
2154
2155            // Remove indirect peer paths that went through this peer
2156            // These paths may no longer be valid since the direct connection is lost
2157            graph.remove_via_peer(node_id);
2158        }
2159
2160        // Unregister peer from delta sync tracking
2161        self.unregister_peer_for_delta(&node_id);
2162
2163        self.notify(HiveEvent::PeerDisconnected {
2164            node_id,
2165            reason: observer_reason,
2166        });
2167        self.notify_mesh_state_changed();
2168        Some(node_id)
2169    }
2170
2171    /// Called when a BLE connection is lost, using NodeId directly
2172    ///
2173    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
2174    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2175        if self
2176            .peer_manager
2177            .on_disconnected_by_node_id(node_id, reason)
2178        {
2179            // Update connection graph
2180            {
2181                let mut graph = self.connection_graph.lock().unwrap();
2182                let platform_reason = match reason {
2183                    DisconnectReason::LocalRequest => {
2184                        crate::platform::DisconnectReason::LocalRequest
2185                    }
2186                    DisconnectReason::RemoteRequest => {
2187                        crate::platform::DisconnectReason::RemoteRequest
2188                    }
2189                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2190                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2191                    DisconnectReason::ConnectionFailed => {
2192                        crate::platform::DisconnectReason::ConnectionFailed
2193                    }
2194                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2195                };
2196                let now_ms = std::time::SystemTime::now()
2197                    .duration_since(std::time::UNIX_EPOCH)
2198                    .map(|d| d.as_millis() as u64)
2199                    .unwrap_or(0);
2200                graph.on_disconnected(node_id, platform_reason, now_ms);
2201
2202                // Remove indirect peer paths that went through this peer
2203                graph.remove_via_peer(node_id);
2204            }
2205
2206            // Unregister peer from delta sync tracking
2207            self.unregister_peer_for_delta(&node_id);
2208
2209            self.notify(HiveEvent::PeerDisconnected { node_id, reason });
2210            self.notify_mesh_state_changed();
2211        }
2212    }
2213
2214    /// Called when a remote device connects to us (incoming connection)
2215    ///
2216    /// Use this when we're acting as a peripheral and a central connects to us.
2217    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2218        let is_new = self
2219            .peer_manager
2220            .on_incoming_connection(identifier, node_id, now_ms);
2221
2222        // Update connection graph
2223        {
2224            let mut graph = self.connection_graph.lock().unwrap();
2225            if is_new {
2226                graph.on_discovered(
2227                    node_id,
2228                    identifier.to_string(),
2229                    None,
2230                    Some(self.config.mesh_id.clone()),
2231                    -50, // Default good RSSI for incoming connections
2232                    now_ms,
2233                );
2234            }
2235            graph.on_connected(node_id, now_ms);
2236        }
2237
2238        // Register peer for delta sync tracking
2239        self.register_peer_for_delta(&node_id);
2240
2241        if is_new {
2242            if let Some(peer) = self.peer_manager.get_peer(node_id) {
2243                self.notify(HiveEvent::PeerDiscovered { peer });
2244            }
2245        }
2246
2247        self.notify(HiveEvent::PeerConnected { node_id });
2248        self.notify_mesh_state_changed();
2249
2250        is_new
2251    }
2252
2253    /// Called when data is received from a peer
2254    ///
2255    /// Parses the document, merges it, and generates appropriate events.
2256    /// If encryption is enabled, decrypts the document first.
2257    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2258    /// Returns the source NodeId and whether the document contained an event.
2259    pub fn on_ble_data_received(
2260        &self,
2261        identifier: &str,
2262        data: &[u8],
2263        now_ms: u64,
2264    ) -> Option<DataReceivedResult> {
2265        // Get node ID from identifier
2266        let node_id = self.peer_manager.get_node_id(identifier)?;
2267
2268        // Check for special message types first
2269        if data.len() >= 2 {
2270            match data[0] {
2271                KEY_EXCHANGE_MARKER => {
2272                    // Handle key exchange - returns response to send back
2273                    let _response = self.handle_key_exchange(data, now_ms);
2274                    // Return None as this isn't a document sync
2275                    return None;
2276                }
2277                PEER_E2EE_MARKER => {
2278                    // Handle encrypted peer message
2279                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2280                    // Return None as this isn't a document sync
2281                    return None;
2282                }
2283                RELAY_ENVELOPE_MARKER => {
2284                    // Handle relay envelope for multi-hop
2285                    return self
2286                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2287                }
2288                _ => {}
2289            }
2290        }
2291
2292        // Direct document (not relay envelope)
2293        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2294    }
2295
2296    /// Internal: Process document data with identifier as source hint
2297    #[allow(clippy::too_many_arguments)]
2298    fn process_document_data_with_identifier(
2299        &self,
2300        source_node: NodeId,
2301        identifier: &str,
2302        data: &[u8],
2303        now_ms: u64,
2304        relay_data: Option<Vec<u8>>,
2305        origin_node: Option<NodeId>,
2306        hop_count: u8,
2307    ) -> Option<DataReceivedResult> {
2308        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
2309        let decrypted = self.decrypt_document(data, Some(identifier))?;
2310
2311        // Check if this is a delta document (wire format v2)
2312        if DeltaDocument::is_delta_document(&decrypted) {
2313            return self.process_delta_document_internal(
2314                source_node,
2315                &decrypted,
2316                now_ms,
2317                relay_data,
2318                origin_node,
2319                hop_count,
2320            );
2321        }
2322
2323        // Merge the document (legacy wire format v1)
2324        let result = self.document_sync.merge_document(&decrypted)?;
2325
2326        // Store peer peripheral if present (for callsign lookup)
2327        if let Some(ref peripheral) = result.peer_peripheral {
2328            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2329                peripherals.insert(result.source_node, peripheral.clone());
2330            }
2331        }
2332
2333        // Record sync
2334        self.peer_manager.record_sync(source_node, now_ms);
2335
2336        // Generate events based on what was received
2337        if result.is_emergency() {
2338            self.notify(HiveEvent::EmergencyReceived {
2339                from_node: result.source_node,
2340            });
2341        } else if result.is_ack() {
2342            self.notify(HiveEvent::AckReceived {
2343                from_node: result.source_node,
2344            });
2345        }
2346
2347        if result.counter_changed {
2348            self.notify(HiveEvent::DocumentSynced {
2349                from_node: result.source_node,
2350                total_count: result.total_count,
2351            });
2352        }
2353
2354        // Emit relay event if we're relaying
2355        if relay_data.is_some() {
2356            let relay_targets = self.get_relay_targets(Some(source_node));
2357            self.notify(HiveEvent::MessageRelayed {
2358                origin_node: origin_node.unwrap_or(result.source_node),
2359                relay_count: relay_targets.len(),
2360                hop_count,
2361            });
2362        }
2363
2364        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2365            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2366
2367        Some(DataReceivedResult {
2368            source_node: result.source_node,
2369            is_emergency: result.is_emergency(),
2370            is_ack: result.is_ack(),
2371            counter_changed: result.counter_changed,
2372            emergency_changed: result.emergency_changed,
2373            total_count: result.total_count,
2374            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2375            relay_data,
2376            origin_node,
2377            hop_count,
2378            callsign,
2379            battery_percent,
2380            heart_rate,
2381            event_type,
2382            latitude,
2383            longitude,
2384            altitude,
2385        })
2386    }
2387
2388    /// Internal: Handle relay envelope with identifier as source hint
2389    fn handle_relay_envelope_with_identifier(
2390        &self,
2391        source_node: NodeId,
2392        identifier: &str,
2393        data: &[u8],
2394        now_ms: u64,
2395    ) -> Option<DataReceivedResult> {
2396        // Process the relay envelope
2397        let envelope = RelayEnvelope::decode(data)?;
2398
2399        // Check deduplication
2400        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2401            let stats = self
2402                .seen_cache
2403                .lock()
2404                .unwrap()
2405                .get_stats(&envelope.message_id);
2406            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2407
2408            self.notify(HiveEvent::DuplicateMessageDropped {
2409                origin_node: envelope.origin_node,
2410                seen_count,
2411            });
2412            return None;
2413        }
2414
2415        // Check TTL and get relay data
2416        let relay_data = if envelope.can_relay() && self.config.enable_relay {
2417            envelope.relay().map(|e| e.encode())
2418        } else {
2419            if !envelope.can_relay() {
2420                self.notify(HiveEvent::MessageTtlExpired {
2421                    origin_node: envelope.origin_node,
2422                    hop_count: envelope.hop_count,
2423                });
2424            }
2425            None
2426        };
2427
2428        // Process the inner payload
2429        self.process_document_data_with_identifier(
2430            source_node,
2431            identifier,
2432            &envelope.payload,
2433            now_ms,
2434            relay_data,
2435            Some(envelope.origin_node),
2436            envelope.hop_count,
2437        )
2438    }
2439
2440    /// Called when data is received but we don't have the identifier mapped
2441    ///
2442    /// Use this when receiving data from a peripheral we discovered.
2443    /// If encryption is enabled, decrypts the document first.
2444    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2445    /// Handles relay envelopes for multi-hop mesh operation.
2446    pub fn on_ble_data_received_from_node(
2447        &self,
2448        node_id: NodeId,
2449        data: &[u8],
2450        now_ms: u64,
2451    ) -> Option<DataReceivedResult> {
2452        // Check for special message types first
2453        if data.len() >= 2 {
2454            match data[0] {
2455                KEY_EXCHANGE_MARKER => {
2456                    let _response = self.handle_key_exchange(data, now_ms);
2457                    return None;
2458                }
2459                PEER_E2EE_MARKER => {
2460                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2461                    return None;
2462                }
2463                RELAY_ENVELOPE_MARKER => {
2464                    // Handle relay envelope for multi-hop
2465                    return self.handle_relay_envelope(node_id, data, now_ms);
2466                }
2467                _ => {}
2468            }
2469        }
2470
2471        // Direct document (not relay envelope)
2472        self.process_document_data(node_id, data, now_ms, None, None, 0)
2473    }
2474
2475    /// Called when encrypted data is received from an unknown peer
2476    ///
2477    /// This handles the case where we receive an encrypted document from a BLE address
2478    /// that isn't registered in our peer manager (e.g., due to BLE address rotation).
2479    /// The function decrypts first using the mesh key, then extracts the source_node
2480    /// from the decrypted document header and registers the peer.
2481    ///
2482    /// Returns `Some(DataReceivedResult)` if decryption and processing succeed.
2483    /// Returns `None` if decryption fails or the document is invalid.
2484    pub fn on_ble_data_received_anonymous(
2485        &self,
2486        identifier: &str,
2487        data: &[u8],
2488        now_ms: u64,
2489    ) -> Option<DataReceivedResult> {
2490        log::debug!(
2491            "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2492            identifier,
2493            data.len(),
2494            data.first().copied().unwrap_or(0)
2495        );
2496
2497        // Try to decrypt (handles both encrypted and unencrypted documents)
2498        let decrypted = match self.decrypt_document(data, Some(identifier)) {
2499            Some(d) => d,
2500            None => {
2501                log::warn!(
2502                    "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2503                    data.len(),
2504                    identifier
2505                );
2506                return None;
2507            }
2508        };
2509
2510        // Extract source_node from decrypted document header
2511        // Header format: [version: 4 bytes (LE)][node_id: 4 bytes (LE)]
2512        if decrypted.len() < 8 {
2513            log::warn!("Decrypted document too short to extract source_node");
2514            return None;
2515        }
2516
2517        let source_node_u32 =
2518            u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2519        let source_node = NodeId::new(source_node_u32);
2520
2521        log::info!(
2522            "Anonymous document from {}: source_node={:08X}, len={}",
2523            identifier,
2524            source_node_u32,
2525            decrypted.len()
2526        );
2527
2528        // Register the peer with this identifier so future lookups work
2529        // This handles BLE address rotation
2530        self.peer_manager
2531            .register_identifier(identifier, source_node);
2532
2533        // Check if this is a delta document
2534        let is_delta = DeltaDocument::is_delta_document(&decrypted);
2535        log::info!(
2536            "Document format: delta={}, first_byte=0x{:02X}, len={}",
2537            is_delta,
2538            decrypted.first().copied().unwrap_or(0),
2539            decrypted.len()
2540        );
2541
2542        if is_delta {
2543            return self.process_delta_document_internal(
2544                source_node,
2545                &decrypted,
2546                now_ms,
2547                None,
2548                None,
2549                0,
2550            );
2551        }
2552
2553        // Handle app-layer message (0xAF marker from hive-lite CannedMessages)
2554        // Store in document registry for CRDT sync instead of bypassing to relay_data.
2555        const APP_LAYER_MARKER: u8 = 0xAF;
2556        if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2557            #[cfg(feature = "hive-lite-sync")]
2558            {
2559                use crate::hive_lite_sync::CannedMessageDocument;
2560                use crate::registry::DocumentType;
2561
2562                log::info!(
2563                    "App-layer message (0xAF) from {:08X}, {} bytes - storing in registry",
2564                    source_node.as_u32(),
2565                    decrypted.len()
2566                );
2567
2568                // Decode and store the document
2569                // CannedMessageDocument::decode expects payload without 0xAF (it prepends it)
2570                let payload = &decrypted[1..];
2571                if let Some(doc) = CannedMessageDocument::decode(payload) {
2572                    let (doc_source, doc_ts) = doc.identity();
2573                    let changed = self.store_app_document(doc);
2574                    log::info!(
2575                        "Stored CannedMessage: source={:08X} ts={} changed={}",
2576                        doc_source,
2577                        doc_ts,
2578                        changed
2579                    );
2580
2581                    // Emit observer event
2582                    self.observers.notify(HiveEvent::app_document_received(
2583                        CannedMessageDocument::TYPE_ID,
2584                        NodeId::new(doc_source),
2585                        doc_ts,
2586                        changed,
2587                    ));
2588
2589                    // Return minimal result - document is now in registry
2590                    return Some(DataReceivedResult {
2591                        source_node,
2592                        is_emergency: false,
2593                        is_ack: false,
2594                        counter_changed: false,
2595                        emergency_changed: false,
2596                        total_count: 0,
2597                        event_timestamp: doc_ts,
2598                        relay_data: None, // No longer needed - flows via delta sync
2599                        origin_node: None,
2600                        hop_count: 0,
2601                        callsign: None,
2602                        battery_percent: None,
2603                        heart_rate: None,
2604                        event_type: None,
2605                        latitude: None,
2606                        longitude: None,
2607                        altitude: None,
2608                    });
2609                } else {
2610                    log::warn!("Failed to decode 0xAF message as CannedMessageDocument");
2611                }
2612            }
2613
2614            #[cfg(not(feature = "hive-lite-sync"))]
2615            {
2616                log::debug!("Ignoring 0xAF message (hive-lite-sync feature not enabled)");
2617            }
2618
2619            return None;
2620        }
2621
2622        // Merge the document (legacy wire format v1)
2623        log::info!(
2624            "Processing legacy document from {:08X}",
2625            source_node.as_u32()
2626        );
2627        let result = self.document_sync.merge_document(&decrypted)?;
2628
2629        // Log what we got from the merge
2630        log::info!(
2631            "Merge result: peer_peripheral={}, counter_changed={}",
2632            result.peer_peripheral.is_some(),
2633            result.counter_changed
2634        );
2635        if let Some(ref p) = result.peer_peripheral {
2636            log::info!("Peripheral callsign: '{}'", p.callsign_str());
2637        }
2638
2639        // Record sync
2640        self.peer_manager.record_sync(source_node, now_ms);
2641
2642        // Generate events
2643        if result.is_emergency() {
2644            self.notify(HiveEvent::EmergencyReceived {
2645                from_node: result.source_node,
2646            });
2647        } else if result.is_ack() {
2648            self.notify(HiveEvent::AckReceived {
2649                from_node: result.source_node,
2650            });
2651        }
2652
2653        if result.counter_changed {
2654            self.notify(HiveEvent::DocumentSynced {
2655                from_node: result.source_node,
2656                total_count: result.total_count,
2657            });
2658        }
2659
2660        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2661            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2662
2663        Some(DataReceivedResult {
2664            source_node: result.source_node,
2665            is_emergency: result.is_emergency(),
2666            is_ack: result.is_ack(),
2667            counter_changed: result.counter_changed,
2668            emergency_changed: result.emergency_changed,
2669            total_count: result.total_count,
2670            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2671            relay_data: None,
2672            origin_node: None,
2673            hop_count: 0,
2674            callsign,
2675            battery_percent,
2676            heart_rate,
2677            event_type,
2678            latitude,
2679            longitude,
2680            altitude,
2681        })
2682    }
2683
2684    /// Internal: Process document data (shared by direct and relay paths)
2685    fn process_document_data(
2686        &self,
2687        source_node: NodeId,
2688        data: &[u8],
2689        now_ms: u64,
2690        relay_data: Option<Vec<u8>>,
2691        origin_node: Option<NodeId>,
2692        hop_count: u8,
2693    ) -> Option<DataReceivedResult> {
2694        // Decrypt if encrypted (mesh-wide encryption)
2695        let source_hint = format!("node:{:08X}", source_node.as_u32());
2696        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2697
2698        // Check if this is a delta document (wire format v2)
2699        if DeltaDocument::is_delta_document(&decrypted) {
2700            return self.process_delta_document_internal(
2701                source_node,
2702                &decrypted,
2703                now_ms,
2704                relay_data,
2705                origin_node,
2706                hop_count,
2707            );
2708        }
2709
2710        // Merge the document (legacy wire format v1)
2711        let result = self.document_sync.merge_document(&decrypted)?;
2712
2713        // Store peer peripheral if present (for callsign lookup)
2714        if let Some(ref peripheral) = result.peer_peripheral {
2715            if let Ok(mut peripherals) = self.peer_peripherals.write() {
2716                peripherals.insert(result.source_node, peripheral.clone());
2717            }
2718        }
2719
2720        // Record sync
2721        self.peer_manager.record_sync(source_node, now_ms);
2722
2723        // Generate events based on what was received
2724        if result.is_emergency() {
2725            self.notify(HiveEvent::EmergencyReceived {
2726                from_node: result.source_node,
2727            });
2728        } else if result.is_ack() {
2729            self.notify(HiveEvent::AckReceived {
2730                from_node: result.source_node,
2731            });
2732        }
2733
2734        if result.counter_changed {
2735            self.notify(HiveEvent::DocumentSynced {
2736                from_node: result.source_node,
2737                total_count: result.total_count,
2738            });
2739        }
2740
2741        // Emit relay event if we're relaying
2742        if relay_data.is_some() {
2743            let relay_targets = self.get_relay_targets(Some(source_node));
2744            self.notify(HiveEvent::MessageRelayed {
2745                origin_node: origin_node.unwrap_or(result.source_node),
2746                relay_count: relay_targets.len(),
2747                hop_count,
2748            });
2749        }
2750
2751        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2752            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2753
2754        Some(DataReceivedResult {
2755            source_node: result.source_node,
2756            is_emergency: result.is_emergency(),
2757            is_ack: result.is_ack(),
2758            counter_changed: result.counter_changed,
2759            emergency_changed: result.emergency_changed,
2760            total_count: result.total_count,
2761            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2762            relay_data,
2763            origin_node,
2764            hop_count,
2765            callsign,
2766            battery_percent,
2767            heart_rate,
2768            event_type,
2769            latitude,
2770            longitude,
2771            altitude,
2772        })
2773    }
2774
2775    /// Internal: Handle relay envelope
2776    fn handle_relay_envelope(
2777        &self,
2778        source_node: NodeId,
2779        data: &[u8],
2780        now_ms: u64,
2781    ) -> Option<DataReceivedResult> {
2782        // Process the relay envelope
2783        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2784
2785        // Get relay data if we should relay
2786        let relay_data = if decision.should_relay {
2787            decision.relay_data()
2788        } else {
2789            None
2790        };
2791
2792        // Process the inner payload
2793        self.process_document_data(
2794            source_node,
2795            &decision.payload,
2796            now_ms,
2797            relay_data,
2798            Some(decision.origin_node),
2799            decision.hop_count,
2800        )
2801    }
2802
2803    /// Called when data is received without a known identifier
2804    ///
2805    /// This is the simplest data receive method - it extracts the source node_id
2806    /// from the document itself. Use this when you don't track identifiers
2807    /// (e.g., ESP32 NimBLE).
2808    /// If encryption is enabled, decrypts the document first.
2809    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
2810    /// Handles relay envelopes for multi-hop mesh operation.
2811    pub fn on_ble_data(
2812        &self,
2813        identifier: &str,
2814        data: &[u8],
2815        now_ms: u64,
2816    ) -> Option<DataReceivedResult> {
2817        // Check for special message types first
2818        if data.len() >= 2 {
2819            match data[0] {
2820                KEY_EXCHANGE_MARKER => {
2821                    let _response = self.handle_key_exchange(data, now_ms);
2822                    return None;
2823                }
2824                PEER_E2EE_MARKER => {
2825                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2826                    return None;
2827                }
2828                RELAY_ENVELOPE_MARKER => {
2829                    // Handle relay envelope - extract origin from envelope
2830                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
2831                }
2832                _ => {}
2833            }
2834        }
2835
2836        // Direct document - process normally
2837        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
2838    }
2839
2840    /// Internal: Process incoming document (handles peer registration)
2841    fn process_incoming_document(
2842        &self,
2843        identifier: &str,
2844        data: &[u8],
2845        now_ms: u64,
2846        relay_data: Option<Vec<u8>>,
2847        origin_node: Option<NodeId>,
2848        hop_count: u8,
2849    ) -> Option<DataReceivedResult> {
2850        // Decrypt if encrypted (mesh-wide encryption)
2851        let decrypted = self.decrypt_document(data, Some(identifier))?;
2852
2853        // Merge the document (extracts node_id internally)
2854        let result = self.document_sync.merge_document(&decrypted)?;
2855
2856        // Record sync using the source_node from the merged document
2857        self.peer_manager.record_sync(result.source_node, now_ms);
2858
2859        // Only register the identifier mapping for direct messages (not relayed)
2860        // For relayed messages, the identifier belongs to the relay source (who forwarded it),
2861        // not the origin node (who created the document). The relay source is already registered
2862        // via handle_relay_envelope_with_incoming when the relay envelope is first processed.
2863        if origin_node.is_none() {
2864            // Direct message - register the peer with this identifier
2865            let is_new =
2866                self.peer_manager
2867                    .on_incoming_connection(identifier, result.source_node, now_ms);
2868
2869            // Update connection graph to track connection state
2870            {
2871                let mut graph = self.connection_graph.lock().unwrap();
2872                if is_new {
2873                    graph.on_discovered(
2874                        result.source_node,
2875                        identifier.to_string(),
2876                        None,
2877                        Some(self.config.mesh_id.clone()),
2878                        -50, // Default RSSI for data-based discovery
2879                        now_ms,
2880                    );
2881                }
2882                graph.on_connected(result.source_node, now_ms);
2883            }
2884        }
2885
2886        // Generate events based on what was received
2887        if result.is_emergency() {
2888            self.notify(HiveEvent::EmergencyReceived {
2889                from_node: result.source_node,
2890            });
2891        } else if result.is_ack() {
2892            self.notify(HiveEvent::AckReceived {
2893                from_node: result.source_node,
2894            });
2895        }
2896
2897        if result.counter_changed {
2898            self.notify(HiveEvent::DocumentSynced {
2899                from_node: result.source_node,
2900                total_count: result.total_count,
2901            });
2902        }
2903
2904        // Emit relay event if we're relaying
2905        if relay_data.is_some() {
2906            let relay_targets = self.get_relay_targets(Some(result.source_node));
2907            self.notify(HiveEvent::MessageRelayed {
2908                origin_node: origin_node.unwrap_or(result.source_node),
2909                relay_count: relay_targets.len(),
2910                hop_count,
2911            });
2912        }
2913
2914        let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2915            DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2916
2917        Some(DataReceivedResult {
2918            source_node: result.source_node,
2919            is_emergency: result.is_emergency(),
2920            is_ack: result.is_ack(),
2921            counter_changed: result.counter_changed,
2922            emergency_changed: result.emergency_changed,
2923            total_count: result.total_count,
2924            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2925            relay_data,
2926            origin_node,
2927            hop_count,
2928            callsign,
2929            battery_percent,
2930            heart_rate,
2931            event_type,
2932            latitude,
2933            longitude,
2934            altitude,
2935        })
2936    }
2937
2938    /// Internal: Handle relay envelope with incoming connection registration
2939    fn handle_relay_envelope_with_incoming(
2940        &self,
2941        identifier: &str,
2942        data: &[u8],
2943        now_ms: u64,
2944    ) -> Option<DataReceivedResult> {
2945        // Parse envelope to get origin
2946        let envelope = RelayEnvelope::decode(data)?;
2947
2948        // Try to look up the source peer from identifier to register indirect path
2949        // If we know who sent this relay, we can track indirect peers via them
2950        if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
2951            if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
2952                let is_new = self.connection_graph.lock().unwrap().on_relay_received(
2953                    source_peer,
2954                    envelope.origin_node,
2955                    envelope.hop_count,
2956                    now_ms,
2957                );
2958
2959                if is_new {
2960                    log::debug!(
2961                        "Discovered indirect peer {:08X} via {:08X} ({} hops)",
2962                        envelope.origin_node.as_u32(),
2963                        source_peer.as_u32(),
2964                        envelope.hop_count
2965                    );
2966                }
2967            }
2968        }
2969
2970        // Check deduplication
2971        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2972            // Duplicate - get stats for event
2973            let stats = self
2974                .seen_cache
2975                .lock()
2976                .unwrap()
2977                .get_stats(&envelope.message_id);
2978            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2979
2980            self.notify(HiveEvent::DuplicateMessageDropped {
2981                origin_node: envelope.origin_node,
2982                seen_count,
2983            });
2984            return None;
2985        }
2986
2987        // Check TTL
2988        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2989            let relay_env = envelope.relay();
2990            (true, relay_env.map(|e| e.encode()))
2991        } else {
2992            if !envelope.can_relay() {
2993                self.notify(HiveEvent::MessageTtlExpired {
2994                    origin_node: envelope.origin_node,
2995                    hop_count: envelope.hop_count,
2996                });
2997            }
2998            (false, None)
2999        };
3000
3001        // Process the inner payload
3002        self.process_incoming_document(
3003            identifier,
3004            &envelope.payload,
3005            now_ms,
3006            if should_relay { relay_data } else { None },
3007            Some(envelope.origin_node),
3008            envelope.hop_count,
3009        )
3010    }
3011
3012    // ==================== Periodic Maintenance ====================
3013
3014    /// Periodic tick - call this regularly (e.g., every second)
3015    ///
3016    /// Performs:
3017    /// - Stale peer cleanup
3018    /// - Periodic sync broadcast (if interval elapsed)
3019    ///
3020    /// Returns `Some(data)` if a sync broadcast is needed.
3021    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3022        use std::sync::atomic::Ordering;
3023
3024        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
3025        let now_ms_32 = now_ms as u32;
3026
3027        // Cleanup stale peers
3028        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3029        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3030        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3031            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3032            let removed = self.peer_manager.cleanup_stale(now_ms);
3033            for node_id in &removed {
3034                self.notify(HiveEvent::PeerLost { node_id: *node_id });
3035            }
3036            if !removed.is_empty() {
3037                self.notify_mesh_state_changed();
3038            }
3039
3040            // Run connection graph maintenance (transition Disconnected -> Lost)
3041            {
3042                let mut graph = self.connection_graph.lock().unwrap();
3043                let newly_lost = graph.tick(now_ms);
3044                // Also cleanup peers lost for more than peer_timeout
3045                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3046                drop(graph);
3047
3048                // Emit PeerLost events for newly lost peers from graph
3049                // (these may differ from peer_manager removals)
3050                for node_id in newly_lost {
3051                    // Only notify if not already notified by peer_manager
3052                    if !removed.contains(&node_id) {
3053                        self.notify(HiveEvent::PeerLost { node_id });
3054                    }
3055                }
3056            }
3057        }
3058
3059        // Check if sync broadcast is needed
3060        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3061        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3062        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3063            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3064            // Only broadcast if we have connected peers
3065            if self.peer_manager.connected_count() > 0 {
3066                let doc = self.document_sync.build_document();
3067                return Some(self.encrypt_document(&doc));
3068            }
3069        }
3070
3071        None
3072    }
3073
3074    /// Periodic tick returning per-peer delta documents
3075    ///
3076    /// Unlike `tick()` which broadcasts a single document to all peers,
3077    /// this returns targeted deltas that only include changes each peer
3078    /// hasn't seen. Use this for platforms that support per-peer transmission.
3079    ///
3080    /// Returns a list of (NodeId, encrypted_delta) tuples, one per connected peer.
3081    /// Empty vector if no sync is needed (interval not elapsed or no connected peers).
3082    pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3083        use std::sync::atomic::Ordering;
3084        let now_ms_32 = now_ms as u32;
3085
3086        // Cleanup stale peers (same as tick())
3087        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3088        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3089        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3090            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3091            let removed = self.peer_manager.cleanup_stale(now_ms);
3092            for node_id in &removed {
3093                self.notify(HiveEvent::PeerLost { node_id: *node_id });
3094            }
3095            if !removed.is_empty() {
3096                self.notify_mesh_state_changed();
3097            }
3098
3099            // Run connection graph maintenance
3100            {
3101                let mut graph = self.connection_graph.lock().unwrap();
3102                let newly_lost = graph.tick(now_ms);
3103                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3104                drop(graph);
3105
3106                for node_id in newly_lost {
3107                    if !removed.contains(&node_id) {
3108                        self.notify(HiveEvent::PeerLost { node_id });
3109                    }
3110                }
3111            }
3112        }
3113
3114        // Check if sync is needed
3115        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3116        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3117        if sync_elapsed >= self.config.sync_interval_ms as u32 {
3118            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3119
3120            // Build document for each connected peer
3121            let doc = self.document_sync.build_document();
3122            let encrypted = self.encrypt_document(&doc);
3123            let mut results = Vec::new();
3124            for peer in self.get_connected_peers() {
3125                results.push((peer.node_id, encrypted.clone()));
3126            }
3127            return results;
3128        }
3129
3130        Vec::new()
3131    }
3132
3133    // ==================== State Queries ====================
3134
3135    /// Get all known peers
3136    pub fn get_peers(&self) -> Vec<HivePeer> {
3137        self.peer_manager.get_peers()
3138    }
3139
3140    /// Get connected peers only
3141    pub fn get_connected_peers(&self) -> Vec<HivePeer> {
3142        self.peer_manager.get_connected_peers()
3143    }
3144
3145    /// Get a specific peer by NodeId
3146    pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
3147        self.peer_manager.get_peer(node_id)
3148    }
3149
3150    /// Get peer count
3151    pub fn peer_count(&self) -> usize {
3152        self.peer_manager.peer_count()
3153    }
3154
3155    /// Get connected peer count
3156    pub fn connected_count(&self) -> usize {
3157        self.peer_manager.connected_count()
3158    }
3159
3160    /// Check if a device mesh ID matches our mesh
3161    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3162        self.peer_manager.matches_mesh(device_mesh_id)
3163    }
3164
3165    // ==================== Connection State Graph ====================
3166
3167    /// Get the connection state graph with all peer states
3168    ///
3169    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
3170    /// Apps can use this to display appropriate UI indicators:
3171    /// - Green for Connected peers
3172    /// - Yellow for Degraded or RecentlyDisconnected peers
3173    /// - Gray for Lost peers
3174    ///
3175    /// # Example
3176    /// ```ignore
3177    /// let states = mesh.get_connection_graph();
3178    /// for peer in states {
3179    ///     match peer.state {
3180    ///         ConnectionState::Connected => show_green_indicator(&peer),
3181    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
3182    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
3183    ///         ConnectionState::Lost => show_gray_indicator(&peer),
3184    ///         _ => {}
3185    ///     }
3186    /// }
3187    /// ```
3188    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3189        self.connection_graph.lock().unwrap().get_all_owned()
3190    }
3191
3192    /// Get a specific peer's connection state
3193    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3194        self.connection_graph
3195            .lock()
3196            .unwrap()
3197            .get_peer(node_id)
3198            .cloned()
3199    }
3200
3201    /// Get all currently connected peers from the connection graph
3202    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3203        self.connection_graph
3204            .lock()
3205            .unwrap()
3206            .get_connected()
3207            .into_iter()
3208            .cloned()
3209            .collect()
3210    }
3211
3212    /// Get peers in degraded state (connected but poor signal quality)
3213    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3214        self.connection_graph
3215            .lock()
3216            .unwrap()
3217            .get_degraded()
3218            .into_iter()
3219            .cloned()
3220            .collect()
3221    }
3222
3223    /// Get peers that disconnected within the specified time window
3224    ///
3225    /// Useful for showing "stale" peers that were recently connected.
3226    pub fn get_recently_disconnected(
3227        &self,
3228        within_ms: u64,
3229        now_ms: u64,
3230    ) -> Vec<PeerConnectionState> {
3231        self.connection_graph
3232            .lock()
3233            .unwrap()
3234            .get_recently_disconnected(within_ms, now_ms)
3235            .into_iter()
3236            .cloned()
3237            .collect()
3238    }
3239
3240    /// Get peers in Lost state (disconnected and no longer advertising)
3241    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3242        self.connection_graph
3243            .lock()
3244            .unwrap()
3245            .get_lost()
3246            .into_iter()
3247            .cloned()
3248            .collect()
3249    }
3250
3251    /// Get summary counts of peers in each connection state
3252    pub fn get_connection_state_counts(&self) -> StateCountSummary {
3253        self.connection_graph.lock().unwrap().state_counts()
3254    }
3255
3256    // ==================== Indirect Peer Methods ====================
3257
3258    /// Get all indirect (multi-hop) peers
3259    ///
3260    /// Returns peers discovered via relay messages that are not directly
3261    /// connected via BLE. Each indirect peer includes the minimum hop count
3262    /// and the direct peers through which they can be reached.
3263    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3264        self.connection_graph
3265            .lock()
3266            .unwrap()
3267            .get_indirect_peers_owned()
3268    }
3269
3270    /// Get the degree (hop count) for a specific peer
3271    ///
3272    /// Returns:
3273    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
3274    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
3275    /// - `None` if peer is not known
3276    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3277        self.connection_graph.lock().unwrap().peer_degree(node_id)
3278    }
3279
3280    /// Get full state counts including indirect peers
3281    ///
3282    /// Returns counts of direct peers by connection state plus counts
3283    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
3284    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3285        self.connection_graph.lock().unwrap().full_state_counts()
3286    }
3287
3288    /// Get all paths to reach an indirect peer
3289    ///
3290    /// Returns a list of (via_peer_id, hop_count) pairs showing all
3291    /// known routes to the specified peer.
3292    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3293        self.connection_graph.lock().unwrap().get_paths_to(node_id)
3294    }
3295
3296    /// Check if a node is known (either direct or indirect)
3297    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3298        self.connection_graph.lock().unwrap().is_known(node_id)
3299    }
3300
3301    /// Get number of indirect peers
3302    pub fn indirect_peer_count(&self) -> usize {
3303        self.connection_graph.lock().unwrap().indirect_peer_count()
3304    }
3305
3306    /// Cleanup stale indirect peers
3307    ///
3308    /// Removes indirect peers that haven't been seen within the timeout.
3309    /// Returns the list of removed peer IDs.
3310    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3311        self.connection_graph
3312            .lock()
3313            .unwrap()
3314            .cleanup_indirect(now_ms)
3315    }
3316
3317    /// Get total counter value
3318    pub fn total_count(&self) -> u64 {
3319        self.document_sync.total_count()
3320    }
3321
3322    /// Get document version
3323    pub fn document_version(&self) -> u32 {
3324        self.document_sync.version()
3325    }
3326
3327    /// Get document version (alias)
3328    pub fn version(&self) -> u32 {
3329        self.document_sync.version()
3330    }
3331
3332    /// Update health status (battery percentage)
3333    pub fn update_health(&self, battery_percent: u8) {
3334        self.document_sync.update_health(battery_percent);
3335    }
3336
3337    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
3338    pub fn update_activity(&self, activity: u8) {
3339        self.document_sync.update_activity(activity);
3340    }
3341
3342    /// Update full health status (battery and activity)
3343    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3344        self.document_sync
3345            .update_health_full(battery_percent, activity);
3346    }
3347
3348    /// Update heart rate
3349    pub fn update_heart_rate(&self, heart_rate: u8) {
3350        self.document_sync.update_heart_rate(heart_rate);
3351    }
3352
3353    /// Update location
3354    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3355        self.document_sync
3356            .update_location(latitude, longitude, altitude);
3357    }
3358
3359    /// Clear location
3360    pub fn clear_location(&self) {
3361        self.document_sync.clear_location();
3362    }
3363
3364    /// Update callsign
3365    pub fn update_callsign(&self, callsign: &str) {
3366        self.document_sync.update_callsign(callsign);
3367    }
3368
3369    /// Set peripheral event type
3370    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3371        self.document_sync
3372            .set_peripheral_event(event_type, timestamp);
3373    }
3374
3375    /// Clear peripheral event
3376    pub fn clear_peripheral_event(&self) {
3377        self.document_sync.clear_peripheral_event();
3378    }
3379
3380    /// Update full peripheral state in one call
3381    ///
3382    /// This is the most efficient way to update all peripheral data before
3383    /// calling `build_document()` for encrypted transmission.
3384    #[allow(clippy::too_many_arguments)]
3385    pub fn update_peripheral_state(
3386        &self,
3387        callsign: &str,
3388        battery_percent: u8,
3389        heart_rate: Option<u8>,
3390        latitude: Option<f32>,
3391        longitude: Option<f32>,
3392        altitude: Option<f32>,
3393        event_type: Option<EventType>,
3394        timestamp: u64,
3395    ) {
3396        self.document_sync.update_peripheral_state(
3397            callsign,
3398            battery_percent,
3399            heart_rate,
3400            latitude,
3401            longitude,
3402            altitude,
3403            event_type,
3404            timestamp,
3405        );
3406    }
3407
3408    /// Build current document for transmission
3409    ///
3410    /// If encryption is enabled, the document is encrypted.
3411    pub fn build_document(&self) -> Vec<u8> {
3412        let doc = self.document_sync.build_document();
3413        self.encrypt_document(&doc)
3414    }
3415
3416    /// Get peers that should be synced with
3417    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
3418        self.peer_manager.peers_needing_sync(now_ms)
3419    }
3420
3421    // ==================== Internal Helpers ====================
3422
3423    fn notify(&self, event: HiveEvent) {
3424        self.observers.notify(event);
3425    }
3426
3427    fn notify_mesh_state_changed(&self) {
3428        self.notify(HiveEvent::MeshStateChanged {
3429            peer_count: self.peer_manager.peer_count(),
3430            connected_count: self.peer_manager.connected_count(),
3431        });
3432    }
3433
3434    // ==================== CannedMessage Integration ====================
3435    //
3436    // These methods provide deduplication support for hive-lite CannedMessages.
3437    // They use document identity (source_node + timestamp) instead of content hash,
3438    // because CRDT merge can change byte ordering.
3439
3440    /// Check if a CannedMessage should be processed.
3441    ///
3442    /// Uses document identity (source_node + timestamp) for deduplication.
3443    /// This prevents broadcast storms when relaying CannedMessages across the mesh.
3444    ///
3445    /// # Arguments
3446    /// * `source_node` - The source node ID from the CannedMessage
3447    /// * `timestamp` - The timestamp from the CannedMessage
3448    /// * `_ttl_ms` - TTL parameter (currently unused, uses cache's default TTL)
3449    ///
3450    /// # Returns
3451    /// `true` if this message is new and should be processed,
3452    /// `false` if it was seen recently and should be skipped.
3453    pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3454        // Create a unique key from source_node and timestamp
3455        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3456        let mut id_bytes = [0u8; 16];
3457        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3458        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3459        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3460
3461        // Check the seen cache
3462        let seen = self.seen_cache.lock().unwrap();
3463        !seen.has_seen(&message_id)
3464    }
3465
3466    /// Mark a CannedMessage as seen (for deduplication).
3467    ///
3468    /// Call this after processing a CannedMessage to prevent reprocessing
3469    /// the same message from other relay paths.
3470    pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3471        let now = std::time::SystemTime::now()
3472            .duration_since(std::time::UNIX_EPOCH)
3473            .map(|d| d.as_millis() as u64)
3474            .unwrap_or(0);
3475
3476        // MessageId is 16 bytes: [source_node: 4B][timestamp: 8B][padding: 4B]
3477        let mut id_bytes = [0u8; 16];
3478        id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3479        id_bytes[4..12].copy_from_slice(&timestamp.to_le_bytes());
3480        let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3481        let origin = NodeId::new(source_node);
3482
3483        let mut seen = self.seen_cache.lock().unwrap();
3484        seen.mark_seen(message_id, origin, now);
3485    }
3486
3487    /// Get list of connected peer identifiers for relay.
3488    ///
3489    /// Used by the platform layer (Kotlin/Swift) to relay CannedMessages
3490    /// to other peers after deduplication check.
3491    pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3492        self.peer_manager.get_connected_identifiers()
3493    }
3494}
3495
3496/// Result from receiving BLE data
3497#[derive(Debug, Clone)]
3498pub struct DataReceivedResult {
3499    /// Node that sent this data
3500    pub source_node: NodeId,
3501
3502    /// Whether this contained an emergency event
3503    pub is_emergency: bool,
3504
3505    /// Whether this contained an ACK event
3506    pub is_ack: bool,
3507
3508    /// Whether the counter changed (new data)
3509    pub counter_changed: bool,
3510
3511    /// Whether emergency state changed (new emergency or ACK updates)
3512    pub emergency_changed: bool,
3513
3514    /// Updated total count
3515    pub total_count: u64,
3516
3517    /// Event timestamp (if event present) - use to detect duplicate events
3518    pub event_timestamp: u64,
3519
3520    /// Data to relay to other peers (if multi-hop relay is enabled)
3521    ///
3522    /// When present, the platform adapter should send this data to peers
3523    /// returned by `get_relay_targets(Some(source_node))`.
3524    pub relay_data: Option<Vec<u8>>,
3525
3526    /// Origin node for relay (may differ from source_node for relayed messages)
3527    pub origin_node: Option<NodeId>,
3528
3529    /// Current hop count (for relayed messages)
3530    pub hop_count: u8,
3531
3532    // ========== Peripheral data from sender ==========
3533    /// Sender's callsign (up to 12 chars)
3534    pub callsign: Option<String>,
3535
3536    /// Sender's battery percentage (0-100)
3537    pub battery_percent: Option<u8>,
3538
3539    /// Sender's heart rate (BPM)
3540    pub heart_rate: Option<u8>,
3541
3542    /// Sender's event type (from PeripheralEvent)
3543    pub event_type: Option<u8>,
3544
3545    /// Sender's latitude
3546    pub latitude: Option<f32>,
3547
3548    /// Sender's longitude
3549    pub longitude: Option<f32>,
3550
3551    /// Sender's altitude (meters)
3552    pub altitude: Option<f32>,
3553}
3554
3555impl DataReceivedResult {
3556    /// Extract peripheral fields from an Option<Peripheral>
3557    #[allow(clippy::type_complexity)]
3558    fn peripheral_fields(
3559        peripheral: &Option<crate::sync::crdt::Peripheral>,
3560    ) -> (
3561        Option<String>,
3562        Option<u8>,
3563        Option<u8>,
3564        Option<u8>,
3565        Option<f32>,
3566        Option<f32>,
3567        Option<f32>,
3568    ) {
3569        match peripheral {
3570            Some(p) => {
3571                let callsign = {
3572                    let s = p.callsign_str();
3573                    if s.is_empty() {
3574                        None
3575                    } else {
3576                        Some(s.to_string())
3577                    }
3578                };
3579                let battery = if p.health.battery_percent > 0 {
3580                    Some(p.health.battery_percent)
3581                } else {
3582                    None
3583                };
3584                let heart_rate = p.health.heart_rate;
3585                let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3586                let (lat, lon, alt) = match &p.location {
3587                    Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3588                    None => (None, None, None),
3589                };
3590                (callsign, battery, heart_rate, event_type, lat, lon, alt)
3591            }
3592            None => (None, None, None, None, None, None, None),
3593        }
3594    }
3595}
3596
3597/// Decision from processing a relay envelope
3598#[derive(Debug, Clone)]
3599pub struct RelayDecision {
3600    /// The payload (document) to process locally
3601    pub payload: Vec<u8>,
3602
3603    /// Original sender of the message
3604    pub origin_node: NodeId,
3605
3606    /// Current hop count
3607    pub hop_count: u8,
3608
3609    /// Whether this message should be relayed to other peers
3610    pub should_relay: bool,
3611
3612    /// The relay envelope to forward (with incremented hop count)
3613    ///
3614    /// Only present if `should_relay` is true and TTL not expired.
3615    pub relay_envelope: Option<RelayEnvelope>,
3616}
3617
3618impl RelayDecision {
3619    /// Get the relay data to send to peers
3620    ///
3621    /// Returns None if relay is not needed.
3622    pub fn relay_data(&self) -> Option<Vec<u8>> {
3623        self.relay_envelope.as_ref().map(|e| e.encode())
3624    }
3625}
3626
3627#[cfg(all(test, feature = "std"))]
3628mod tests {
3629    use super::*;
3630    use crate::observer::CollectingObserver;
3631
3632    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
3633    const TEST_TIMESTAMP: u64 = 1705276800000;
3634
3635    fn create_mesh(node_id: u32, callsign: &str) -> HiveMesh {
3636        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3637        HiveMesh::new(config)
3638    }
3639
3640    #[test]
3641    fn test_mesh_creation() {
3642        let mesh = create_mesh(0x12345678, "ALPHA-1");
3643
3644        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3645        assert_eq!(mesh.callsign(), "ALPHA-1");
3646        assert_eq!(mesh.mesh_id(), "TEST");
3647        assert_eq!(mesh.device_name(), "HIVE_TEST-12345678");
3648    }
3649
3650    #[test]
3651    fn test_peer_discovery() {
3652        let mesh = create_mesh(0x11111111, "ALPHA-1");
3653        let observer = Arc::new(CollectingObserver::new());
3654        mesh.add_observer(observer.clone());
3655
3656        // Discover a peer
3657        let peer = mesh.on_ble_discovered(
3658            "device-uuid",
3659            Some("HIVE_TEST-22222222"),
3660            -65,
3661            Some("TEST"),
3662            1000,
3663        );
3664
3665        assert!(peer.is_some());
3666        let peer = peer.unwrap();
3667        assert_eq!(peer.node_id.as_u32(), 0x22222222);
3668
3669        // Check events were generated
3670        let events = observer.events();
3671        assert!(events
3672            .iter()
3673            .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3674        assert!(events
3675            .iter()
3676            .any(|e| matches!(e, HiveEvent::MeshStateChanged { .. })));
3677    }
3678
3679    #[test]
3680    fn test_connection_lifecycle() {
3681        let mesh = create_mesh(0x11111111, "ALPHA-1");
3682        let observer = Arc::new(CollectingObserver::new());
3683        mesh.add_observer(observer.clone());
3684
3685        // Discover and connect
3686        mesh.on_ble_discovered(
3687            "device-uuid",
3688            Some("HIVE_TEST-22222222"),
3689            -65,
3690            Some("TEST"),
3691            1000,
3692        );
3693
3694        let node_id = mesh.on_ble_connected("device-uuid", 2000);
3695        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3696        assert_eq!(mesh.connected_count(), 1);
3697
3698        // Disconnect
3699        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3700        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3701        assert_eq!(mesh.connected_count(), 0);
3702
3703        // Check events
3704        let events = observer.events();
3705        assert!(events
3706            .iter()
3707            .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3708        assert!(events
3709            .iter()
3710            .any(|e| matches!(e, HiveEvent::PeerDisconnected { .. })));
3711    }
3712
3713    #[test]
3714    fn test_emergency_flow() {
3715        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3716        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3717
3718        let observer2 = Arc::new(CollectingObserver::new());
3719        mesh2.add_observer(observer2.clone());
3720
3721        // mesh1 sends emergency
3722        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3723        assert!(mesh1.is_emergency_active());
3724
3725        // mesh2 receives it
3726        let result =
3727            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3728
3729        assert!(result.is_some());
3730        let result = result.unwrap();
3731        assert!(result.is_emergency);
3732        assert_eq!(result.source_node.as_u32(), 0x11111111);
3733
3734        // Check events on mesh2
3735        let events = observer2.events();
3736        assert!(events
3737            .iter()
3738            .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3739    }
3740
3741    #[test]
3742    fn test_ack_flow() {
3743        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3744        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3745
3746        let observer2 = Arc::new(CollectingObserver::new());
3747        mesh2.add_observer(observer2.clone());
3748
3749        // mesh1 sends ACK
3750        let doc = mesh1.send_ack(TEST_TIMESTAMP);
3751        assert!(mesh1.is_ack_active());
3752
3753        // mesh2 receives it
3754        let result =
3755            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3756
3757        assert!(result.is_some());
3758        let result = result.unwrap();
3759        assert!(result.is_ack);
3760
3761        // Check events on mesh2
3762        let events = observer2.events();
3763        assert!(events
3764            .iter()
3765            .any(|e| matches!(e, HiveEvent::AckReceived { .. })));
3766    }
3767
3768    #[test]
3769    fn test_tick_cleanup() {
3770        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3771            .with_peer_timeout(10_000);
3772        let mesh = HiveMesh::new(config);
3773
3774        let observer = Arc::new(CollectingObserver::new());
3775        mesh.add_observer(observer.clone());
3776
3777        // Discover a peer
3778        mesh.on_ble_discovered(
3779            "device-uuid",
3780            Some("HIVE_TEST-22222222"),
3781            -65,
3782            Some("TEST"),
3783            1000,
3784        );
3785        assert_eq!(mesh.peer_count(), 1);
3786
3787        // Tick at t=5000 - not stale yet
3788        mesh.tick(5000);
3789        assert_eq!(mesh.peer_count(), 1);
3790
3791        // Tick at t=20000 - peer is stale (10s timeout exceeded)
3792        mesh.tick(20000);
3793        assert_eq!(mesh.peer_count(), 0);
3794
3795        // Check PeerLost event
3796        let events = observer.events();
3797        assert!(events
3798            .iter()
3799            .any(|e| matches!(e, HiveEvent::PeerLost { .. })));
3800    }
3801
3802    #[test]
3803    fn test_tick_sync_broadcast() {
3804        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3805            .with_sync_interval(5000);
3806        let mesh = HiveMesh::new(config);
3807
3808        // Discover and connect a peer first
3809        mesh.on_ble_discovered(
3810            "device-uuid",
3811            Some("HIVE_TEST-22222222"),
3812            -65,
3813            Some("TEST"),
3814            1000,
3815        );
3816        mesh.on_ble_connected("device-uuid", 1000);
3817
3818        // First tick at t=0 sets last_sync
3819        let _result = mesh.tick(0);
3820        // May or may not broadcast depending on initial state
3821
3822        // Tick before interval - no broadcast
3823        let result = mesh.tick(3000);
3824        assert!(result.is_none());
3825
3826        // After interval - should broadcast
3827        let result = mesh.tick(6000);
3828        assert!(result.is_some());
3829
3830        // Immediate second tick - no broadcast (interval not elapsed)
3831        let result = mesh.tick(6100);
3832        assert!(result.is_none());
3833
3834        // After another interval - should broadcast again
3835        let result = mesh.tick(12000);
3836        assert!(result.is_some());
3837    }
3838
3839    #[test]
3840    fn test_incoming_connection() {
3841        let mesh = create_mesh(0x11111111, "ALPHA-1");
3842        let observer = Arc::new(CollectingObserver::new());
3843        mesh.add_observer(observer.clone());
3844
3845        // Incoming connection from unknown peer
3846        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
3847
3848        assert!(is_new);
3849        assert_eq!(mesh.peer_count(), 1);
3850        assert_eq!(mesh.connected_count(), 1);
3851
3852        // Check events
3853        let events = observer.events();
3854        assert!(events
3855            .iter()
3856            .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3857        assert!(events
3858            .iter()
3859            .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3860    }
3861
3862    #[test]
3863    fn test_mesh_filtering() {
3864        let mesh = create_mesh(0x11111111, "ALPHA-1");
3865
3866        // Wrong mesh - ignored
3867        let peer = mesh.on_ble_discovered(
3868            "device-uuid-1",
3869            Some("HIVE_OTHER-22222222"),
3870            -65,
3871            Some("OTHER"),
3872            1000,
3873        );
3874        assert!(peer.is_none());
3875        assert_eq!(mesh.peer_count(), 0);
3876
3877        // Correct mesh - accepted
3878        let peer = mesh.on_ble_discovered(
3879            "device-uuid-2",
3880            Some("HIVE_TEST-33333333"),
3881            -65,
3882            Some("TEST"),
3883            1000,
3884        );
3885        assert!(peer.is_some());
3886        assert_eq!(mesh.peer_count(), 1);
3887    }
3888
3889    // ==================== Encryption Tests ====================
3890
3891    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
3892        let config =
3893            HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
3894        HiveMesh::new(config)
3895    }
3896
3897    #[test]
3898    fn test_encryption_enabled() {
3899        let secret = [0x42u8; 32];
3900        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3901
3902        assert!(mesh.is_encryption_enabled());
3903    }
3904
3905    #[test]
3906    fn test_encryption_disabled_by_default() {
3907        let mesh = create_mesh(0x11111111, "ALPHA-1");
3908
3909        assert!(!mesh.is_encryption_enabled());
3910    }
3911
3912    #[test]
3913    fn test_encrypted_document_exchange() {
3914        let secret = [0x42u8; 32];
3915        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3916        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3917
3918        // mesh1 sends document
3919        let doc = mesh1.build_document();
3920
3921        // Document should be encrypted (starts with ENCRYPTED_MARKER)
3922        assert!(doc.len() >= 2);
3923        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3924
3925        // mesh2 receives and decrypts
3926        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3927
3928        assert!(result.is_some());
3929        let result = result.unwrap();
3930        assert_eq!(result.source_node.as_u32(), 0x11111111);
3931    }
3932
3933    #[test]
3934    fn test_encrypted_emergency_exchange() {
3935        let secret = [0x42u8; 32];
3936        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3937        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3938
3939        let observer = Arc::new(CollectingObserver::new());
3940        mesh2.add_observer(observer.clone());
3941
3942        // mesh1 sends emergency
3943        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3944
3945        // mesh2 receives and decrypts
3946        let result =
3947            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3948
3949        assert!(result.is_some());
3950        let result = result.unwrap();
3951        assert!(result.is_emergency);
3952
3953        // Check EmergencyReceived event was fired
3954        let events = observer.events();
3955        assert!(events
3956            .iter()
3957            .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3958    }
3959
3960    #[test]
3961    fn test_wrong_key_fails_decrypt() {
3962        let secret1 = [0x42u8; 32];
3963        let secret2 = [0x43u8; 32]; // Different key
3964        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3965        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3966
3967        // mesh1 sends document
3968        let doc = mesh1.build_document();
3969
3970        // mesh2 cannot decrypt (wrong key)
3971        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3972
3973        assert!(result.is_none());
3974    }
3975
3976    #[test]
3977    fn test_unencrypted_mesh_can_read_unencrypted() {
3978        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3979        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3980
3981        // mesh1 sends document (unencrypted)
3982        let doc = mesh1.build_document();
3983
3984        // mesh2 receives (also unencrypted)
3985        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3986
3987        assert!(result.is_some());
3988    }
3989
3990    #[test]
3991    fn test_encrypted_mesh_can_receive_unencrypted() {
3992        // Backward compatibility: encrypted mesh can receive unencrypted docs
3993        let secret = [0x42u8; 32];
3994        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
3995        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
3996
3997        // mesh1 sends unencrypted document
3998        let doc = mesh1.build_document();
3999
4000        // mesh2 can receive unencrypted (backward compat)
4001        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4002
4003        assert!(result.is_some());
4004    }
4005
4006    #[test]
4007    fn test_unencrypted_mesh_cannot_receive_encrypted() {
4008        let secret = [0x42u8; 32];
4009        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
4010        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
4011
4012        // mesh1 sends encrypted document
4013        let doc = mesh1.build_document();
4014
4015        // mesh2 cannot decrypt (no key)
4016        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4017
4018        assert!(result.is_none());
4019    }
4020
4021    #[test]
4022    fn test_enable_disable_encryption() {
4023        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4024
4025        assert!(!mesh.is_encryption_enabled());
4026
4027        // Enable encryption
4028        let secret = [0x42u8; 32];
4029        mesh.enable_encryption(&secret);
4030        assert!(mesh.is_encryption_enabled());
4031
4032        // Build document should now be encrypted
4033        let doc = mesh.build_document();
4034        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4035
4036        // Disable encryption
4037        mesh.disable_encryption();
4038        assert!(!mesh.is_encryption_enabled());
4039
4040        // Build document should now be unencrypted
4041        let doc = mesh.build_document();
4042        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4043    }
4044
4045    #[test]
4046    fn test_encryption_overhead() {
4047        let secret = [0x42u8; 32];
4048        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4049        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4050
4051        let doc_encrypted = mesh_encrypted.build_document();
4052        let doc_unencrypted = mesh_unencrypted.build_document();
4053
4054        // Encrypted doc should be larger by:
4055        // - 2 bytes marker header (0xAE + reserved)
4056        // - 12 bytes nonce
4057        // - 16 bytes auth tag
4058        // Total: 30 bytes overhead
4059        let overhead = doc_encrypted.len() - doc_unencrypted.len();
4060        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
4061    }
4062
4063    // ==================== Per-Peer E2EE Tests ====================
4064
4065    #[test]
4066    fn test_peer_e2ee_enable_disable() {
4067        let mesh = create_mesh(0x11111111, "ALPHA-1");
4068
4069        assert!(!mesh.is_peer_e2ee_enabled());
4070        assert!(mesh.peer_e2ee_public_key().is_none());
4071
4072        mesh.enable_peer_e2ee();
4073        assert!(mesh.is_peer_e2ee_enabled());
4074        assert!(mesh.peer_e2ee_public_key().is_some());
4075
4076        mesh.disable_peer_e2ee();
4077        assert!(!mesh.is_peer_e2ee_enabled());
4078    }
4079
4080    #[test]
4081    fn test_peer_e2ee_initiate_session() {
4082        let mesh = create_mesh(0x11111111, "ALPHA-1");
4083        mesh.enable_peer_e2ee();
4084
4085        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4086        assert!(key_exchange.is_some());
4087
4088        let key_exchange = key_exchange.unwrap();
4089        // Should start with KEY_EXCHANGE_MARKER
4090        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4091
4092        // Should have a pending session
4093        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4094        assert_eq!(mesh.peer_e2ee_established_count(), 0);
4095    }
4096
4097    #[test]
4098    fn test_peer_e2ee_full_handshake() {
4099        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4100        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4101
4102        mesh1.enable_peer_e2ee();
4103        mesh2.enable_peer_e2ee();
4104
4105        let observer1 = Arc::new(CollectingObserver::new());
4106        let observer2 = Arc::new(CollectingObserver::new());
4107        mesh1.add_observer(observer1.clone());
4108        mesh2.add_observer(observer2.clone());
4109
4110        // mesh1 initiates to mesh2
4111        let key_exchange1 = mesh1
4112            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4113            .unwrap();
4114
4115        // mesh2 receives and responds
4116        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4117        assert!(response.is_some());
4118
4119        // Check mesh2 has established session
4120        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4121
4122        // mesh1 receives mesh2's response
4123        let key_exchange2 = response.unwrap();
4124        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4125
4126        // Check mesh1 has established session
4127        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4128
4129        // Both should have E2EE established events
4130        let events1 = observer1.events();
4131        assert!(events1
4132            .iter()
4133            .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4134
4135        let events2 = observer2.events();
4136        assert!(events2
4137            .iter()
4138            .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4139    }
4140
4141    #[test]
4142    fn test_peer_e2ee_encrypt_decrypt() {
4143        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4144        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4145
4146        mesh1.enable_peer_e2ee();
4147        mesh2.enable_peer_e2ee();
4148
4149        // Establish session via key exchange
4150        let key_exchange1 = mesh1
4151            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4152            .unwrap();
4153        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4154        mesh1.handle_key_exchange(&key_exchange2, 1000);
4155
4156        // mesh1 sends encrypted message to mesh2
4157        let plaintext = b"Secret message from mesh1";
4158        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4159        assert!(encrypted.is_some());
4160
4161        let encrypted = encrypted.unwrap();
4162        // Should start with PEER_E2EE_MARKER
4163        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4164
4165        // mesh2 receives and decrypts
4166        let observer2 = Arc::new(CollectingObserver::new());
4167        mesh2.add_observer(observer2.clone());
4168
4169        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4170        assert!(decrypted.is_some());
4171        assert_eq!(decrypted.unwrap(), plaintext);
4172
4173        // Should have received message event
4174        let events = observer2.events();
4175        assert!(events.iter().any(|e| matches!(
4176            e,
4177            HiveEvent::PeerE2eeMessageReceived { from_node, data }
4178            if from_node.as_u32() == 0x11111111 && data == plaintext
4179        )));
4180    }
4181
4182    #[test]
4183    fn test_peer_e2ee_bidirectional() {
4184        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4185        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4186
4187        mesh1.enable_peer_e2ee();
4188        mesh2.enable_peer_e2ee();
4189
4190        // Establish session
4191        let key_exchange1 = mesh1
4192            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4193            .unwrap();
4194        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4195        mesh1.handle_key_exchange(&key_exchange2, 1000);
4196
4197        // mesh1 -> mesh2
4198        let msg1 = mesh1
4199            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4200            .unwrap();
4201        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4202        assert_eq!(dec1, b"Hello from mesh1");
4203
4204        // mesh2 -> mesh1
4205        let msg2 = mesh2
4206            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4207            .unwrap();
4208        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4209        assert_eq!(dec2, b"Hello from mesh2");
4210    }
4211
4212    #[test]
4213    fn test_peer_e2ee_close_session() {
4214        let mesh = create_mesh(0x11111111, "ALPHA-1");
4215        mesh.enable_peer_e2ee();
4216
4217        let observer = Arc::new(CollectingObserver::new());
4218        mesh.add_observer(observer.clone());
4219
4220        // Initiate a session
4221        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4222        assert_eq!(mesh.peer_e2ee_session_count(), 1);
4223
4224        // Close session
4225        mesh.close_peer_e2ee(NodeId::new(0x22222222));
4226
4227        // Check close event
4228        let events = observer.events();
4229        assert!(events
4230            .iter()
4231            .any(|e| matches!(e, HiveEvent::PeerE2eeClosed { .. })));
4232    }
4233
4234    #[test]
4235    fn test_peer_e2ee_without_enabling() {
4236        let mesh = create_mesh(0x11111111, "ALPHA-1");
4237
4238        // E2EE not enabled - should return None
4239        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4240        assert!(result.is_none());
4241
4242        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4243        assert!(result.is_none());
4244
4245        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4246    }
4247
4248    #[test]
4249    fn test_peer_e2ee_overhead() {
4250        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4251        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4252
4253        mesh1.enable_peer_e2ee();
4254        mesh2.enable_peer_e2ee();
4255
4256        // Establish session
4257        let key_exchange1 = mesh1
4258            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4259            .unwrap();
4260        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4261        mesh1.handle_key_exchange(&key_exchange2, 1000);
4262
4263        // Encrypt a message
4264        let plaintext = b"Test message";
4265        let encrypted = mesh1
4266            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4267            .unwrap();
4268
4269        // Overhead should be:
4270        // - 2 bytes marker header
4271        // - 4 bytes recipient node ID
4272        // - 4 bytes sender node ID
4273        // - 8 bytes counter
4274        // - 12 bytes nonce
4275        // - 16 bytes auth tag
4276        // Total: 46 bytes overhead
4277        let overhead = encrypted.len() - plaintext.len();
4278        assert_eq!(overhead, 46);
4279    }
4280
4281    // ==================== Strict Encryption Mode Tests ====================
4282
4283    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
4284        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4285            .with_encryption(secret)
4286            .with_strict_encryption();
4287        HiveMesh::new(config)
4288    }
4289
4290    #[test]
4291    fn test_strict_encryption_enabled() {
4292        let secret = [0x42u8; 32];
4293        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4294
4295        assert!(mesh.is_encryption_enabled());
4296        assert!(mesh.is_strict_encryption_enabled());
4297    }
4298
4299    #[test]
4300    fn test_strict_encryption_disabled_by_default() {
4301        let secret = [0x42u8; 32];
4302        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4303
4304        assert!(mesh.is_encryption_enabled());
4305        assert!(!mesh.is_strict_encryption_enabled());
4306    }
4307
4308    #[test]
4309    fn test_strict_encryption_requires_encryption_enabled() {
4310        // strict_encryption without encryption should have no effect
4311        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4312            .with_strict_encryption(); // No encryption!
4313        let mesh = HiveMesh::new(config);
4314
4315        assert!(!mesh.is_encryption_enabled());
4316        assert!(!mesh.is_strict_encryption_enabled());
4317    }
4318
4319    #[test]
4320    fn test_strict_mode_accepts_encrypted_documents() {
4321        let secret = [0x42u8; 32];
4322        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4323        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4324
4325        // mesh1 sends encrypted document
4326        let doc = mesh1.build_document();
4327        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4328
4329        // mesh2 (strict mode) should accept encrypted documents
4330        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4331        assert!(result.is_some());
4332    }
4333
4334    #[test]
4335    fn test_strict_mode_rejects_unencrypted_documents() {
4336        let secret = [0x42u8; 32];
4337        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4338        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
4339
4340        let observer = Arc::new(CollectingObserver::new());
4341        mesh2.add_observer(observer.clone());
4342
4343        // mesh1 sends unencrypted document
4344        let doc = mesh1.build_document();
4345        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4346
4347        // mesh2 (strict mode) should reject unencrypted documents
4348        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4349        assert!(result.is_none());
4350
4351        // Should have SecurityViolation event
4352        let events = observer.events();
4353        assert!(events.iter().any(|e| matches!(
4354            e,
4355            HiveEvent::SecurityViolation {
4356                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4357                ..
4358            }
4359        )));
4360    }
4361
4362    #[test]
4363    fn test_non_strict_mode_accepts_unencrypted_documents() {
4364        let secret = [0x42u8; 32];
4365        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
4366        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
4367
4368        // mesh1 sends unencrypted document
4369        let doc = mesh1.build_document();
4370
4371        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
4372        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4373        assert!(result.is_some());
4374    }
4375
4376    #[test]
4377    fn test_strict_mode_security_violation_event_includes_source() {
4378        let secret = [0x42u8; 32];
4379        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4380        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4381
4382        let observer = Arc::new(CollectingObserver::new());
4383        mesh2.add_observer(observer.clone());
4384
4385        let doc = mesh1.build_document();
4386
4387        // Use on_ble_data_received with identifier to test source is captured
4388        mesh2.on_ble_discovered(
4389            "test-device-uuid",
4390            Some("HIVE_TEST-11111111"),
4391            -65,
4392            Some("TEST"),
4393            500,
4394        );
4395        mesh2.on_ble_connected("test-device-uuid", 600);
4396
4397        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4398
4399        // Check SecurityViolation event has source
4400        let events = observer.events();
4401        let violation = events.iter().find(|e| {
4402            matches!(
4403                e,
4404                HiveEvent::SecurityViolation {
4405                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4406                    ..
4407                }
4408            )
4409        });
4410        assert!(violation.is_some());
4411
4412        if let Some(HiveEvent::SecurityViolation { source, .. }) = violation {
4413            assert!(source.is_some());
4414            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4415        }
4416    }
4417
4418    #[test]
4419    fn test_decryption_failure_emits_security_violation() {
4420        let secret1 = [0x42u8; 32];
4421        let secret2 = [0x43u8; 32]; // Different key
4422        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4423        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4424
4425        let observer = Arc::new(CollectingObserver::new());
4426        mesh2.add_observer(observer.clone());
4427
4428        // mesh1 sends encrypted document
4429        let doc = mesh1.build_document();
4430
4431        // mesh2 cannot decrypt (wrong key)
4432        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4433        assert!(result.is_none());
4434
4435        // Should have SecurityViolation event for decryption failure
4436        let events = observer.events();
4437        assert!(events.iter().any(|e| matches!(
4438            e,
4439            HiveEvent::SecurityViolation {
4440                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4441                ..
4442            }
4443        )));
4444    }
4445
4446    #[test]
4447    fn test_strict_mode_builder_chain() {
4448        let secret = [0x42u8; 32];
4449        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4450            .with_encryption(secret)
4451            .with_strict_encryption()
4452            .with_sync_interval(10_000)
4453            .with_peer_timeout(60_000);
4454
4455        let mesh = HiveMesh::new(config);
4456
4457        assert!(mesh.is_encryption_enabled());
4458        assert!(mesh.is_strict_encryption_enabled());
4459    }
4460
4461    // ==================== Multi-Hop Relay Tests ====================
4462
4463    fn create_relay_mesh(node_id: u32, callsign: &str) -> HiveMesh {
4464        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4465        HiveMesh::new(config)
4466    }
4467
4468    #[test]
4469    fn test_relay_disabled_by_default() {
4470        let mesh = create_mesh(0x11111111, "ALPHA-1");
4471        assert!(!mesh.is_relay_enabled());
4472    }
4473
4474    #[test]
4475    fn test_relay_enabled() {
4476        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4477        assert!(mesh.is_relay_enabled());
4478    }
4479
4480    #[test]
4481    fn test_relay_config_builder() {
4482        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4483            .with_relay()
4484            .with_max_relay_hops(5)
4485            .with_relay_fanout(3)
4486            .with_seen_cache_ttl(60_000);
4487
4488        assert!(config.enable_relay);
4489        assert_eq!(config.max_relay_hops, 5);
4490        assert_eq!(config.relay_fanout, 3);
4491        assert_eq!(config.seen_cache_ttl_ms, 60_000);
4492    }
4493
4494    #[test]
4495    fn test_seen_message_deduplication() {
4496        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4497        let origin = NodeId::new(0x22222222);
4498        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4499
4500        // First time - should be new
4501        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4502
4503        // Second time - should be duplicate
4504        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4505
4506        assert_eq!(mesh.seen_cache_size(), 1);
4507    }
4508
4509    #[test]
4510    fn test_wrap_for_relay() {
4511        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4512
4513        let payload = vec![1, 2, 3, 4, 5];
4514        let wrapped = mesh.wrap_for_relay(payload.clone());
4515
4516        // Should start with relay envelope marker
4517        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4518
4519        // Decode and verify
4520        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4521        assert_eq!(envelope.payload, payload);
4522        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4523        assert_eq!(envelope.hop_count, 0);
4524    }
4525
4526    #[test]
4527    fn test_process_relay_envelope_new_message() {
4528        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4529        let observer = Arc::new(CollectingObserver::new());
4530        mesh.add_observer(observer.clone());
4531
4532        // Create an envelope from another node
4533        let payload = vec![1, 2, 3, 4, 5];
4534        let envelope =
4535            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4536                .with_max_hops(7);
4537        let data = envelope.encode();
4538
4539        // Process it
4540        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4541
4542        assert!(decision.is_some());
4543        let decision = decision.unwrap();
4544        assert_eq!(decision.payload, payload);
4545        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4546        assert_eq!(decision.hop_count, 0);
4547        assert!(decision.should_relay);
4548        assert!(decision.relay_envelope.is_some());
4549
4550        // Relay envelope should have incremented hop count
4551        let relay_env = decision.relay_envelope.unwrap();
4552        assert_eq!(relay_env.hop_count, 1);
4553    }
4554
4555    #[test]
4556    fn test_process_relay_envelope_duplicate() {
4557        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4558        let observer = Arc::new(CollectingObserver::new());
4559        mesh.add_observer(observer.clone());
4560
4561        let payload = vec![1, 2, 3, 4, 5];
4562        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4563        let data = envelope.encode();
4564
4565        // First time - should succeed
4566        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4567        assert!(decision.is_some());
4568
4569        // Second time - should be duplicate
4570        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4571        assert!(decision.is_none());
4572
4573        // Should have DuplicateMessageDropped event
4574        let events = observer.events();
4575        assert!(events
4576            .iter()
4577            .any(|e| matches!(e, HiveEvent::DuplicateMessageDropped { .. })));
4578    }
4579
4580    #[test]
4581    fn test_process_relay_envelope_ttl_expired() {
4582        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4583        let observer = Arc::new(CollectingObserver::new());
4584        mesh.add_observer(observer.clone());
4585
4586        // Create envelope at max hops (TTL expired)
4587        let payload = vec![1, 2, 3, 4, 5];
4588        let mut envelope =
4589            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4590                .with_max_hops(3);
4591
4592        // Simulate having been relayed 3 times already
4593        envelope = envelope.relay().unwrap(); // hop 1
4594        envelope = envelope.relay().unwrap(); // hop 2
4595        envelope = envelope.relay().unwrap(); // hop 3 - at max now
4596
4597        let data = envelope.encode();
4598
4599        // Process - should still process locally but not relay further
4600        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4601
4602        assert!(decision.is_some());
4603        let decision = decision.unwrap();
4604        assert_eq!(decision.payload, payload);
4605        assert!(!decision.should_relay); // Cannot relay further
4606        assert!(decision.relay_envelope.is_none());
4607
4608        // Should have MessageTtlExpired event
4609        let events = observer.events();
4610        assert!(events
4611            .iter()
4612            .any(|e| matches!(e, HiveEvent::MessageTtlExpired { .. })));
4613    }
4614
4615    #[test]
4616    fn test_build_relay_document() {
4617        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4618
4619        let relay_doc = mesh.build_relay_document();
4620
4621        // Should be a valid relay envelope
4622        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4623
4624        // Decode and verify it contains a valid document
4625        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4626        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4627
4628        // The payload should be a valid HiveDocument
4629        let doc = crate::document::HiveDocument::decode(&envelope.payload);
4630        assert!(doc.is_some());
4631    }
4632
4633    #[test]
4634    fn test_relay_targets_excludes_source() {
4635        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4636
4637        // Add some peers
4638        mesh.on_ble_discovered(
4639            "peer-1",
4640            Some("HIVE_TEST-22222222"),
4641            -60,
4642            Some("TEST"),
4643            1000,
4644        );
4645        mesh.on_ble_connected("peer-1", 1000);
4646
4647        mesh.on_ble_discovered(
4648            "peer-2",
4649            Some("HIVE_TEST-33333333"),
4650            -65,
4651            Some("TEST"),
4652            1000,
4653        );
4654        mesh.on_ble_connected("peer-2", 1000);
4655
4656        mesh.on_ble_discovered(
4657            "peer-3",
4658            Some("HIVE_TEST-44444444"),
4659            -70,
4660            Some("TEST"),
4661            1000,
4662        );
4663        mesh.on_ble_connected("peer-3", 1000);
4664
4665        // Get relay targets excluding peer-2
4666        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4667
4668        // Should not include peer-2 in targets
4669        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4670    }
4671
4672    #[test]
4673    fn test_clear_seen_cache() {
4674        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4675        let origin = NodeId::new(0x22222222);
4676
4677        // Add some messages
4678        mesh.mark_message_seen(
4679            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4680            origin,
4681            1000,
4682        );
4683        mesh.mark_message_seen(
4684            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4685            origin,
4686            2000,
4687        );
4688
4689        assert_eq!(mesh.seen_cache_size(), 2);
4690
4691        // Clear
4692        mesh.clear_seen_cache();
4693        assert_eq!(mesh.seen_cache_size(), 0);
4694    }
4695}