hive_btle/
hive_mesh.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HiveMesh - Unified mesh management facade
17//!
18//! This module provides the main entry point for HIVE BLE mesh operations.
19//! It composes peer management, document sync, and observer notifications
20//! into a single interface that platform implementations can use.
21//!
22//! ## Usage
23//!
24//! ```ignore
25//! use hive_btle::hive_mesh::{HiveMesh, HiveMeshConfig};
26//! use hive_btle::observer::{HiveEvent, HiveObserver};
27//! use hive_btle::NodeId;
28//! use std::sync::Arc;
29//!
30//! // Create mesh configuration
31//! let config = HiveMeshConfig::new(NodeId::new(0x12345678), "ALPHA-1", "DEMO");
32//!
33//! // Create mesh instance
34//! let mesh = HiveMesh::new(config);
35//!
36//! // Add observer for events
37//! struct MyObserver;
38//! impl HiveObserver for MyObserver {
39//!     fn on_event(&self, event: HiveEvent) {
40//!         println!("Event: {:?}", event);
41//!     }
42//! }
43//! mesh.add_observer(Arc::new(MyObserver));
44//!
45//! // Platform BLE callbacks
46//! mesh.on_ble_discovered("device-uuid", Some("HIVE_DEMO-AABBCCDD"), -65, Some("DEMO"), now_ms);
47//! mesh.on_ble_connected("device-uuid", now_ms);
48//! mesh.on_ble_data_received("device-uuid", &data, now_ms);
49//!
50//! // Periodic maintenance
51//! if let Some(sync_data) = mesh.tick(now_ms) {
52//!     // Broadcast sync_data to connected peers
53//! }
54//! ```
55
56#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::sync::Arc;
60
61use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
62use crate::document_sync::DocumentSync;
63use crate::gossip::{GossipStrategy, RandomFanout};
64use crate::observer::{DisconnectReason, HiveEvent, HiveObserver, SecurityViolationKind};
65use crate::peer::{
66    ConnectionStateGraph, FullStateCountSummary, HivePeer, IndirectPeer, PeerConnectionState,
67    PeerDegree, PeerManagerConfig, StateCountSummary,
68};
69use crate::peer_manager::PeerManager;
70use crate::relay::{
71    MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
72    RELAY_ENVELOPE_MARKER,
73};
74use crate::security::{
75    KeyExchangeMessage, MeshEncryptionKey, PeerEncryptedMessage, PeerSessionManager, SessionState,
76};
77use crate::sync::crdt::{EventType, PeripheralType};
78use crate::sync::delta::{DeltaEncoder, DeltaStats};
79use crate::sync::delta_document::{DeltaDocument, Operation};
80use crate::NodeId;
81
82#[cfg(feature = "std")]
83use crate::observer::ObserverManager;
84
85/// Configuration for HiveMesh
86#[derive(Debug, Clone)]
87pub struct HiveMeshConfig {
88    /// Our node ID
89    pub node_id: NodeId,
90
91    /// Our callsign (e.g., "ALPHA-1")
92    pub callsign: String,
93
94    /// Mesh ID to filter peers (e.g., "DEMO")
95    pub mesh_id: String,
96
97    /// Peripheral type for this device
98    pub peripheral_type: PeripheralType,
99
100    /// Peer management configuration
101    pub peer_config: PeerManagerConfig,
102
103    /// Sync interval in milliseconds (how often to broadcast state)
104    pub sync_interval_ms: u64,
105
106    /// Whether to auto-broadcast on emergency/ack
107    pub auto_broadcast_events: bool,
108
109    /// Optional shared secret for mesh-wide encryption (32 bytes)
110    ///
111    /// When set, all documents are encrypted using ChaCha20-Poly1305 before
112    /// transmission and decrypted upon receipt. All nodes in the mesh must
113    /// share the same secret to communicate.
114    pub encryption_secret: Option<[u8; 32]>,
115
116    /// Strict encryption mode - reject unencrypted documents when encryption is enabled
117    ///
118    /// When true and encryption is enabled, any unencrypted documents received
119    /// will be rejected and trigger a SecurityViolation event. This prevents
120    /// downgrade attacks where an adversary sends unencrypted malicious documents.
121    ///
122    /// Default: false (backward compatible - accepts unencrypted for gradual rollout)
123    pub strict_encryption: bool,
124
125    /// Enable multi-hop relay
126    ///
127    /// When enabled, received messages will be forwarded to other peers based
128    /// on the gossip strategy. Requires message deduplication to prevent loops.
129    ///
130    /// Default: false
131    pub enable_relay: bool,
132
133    /// Maximum hops for relay messages (TTL)
134    ///
135    /// Messages will not be relayed beyond this many hops from the origin.
136    /// Default: 7
137    pub max_relay_hops: u8,
138
139    /// Gossip fanout for relay
140    ///
141    /// Number of peers to forward each message to. Higher values increase
142    /// convergence speed but also bandwidth usage.
143    /// Default: 2
144    pub relay_fanout: usize,
145
146    /// TTL for seen message cache (milliseconds)
147    ///
148    /// How long to remember message IDs for deduplication.
149    /// Default: 300_000 (5 minutes)
150    pub seen_cache_ttl_ms: u64,
151}
152
153impl HiveMeshConfig {
154    /// Create a new configuration with required fields
155    pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
156        Self {
157            node_id,
158            callsign: callsign.into(),
159            mesh_id: mesh_id.into(),
160            peripheral_type: PeripheralType::SoldierSensor,
161            peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
162            sync_interval_ms: 5000,
163            auto_broadcast_events: true,
164            encryption_secret: None,
165            strict_encryption: false,
166            enable_relay: false,
167            max_relay_hops: DEFAULT_MAX_HOPS,
168            relay_fanout: 2,
169            seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
170        }
171    }
172
173    /// Enable mesh-wide encryption with a shared secret
174    ///
175    /// All documents will be encrypted using ChaCha20-Poly1305 before
176    /// transmission. All mesh participants must use the same secret.
177    pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
178        self.encryption_secret = Some(secret);
179        self
180    }
181
182    /// Set peripheral type
183    pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
184        self.peripheral_type = ptype;
185        self
186    }
187
188    /// Set sync interval
189    pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
190        self.sync_interval_ms = interval_ms;
191        self
192    }
193
194    /// Set peer timeout
195    pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
196        self.peer_config.peer_timeout_ms = timeout_ms;
197        self
198    }
199
200    /// Set max peers (for embedded systems)
201    pub fn with_max_peers(mut self, max: usize) -> Self {
202        self.peer_config.max_peers = max;
203        self
204    }
205
206    /// Enable strict encryption mode
207    ///
208    /// When enabled (and encryption is also enabled), any unencrypted documents
209    /// received will be rejected and trigger a `SecurityViolation` event.
210    /// This prevents downgrade attacks.
211    ///
212    /// Note: This only has effect when encryption is enabled via `with_encryption()`.
213    pub fn with_strict_encryption(mut self) -> Self {
214        self.strict_encryption = true;
215        self
216    }
217
218    /// Enable multi-hop relay
219    ///
220    /// When enabled, received messages will be forwarded to other connected peers
221    /// based on the gossip strategy. This enables mesh-wide message propagation.
222    pub fn with_relay(mut self) -> Self {
223        self.enable_relay = true;
224        self
225    }
226
227    /// Set maximum relay hops (TTL)
228    ///
229    /// Messages will not be relayed beyond this many hops from the origin.
230    pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
231        self.max_relay_hops = max_hops;
232        self
233    }
234
235    /// Set gossip fanout for relay
236    ///
237    /// Number of peers to forward each message to.
238    pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
239        self.relay_fanout = fanout.max(1);
240        self
241    }
242
243    /// Set TTL for seen message cache
244    ///
245    /// How long to remember message IDs for deduplication (milliseconds).
246    pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
247        self.seen_cache_ttl_ms = ttl_ms;
248        self
249    }
250}
251
252/// Main facade for HIVE BLE mesh operations
253///
254/// Composes peer management, document sync, and observer notifications.
255/// Platform implementations call into this from their BLE callbacks.
256#[cfg(feature = "std")]
257pub struct HiveMesh {
258    /// Configuration
259    config: HiveMeshConfig,
260
261    /// Peer manager
262    peer_manager: PeerManager,
263
264    /// Document sync
265    document_sync: DocumentSync,
266
267    /// Observer manager
268    observers: ObserverManager,
269
270    /// Last sync broadcast time (u32 wraps every ~49 days, sufficient for intervals)
271    last_sync_ms: std::sync::atomic::AtomicU32,
272
273    /// Last cleanup time
274    last_cleanup_ms: std::sync::atomic::AtomicU32,
275
276    /// Optional mesh-wide encryption key (derived from shared secret)
277    encryption_key: Option<MeshEncryptionKey>,
278
279    /// Optional per-peer E2EE session manager
280    peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
281
282    /// Connection state graph for tracking peer connection lifecycle
283    connection_graph: std::sync::Mutex<ConnectionStateGraph>,
284
285    /// Seen message cache for relay deduplication
286    seen_cache: std::sync::Mutex<SeenMessageCache>,
287
288    /// Gossip strategy for relay peer selection
289    gossip_strategy: Box<dyn GossipStrategy>,
290
291    /// Delta encoder for per-peer sync state tracking
292    ///
293    /// Tracks what data has been sent to each peer to enable delta sync
294    /// (sending only changes instead of full documents).
295    delta_encoder: std::sync::Mutex<DeltaEncoder>,
296}
297
298#[cfg(feature = "std")]
299impl HiveMesh {
300    /// Create a new HiveMesh instance
301    pub fn new(config: HiveMeshConfig) -> Self {
302        let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
303        let document_sync = DocumentSync::with_peripheral_type(
304            config.node_id,
305            &config.callsign,
306            config.peripheral_type,
307        );
308
309        // Derive encryption key from shared secret if configured
310        let encryption_key = config
311            .encryption_secret
312            .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
313
314        // Create connection state graph with config thresholds
315        let connection_graph = ConnectionStateGraph::with_config(
316            config.peer_config.rssi_degraded_threshold,
317            config.peer_config.lost_timeout_ms,
318        );
319
320        // Create seen message cache for relay deduplication
321        let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
322
323        // Create gossip strategy for relay
324        let gossip_strategy: Box<dyn GossipStrategy> =
325            Box::new(RandomFanout::new(config.relay_fanout));
326
327        // Create delta encoder for per-peer sync state tracking
328        let delta_encoder = DeltaEncoder::new(config.node_id);
329
330        Self {
331            config,
332            peer_manager,
333            document_sync,
334            observers: ObserverManager::new(),
335            last_sync_ms: std::sync::atomic::AtomicU32::new(0),
336            last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
337            encryption_key,
338            peer_sessions: std::sync::Mutex::new(None),
339            connection_graph: std::sync::Mutex::new(connection_graph),
340            seen_cache: std::sync::Mutex::new(seen_cache),
341            gossip_strategy,
342            delta_encoder: std::sync::Mutex::new(delta_encoder),
343        }
344    }
345
346    // ==================== Encryption ====================
347
348    /// Check if mesh-wide encryption is enabled
349    pub fn is_encryption_enabled(&self) -> bool {
350        self.encryption_key.is_some()
351    }
352
353    /// Check if strict encryption mode is enabled
354    ///
355    /// Returns true only if both encryption and strict_encryption are enabled.
356    pub fn is_strict_encryption_enabled(&self) -> bool {
357        self.config.strict_encryption && self.encryption_key.is_some()
358    }
359
360    /// Enable mesh-wide encryption with a shared secret
361    ///
362    /// Derives a ChaCha20-Poly1305 key from the secret using HKDF-SHA256.
363    /// All mesh participants must use the same secret to communicate.
364    pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
365        self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
366            &self.config.mesh_id,
367            secret,
368        ));
369    }
370
371    /// Disable mesh-wide encryption
372    pub fn disable_encryption(&mut self) {
373        self.encryption_key = None;
374    }
375
376    /// Encrypt document bytes for transmission
377    ///
378    /// Returns the encrypted bytes with ENCRYPTED_MARKER prefix, or the
379    /// original bytes if encryption is disabled.
380    fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
381        match &self.encryption_key {
382            Some(key) => {
383                // Encrypt and prepend marker
384                match key.encrypt_to_bytes(plaintext) {
385                    Ok(ciphertext) => {
386                        let mut buf = Vec::with_capacity(2 + ciphertext.len());
387                        buf.push(ENCRYPTED_MARKER);
388                        buf.push(0x00); // reserved
389                        buf.extend_from_slice(&ciphertext);
390                        buf
391                    }
392                    Err(e) => {
393                        log::error!("Encryption failed: {}", e);
394                        // Fall back to unencrypted on error (shouldn't happen)
395                        plaintext.to_vec()
396                    }
397                }
398            }
399            None => plaintext.to_vec(),
400        }
401    }
402
403    /// Decrypt document bytes received from peer
404    ///
405    /// Returns the decrypted bytes if encrypted and valid, or the original
406    /// bytes if not encrypted. Returns None if decryption fails.
407    ///
408    /// In strict encryption mode (when both encryption and strict_encryption are enabled),
409    /// unencrypted documents are rejected and trigger a SecurityViolation event.
410    fn decrypt_document<'a>(
411        &self,
412        data: &'a [u8],
413        source_hint: Option<&str>,
414    ) -> Option<std::borrow::Cow<'a, [u8]>> {
415        // Check for encrypted marker
416        if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
417            // Encrypted document
418            let _reserved = data[1];
419            let encrypted_payload = &data[2..];
420
421            match &self.encryption_key {
422                Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
423                    Ok(plaintext) => Some(std::borrow::Cow::Owned(plaintext)),
424                    Err(e) => {
425                        log::warn!("Decryption failed (wrong key or corrupted): {}", e);
426                        self.notify(HiveEvent::SecurityViolation {
427                            kind: SecurityViolationKind::DecryptionFailed,
428                            source: source_hint.map(String::from),
429                        });
430                        None
431                    }
432                },
433                None => {
434                    log::warn!("Received encrypted document but encryption not enabled");
435                    None
436                }
437            }
438        } else {
439            // Unencrypted document
440            // Check strict encryption mode
441            if self.config.strict_encryption && self.encryption_key.is_some() {
442                log::warn!(
443                    "Rejected unencrypted document in strict encryption mode (source: {:?})",
444                    source_hint
445                );
446                self.notify(HiveEvent::SecurityViolation {
447                    kind: SecurityViolationKind::UnencryptedInStrictMode,
448                    source: source_hint.map(String::from),
449                });
450                None
451            } else {
452                // Permissive mode: accept unencrypted for backward compatibility
453                Some(std::borrow::Cow::Borrowed(data))
454            }
455        }
456    }
457
458    // ==================== Multi-Hop Relay ====================
459
460    /// Check if multi-hop relay is enabled
461    pub fn is_relay_enabled(&self) -> bool {
462        self.config.enable_relay
463    }
464
465    /// Enable multi-hop relay
466    pub fn enable_relay(&mut self) {
467        self.config.enable_relay = true;
468    }
469
470    /// Disable multi-hop relay
471    pub fn disable_relay(&mut self) {
472        self.config.enable_relay = false;
473    }
474
475    /// Check if a message has been seen before (for deduplication)
476    ///
477    /// Returns true if the message was already seen (duplicate).
478    pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
479        self.seen_cache.lock().unwrap().has_seen(message_id)
480    }
481
482    /// Mark a message as seen
483    ///
484    /// Returns true if this is a new message (first time seen).
485    pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
486        self.seen_cache
487            .lock()
488            .unwrap()
489            .check_and_mark(message_id, origin, now_ms)
490    }
491
492    /// Get the number of entries in the seen message cache
493    pub fn seen_cache_size(&self) -> usize {
494        self.seen_cache.lock().unwrap().len()
495    }
496
497    /// Clear the seen message cache
498    pub fn clear_seen_cache(&self) {
499        self.seen_cache.lock().unwrap().clear();
500    }
501
502    /// Wrap a document in a relay envelope for multi-hop transmission
503    ///
504    /// The returned bytes can be sent to peers and will be automatically
505    /// relayed through the mesh if relay is enabled on receiving nodes.
506    pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
507        let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
508            .with_max_hops(self.config.max_relay_hops);
509        envelope.encode()
510    }
511
512    /// Get peers to relay a message to
513    ///
514    /// Uses the configured gossip strategy to select relay targets.
515    /// Excludes the source peer (if provided) to avoid sending back to sender.
516    pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<HivePeer> {
517        let connected = self.peer_manager.get_connected_peers();
518        let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
519            connected
520                .into_iter()
521                .filter(|p| p.node_id != exclude)
522                .collect()
523        } else {
524            connected
525        };
526
527        self.gossip_strategy
528            .select_peers(&filtered)
529            .into_iter()
530            .cloned()
531            .collect()
532    }
533
534    /// Process an incoming relay envelope
535    ///
536    /// Handles deduplication, TTL checking, and determines if the message
537    /// should be processed and/or relayed.
538    ///
539    /// Returns:
540    /// - `Ok(Some(RelayDecision))` if message should be processed/relayed
541    /// - `Ok(None)` if message was a duplicate or TTL expired
542    /// - `Err` if parsing failed
543    pub fn process_relay_envelope(
544        &self,
545        data: &[u8],
546        source_peer: NodeId,
547        now_ms: u64,
548    ) -> Option<RelayDecision> {
549        // Parse envelope
550        let envelope = RelayEnvelope::decode(data)?;
551
552        // Update indirect peer graph if origin differs from source
553        // This means the message was relayed through source_peer from origin_node
554        if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
555            let is_new = self.connection_graph.lock().unwrap().on_relay_received(
556                source_peer,
557                envelope.origin_node,
558                envelope.hop_count,
559                now_ms,
560            );
561
562            if is_new {
563                log::debug!(
564                    "Discovered indirect peer {:08X} via {:08X} ({} hops)",
565                    envelope.origin_node.as_u32(),
566                    source_peer.as_u32(),
567                    envelope.hop_count
568                );
569            }
570        }
571
572        // Check deduplication
573        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
574            // Duplicate message
575            let stats = self
576                .seen_cache
577                .lock()
578                .unwrap()
579                .get_stats(&envelope.message_id);
580            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
581
582            self.notify(HiveEvent::DuplicateMessageDropped {
583                origin_node: envelope.origin_node,
584                seen_count,
585            });
586
587            log::debug!(
588                "Dropped duplicate message {} from {:08X} (seen {} times)",
589                envelope.message_id,
590                envelope.origin_node.as_u32(),
591                seen_count
592            );
593            return None;
594        }
595
596        // Check TTL
597        if !envelope.can_relay() {
598            self.notify(HiveEvent::MessageTtlExpired {
599                origin_node: envelope.origin_node,
600                hop_count: envelope.hop_count,
601            });
602
603            log::debug!(
604                "Message {} from {:08X} TTL expired at hop {}",
605                envelope.message_id,
606                envelope.origin_node.as_u32(),
607                envelope.hop_count
608            );
609
610            // Still process locally even if TTL expired
611            return Some(RelayDecision {
612                payload: envelope.payload,
613                origin_node: envelope.origin_node,
614                hop_count: envelope.hop_count,
615                should_relay: false,
616                relay_envelope: None,
617            });
618        }
619
620        // Determine if we should relay
621        let should_relay = self.config.enable_relay;
622        let relay_envelope = if should_relay {
623            envelope.relay() // Increments hop count
624        } else {
625            None
626        };
627
628        Some(RelayDecision {
629            payload: envelope.payload,
630            origin_node: envelope.origin_node,
631            hop_count: envelope.hop_count,
632            should_relay,
633            relay_envelope,
634        })
635    }
636
637    /// Build a document wrapped in a relay envelope
638    ///
639    /// Convenience method that builds the document, encrypts it (if enabled),
640    /// and wraps it in a relay envelope for multi-hop transmission.
641    pub fn build_relay_document(&self) -> Vec<u8> {
642        let doc = self.build_document(); // Already encrypted if encryption enabled
643        self.wrap_for_relay(doc)
644    }
645
646    // ==================== Delta Sync ====================
647
648    /// Register a peer for delta sync tracking
649    ///
650    /// Call this when a peer connects to start tracking what data has been
651    /// sent to them. This enables future delta sync (sending only changes).
652    pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
653        let mut encoder = self.delta_encoder.lock().unwrap();
654        encoder.add_peer(peer_id);
655        log::debug!(
656            "Registered peer {:08X} for delta sync tracking",
657            peer_id.as_u32()
658        );
659    }
660
661    /// Unregister a peer from delta sync tracking
662    ///
663    /// Call this when a peer disconnects to clean up tracking state.
664    pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
665        let mut encoder = self.delta_encoder.lock().unwrap();
666        encoder.remove_peer(peer_id);
667        log::debug!(
668            "Unregistered peer {:08X} from delta sync tracking",
669            peer_id.as_u32()
670        );
671    }
672
673    /// Reset delta sync state for a peer
674    ///
675    /// Call this when a peer reconnects to force a full sync on next
676    /// communication. This clears the record of what was previously sent.
677    pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
678        let mut encoder = self.delta_encoder.lock().unwrap();
679        encoder.reset_peer(peer_id);
680        log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
681    }
682
683    /// Record bytes sent to a peer (for delta statistics)
684    pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
685        let mut encoder = self.delta_encoder.lock().unwrap();
686        encoder.record_sent(peer_id, bytes);
687    }
688
689    /// Record bytes received from a peer (for delta statistics)
690    pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
691        let mut encoder = self.delta_encoder.lock().unwrap();
692        encoder.record_received(peer_id, bytes, timestamp);
693    }
694
695    /// Get delta sync statistics
696    ///
697    /// Returns aggregate statistics about delta sync across all peers,
698    /// including bytes sent/received and sync counts.
699    pub fn delta_stats(&self) -> DeltaStats {
700        self.delta_encoder.lock().unwrap().stats()
701    }
702
703    /// Get delta sync statistics for a specific peer
704    ///
705    /// Returns the bytes sent/received and sync count for a single peer.
706    pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
707        let encoder = self.delta_encoder.lock().unwrap();
708        encoder
709            .get_peer_state(peer_id)
710            .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
711    }
712
713    /// Build a delta document for a specific peer
714    ///
715    /// This only includes operations that have changed since the last sync
716    /// with this peer. Uses the delta encoder to track per-peer state.
717    ///
718    /// Returns the encoded delta document bytes, or None if there's nothing
719    /// new to send to this peer.
720    pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
721        // Collect all current operations
722        let mut all_operations: Vec<Operation> = Vec::new();
723
724        // Add counter operations (one per node that has contributed)
725        // Use the count value as the "timestamp" for tracking - only send if count increased
726        for (node_id_u32, count) in self.document_sync.counter_entries() {
727            all_operations.push(Operation::IncrementCounter {
728                counter_id: 0, // Default mesh counter
729                node_id: NodeId::new(node_id_u32),
730                amount: count,
731                timestamp: count, // Use count as timestamp for delta tracking
732            });
733        }
734
735        // Add peripheral update
736        // Use event timestamp if available, otherwise use 1 for initial send
737        let peripheral = self.document_sync.peripheral_snapshot();
738        let peripheral_timestamp = peripheral
739            .last_event
740            .as_ref()
741            .map(|e| e.timestamp)
742            .unwrap_or(1); // Use 1 (not 0) so it's sent initially
743        all_operations.push(Operation::UpdatePeripheral {
744            peripheral,
745            timestamp: peripheral_timestamp,
746        });
747
748        // Add emergency operations if active
749        if let Some(emergency) = self.document_sync.emergency_snapshot() {
750            let source_node = NodeId::new(emergency.source_node());
751            let timestamp = emergency.timestamp();
752
753            // Add SetEmergency operation
754            all_operations.push(Operation::SetEmergency {
755                source_node,
756                timestamp,
757                known_peers: emergency.all_nodes(),
758            });
759
760            // Add AckEmergency for each node that has acked
761            for acked_node in emergency.acked_nodes() {
762                all_operations.push(Operation::AckEmergency {
763                    node_id: NodeId::new(acked_node),
764                    emergency_timestamp: timestamp,
765                });
766            }
767        }
768
769        // Filter operations for this peer (only send what's new)
770        let filtered_operations: Vec<Operation> = {
771            let encoder = self.delta_encoder.lock().unwrap();
772            if let Some(peer_state) = encoder.get_peer_state(peer_id) {
773                all_operations
774                    .into_iter()
775                    .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
776                    .collect()
777            } else {
778                // Unknown peer, send all operations
779                all_operations
780            }
781        };
782
783        // If nothing new to send, return None
784        if filtered_operations.is_empty() {
785            return None;
786        }
787
788        // Mark operations as sent
789        {
790            let mut encoder = self.delta_encoder.lock().unwrap();
791            if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
792                for op in &filtered_operations {
793                    peer_state.mark_sent(&op.key(), op.timestamp());
794                }
795            }
796        }
797
798        // Build the delta document
799        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
800        for op in filtered_operations {
801            delta.add_operation(op);
802        }
803
804        // Encode and optionally encrypt
805        let encoded = delta.encode();
806        let result = self.encrypt_document(&encoded);
807
808        // Record stats
809        {
810            let mut encoder = self.delta_encoder.lock().unwrap();
811            encoder.record_sent(peer_id, result.len());
812        }
813
814        Some(result)
815    }
816
817    /// Build a full delta document (for broadcast or new peers)
818    ///
819    /// Unlike `build_delta_document_for_peer`, this includes all state
820    /// regardless of what has been sent before. Use this for broadcasts.
821    pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
822        let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
823
824        // Add all counter operations
825        for (node_id_u32, count) in self.document_sync.counter_entries() {
826            delta.add_operation(Operation::IncrementCounter {
827                counter_id: 0,
828                node_id: NodeId::new(node_id_u32),
829                amount: count,
830                timestamp: now_ms,
831            });
832        }
833
834        // Add peripheral
835        let peripheral = self.document_sync.peripheral_snapshot();
836        let peripheral_timestamp = peripheral
837            .last_event
838            .as_ref()
839            .map(|e| e.timestamp)
840            .unwrap_or(now_ms);
841        delta.add_operation(Operation::UpdatePeripheral {
842            peripheral,
843            timestamp: peripheral_timestamp,
844        });
845
846        // Add emergency if active
847        if let Some(emergency) = self.document_sync.emergency_snapshot() {
848            let source_node = NodeId::new(emergency.source_node());
849            let timestamp = emergency.timestamp();
850
851            delta.add_operation(Operation::SetEmergency {
852                source_node,
853                timestamp,
854                known_peers: emergency.all_nodes(),
855            });
856
857            for acked_node in emergency.acked_nodes() {
858                delta.add_operation(Operation::AckEmergency {
859                    node_id: NodeId::new(acked_node),
860                    emergency_timestamp: timestamp,
861                });
862            }
863        }
864
865        let encoded = delta.encode();
866        self.encrypt_document(&encoded)
867    }
868
869    /// Internal: Process a received delta document
870    ///
871    /// Applies operations from a delta document to local state.
872    fn process_delta_document_internal(
873        &self,
874        source_node: NodeId,
875        data: &[u8],
876        now_ms: u64,
877        relay_data: Option<Vec<u8>>,
878        origin_node: Option<NodeId>,
879        hop_count: u8,
880    ) -> Option<DataReceivedResult> {
881        // Decode the delta document
882        let delta = DeltaDocument::decode(data)?;
883
884        // Don't process our own documents
885        if delta.origin_node == self.config.node_id {
886            return None;
887        }
888
889        // Apply operations to local state
890        let mut counter_changed = false;
891        let mut emergency_changed = false;
892        let mut is_emergency = false;
893        let mut is_ack = false;
894        let mut event_timestamp = 0u64;
895
896        for op in &delta.operations {
897            match op {
898                Operation::IncrementCounter {
899                    node_id, amount, ..
900                } => {
901                    // Merge counter value (take max)
902                    let current = self.document_sync.counter_entries();
903                    let current_value = current
904                        .iter()
905                        .find(|(id, _)| *id == node_id.as_u32())
906                        .map(|(_, v)| *v)
907                        .unwrap_or(0);
908
909                    if *amount > current_value {
910                        // Need to merge - this is handled by the counter merge logic
911                        // For now, we record that counter changed
912                        counter_changed = true;
913                    }
914                }
915                Operation::UpdatePeripheral { timestamp, .. } => {
916                    // Track the timestamp for the result
917                    if *timestamp > event_timestamp {
918                        event_timestamp = *timestamp;
919                    }
920                }
921                Operation::SetEmergency { timestamp, .. } => {
922                    is_emergency = true;
923                    emergency_changed = true;
924                    event_timestamp = *timestamp;
925                }
926                Operation::AckEmergency {
927                    emergency_timestamp,
928                    ..
929                } => {
930                    is_ack = true;
931                    emergency_changed = true;
932                    if *emergency_timestamp > event_timestamp {
933                        event_timestamp = *emergency_timestamp;
934                    }
935                }
936                Operation::ClearEmergency {
937                    emergency_timestamp,
938                } => {
939                    emergency_changed = true;
940                    if *emergency_timestamp > event_timestamp {
941                        event_timestamp = *emergency_timestamp;
942                    }
943                }
944            }
945        }
946
947        // Record sync
948        self.peer_manager.record_sync(source_node, now_ms);
949
950        // Record delta received
951        {
952            let mut encoder = self.delta_encoder.lock().unwrap();
953            encoder.record_received(&source_node, data.len(), now_ms);
954        }
955
956        // Generate events based on what was received
957        if is_emergency {
958            self.notify(HiveEvent::EmergencyReceived {
959                from_node: delta.origin_node,
960            });
961        } else if is_ack {
962            self.notify(HiveEvent::AckReceived {
963                from_node: delta.origin_node,
964            });
965        }
966
967        if counter_changed {
968            let total_count = self.document_sync.total_count();
969            self.notify(HiveEvent::DocumentSynced {
970                from_node: delta.origin_node,
971                total_count,
972            });
973        }
974
975        // Emit relay event if we're relaying
976        if relay_data.is_some() {
977            let relay_targets = self.get_relay_targets(Some(source_node));
978            self.notify(HiveEvent::MessageRelayed {
979                origin_node: origin_node.unwrap_or(delta.origin_node),
980                relay_count: relay_targets.len(),
981                hop_count,
982            });
983        }
984
985        Some(DataReceivedResult {
986            source_node: delta.origin_node,
987            is_emergency,
988            is_ack,
989            counter_changed,
990            emergency_changed,
991            total_count: self.document_sync.total_count(),
992            event_timestamp,
993            relay_data,
994            origin_node,
995            hop_count,
996        })
997    }
998
999    // ==================== Per-Peer E2EE ====================
1000
1001    /// Enable per-peer E2EE capability
1002    ///
1003    /// Creates a new identity key for this node. This allows establishing
1004    /// encrypted sessions with specific peers where only the sender and
1005    /// recipient can read messages (other mesh members cannot).
1006    pub fn enable_peer_e2ee(&self) {
1007        let mut sessions = self.peer_sessions.lock().unwrap();
1008        if sessions.is_none() {
1009            *sessions = Some(PeerSessionManager::new(self.config.node_id));
1010            log::info!(
1011                "Per-peer E2EE enabled for node {:08X}",
1012                self.config.node_id.as_u32()
1013            );
1014        }
1015    }
1016
1017    /// Disable per-peer E2EE capability
1018    ///
1019    /// Clears all peer sessions and disables E2EE.
1020    pub fn disable_peer_e2ee(&self) {
1021        let mut sessions = self.peer_sessions.lock().unwrap();
1022        *sessions = None;
1023        log::info!("Per-peer E2EE disabled");
1024    }
1025
1026    /// Check if per-peer E2EE is enabled
1027    pub fn is_peer_e2ee_enabled(&self) -> bool {
1028        self.peer_sessions.lock().unwrap().is_some()
1029    }
1030
1031    /// Get our E2EE public key (for sharing with peers)
1032    ///
1033    /// Returns None if per-peer E2EE is not enabled.
1034    pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1035        self.peer_sessions
1036            .lock()
1037            .unwrap()
1038            .as_ref()
1039            .map(|s| s.our_public_key())
1040    }
1041
1042    /// Initiate E2EE session with a specific peer
1043    ///
1044    /// Returns the key exchange message bytes to send to the peer.
1045    /// The message should be broadcast/sent to the peer.
1046    /// Returns None if per-peer E2EE is not enabled.
1047    pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1048        let mut sessions = self.peer_sessions.lock().unwrap();
1049        let session_mgr = sessions.as_mut()?;
1050
1051        let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1052        let mut buf = Vec::with_capacity(2 + 37);
1053        buf.push(KEY_EXCHANGE_MARKER);
1054        buf.push(0x00); // reserved
1055        buf.extend_from_slice(&key_exchange.encode());
1056
1057        log::info!(
1058            "Initiated E2EE session with peer {:08X}",
1059            peer_node_id.as_u32()
1060        );
1061        Some(buf)
1062    }
1063
1064    /// Check if we have an established E2EE session with a peer
1065    pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1066        self.peer_sessions
1067            .lock()
1068            .unwrap()
1069            .as_ref()
1070            .is_some_and(|s| s.has_session(peer_node_id))
1071    }
1072
1073    /// Get E2EE session state with a peer
1074    pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1075        self.peer_sessions
1076            .lock()
1077            .unwrap()
1078            .as_ref()
1079            .and_then(|s| s.session_state(peer_node_id))
1080    }
1081
1082    /// Send an E2EE encrypted message to a specific peer
1083    ///
1084    /// Returns the encrypted message bytes to send, or None if no session exists.
1085    /// The message should be sent directly to the peer (not broadcast).
1086    pub fn send_peer_e2ee(
1087        &self,
1088        peer_node_id: NodeId,
1089        plaintext: &[u8],
1090        now_ms: u64,
1091    ) -> Option<Vec<u8>> {
1092        let mut sessions = self.peer_sessions.lock().unwrap();
1093        let session_mgr = sessions.as_mut()?;
1094
1095        match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1096            Ok(encrypted) => {
1097                let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1098                buf.push(PEER_E2EE_MARKER);
1099                buf.push(0x00); // reserved
1100                buf.extend_from_slice(&encrypted.encode());
1101                Some(buf)
1102            }
1103            Err(e) => {
1104                log::warn!(
1105                    "Failed to encrypt for peer {:08X}: {:?}",
1106                    peer_node_id.as_u32(),
1107                    e
1108                );
1109                None
1110            }
1111        }
1112    }
1113
1114    /// Close E2EE session with a peer
1115    pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1116        let mut sessions = self.peer_sessions.lock().unwrap();
1117        if let Some(session_mgr) = sessions.as_mut() {
1118            session_mgr.close_session(peer_node_id);
1119            self.notify(HiveEvent::PeerE2eeClosed { peer_node_id });
1120            log::info!(
1121                "Closed E2EE session with peer {:08X}",
1122                peer_node_id.as_u32()
1123            );
1124        }
1125    }
1126
1127    /// Get count of active E2EE sessions
1128    pub fn peer_e2ee_session_count(&self) -> usize {
1129        self.peer_sessions
1130            .lock()
1131            .unwrap()
1132            .as_ref()
1133            .map(|s| s.session_count())
1134            .unwrap_or(0)
1135    }
1136
1137    /// Get count of established E2EE sessions
1138    pub fn peer_e2ee_established_count(&self) -> usize {
1139        self.peer_sessions
1140            .lock()
1141            .unwrap()
1142            .as_ref()
1143            .map(|s| s.established_count())
1144            .unwrap_or(0)
1145    }
1146
1147    /// Handle incoming key exchange message
1148    ///
1149    /// Called internally when we receive a KEY_EXCHANGE_MARKER message.
1150    /// Returns the response key exchange bytes to send back, or None if invalid.
1151    fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1152        if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1153            return None;
1154        }
1155
1156        let payload = &data[2..];
1157        let msg = KeyExchangeMessage::decode(payload)?;
1158
1159        let mut sessions = self.peer_sessions.lock().unwrap();
1160        let session_mgr = sessions.as_mut()?;
1161
1162        let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1163
1164        if established {
1165            self.notify(HiveEvent::PeerE2eeEstablished {
1166                peer_node_id: msg.sender_node_id,
1167            });
1168            log::info!(
1169                "E2EE session established with peer {:08X}",
1170                msg.sender_node_id.as_u32()
1171            );
1172        }
1173
1174        // Return response key exchange
1175        let mut buf = Vec::with_capacity(2 + 37);
1176        buf.push(KEY_EXCHANGE_MARKER);
1177        buf.push(0x00);
1178        buf.extend_from_slice(&response.encode());
1179        Some(buf)
1180    }
1181
1182    /// Handle incoming E2EE encrypted message
1183    ///
1184    /// Called internally when we receive a PEER_E2EE_MARKER message.
1185    /// Decrypts and notifies observers of the received message.
1186    fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1187        if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1188            return None;
1189        }
1190
1191        let payload = &data[2..];
1192        let msg = PeerEncryptedMessage::decode(payload)?;
1193
1194        let mut sessions = self.peer_sessions.lock().unwrap();
1195        let session_mgr = sessions.as_mut()?;
1196
1197        match session_mgr.decrypt_from_peer(&msg, now_ms) {
1198            Ok(plaintext) => {
1199                // Notify observers of the decrypted message
1200                self.notify(HiveEvent::PeerE2eeMessageReceived {
1201                    from_node: msg.sender_node_id,
1202                    data: plaintext.clone(),
1203                });
1204                Some(plaintext)
1205            }
1206            Err(e) => {
1207                log::warn!(
1208                    "Failed to decrypt E2EE message from {:08X}: {:?}",
1209                    msg.sender_node_id.as_u32(),
1210                    e
1211                );
1212                None
1213            }
1214        }
1215    }
1216
1217    // ==================== Configuration ====================
1218
1219    /// Get our node ID
1220    pub fn node_id(&self) -> NodeId {
1221        self.config.node_id
1222    }
1223
1224    /// Get our callsign
1225    pub fn callsign(&self) -> &str {
1226        &self.config.callsign
1227    }
1228
1229    /// Get the mesh ID
1230    pub fn mesh_id(&self) -> &str {
1231        &self.config.mesh_id
1232    }
1233
1234    /// Get the device name for BLE advertising
1235    pub fn device_name(&self) -> String {
1236        format!(
1237            "HIVE_{}-{:08X}",
1238            self.config.mesh_id,
1239            self.config.node_id.as_u32()
1240        )
1241    }
1242
1243    // ==================== Observer Management ====================
1244
1245    /// Add an observer for mesh events
1246    pub fn add_observer(&self, observer: Arc<dyn HiveObserver>) {
1247        self.observers.add(observer);
1248    }
1249
1250    /// Remove an observer
1251    pub fn remove_observer(&self, observer: &Arc<dyn HiveObserver>) {
1252        self.observers.remove(observer);
1253    }
1254
1255    // ==================== User Actions ====================
1256
1257    /// Send an emergency alert
1258    ///
1259    /// Returns the document bytes to broadcast to all peers.
1260    /// If encryption is enabled, the document is encrypted.
1261    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1262        let data = self.document_sync.send_emergency(timestamp);
1263        self.notify(HiveEvent::MeshStateChanged {
1264            peer_count: self.peer_manager.peer_count(),
1265            connected_count: self.peer_manager.connected_count(),
1266        });
1267        self.encrypt_document(&data)
1268    }
1269
1270    /// Send an ACK response
1271    ///
1272    /// Returns the document bytes to broadcast to all peers.
1273    /// If encryption is enabled, the document is encrypted.
1274    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1275        let data = self.document_sync.send_ack(timestamp);
1276        self.notify(HiveEvent::MeshStateChanged {
1277            peer_count: self.peer_manager.peer_count(),
1278            connected_count: self.peer_manager.connected_count(),
1279        });
1280        self.encrypt_document(&data)
1281    }
1282
1283    /// Clear the current event (emergency or ack)
1284    pub fn clear_event(&self) {
1285        self.document_sync.clear_event();
1286    }
1287
1288    /// Check if emergency is active
1289    pub fn is_emergency_active(&self) -> bool {
1290        self.document_sync.is_emergency_active()
1291    }
1292
1293    /// Check if ACK is active
1294    pub fn is_ack_active(&self) -> bool {
1295        self.document_sync.is_ack_active()
1296    }
1297
1298    /// Get current event type
1299    pub fn current_event(&self) -> Option<EventType> {
1300        self.document_sync.current_event()
1301    }
1302
1303    // ==================== Emergency Management (Document-Based) ====================
1304
1305    /// Start a new emergency event with ACK tracking
1306    ///
1307    /// Creates an emergency event that tracks ACKs from all known peers.
1308    /// Pass the list of known peer node IDs to track.
1309    /// Returns the document bytes to broadcast.
1310    /// If encryption is enabled, the document is encrypted.
1311    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1312        let data = self.document_sync.start_emergency(timestamp, known_peers);
1313        self.notify(HiveEvent::MeshStateChanged {
1314            peer_count: self.peer_manager.peer_count(),
1315            connected_count: self.peer_manager.connected_count(),
1316        });
1317        self.encrypt_document(&data)
1318    }
1319
1320    /// Start a new emergency using all currently known peers
1321    ///
1322    /// Convenience method that automatically includes all discovered peers.
1323    pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1324        let peers: Vec<u32> = self
1325            .peer_manager
1326            .get_peers()
1327            .iter()
1328            .map(|p| p.node_id.as_u32())
1329            .collect();
1330        self.start_emergency(timestamp, &peers)
1331    }
1332
1333    /// Record our ACK for the current emergency
1334    ///
1335    /// Returns the document bytes to broadcast, or None if no emergency is active.
1336    /// If encryption is enabled, the document is encrypted.
1337    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1338        let result = self.document_sync.ack_emergency(timestamp);
1339        if result.is_some() {
1340            self.notify(HiveEvent::MeshStateChanged {
1341                peer_count: self.peer_manager.peer_count(),
1342                connected_count: self.peer_manager.connected_count(),
1343            });
1344        }
1345        result.map(|data| self.encrypt_document(&data))
1346    }
1347
1348    /// Clear the current emergency event
1349    pub fn clear_emergency(&self) {
1350        self.document_sync.clear_emergency();
1351    }
1352
1353    /// Check if there's an active emergency
1354    pub fn has_active_emergency(&self) -> bool {
1355        self.document_sync.has_active_emergency()
1356    }
1357
1358    /// Get emergency status info
1359    ///
1360    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
1361    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1362        self.document_sync.get_emergency_status()
1363    }
1364
1365    /// Check if a specific peer has ACKed the current emergency
1366    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1367        self.document_sync.has_peer_acked(peer_id)
1368    }
1369
1370    /// Check if all peers have ACKed the current emergency
1371    pub fn all_peers_acked(&self) -> bool {
1372        self.document_sync.all_peers_acked()
1373    }
1374
1375    // ==================== Chat Methods ====================
1376
1377    /// Send a chat message
1378    ///
1379    /// Adds the message to the local CRDT and returns the document bytes
1380    /// to broadcast to all peers. If encryption is enabled, the document is encrypted.
1381    ///
1382    /// Returns the encrypted document bytes if the message was new,
1383    /// or None if it was a duplicate.
1384    pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1385        if self.document_sync.add_chat_message(sender, text, timestamp) {
1386            Some(self.encrypt_document(&self.build_document()))
1387        } else {
1388            None
1389        }
1390    }
1391
1392    /// Send a chat reply
1393    ///
1394    /// Adds the reply to the local CRDT with reply-to information and returns
1395    /// the document bytes to broadcast. If encryption is enabled, the document is encrypted.
1396    ///
1397    /// Returns the encrypted document bytes if the message was new,
1398    /// or None if it was a duplicate.
1399    pub fn send_chat_reply(
1400        &self,
1401        sender: &str,
1402        text: &str,
1403        reply_to_node: u32,
1404        reply_to_timestamp: u64,
1405        timestamp: u64,
1406    ) -> Option<Vec<u8>> {
1407        if self.document_sync.add_chat_reply(
1408            sender,
1409            text,
1410            reply_to_node,
1411            reply_to_timestamp,
1412            timestamp,
1413        ) {
1414            Some(self.encrypt_document(&self.build_document()))
1415        } else {
1416            None
1417        }
1418    }
1419
1420    /// Get the number of chat messages in the local CRDT
1421    pub fn chat_count(&self) -> usize {
1422        self.document_sync.chat_count()
1423    }
1424
1425    /// Get chat messages newer than a timestamp
1426    ///
1427    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
1428    pub fn chat_messages_since(
1429        &self,
1430        since_timestamp: u64,
1431    ) -> Vec<(u32, u64, String, String, u32, u64)> {
1432        self.document_sync.chat_messages_since(since_timestamp)
1433    }
1434
1435    /// Get all chat messages
1436    ///
1437    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
1438    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
1439        self.document_sync.all_chat_messages()
1440    }
1441
1442    // ==================== BLE Callbacks (Platform -> Mesh) ====================
1443
1444    /// Called when a BLE device is discovered
1445    ///
1446    /// Returns `Some(HivePeer)` if this is a new HIVE peer on our mesh.
1447    pub fn on_ble_discovered(
1448        &self,
1449        identifier: &str,
1450        name: Option<&str>,
1451        rssi: i8,
1452        mesh_id: Option<&str>,
1453        now_ms: u64,
1454    ) -> Option<HivePeer> {
1455        let (node_id, is_new) = self
1456            .peer_manager
1457            .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
1458
1459        let peer = self.peer_manager.get_peer(node_id)?;
1460
1461        // Update connection graph
1462        {
1463            let mut graph = self.connection_graph.lock().unwrap();
1464            graph.on_discovered(
1465                node_id,
1466                identifier.to_string(),
1467                name.map(|s| s.to_string()),
1468                mesh_id.map(|s| s.to_string()),
1469                rssi,
1470                now_ms,
1471            );
1472        }
1473
1474        if is_new {
1475            self.notify(HiveEvent::PeerDiscovered { peer: peer.clone() });
1476            self.notify_mesh_state_changed();
1477        }
1478
1479        Some(peer)
1480    }
1481
1482    /// Called when a BLE connection is established (outgoing)
1483    ///
1484    /// Returns the NodeId if this identifier is known.
1485    pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
1486        let node_id = self.peer_manager.on_connected(identifier, now_ms)?;
1487
1488        // Update connection graph
1489        {
1490            let mut graph = self.connection_graph.lock().unwrap();
1491            graph.on_connected(node_id, now_ms);
1492        }
1493
1494        self.notify(HiveEvent::PeerConnected { node_id });
1495        self.notify_mesh_state_changed();
1496        Some(node_id)
1497    }
1498
1499    /// Called when a BLE connection is lost
1500    pub fn on_ble_disconnected(
1501        &self,
1502        identifier: &str,
1503        reason: DisconnectReason,
1504    ) -> Option<NodeId> {
1505        let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
1506
1507        // Update connection graph (convert observer reason to platform reason)
1508        {
1509            let mut graph = self.connection_graph.lock().unwrap();
1510            let platform_reason = match observer_reason {
1511                DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
1512                DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
1513                DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
1514                DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
1515                DisconnectReason::ConnectionFailed => {
1516                    crate::platform::DisconnectReason::ConnectionFailed
1517                }
1518                DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
1519            };
1520            let now_ms = std::time::SystemTime::now()
1521                .duration_since(std::time::UNIX_EPOCH)
1522                .map(|d| d.as_millis() as u64)
1523                .unwrap_or(0);
1524            graph.on_disconnected(node_id, platform_reason, now_ms);
1525        }
1526
1527        self.notify(HiveEvent::PeerDisconnected {
1528            node_id,
1529            reason: observer_reason,
1530        });
1531        self.notify_mesh_state_changed();
1532        Some(node_id)
1533    }
1534
1535    /// Called when a BLE connection is lost, using NodeId directly
1536    ///
1537    /// Alternative to on_ble_disconnected() when only NodeId is known (e.g., ESP32).
1538    pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
1539        if self
1540            .peer_manager
1541            .on_disconnected_by_node_id(node_id, reason)
1542        {
1543            // Update connection graph
1544            {
1545                let mut graph = self.connection_graph.lock().unwrap();
1546                let platform_reason = match reason {
1547                    DisconnectReason::LocalRequest => {
1548                        crate::platform::DisconnectReason::LocalRequest
1549                    }
1550                    DisconnectReason::RemoteRequest => {
1551                        crate::platform::DisconnectReason::RemoteRequest
1552                    }
1553                    DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
1554                    DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
1555                    DisconnectReason::ConnectionFailed => {
1556                        crate::platform::DisconnectReason::ConnectionFailed
1557                    }
1558                    DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
1559                };
1560                let now_ms = std::time::SystemTime::now()
1561                    .duration_since(std::time::UNIX_EPOCH)
1562                    .map(|d| d.as_millis() as u64)
1563                    .unwrap_or(0);
1564                graph.on_disconnected(node_id, platform_reason, now_ms);
1565            }
1566
1567            self.notify(HiveEvent::PeerDisconnected { node_id, reason });
1568            self.notify_mesh_state_changed();
1569        }
1570    }
1571
1572    /// Called when a remote device connects to us (incoming connection)
1573    ///
1574    /// Use this when we're acting as a peripheral and a central connects to us.
1575    pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
1576        let is_new = self
1577            .peer_manager
1578            .on_incoming_connection(identifier, node_id, now_ms);
1579
1580        // Update connection graph
1581        {
1582            let mut graph = self.connection_graph.lock().unwrap();
1583            if is_new {
1584                graph.on_discovered(
1585                    node_id,
1586                    identifier.to_string(),
1587                    None,
1588                    Some(self.config.mesh_id.clone()),
1589                    -50, // Default good RSSI for incoming connections
1590                    now_ms,
1591                );
1592            }
1593            graph.on_connected(node_id, now_ms);
1594        }
1595
1596        if is_new {
1597            if let Some(peer) = self.peer_manager.get_peer(node_id) {
1598                self.notify(HiveEvent::PeerDiscovered { peer });
1599            }
1600        }
1601
1602        self.notify(HiveEvent::PeerConnected { node_id });
1603        self.notify_mesh_state_changed();
1604
1605        is_new
1606    }
1607
1608    /// Called when data is received from a peer
1609    ///
1610    /// Parses the document, merges it, and generates appropriate events.
1611    /// If encryption is enabled, decrypts the document first.
1612    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
1613    /// Returns the source NodeId and whether the document contained an event.
1614    pub fn on_ble_data_received(
1615        &self,
1616        identifier: &str,
1617        data: &[u8],
1618        now_ms: u64,
1619    ) -> Option<DataReceivedResult> {
1620        // Get node ID from identifier
1621        let node_id = self.peer_manager.get_node_id(identifier)?;
1622
1623        // Check for special message types first
1624        if data.len() >= 2 {
1625            match data[0] {
1626                KEY_EXCHANGE_MARKER => {
1627                    // Handle key exchange - returns response to send back
1628                    let _response = self.handle_key_exchange(data, now_ms);
1629                    // Return None as this isn't a document sync
1630                    return None;
1631                }
1632                PEER_E2EE_MARKER => {
1633                    // Handle encrypted peer message
1634                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
1635                    // Return None as this isn't a document sync
1636                    return None;
1637                }
1638                RELAY_ENVELOPE_MARKER => {
1639                    // Handle relay envelope for multi-hop
1640                    return self
1641                        .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
1642                }
1643                _ => {}
1644            }
1645        }
1646
1647        // Direct document (not relay envelope)
1648        self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
1649    }
1650
1651    /// Internal: Process document data with identifier as source hint
1652    #[allow(clippy::too_many_arguments)]
1653    fn process_document_data_with_identifier(
1654        &self,
1655        source_node: NodeId,
1656        identifier: &str,
1657        data: &[u8],
1658        now_ms: u64,
1659        relay_data: Option<Vec<u8>>,
1660        origin_node: Option<NodeId>,
1661        hop_count: u8,
1662    ) -> Option<DataReceivedResult> {
1663        // Decrypt if encrypted (mesh-wide encryption) - use identifier as source hint
1664        let decrypted = self.decrypt_document(data, Some(identifier))?;
1665
1666        // Check if this is a delta document (wire format v2)
1667        if DeltaDocument::is_delta_document(&decrypted) {
1668            return self.process_delta_document_internal(
1669                source_node,
1670                &decrypted,
1671                now_ms,
1672                relay_data,
1673                origin_node,
1674                hop_count,
1675            );
1676        }
1677
1678        // Merge the document (legacy wire format v1)
1679        let result = self.document_sync.merge_document(&decrypted)?;
1680
1681        // Record sync
1682        self.peer_manager.record_sync(source_node, now_ms);
1683
1684        // Generate events based on what was received
1685        if result.is_emergency() {
1686            self.notify(HiveEvent::EmergencyReceived {
1687                from_node: result.source_node,
1688            });
1689        } else if result.is_ack() {
1690            self.notify(HiveEvent::AckReceived {
1691                from_node: result.source_node,
1692            });
1693        }
1694
1695        if result.counter_changed {
1696            self.notify(HiveEvent::DocumentSynced {
1697                from_node: result.source_node,
1698                total_count: result.total_count,
1699            });
1700        }
1701
1702        // Emit relay event if we're relaying
1703        if relay_data.is_some() {
1704            let relay_targets = self.get_relay_targets(Some(source_node));
1705            self.notify(HiveEvent::MessageRelayed {
1706                origin_node: origin_node.unwrap_or(result.source_node),
1707                relay_count: relay_targets.len(),
1708                hop_count,
1709            });
1710        }
1711
1712        Some(DataReceivedResult {
1713            source_node: result.source_node,
1714            is_emergency: result.is_emergency(),
1715            is_ack: result.is_ack(),
1716            counter_changed: result.counter_changed,
1717            emergency_changed: result.emergency_changed,
1718            total_count: result.total_count,
1719            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
1720            relay_data,
1721            origin_node,
1722            hop_count,
1723        })
1724    }
1725
1726    /// Internal: Handle relay envelope with identifier as source hint
1727    fn handle_relay_envelope_with_identifier(
1728        &self,
1729        source_node: NodeId,
1730        identifier: &str,
1731        data: &[u8],
1732        now_ms: u64,
1733    ) -> Option<DataReceivedResult> {
1734        // Process the relay envelope
1735        let envelope = RelayEnvelope::decode(data)?;
1736
1737        // Check deduplication
1738        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1739            let stats = self
1740                .seen_cache
1741                .lock()
1742                .unwrap()
1743                .get_stats(&envelope.message_id);
1744            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1745
1746            self.notify(HiveEvent::DuplicateMessageDropped {
1747                origin_node: envelope.origin_node,
1748                seen_count,
1749            });
1750            return None;
1751        }
1752
1753        // Check TTL and get relay data
1754        let relay_data = if envelope.can_relay() && self.config.enable_relay {
1755            envelope.relay().map(|e| e.encode())
1756        } else {
1757            if !envelope.can_relay() {
1758                self.notify(HiveEvent::MessageTtlExpired {
1759                    origin_node: envelope.origin_node,
1760                    hop_count: envelope.hop_count,
1761                });
1762            }
1763            None
1764        };
1765
1766        // Process the inner payload
1767        self.process_document_data_with_identifier(
1768            source_node,
1769            identifier,
1770            &envelope.payload,
1771            now_ms,
1772            relay_data,
1773            Some(envelope.origin_node),
1774            envelope.hop_count,
1775        )
1776    }
1777
1778    /// Called when data is received but we don't have the identifier mapped
1779    ///
1780    /// Use this when receiving data from a peripheral we discovered.
1781    /// If encryption is enabled, decrypts the document first.
1782    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
1783    /// Handles relay envelopes for multi-hop mesh operation.
1784    pub fn on_ble_data_received_from_node(
1785        &self,
1786        node_id: NodeId,
1787        data: &[u8],
1788        now_ms: u64,
1789    ) -> Option<DataReceivedResult> {
1790        // Check for special message types first
1791        if data.len() >= 2 {
1792            match data[0] {
1793                KEY_EXCHANGE_MARKER => {
1794                    let _response = self.handle_key_exchange(data, now_ms);
1795                    return None;
1796                }
1797                PEER_E2EE_MARKER => {
1798                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
1799                    return None;
1800                }
1801                RELAY_ENVELOPE_MARKER => {
1802                    // Handle relay envelope for multi-hop
1803                    return self.handle_relay_envelope(node_id, data, now_ms);
1804                }
1805                _ => {}
1806            }
1807        }
1808
1809        // Direct document (not relay envelope)
1810        self.process_document_data(node_id, data, now_ms, None, None, 0)
1811    }
1812
1813    /// Internal: Process document data (shared by direct and relay paths)
1814    fn process_document_data(
1815        &self,
1816        source_node: NodeId,
1817        data: &[u8],
1818        now_ms: u64,
1819        relay_data: Option<Vec<u8>>,
1820        origin_node: Option<NodeId>,
1821        hop_count: u8,
1822    ) -> Option<DataReceivedResult> {
1823        // Decrypt if encrypted (mesh-wide encryption)
1824        let source_hint = format!("node:{:08X}", source_node.as_u32());
1825        let decrypted = self.decrypt_document(data, Some(&source_hint))?;
1826
1827        // Check if this is a delta document (wire format v2)
1828        if DeltaDocument::is_delta_document(&decrypted) {
1829            return self.process_delta_document_internal(
1830                source_node,
1831                &decrypted,
1832                now_ms,
1833                relay_data,
1834                origin_node,
1835                hop_count,
1836            );
1837        }
1838
1839        // Merge the document (legacy wire format v1)
1840        let result = self.document_sync.merge_document(&decrypted)?;
1841
1842        // Record sync
1843        self.peer_manager.record_sync(source_node, now_ms);
1844
1845        // Generate events based on what was received
1846        if result.is_emergency() {
1847            self.notify(HiveEvent::EmergencyReceived {
1848                from_node: result.source_node,
1849            });
1850        } else if result.is_ack() {
1851            self.notify(HiveEvent::AckReceived {
1852                from_node: result.source_node,
1853            });
1854        }
1855
1856        if result.counter_changed {
1857            self.notify(HiveEvent::DocumentSynced {
1858                from_node: result.source_node,
1859                total_count: result.total_count,
1860            });
1861        }
1862
1863        // Emit relay event if we're relaying
1864        if relay_data.is_some() {
1865            let relay_targets = self.get_relay_targets(Some(source_node));
1866            self.notify(HiveEvent::MessageRelayed {
1867                origin_node: origin_node.unwrap_or(result.source_node),
1868                relay_count: relay_targets.len(),
1869                hop_count,
1870            });
1871        }
1872
1873        Some(DataReceivedResult {
1874            source_node: result.source_node,
1875            is_emergency: result.is_emergency(),
1876            is_ack: result.is_ack(),
1877            counter_changed: result.counter_changed,
1878            emergency_changed: result.emergency_changed,
1879            total_count: result.total_count,
1880            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
1881            relay_data,
1882            origin_node,
1883            hop_count,
1884        })
1885    }
1886
1887    /// Internal: Handle relay envelope
1888    fn handle_relay_envelope(
1889        &self,
1890        source_node: NodeId,
1891        data: &[u8],
1892        now_ms: u64,
1893    ) -> Option<DataReceivedResult> {
1894        // Process the relay envelope
1895        let decision = self.process_relay_envelope(data, source_node, now_ms)?;
1896
1897        // Get relay data if we should relay
1898        let relay_data = if decision.should_relay {
1899            decision.relay_data()
1900        } else {
1901            None
1902        };
1903
1904        // Process the inner payload
1905        self.process_document_data(
1906            source_node,
1907            &decision.payload,
1908            now_ms,
1909            relay_data,
1910            Some(decision.origin_node),
1911            decision.hop_count,
1912        )
1913    }
1914
1915    /// Called when data is received without a known identifier
1916    ///
1917    /// This is the simplest data receive method - it extracts the source node_id
1918    /// from the document itself. Use this when you don't track identifiers
1919    /// (e.g., ESP32 NimBLE).
1920    /// If encryption is enabled, decrypts the document first.
1921    /// Handles per-peer E2EE messages (KEY_EXCHANGE and PEER_E2EE markers).
1922    /// Handles relay envelopes for multi-hop mesh operation.
1923    pub fn on_ble_data(
1924        &self,
1925        identifier: &str,
1926        data: &[u8],
1927        now_ms: u64,
1928    ) -> Option<DataReceivedResult> {
1929        // Check for special message types first
1930        if data.len() >= 2 {
1931            match data[0] {
1932                KEY_EXCHANGE_MARKER => {
1933                    let _response = self.handle_key_exchange(data, now_ms);
1934                    return None;
1935                }
1936                PEER_E2EE_MARKER => {
1937                    let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
1938                    return None;
1939                }
1940                RELAY_ENVELOPE_MARKER => {
1941                    // Handle relay envelope - extract origin from envelope
1942                    return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
1943                }
1944                _ => {}
1945            }
1946        }
1947
1948        // Direct document - process normally
1949        self.process_incoming_document(identifier, data, now_ms, None, None, 0)
1950    }
1951
1952    /// Internal: Process incoming document (handles peer registration)
1953    fn process_incoming_document(
1954        &self,
1955        identifier: &str,
1956        data: &[u8],
1957        now_ms: u64,
1958        relay_data: Option<Vec<u8>>,
1959        origin_node: Option<NodeId>,
1960        hop_count: u8,
1961    ) -> Option<DataReceivedResult> {
1962        // Decrypt if encrypted (mesh-wide encryption)
1963        let decrypted = self.decrypt_document(data, Some(identifier))?;
1964
1965        // Merge the document (extracts node_id internally)
1966        let result = self.document_sync.merge_document(&decrypted)?;
1967
1968        // Record sync using the source_node from the merged document
1969        self.peer_manager.record_sync(result.source_node, now_ms);
1970
1971        // Add the peer if not already known (creates peer entry from document data)
1972        self.peer_manager
1973            .on_incoming_connection(identifier, result.source_node, now_ms);
1974
1975        // Generate events based on what was received
1976        if result.is_emergency() {
1977            self.notify(HiveEvent::EmergencyReceived {
1978                from_node: result.source_node,
1979            });
1980        } else if result.is_ack() {
1981            self.notify(HiveEvent::AckReceived {
1982                from_node: result.source_node,
1983            });
1984        }
1985
1986        if result.counter_changed {
1987            self.notify(HiveEvent::DocumentSynced {
1988                from_node: result.source_node,
1989                total_count: result.total_count,
1990            });
1991        }
1992
1993        // Emit relay event if we're relaying
1994        if relay_data.is_some() {
1995            let relay_targets = self.get_relay_targets(Some(result.source_node));
1996            self.notify(HiveEvent::MessageRelayed {
1997                origin_node: origin_node.unwrap_or(result.source_node),
1998                relay_count: relay_targets.len(),
1999                hop_count,
2000            });
2001        }
2002
2003        Some(DataReceivedResult {
2004            source_node: result.source_node,
2005            is_emergency: result.is_emergency(),
2006            is_ack: result.is_ack(),
2007            counter_changed: result.counter_changed,
2008            emergency_changed: result.emergency_changed,
2009            total_count: result.total_count,
2010            event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2011            relay_data,
2012            origin_node,
2013            hop_count,
2014        })
2015    }
2016
2017    /// Internal: Handle relay envelope with incoming connection registration
2018    fn handle_relay_envelope_with_incoming(
2019        &self,
2020        identifier: &str,
2021        data: &[u8],
2022        now_ms: u64,
2023    ) -> Option<DataReceivedResult> {
2024        // Parse envelope to get origin
2025        let envelope = RelayEnvelope::decode(data)?;
2026
2027        // Check deduplication
2028        if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2029            // Duplicate - get stats for event
2030            let stats = self
2031                .seen_cache
2032                .lock()
2033                .unwrap()
2034                .get_stats(&envelope.message_id);
2035            let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2036
2037            self.notify(HiveEvent::DuplicateMessageDropped {
2038                origin_node: envelope.origin_node,
2039                seen_count,
2040            });
2041            return None;
2042        }
2043
2044        // Check TTL
2045        let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2046            let relay_env = envelope.relay();
2047            (true, relay_env.map(|e| e.encode()))
2048        } else {
2049            if !envelope.can_relay() {
2050                self.notify(HiveEvent::MessageTtlExpired {
2051                    origin_node: envelope.origin_node,
2052                    hop_count: envelope.hop_count,
2053                });
2054            }
2055            (false, None)
2056        };
2057
2058        // Process the inner payload
2059        self.process_incoming_document(
2060            identifier,
2061            &envelope.payload,
2062            now_ms,
2063            if should_relay { relay_data } else { None },
2064            Some(envelope.origin_node),
2065            envelope.hop_count,
2066        )
2067    }
2068
2069    // ==================== Periodic Maintenance ====================
2070
2071    /// Periodic tick - call this regularly (e.g., every second)
2072    ///
2073    /// Performs:
2074    /// - Stale peer cleanup
2075    /// - Periodic sync broadcast (if interval elapsed)
2076    ///
2077    /// Returns `Some(data)` if a sync broadcast is needed.
2078    pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
2079        use std::sync::atomic::Ordering;
2080
2081        // Use u32 for atomic storage (wraps every ~49 days, intervals still work)
2082        let now_ms_32 = now_ms as u32;
2083
2084        // Cleanup stale peers
2085        let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
2086        let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
2087        if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
2088            self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
2089            let removed = self.peer_manager.cleanup_stale(now_ms);
2090            for node_id in &removed {
2091                self.notify(HiveEvent::PeerLost { node_id: *node_id });
2092            }
2093            if !removed.is_empty() {
2094                self.notify_mesh_state_changed();
2095            }
2096
2097            // Run connection graph maintenance (transition Disconnected -> Lost)
2098            {
2099                let mut graph = self.connection_graph.lock().unwrap();
2100                let newly_lost = graph.tick(now_ms);
2101                // Also cleanup peers lost for more than peer_timeout
2102                graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
2103                drop(graph);
2104
2105                // Emit PeerLost events for newly lost peers from graph
2106                // (these may differ from peer_manager removals)
2107                for node_id in newly_lost {
2108                    // Only notify if not already notified by peer_manager
2109                    if !removed.contains(&node_id) {
2110                        self.notify(HiveEvent::PeerLost { node_id });
2111                    }
2112                }
2113            }
2114        }
2115
2116        // Check if sync broadcast is needed
2117        let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
2118        let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
2119        if sync_elapsed >= self.config.sync_interval_ms as u32 {
2120            self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
2121            // Only broadcast if we have connected peers
2122            if self.peer_manager.connected_count() > 0 {
2123                let doc = self.document_sync.build_document();
2124                return Some(self.encrypt_document(&doc));
2125            }
2126        }
2127
2128        None
2129    }
2130
2131    // ==================== State Queries ====================
2132
2133    /// Get all known peers
2134    pub fn get_peers(&self) -> Vec<HivePeer> {
2135        self.peer_manager.get_peers()
2136    }
2137
2138    /// Get connected peers only
2139    pub fn get_connected_peers(&self) -> Vec<HivePeer> {
2140        self.peer_manager.get_connected_peers()
2141    }
2142
2143    /// Get a specific peer by NodeId
2144    pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
2145        self.peer_manager.get_peer(node_id)
2146    }
2147
2148    /// Get peer count
2149    pub fn peer_count(&self) -> usize {
2150        self.peer_manager.peer_count()
2151    }
2152
2153    /// Get connected peer count
2154    pub fn connected_count(&self) -> usize {
2155        self.peer_manager.connected_count()
2156    }
2157
2158    /// Check if a device mesh ID matches our mesh
2159    pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
2160        self.peer_manager.matches_mesh(device_mesh_id)
2161    }
2162
2163    // ==================== Connection State Graph ====================
2164
2165    /// Get the connection state graph with all peer states
2166    ///
2167    /// Returns a snapshot of all tracked peers and their connection lifecycle state.
2168    /// Apps can use this to display appropriate UI indicators:
2169    /// - Green for Connected peers
2170    /// - Yellow for Degraded or RecentlyDisconnected peers
2171    /// - Gray for Lost peers
2172    ///
2173    /// # Example
2174    /// ```ignore
2175    /// let states = mesh.get_connection_graph();
2176    /// for peer in states {
2177    ///     match peer.state {
2178    ///         ConnectionState::Connected => show_green_indicator(&peer),
2179    ///         ConnectionState::Degraded => show_yellow_indicator(&peer),
2180    ///         ConnectionState::Disconnected => show_stale_indicator(&peer),
2181    ///         ConnectionState::Lost => show_gray_indicator(&peer),
2182    ///         _ => {}
2183    ///     }
2184    /// }
2185    /// ```
2186    pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
2187        self.connection_graph.lock().unwrap().get_all_owned()
2188    }
2189
2190    /// Get a specific peer's connection state
2191    pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
2192        self.connection_graph
2193            .lock()
2194            .unwrap()
2195            .get_peer(node_id)
2196            .cloned()
2197    }
2198
2199    /// Get all currently connected peers from the connection graph
2200    pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
2201        self.connection_graph
2202            .lock()
2203            .unwrap()
2204            .get_connected()
2205            .into_iter()
2206            .cloned()
2207            .collect()
2208    }
2209
2210    /// Get peers in degraded state (connected but poor signal quality)
2211    pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
2212        self.connection_graph
2213            .lock()
2214            .unwrap()
2215            .get_degraded()
2216            .into_iter()
2217            .cloned()
2218            .collect()
2219    }
2220
2221    /// Get peers that disconnected within the specified time window
2222    ///
2223    /// Useful for showing "stale" peers that were recently connected.
2224    pub fn get_recently_disconnected(
2225        &self,
2226        within_ms: u64,
2227        now_ms: u64,
2228    ) -> Vec<PeerConnectionState> {
2229        self.connection_graph
2230            .lock()
2231            .unwrap()
2232            .get_recently_disconnected(within_ms, now_ms)
2233            .into_iter()
2234            .cloned()
2235            .collect()
2236    }
2237
2238    /// Get peers in Lost state (disconnected and no longer advertising)
2239    pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
2240        self.connection_graph
2241            .lock()
2242            .unwrap()
2243            .get_lost()
2244            .into_iter()
2245            .cloned()
2246            .collect()
2247    }
2248
2249    /// Get summary counts of peers in each connection state
2250    pub fn get_connection_state_counts(&self) -> StateCountSummary {
2251        self.connection_graph.lock().unwrap().state_counts()
2252    }
2253
2254    // ==================== Indirect Peer Methods ====================
2255
2256    /// Get all indirect (multi-hop) peers
2257    ///
2258    /// Returns peers discovered via relay messages that are not directly
2259    /// connected via BLE. Each indirect peer includes the minimum hop count
2260    /// and the direct peers through which they can be reached.
2261    pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
2262        self.connection_graph
2263            .lock()
2264            .unwrap()
2265            .get_indirect_peers_owned()
2266    }
2267
2268    /// Get the degree (hop count) for a specific peer
2269    ///
2270    /// Returns:
2271    /// - `Some(PeerDegree::Direct)` for directly connected BLE peers
2272    /// - `Some(PeerDegree::OneHop/TwoHop/ThreeHop)` for indirect peers
2273    /// - `None` if peer is not known
2274    pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
2275        self.connection_graph.lock().unwrap().peer_degree(node_id)
2276    }
2277
2278    /// Get full state counts including indirect peers
2279    ///
2280    /// Returns counts of direct peers by connection state plus counts
2281    /// of indirect peers by hop count (1-hop, 2-hop, 3-hop).
2282    pub fn get_full_state_counts(&self) -> FullStateCountSummary {
2283        self.connection_graph.lock().unwrap().full_state_counts()
2284    }
2285
2286    /// Get all paths to reach an indirect peer
2287    ///
2288    /// Returns a list of (via_peer_id, hop_count) pairs showing all
2289    /// known routes to the specified peer.
2290    pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
2291        self.connection_graph.lock().unwrap().get_paths_to(node_id)
2292    }
2293
2294    /// Check if a node is known (either direct or indirect)
2295    pub fn is_peer_known(&self, node_id: NodeId) -> bool {
2296        self.connection_graph.lock().unwrap().is_known(node_id)
2297    }
2298
2299    /// Get number of indirect peers
2300    pub fn indirect_peer_count(&self) -> usize {
2301        self.connection_graph.lock().unwrap().indirect_peer_count()
2302    }
2303
2304    /// Cleanup stale indirect peers
2305    ///
2306    /// Removes indirect peers that haven't been seen within the timeout.
2307    /// Returns the list of removed peer IDs.
2308    pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
2309        self.connection_graph
2310            .lock()
2311            .unwrap()
2312            .cleanup_indirect(now_ms)
2313    }
2314
2315    /// Get total counter value
2316    pub fn total_count(&self) -> u64 {
2317        self.document_sync.total_count()
2318    }
2319
2320    /// Get document version
2321    pub fn document_version(&self) -> u32 {
2322        self.document_sync.version()
2323    }
2324
2325    /// Get document version (alias)
2326    pub fn version(&self) -> u32 {
2327        self.document_sync.version()
2328    }
2329
2330    /// Update health status (battery percentage)
2331    pub fn update_health(&self, battery_percent: u8) {
2332        self.document_sync.update_health(battery_percent);
2333    }
2334
2335    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
2336    pub fn update_activity(&self, activity: u8) {
2337        self.document_sync.update_activity(activity);
2338    }
2339
2340    /// Update full health status (battery and activity)
2341    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
2342        self.document_sync
2343            .update_health_full(battery_percent, activity);
2344    }
2345
2346    /// Build current document for transmission
2347    ///
2348    /// If encryption is enabled, the document is encrypted.
2349    pub fn build_document(&self) -> Vec<u8> {
2350        let doc = self.document_sync.build_document();
2351        self.encrypt_document(&doc)
2352    }
2353
2354    /// Get peers that should be synced with
2355    pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
2356        self.peer_manager.peers_needing_sync(now_ms)
2357    }
2358
2359    // ==================== Internal Helpers ====================
2360
2361    fn notify(&self, event: HiveEvent) {
2362        self.observers.notify(event);
2363    }
2364
2365    fn notify_mesh_state_changed(&self) {
2366        self.notify(HiveEvent::MeshStateChanged {
2367            peer_count: self.peer_manager.peer_count(),
2368            connected_count: self.peer_manager.connected_count(),
2369        });
2370    }
2371}
2372
2373/// Result from receiving BLE data
2374#[derive(Debug, Clone)]
2375pub struct DataReceivedResult {
2376    /// Node that sent this data
2377    pub source_node: NodeId,
2378
2379    /// Whether this contained an emergency event
2380    pub is_emergency: bool,
2381
2382    /// Whether this contained an ACK event
2383    pub is_ack: bool,
2384
2385    /// Whether the counter changed (new data)
2386    pub counter_changed: bool,
2387
2388    /// Whether emergency state changed (new emergency or ACK updates)
2389    pub emergency_changed: bool,
2390
2391    /// Updated total count
2392    pub total_count: u64,
2393
2394    /// Event timestamp (if event present) - use to detect duplicate events
2395    pub event_timestamp: u64,
2396
2397    /// Data to relay to other peers (if multi-hop relay is enabled)
2398    ///
2399    /// When present, the platform adapter should send this data to peers
2400    /// returned by `get_relay_targets(Some(source_node))`.
2401    pub relay_data: Option<Vec<u8>>,
2402
2403    /// Origin node for relay (may differ from source_node for relayed messages)
2404    pub origin_node: Option<NodeId>,
2405
2406    /// Current hop count (for relayed messages)
2407    pub hop_count: u8,
2408}
2409
2410/// Decision from processing a relay envelope
2411#[derive(Debug, Clone)]
2412pub struct RelayDecision {
2413    /// The payload (document) to process locally
2414    pub payload: Vec<u8>,
2415
2416    /// Original sender of the message
2417    pub origin_node: NodeId,
2418
2419    /// Current hop count
2420    pub hop_count: u8,
2421
2422    /// Whether this message should be relayed to other peers
2423    pub should_relay: bool,
2424
2425    /// The relay envelope to forward (with incremented hop count)
2426    ///
2427    /// Only present if `should_relay` is true and TTL not expired.
2428    pub relay_envelope: Option<RelayEnvelope>,
2429}
2430
2431impl RelayDecision {
2432    /// Get the relay data to send to peers
2433    ///
2434    /// Returns None if relay is not needed.
2435    pub fn relay_data(&self) -> Option<Vec<u8>> {
2436        self.relay_envelope.as_ref().map(|e| e.encode())
2437    }
2438}
2439
2440#[cfg(all(test, feature = "std"))]
2441mod tests {
2442    use super::*;
2443    use crate::observer::CollectingObserver;
2444
2445    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
2446    const TEST_TIMESTAMP: u64 = 1705276800000;
2447
2448    fn create_mesh(node_id: u32, callsign: &str) -> HiveMesh {
2449        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
2450        HiveMesh::new(config)
2451    }
2452
2453    #[test]
2454    fn test_mesh_creation() {
2455        let mesh = create_mesh(0x12345678, "ALPHA-1");
2456
2457        assert_eq!(mesh.node_id().as_u32(), 0x12345678);
2458        assert_eq!(mesh.callsign(), "ALPHA-1");
2459        assert_eq!(mesh.mesh_id(), "TEST");
2460        assert_eq!(mesh.device_name(), "HIVE_TEST-12345678");
2461    }
2462
2463    #[test]
2464    fn test_peer_discovery() {
2465        let mesh = create_mesh(0x11111111, "ALPHA-1");
2466        let observer = Arc::new(CollectingObserver::new());
2467        mesh.add_observer(observer.clone());
2468
2469        // Discover a peer
2470        let peer = mesh.on_ble_discovered(
2471            "device-uuid",
2472            Some("HIVE_TEST-22222222"),
2473            -65,
2474            Some("TEST"),
2475            1000,
2476        );
2477
2478        assert!(peer.is_some());
2479        let peer = peer.unwrap();
2480        assert_eq!(peer.node_id.as_u32(), 0x22222222);
2481
2482        // Check events were generated
2483        let events = observer.events();
2484        assert!(events
2485            .iter()
2486            .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
2487        assert!(events
2488            .iter()
2489            .any(|e| matches!(e, HiveEvent::MeshStateChanged { .. })));
2490    }
2491
2492    #[test]
2493    fn test_connection_lifecycle() {
2494        let mesh = create_mesh(0x11111111, "ALPHA-1");
2495        let observer = Arc::new(CollectingObserver::new());
2496        mesh.add_observer(observer.clone());
2497
2498        // Discover and connect
2499        mesh.on_ble_discovered(
2500            "device-uuid",
2501            Some("HIVE_TEST-22222222"),
2502            -65,
2503            Some("TEST"),
2504            1000,
2505        );
2506
2507        let node_id = mesh.on_ble_connected("device-uuid", 2000);
2508        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
2509        assert_eq!(mesh.connected_count(), 1);
2510
2511        // Disconnect
2512        let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
2513        assert_eq!(node_id, Some(NodeId::new(0x22222222)));
2514        assert_eq!(mesh.connected_count(), 0);
2515
2516        // Check events
2517        let events = observer.events();
2518        assert!(events
2519            .iter()
2520            .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
2521        assert!(events
2522            .iter()
2523            .any(|e| matches!(e, HiveEvent::PeerDisconnected { .. })));
2524    }
2525
2526    #[test]
2527    fn test_emergency_flow() {
2528        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2529        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2530
2531        let observer2 = Arc::new(CollectingObserver::new());
2532        mesh2.add_observer(observer2.clone());
2533
2534        // mesh1 sends emergency
2535        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
2536        assert!(mesh1.is_emergency_active());
2537
2538        // mesh2 receives it
2539        let result =
2540            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
2541
2542        assert!(result.is_some());
2543        let result = result.unwrap();
2544        assert!(result.is_emergency);
2545        assert_eq!(result.source_node.as_u32(), 0x11111111);
2546
2547        // Check events on mesh2
2548        let events = observer2.events();
2549        assert!(events
2550            .iter()
2551            .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
2552    }
2553
2554    #[test]
2555    fn test_ack_flow() {
2556        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2557        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2558
2559        let observer2 = Arc::new(CollectingObserver::new());
2560        mesh2.add_observer(observer2.clone());
2561
2562        // mesh1 sends ACK
2563        let doc = mesh1.send_ack(TEST_TIMESTAMP);
2564        assert!(mesh1.is_ack_active());
2565
2566        // mesh2 receives it
2567        let result =
2568            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
2569
2570        assert!(result.is_some());
2571        let result = result.unwrap();
2572        assert!(result.is_ack);
2573
2574        // Check events on mesh2
2575        let events = observer2.events();
2576        assert!(events
2577            .iter()
2578            .any(|e| matches!(e, HiveEvent::AckReceived { .. })));
2579    }
2580
2581    #[test]
2582    fn test_tick_cleanup() {
2583        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
2584            .with_peer_timeout(10_000);
2585        let mesh = HiveMesh::new(config);
2586
2587        let observer = Arc::new(CollectingObserver::new());
2588        mesh.add_observer(observer.clone());
2589
2590        // Discover a peer
2591        mesh.on_ble_discovered(
2592            "device-uuid",
2593            Some("HIVE_TEST-22222222"),
2594            -65,
2595            Some("TEST"),
2596            1000,
2597        );
2598        assert_eq!(mesh.peer_count(), 1);
2599
2600        // Tick at t=5000 - not stale yet
2601        mesh.tick(5000);
2602        assert_eq!(mesh.peer_count(), 1);
2603
2604        // Tick at t=20000 - peer is stale (10s timeout exceeded)
2605        mesh.tick(20000);
2606        assert_eq!(mesh.peer_count(), 0);
2607
2608        // Check PeerLost event
2609        let events = observer.events();
2610        assert!(events
2611            .iter()
2612            .any(|e| matches!(e, HiveEvent::PeerLost { .. })));
2613    }
2614
2615    #[test]
2616    fn test_tick_sync_broadcast() {
2617        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
2618            .with_sync_interval(5000);
2619        let mesh = HiveMesh::new(config);
2620
2621        // Discover and connect a peer first
2622        mesh.on_ble_discovered(
2623            "device-uuid",
2624            Some("HIVE_TEST-22222222"),
2625            -65,
2626            Some("TEST"),
2627            1000,
2628        );
2629        mesh.on_ble_connected("device-uuid", 1000);
2630
2631        // First tick at t=0 sets last_sync
2632        let _result = mesh.tick(0);
2633        // May or may not broadcast depending on initial state
2634
2635        // Tick before interval - no broadcast
2636        let result = mesh.tick(3000);
2637        assert!(result.is_none());
2638
2639        // After interval - should broadcast
2640        let result = mesh.tick(6000);
2641        assert!(result.is_some());
2642
2643        // Immediate second tick - no broadcast (interval not elapsed)
2644        let result = mesh.tick(6100);
2645        assert!(result.is_none());
2646
2647        // After another interval - should broadcast again
2648        let result = mesh.tick(12000);
2649        assert!(result.is_some());
2650    }
2651
2652    #[test]
2653    fn test_incoming_connection() {
2654        let mesh = create_mesh(0x11111111, "ALPHA-1");
2655        let observer = Arc::new(CollectingObserver::new());
2656        mesh.add_observer(observer.clone());
2657
2658        // Incoming connection from unknown peer
2659        let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
2660
2661        assert!(is_new);
2662        assert_eq!(mesh.peer_count(), 1);
2663        assert_eq!(mesh.connected_count(), 1);
2664
2665        // Check events
2666        let events = observer.events();
2667        assert!(events
2668            .iter()
2669            .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
2670        assert!(events
2671            .iter()
2672            .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
2673    }
2674
2675    #[test]
2676    fn test_mesh_filtering() {
2677        let mesh = create_mesh(0x11111111, "ALPHA-1");
2678
2679        // Wrong mesh - ignored
2680        let peer = mesh.on_ble_discovered(
2681            "device-uuid-1",
2682            Some("HIVE_OTHER-22222222"),
2683            -65,
2684            Some("OTHER"),
2685            1000,
2686        );
2687        assert!(peer.is_none());
2688        assert_eq!(mesh.peer_count(), 0);
2689
2690        // Correct mesh - accepted
2691        let peer = mesh.on_ble_discovered(
2692            "device-uuid-2",
2693            Some("HIVE_TEST-33333333"),
2694            -65,
2695            Some("TEST"),
2696            1000,
2697        );
2698        assert!(peer.is_some());
2699        assert_eq!(mesh.peer_count(), 1);
2700    }
2701
2702    // ==================== Encryption Tests ====================
2703
2704    fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
2705        let config =
2706            HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
2707        HiveMesh::new(config)
2708    }
2709
2710    #[test]
2711    fn test_encryption_enabled() {
2712        let secret = [0x42u8; 32];
2713        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2714
2715        assert!(mesh.is_encryption_enabled());
2716    }
2717
2718    #[test]
2719    fn test_encryption_disabled_by_default() {
2720        let mesh = create_mesh(0x11111111, "ALPHA-1");
2721
2722        assert!(!mesh.is_encryption_enabled());
2723    }
2724
2725    #[test]
2726    fn test_encrypted_document_exchange() {
2727        let secret = [0x42u8; 32];
2728        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2729        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
2730
2731        // mesh1 sends document
2732        let doc = mesh1.build_document();
2733
2734        // Document should be encrypted (starts with ENCRYPTED_MARKER)
2735        assert!(doc.len() >= 2);
2736        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
2737
2738        // mesh2 receives and decrypts
2739        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2740
2741        assert!(result.is_some());
2742        let result = result.unwrap();
2743        assert_eq!(result.source_node.as_u32(), 0x11111111);
2744    }
2745
2746    #[test]
2747    fn test_encrypted_emergency_exchange() {
2748        let secret = [0x42u8; 32];
2749        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2750        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
2751
2752        let observer = Arc::new(CollectingObserver::new());
2753        mesh2.add_observer(observer.clone());
2754
2755        // mesh1 sends emergency
2756        let doc = mesh1.send_emergency(TEST_TIMESTAMP);
2757
2758        // mesh2 receives and decrypts
2759        let result =
2760            mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
2761
2762        assert!(result.is_some());
2763        let result = result.unwrap();
2764        assert!(result.is_emergency);
2765
2766        // Check EmergencyReceived event was fired
2767        let events = observer.events();
2768        assert!(events
2769            .iter()
2770            .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
2771    }
2772
2773    #[test]
2774    fn test_wrong_key_fails_decrypt() {
2775        let secret1 = [0x42u8; 32];
2776        let secret2 = [0x43u8; 32]; // Different key
2777        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
2778        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
2779
2780        // mesh1 sends document
2781        let doc = mesh1.build_document();
2782
2783        // mesh2 cannot decrypt (wrong key)
2784        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2785
2786        assert!(result.is_none());
2787    }
2788
2789    #[test]
2790    fn test_unencrypted_mesh_can_read_unencrypted() {
2791        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2792        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2793
2794        // mesh1 sends document (unencrypted)
2795        let doc = mesh1.build_document();
2796
2797        // mesh2 receives (also unencrypted)
2798        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2799
2800        assert!(result.is_some());
2801    }
2802
2803    #[test]
2804    fn test_encrypted_mesh_can_receive_unencrypted() {
2805        // Backward compatibility: encrypted mesh can receive unencrypted docs
2806        let secret = [0x42u8; 32];
2807        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // unencrypted
2808        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // encrypted
2809
2810        // mesh1 sends unencrypted document
2811        let doc = mesh1.build_document();
2812
2813        // mesh2 can receive unencrypted (backward compat)
2814        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2815
2816        assert!(result.is_some());
2817    }
2818
2819    #[test]
2820    fn test_unencrypted_mesh_cannot_receive_encrypted() {
2821        let secret = [0x42u8; 32];
2822        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); // encrypted
2823        let mesh2 = create_mesh(0x22222222, "BRAVO-1"); // unencrypted
2824
2825        // mesh1 sends encrypted document
2826        let doc = mesh1.build_document();
2827
2828        // mesh2 cannot decrypt (no key)
2829        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2830
2831        assert!(result.is_none());
2832    }
2833
2834    #[test]
2835    fn test_enable_disable_encryption() {
2836        let mut mesh = create_mesh(0x11111111, "ALPHA-1");
2837
2838        assert!(!mesh.is_encryption_enabled());
2839
2840        // Enable encryption
2841        let secret = [0x42u8; 32];
2842        mesh.enable_encryption(&secret);
2843        assert!(mesh.is_encryption_enabled());
2844
2845        // Build document should now be encrypted
2846        let doc = mesh.build_document();
2847        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
2848
2849        // Disable encryption
2850        mesh.disable_encryption();
2851        assert!(!mesh.is_encryption_enabled());
2852
2853        // Build document should now be unencrypted
2854        let doc = mesh.build_document();
2855        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
2856    }
2857
2858    #[test]
2859    fn test_encryption_overhead() {
2860        let secret = [0x42u8; 32];
2861        let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2862        let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
2863
2864        let doc_encrypted = mesh_encrypted.build_document();
2865        let doc_unencrypted = mesh_unencrypted.build_document();
2866
2867        // Encrypted doc should be larger by:
2868        // - 2 bytes marker header (0xAE + reserved)
2869        // - 12 bytes nonce
2870        // - 16 bytes auth tag
2871        // Total: 30 bytes overhead
2872        let overhead = doc_encrypted.len() - doc_unencrypted.len();
2873        assert_eq!(overhead, 30); // 2 (marker) + 12 (nonce) + 16 (tag)
2874    }
2875
2876    // ==================== Per-Peer E2EE Tests ====================
2877
2878    #[test]
2879    fn test_peer_e2ee_enable_disable() {
2880        let mesh = create_mesh(0x11111111, "ALPHA-1");
2881
2882        assert!(!mesh.is_peer_e2ee_enabled());
2883        assert!(mesh.peer_e2ee_public_key().is_none());
2884
2885        mesh.enable_peer_e2ee();
2886        assert!(mesh.is_peer_e2ee_enabled());
2887        assert!(mesh.peer_e2ee_public_key().is_some());
2888
2889        mesh.disable_peer_e2ee();
2890        assert!(!mesh.is_peer_e2ee_enabled());
2891    }
2892
2893    #[test]
2894    fn test_peer_e2ee_initiate_session() {
2895        let mesh = create_mesh(0x11111111, "ALPHA-1");
2896        mesh.enable_peer_e2ee();
2897
2898        let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
2899        assert!(key_exchange.is_some());
2900
2901        let key_exchange = key_exchange.unwrap();
2902        // Should start with KEY_EXCHANGE_MARKER
2903        assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
2904
2905        // Should have a pending session
2906        assert_eq!(mesh.peer_e2ee_session_count(), 1);
2907        assert_eq!(mesh.peer_e2ee_established_count(), 0);
2908    }
2909
2910    #[test]
2911    fn test_peer_e2ee_full_handshake() {
2912        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2913        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2914
2915        mesh1.enable_peer_e2ee();
2916        mesh2.enable_peer_e2ee();
2917
2918        let observer1 = Arc::new(CollectingObserver::new());
2919        let observer2 = Arc::new(CollectingObserver::new());
2920        mesh1.add_observer(observer1.clone());
2921        mesh2.add_observer(observer2.clone());
2922
2923        // mesh1 initiates to mesh2
2924        let key_exchange1 = mesh1
2925            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
2926            .unwrap();
2927
2928        // mesh2 receives and responds
2929        let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
2930        assert!(response.is_some());
2931
2932        // Check mesh2 has established session
2933        assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
2934
2935        // mesh1 receives mesh2's response
2936        let key_exchange2 = response.unwrap();
2937        let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
2938
2939        // Check mesh1 has established session
2940        assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
2941
2942        // Both should have E2EE established events
2943        let events1 = observer1.events();
2944        assert!(events1
2945            .iter()
2946            .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
2947
2948        let events2 = observer2.events();
2949        assert!(events2
2950            .iter()
2951            .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
2952    }
2953
2954    #[test]
2955    fn test_peer_e2ee_encrypt_decrypt() {
2956        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2957        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2958
2959        mesh1.enable_peer_e2ee();
2960        mesh2.enable_peer_e2ee();
2961
2962        // Establish session via key exchange
2963        let key_exchange1 = mesh1
2964            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
2965            .unwrap();
2966        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
2967        mesh1.handle_key_exchange(&key_exchange2, 1000);
2968
2969        // mesh1 sends encrypted message to mesh2
2970        let plaintext = b"Secret message from mesh1";
2971        let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
2972        assert!(encrypted.is_some());
2973
2974        let encrypted = encrypted.unwrap();
2975        // Should start with PEER_E2EE_MARKER
2976        assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
2977
2978        // mesh2 receives and decrypts
2979        let observer2 = Arc::new(CollectingObserver::new());
2980        mesh2.add_observer(observer2.clone());
2981
2982        let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
2983        assert!(decrypted.is_some());
2984        assert_eq!(decrypted.unwrap(), plaintext);
2985
2986        // Should have received message event
2987        let events = observer2.events();
2988        assert!(events.iter().any(|e| matches!(
2989            e,
2990            HiveEvent::PeerE2eeMessageReceived { from_node, data }
2991            if from_node.as_u32() == 0x11111111 && data == plaintext
2992        )));
2993    }
2994
2995    #[test]
2996    fn test_peer_e2ee_bidirectional() {
2997        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2998        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2999
3000        mesh1.enable_peer_e2ee();
3001        mesh2.enable_peer_e2ee();
3002
3003        // Establish session
3004        let key_exchange1 = mesh1
3005            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
3006            .unwrap();
3007        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
3008        mesh1.handle_key_exchange(&key_exchange2, 1000);
3009
3010        // mesh1 -> mesh2
3011        let msg1 = mesh1
3012            .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
3013            .unwrap();
3014        let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
3015        assert_eq!(dec1, b"Hello from mesh1");
3016
3017        // mesh2 -> mesh1
3018        let msg2 = mesh2
3019            .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
3020            .unwrap();
3021        let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
3022        assert_eq!(dec2, b"Hello from mesh2");
3023    }
3024
3025    #[test]
3026    fn test_peer_e2ee_close_session() {
3027        let mesh = create_mesh(0x11111111, "ALPHA-1");
3028        mesh.enable_peer_e2ee();
3029
3030        let observer = Arc::new(CollectingObserver::new());
3031        mesh.add_observer(observer.clone());
3032
3033        // Initiate a session
3034        mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
3035        assert_eq!(mesh.peer_e2ee_session_count(), 1);
3036
3037        // Close session
3038        mesh.close_peer_e2ee(NodeId::new(0x22222222));
3039
3040        // Check close event
3041        let events = observer.events();
3042        assert!(events
3043            .iter()
3044            .any(|e| matches!(e, HiveEvent::PeerE2eeClosed { .. })));
3045    }
3046
3047    #[test]
3048    fn test_peer_e2ee_without_enabling() {
3049        let mesh = create_mesh(0x11111111, "ALPHA-1");
3050
3051        // E2EE not enabled - should return None
3052        let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
3053        assert!(result.is_none());
3054
3055        let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
3056        assert!(result.is_none());
3057
3058        assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
3059    }
3060
3061    #[test]
3062    fn test_peer_e2ee_overhead() {
3063        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3064        let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3065
3066        mesh1.enable_peer_e2ee();
3067        mesh2.enable_peer_e2ee();
3068
3069        // Establish session
3070        let key_exchange1 = mesh1
3071            .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
3072            .unwrap();
3073        let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
3074        mesh1.handle_key_exchange(&key_exchange2, 1000);
3075
3076        // Encrypt a message
3077        let plaintext = b"Test message";
3078        let encrypted = mesh1
3079            .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
3080            .unwrap();
3081
3082        // Overhead should be:
3083        // - 2 bytes marker header
3084        // - 4 bytes recipient node ID
3085        // - 4 bytes sender node ID
3086        // - 8 bytes counter
3087        // - 12 bytes nonce
3088        // - 16 bytes auth tag
3089        // Total: 46 bytes overhead
3090        let overhead = encrypted.len() - plaintext.len();
3091        assert_eq!(overhead, 46);
3092    }
3093
3094    // ==================== Strict Encryption Mode Tests ====================
3095
3096    fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
3097        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
3098            .with_encryption(secret)
3099            .with_strict_encryption();
3100        HiveMesh::new(config)
3101    }
3102
3103    #[test]
3104    fn test_strict_encryption_enabled() {
3105        let secret = [0x42u8; 32];
3106        let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3107
3108        assert!(mesh.is_encryption_enabled());
3109        assert!(mesh.is_strict_encryption_enabled());
3110    }
3111
3112    #[test]
3113    fn test_strict_encryption_disabled_by_default() {
3114        let secret = [0x42u8; 32];
3115        let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3116
3117        assert!(mesh.is_encryption_enabled());
3118        assert!(!mesh.is_strict_encryption_enabled());
3119    }
3120
3121    #[test]
3122    fn test_strict_encryption_requires_encryption_enabled() {
3123        // strict_encryption without encryption should have no effect
3124        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3125            .with_strict_encryption(); // No encryption!
3126        let mesh = HiveMesh::new(config);
3127
3128        assert!(!mesh.is_encryption_enabled());
3129        assert!(!mesh.is_strict_encryption_enabled());
3130    }
3131
3132    #[test]
3133    fn test_strict_mode_accepts_encrypted_documents() {
3134        let secret = [0x42u8; 32];
3135        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3136        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3137
3138        // mesh1 sends encrypted document
3139        let doc = mesh1.build_document();
3140        assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3141
3142        // mesh2 (strict mode) should accept encrypted documents
3143        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3144        assert!(result.is_some());
3145    }
3146
3147    #[test]
3148    fn test_strict_mode_rejects_unencrypted_documents() {
3149        let secret = [0x42u8; 32];
3150        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
3151        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Strict receiver
3152
3153        let observer = Arc::new(CollectingObserver::new());
3154        mesh2.add_observer(observer.clone());
3155
3156        // mesh1 sends unencrypted document
3157        let doc = mesh1.build_document();
3158        assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
3159
3160        // mesh2 (strict mode) should reject unencrypted documents
3161        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3162        assert!(result.is_none());
3163
3164        // Should have SecurityViolation event
3165        let events = observer.events();
3166        assert!(events.iter().any(|e| matches!(
3167            e,
3168            HiveEvent::SecurityViolation {
3169                kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
3170                ..
3171            }
3172        )));
3173    }
3174
3175    #[test]
3176    fn test_non_strict_mode_accepts_unencrypted_documents() {
3177        let secret = [0x42u8; 32];
3178        let mesh1 = create_mesh(0x11111111, "ALPHA-1"); // Unencrypted sender
3179        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); // Non-strict receiver
3180
3181        // mesh1 sends unencrypted document
3182        let doc = mesh1.build_document();
3183
3184        // mesh2 (non-strict) should accept unencrypted documents (backward compat)
3185        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3186        assert!(result.is_some());
3187    }
3188
3189    #[test]
3190    fn test_strict_mode_security_violation_event_includes_source() {
3191        let secret = [0x42u8; 32];
3192        let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3193        let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3194
3195        let observer = Arc::new(CollectingObserver::new());
3196        mesh2.add_observer(observer.clone());
3197
3198        let doc = mesh1.build_document();
3199
3200        // Use on_ble_data_received with identifier to test source is captured
3201        mesh2.on_ble_discovered(
3202            "test-device-uuid",
3203            Some("HIVE_TEST-11111111"),
3204            -65,
3205            Some("TEST"),
3206            500,
3207        );
3208        mesh2.on_ble_connected("test-device-uuid", 600);
3209
3210        let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
3211
3212        // Check SecurityViolation event has source
3213        let events = observer.events();
3214        let violation = events.iter().find(|e| {
3215            matches!(
3216                e,
3217                HiveEvent::SecurityViolation {
3218                    kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
3219                    ..
3220                }
3221            )
3222        });
3223        assert!(violation.is_some());
3224
3225        if let Some(HiveEvent::SecurityViolation { source, .. }) = violation {
3226            assert!(source.is_some());
3227            assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
3228        }
3229    }
3230
3231    #[test]
3232    fn test_decryption_failure_emits_security_violation() {
3233        let secret1 = [0x42u8; 32];
3234        let secret2 = [0x43u8; 32]; // Different key
3235        let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3236        let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3237
3238        let observer = Arc::new(CollectingObserver::new());
3239        mesh2.add_observer(observer.clone());
3240
3241        // mesh1 sends encrypted document
3242        let doc = mesh1.build_document();
3243
3244        // mesh2 cannot decrypt (wrong key)
3245        let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3246        assert!(result.is_none());
3247
3248        // Should have SecurityViolation event for decryption failure
3249        let events = observer.events();
3250        assert!(events.iter().any(|e| matches!(
3251            e,
3252            HiveEvent::SecurityViolation {
3253                kind: crate::observer::SecurityViolationKind::DecryptionFailed,
3254                ..
3255            }
3256        )));
3257    }
3258
3259    #[test]
3260    fn test_strict_mode_builder_chain() {
3261        let secret = [0x42u8; 32];
3262        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3263            .with_encryption(secret)
3264            .with_strict_encryption()
3265            .with_sync_interval(10_000)
3266            .with_peer_timeout(60_000);
3267
3268        let mesh = HiveMesh::new(config);
3269
3270        assert!(mesh.is_encryption_enabled());
3271        assert!(mesh.is_strict_encryption_enabled());
3272    }
3273
3274    // ==================== Multi-Hop Relay Tests ====================
3275
3276    fn create_relay_mesh(node_id: u32, callsign: &str) -> HiveMesh {
3277        let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
3278        HiveMesh::new(config)
3279    }
3280
3281    #[test]
3282    fn test_relay_disabled_by_default() {
3283        let mesh = create_mesh(0x11111111, "ALPHA-1");
3284        assert!(!mesh.is_relay_enabled());
3285    }
3286
3287    #[test]
3288    fn test_relay_enabled() {
3289        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3290        assert!(mesh.is_relay_enabled());
3291    }
3292
3293    #[test]
3294    fn test_relay_config_builder() {
3295        let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3296            .with_relay()
3297            .with_max_relay_hops(5)
3298            .with_relay_fanout(3)
3299            .with_seen_cache_ttl(60_000);
3300
3301        assert!(config.enable_relay);
3302        assert_eq!(config.max_relay_hops, 5);
3303        assert_eq!(config.relay_fanout, 3);
3304        assert_eq!(config.seen_cache_ttl_ms, 60_000);
3305    }
3306
3307    #[test]
3308    fn test_seen_message_deduplication() {
3309        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3310        let origin = NodeId::new(0x22222222);
3311        let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
3312
3313        // First time - should be new
3314        assert!(mesh.mark_message_seen(msg_id, origin, 1000));
3315
3316        // Second time - should be duplicate
3317        assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
3318
3319        assert_eq!(mesh.seen_cache_size(), 1);
3320    }
3321
3322    #[test]
3323    fn test_wrap_for_relay() {
3324        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3325
3326        let payload = vec![1, 2, 3, 4, 5];
3327        let wrapped = mesh.wrap_for_relay(payload.clone());
3328
3329        // Should start with relay envelope marker
3330        assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
3331
3332        // Decode and verify
3333        let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
3334        assert_eq!(envelope.payload, payload);
3335        assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
3336        assert_eq!(envelope.hop_count, 0);
3337    }
3338
3339    #[test]
3340    fn test_process_relay_envelope_new_message() {
3341        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3342        let observer = Arc::new(CollectingObserver::new());
3343        mesh.add_observer(observer.clone());
3344
3345        // Create an envelope from another node
3346        let payload = vec![1, 2, 3, 4, 5];
3347        let envelope =
3348            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
3349                .with_max_hops(7);
3350        let data = envelope.encode();
3351
3352        // Process it
3353        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
3354
3355        assert!(decision.is_some());
3356        let decision = decision.unwrap();
3357        assert_eq!(decision.payload, payload);
3358        assert_eq!(decision.origin_node.as_u32(), 0x22222222);
3359        assert_eq!(decision.hop_count, 0);
3360        assert!(decision.should_relay);
3361        assert!(decision.relay_envelope.is_some());
3362
3363        // Relay envelope should have incremented hop count
3364        let relay_env = decision.relay_envelope.unwrap();
3365        assert_eq!(relay_env.hop_count, 1);
3366    }
3367
3368    #[test]
3369    fn test_process_relay_envelope_duplicate() {
3370        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3371        let observer = Arc::new(CollectingObserver::new());
3372        mesh.add_observer(observer.clone());
3373
3374        let payload = vec![1, 2, 3, 4, 5];
3375        let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
3376        let data = envelope.encode();
3377
3378        // First time - should succeed
3379        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
3380        assert!(decision.is_some());
3381
3382        // Second time - should be duplicate
3383        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
3384        assert!(decision.is_none());
3385
3386        // Should have DuplicateMessageDropped event
3387        let events = observer.events();
3388        assert!(events
3389            .iter()
3390            .any(|e| matches!(e, HiveEvent::DuplicateMessageDropped { .. })));
3391    }
3392
3393    #[test]
3394    fn test_process_relay_envelope_ttl_expired() {
3395        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3396        let observer = Arc::new(CollectingObserver::new());
3397        mesh.add_observer(observer.clone());
3398
3399        // Create envelope at max hops (TTL expired)
3400        let payload = vec![1, 2, 3, 4, 5];
3401        let mut envelope =
3402            crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
3403                .with_max_hops(3);
3404
3405        // Simulate having been relayed 3 times already
3406        envelope = envelope.relay().unwrap(); // hop 1
3407        envelope = envelope.relay().unwrap(); // hop 2
3408        envelope = envelope.relay().unwrap(); // hop 3 - at max now
3409
3410        let data = envelope.encode();
3411
3412        // Process - should still process locally but not relay further
3413        let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
3414
3415        assert!(decision.is_some());
3416        let decision = decision.unwrap();
3417        assert_eq!(decision.payload, payload);
3418        assert!(!decision.should_relay); // Cannot relay further
3419        assert!(decision.relay_envelope.is_none());
3420
3421        // Should have MessageTtlExpired event
3422        let events = observer.events();
3423        assert!(events
3424            .iter()
3425            .any(|e| matches!(e, HiveEvent::MessageTtlExpired { .. })));
3426    }
3427
3428    #[test]
3429    fn test_build_relay_document() {
3430        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3431
3432        let relay_doc = mesh.build_relay_document();
3433
3434        // Should be a valid relay envelope
3435        assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
3436
3437        // Decode and verify it contains a valid document
3438        let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
3439        assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
3440
3441        // The payload should be a valid HiveDocument
3442        let doc = crate::document::HiveDocument::decode(&envelope.payload);
3443        assert!(doc.is_some());
3444    }
3445
3446    #[test]
3447    fn test_relay_targets_excludes_source() {
3448        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3449
3450        // Add some peers
3451        mesh.on_ble_discovered(
3452            "peer-1",
3453            Some("HIVE_TEST-22222222"),
3454            -60,
3455            Some("TEST"),
3456            1000,
3457        );
3458        mesh.on_ble_connected("peer-1", 1000);
3459
3460        mesh.on_ble_discovered(
3461            "peer-2",
3462            Some("HIVE_TEST-33333333"),
3463            -65,
3464            Some("TEST"),
3465            1000,
3466        );
3467        mesh.on_ble_connected("peer-2", 1000);
3468
3469        mesh.on_ble_discovered(
3470            "peer-3",
3471            Some("HIVE_TEST-44444444"),
3472            -70,
3473            Some("TEST"),
3474            1000,
3475        );
3476        mesh.on_ble_connected("peer-3", 1000);
3477
3478        // Get relay targets excluding peer-2
3479        let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
3480
3481        // Should not include peer-2 in targets
3482        assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
3483    }
3484
3485    #[test]
3486    fn test_clear_seen_cache() {
3487        let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3488        let origin = NodeId::new(0x22222222);
3489
3490        // Add some messages
3491        mesh.mark_message_seen(
3492            crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
3493            origin,
3494            1000,
3495        );
3496        mesh.mark_message_seen(
3497            crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
3498            origin,
3499            2000,
3500        );
3501
3502        assert_eq!(mesh.seen_cache_size(), 2);
3503
3504        // Clear
3505        mesh.clear_seen_cache();
3506        assert_eq!(mesh.seen_cache_size(), 0);
3507    }
3508}