saorsa_core/
dht.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Distributed Hash Table (DHT) Implementation
15//!
16//! This module provides a Kademlia-based DHT for distributed peer routing and data storage.
17//! It implements the core Kademlia algorithm with proper distance metrics, k-buckets,
18//! and network operations for a fully decentralized P2P system.
19//!
20//! The implementation includes S/Kademlia security extensions for enhanced protection
21//! against various attacks on the DHT infrastructure.
22
23use crate::error::{P2PError, P2pResult as Result};
24use crate::validation::{
25    Validate, ValidationContext, validate_dht_key, validate_dht_value, validate_peer_id,
26};
27use crate::{Multiaddr, PeerId};
28use futures;
29use serde::{Deserialize, Serialize};
30use sha2::{Digest, Sha256};
31use std::collections::{HashMap, VecDeque};
32use std::fmt;
33use std::time::{Duration, Instant, SystemTime};
34use tokio::sync::RwLock;
35use tracing::{debug, info};
36
37// S/Kademlia security extensions
38pub mod skademlia;
39
40// IPv6-based node identity system
41pub mod ipv6_identity;
42
43// Enhanced storage with K=8 replication
44pub mod enhanced_storage;
45
46// Optimized storage with LRU cache and performance improvements
47pub mod optimized_storage;
48
49// New DHT v2 components
50#[allow(unused)]
51pub mod content_addressing;
52#[allow(unused)]
53pub mod core_engine;
54#[allow(unused)]
55pub mod network_integration;
56#[allow(unused)]
57pub mod reed_solomon;
58#[allow(unused)]
59pub mod witness;
60
61// DHT client for messaging integration
62pub mod client;
63
64// RSPS integration for efficient content discovery
65pub mod rsps_integration;
66
67// Geographic-aware DHT routing (Phase 2.2.2)
68pub mod geographic_network_integration;
69pub mod geographic_routing;
70pub mod geographic_routing_table;
71pub mod latency_aware_selection;
72
73// Test modules
74#[cfg(test)]
75mod content_addressing_test;
76#[cfg(test)]
77mod core_engine_test;
78#[cfg(test)]
79mod reed_solomon_test;
80
81/// DHT configuration parameters
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct DHTConfig {
84    /// Replication parameter (k) - number of nodes to store each record
85    pub replication_factor: usize,
86    /// Maximum nodes per k-bucket
87    pub bucket_size: usize,
88    /// Concurrency parameter for parallel lookups
89    pub alpha: usize,
90    /// Record expiration time
91    pub record_ttl: Duration,
92    /// Refresh interval for buckets
93    pub bucket_refresh_interval: Duration,
94    /// Republish interval for stored records
95    pub republish_interval: Duration,
96    /// Maximum distance for considering nodes "close"
97    pub max_distance: u8,
98}
99
100/// DHT key type with proper Kademlia distance calculation
101#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
102pub struct Key {
103    /// 256-bit key hash
104    hash: [u8; 32],
105}
106
107impl Validate for Key {
108    fn validate(&self, ctx: &ValidationContext) -> Result<()> {
109        // Validate key hash
110        validate_dht_key(&self.hash, ctx)?;
111        Ok(())
112    }
113}
114
115/// DHT record containing key-value data with metadata
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct Record {
118    /// Record key
119    pub key: Key,
120    /// Record value
121    pub value: Vec<u8>,
122    /// Publisher peer ID
123    pub publisher: PeerId,
124    /// Record creation time
125    pub created_at: SystemTime,
126    /// Record expiration time
127    pub expires_at: SystemTime,
128    /// Signature for verification (optional)
129    pub signature: Option<Vec<u8>>,
130}
131
132impl Validate for Record {
133    fn validate(&self, ctx: &ValidationContext) -> Result<()> {
134        // Validate key
135        self.key.validate(ctx)?;
136
137        // Validate value size
138        validate_dht_value(&self.value, ctx)?;
139
140        // Validate publisher
141        validate_peer_id(&self.publisher)?;
142
143        // Validate timestamps
144        let now = SystemTime::now();
145        if self.created_at > now {
146            return Err(P2PError::validation(
147                "Record creation time is in the future",
148            ));
149        }
150
151        if self.expires_at < self.created_at {
152            return Err(P2PError::validation(
153                "Record expiration time is before creation time",
154            ));
155        }
156
157        // Validate signature if present
158        if let Some(sig) = &self.signature
159            && (sig.is_empty() || sig.len() > 512)
160        {
161            return Err(P2PError::validation("Invalid signature size"));
162        }
163
164        Ok(())
165    }
166}
167
168/// DHT node information
169#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
170pub struct DHTNode {
171    /// Node peer ID
172    pub peer_id: PeerId,
173    /// Node addresses
174    pub addresses: Vec<Multiaddr>,
175    /// Last seen timestamp (seconds since epoch)
176    #[serde(with = "instant_as_secs")]
177    pub last_seen: Instant,
178    /// Node distance from local node
179    pub distance: Key,
180    /// Connection status
181    pub is_connected: bool,
182}
183
184/// Serde helper for Instant serialization
185mod instant_as_secs {
186    use serde::{Deserialize, Deserializer, Serialize, Serializer};
187    use std::time::Instant;
188
189    pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
190    where
191        S: Serializer,
192    {
193        // Convert to approximate seconds since creation
194        instant.elapsed().as_secs().serialize(serializer)
195    }
196
197    pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
198    where
199        D: Deserializer<'de>,
200    {
201        let secs = u64::deserialize(deserializer)?;
202        // Return current instant minus the stored duration (approximate)
203        Ok(Instant::now() - std::time::Duration::from_secs(secs))
204    }
205}
206
207/// Serializable DHT node for network transmission
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct SerializableDHTNode {
210    /// Node peer ID
211    pub peer_id: PeerId,
212    /// Node addresses
213    pub addresses: Vec<Multiaddr>,
214    /// Last seen timestamp as seconds since epoch
215    pub last_seen_secs: u64,
216    /// Node distance from local node
217    pub distance: Key,
218    /// Connection status
219    pub is_connected: bool,
220}
221
222/// Kademlia routing table bucket
223#[derive(Debug)]
224struct KBucket {
225    /// Nodes in this bucket (up to k nodes)
226    nodes: VecDeque<DHTNode>,
227    /// Bucket capacity
228    capacity: usize,
229    /// Last refresh time
230    last_refresh: Instant,
231}
232
233/// Kademlia routing table
234#[derive(Debug)]
235pub struct RoutingTable {
236    /// Local node ID
237    local_id: Key,
238    /// K-buckets indexed by distance
239    buckets: Vec<RwLock<KBucket>>,
240    /// Configuration (reserved for future use)
241    #[allow(dead_code)]
242    config: DHTConfig,
243}
244
245/// DHT storage for local records with performance optimizations
246#[derive(Debug)]
247pub struct DHTStorage {
248    /// Optimized storage with LRU cache and indexed operations
249    optimized_storage: optimized_storage::OptimizedDHTStorage,
250    /// Configuration (reserved for future use)
251    #[allow(dead_code)]
252    config: DHTConfig,
253}
254
255/// Main DHT implementation with S/Kademlia security extensions
256#[derive(Debug)]
257pub struct DHT {
258    /// Local node ID
259    local_id: Key,
260    /// Routing table
261    routing_table: RoutingTable,
262    /// Local storage
263    storage: DHTStorage,
264    /// Configuration (reserved for future use)
265    #[allow(dead_code)]
266    config: DHTConfig,
267    /// S/Kademlia security extensions
268    pub skademlia: Option<skademlia::SKademlia>,
269    /// IPv6-based identity manager
270    pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
271}
272
273/// DHT query types
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub enum DHTQuery {
276    /// Find nodes close to a key
277    FindNode {
278        /// The key to find nodes near
279        key: Key,
280        /// ID of the requesting peer
281        requester: PeerId,
282    },
283    /// Find value for a key
284    FindValue {
285        /// The key to find value for
286        key: Key,
287        /// ID of the requesting peer
288        requester: PeerId,
289    },
290    /// Store a record
291    Store {
292        /// The record to store
293        record: Record,
294        /// ID of the requesting peer
295        requester: PeerId,
296    },
297    /// Ping to check node availability
298    Ping {
299        /// ID of the requesting peer
300        requester: PeerId,
301    },
302}
303
304/// DHT response types
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub enum DHTResponse {
307    /// Response to FindNode query
308    Nodes {
309        /// List of nodes near the requested key
310        nodes: Vec<SerializableDHTNode>,
311    },
312    /// Response to FindValue query
313    Value {
314        /// The found record
315        record: Record,
316    },
317    /// Response to Store query
318    Stored {
319        /// Whether storage was successful
320        success: bool,
321    },
322    /// Response to Ping query
323    Pong {
324        /// ID of the responding peer
325        responder: PeerId,
326    },
327    /// Error response
328    Error {
329        /// Error message describing what went wrong
330        message: String,
331    },
332}
333
334/// DHT lookup state for iterative queries
335#[derive(Debug)]
336pub struct LookupState {
337    /// Target key
338    pub target: Key,
339    /// Nodes queried so far
340    pub queried: HashMap<PeerId, Instant>,
341    /// Nodes to query next
342    pub to_query: VecDeque<DHTNode>,
343    /// Closest nodes found
344    pub closest: Vec<DHTNode>,
345    /// Lookup start time
346    pub started_at: Instant,
347    /// Maximum nodes to query in parallel
348    pub alpha: usize,
349}
350
351impl Default for DHTConfig {
352    fn default() -> Self {
353        Self {
354            replication_factor: 20,                        // k = 20 (standard Kademlia)
355            bucket_size: 20,                               // k = 20 nodes per bucket
356            alpha: 3,                                      // α = 3 concurrent lookups
357            record_ttl: Duration::from_secs(24 * 60 * 60), // 24 hours
358            bucket_refresh_interval: Duration::from_secs(60 * 60), // 1 hour
359            republish_interval: Duration::from_secs(24 * 60 * 60), // 24 hours
360            max_distance: 160,                             // 160-bit distance space
361        }
362    }
363}
364
365impl Key {
366    /// Create a new key from raw data
367    pub fn new(data: &[u8]) -> Self {
368        let mut hasher = Sha256::new();
369        hasher.update(data);
370        let hash: [u8; 32] = hasher.finalize().into();
371        Self { hash }
372    }
373
374    /// Create a key from existing hash
375    pub fn from_hash(hash: [u8; 32]) -> Self {
376        Self { hash }
377    }
378
379    /// Create a random key
380    pub fn random() -> Self {
381        use rand::RngCore;
382        let mut hash = [0u8; 32];
383        rand::thread_rng().fill_bytes(&mut hash);
384        Self { hash }
385    }
386
387    /// Get key as bytes
388    pub fn as_bytes(&self) -> &[u8] {
389        &self.hash
390    }
391
392    /// Get raw 32-byte hash (by value) for integrations
393    pub fn hash_bytes(&self) -> [u8; 32] {
394        self.hash
395    }
396
397    /// Get key as hex string
398    pub fn to_hex(&self) -> String {
399        hex::encode(self.hash)
400    }
401
402    /// Calculate XOR distance between two keys (Kademlia distance metric)
403    pub fn distance(&self, other: &Key) -> Key {
404        let mut result = [0u8; 32];
405        for (i, out) in result.iter_mut().enumerate() {
406            *out = self.hash[i] ^ other.hash[i];
407        }
408        Key { hash: result }
409    }
410
411    /// Get the bit length of the distance (number of leading zeros)
412    pub fn leading_zeros(&self) -> u32 {
413        for (i, &byte) in self.hash.iter().enumerate() {
414            if byte != 0 {
415                return (i * 8) as u32 + byte.leading_zeros();
416            }
417        }
418        256 // All bits are zero
419    }
420
421    /// Get bucket index for this key relative to local node
422    pub fn bucket_index(&self, local_id: &Key) -> usize {
423        let distance = self.distance(local_id);
424        let leading_zeros = distance.leading_zeros();
425        if leading_zeros >= 255 {
426            255 // Maximum bucket index
427        } else {
428            (255 - leading_zeros) as usize
429        }
430    }
431}
432
433impl fmt::Display for Key {
434    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435        write!(f, "{}", hex::encode(&self.hash[..8]))
436    }
437}
438
439impl Record {
440    /// Create a new record
441    pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
442        let now = SystemTime::now();
443        let ttl = Duration::from_secs(24 * 60 * 60); // 24 hours default
444
445        Self {
446            key,
447            value,
448            publisher,
449            created_at: now,
450            expires_at: now + ttl,
451            signature: None,
452        }
453    }
454
455    /// Create a record with custom TTL
456    pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
457        let now = SystemTime::now();
458
459        Self {
460            key,
461            value,
462            publisher,
463            created_at: now,
464            expires_at: now + ttl,
465            signature: None,
466        }
467    }
468
469    /// Check if record has expired
470    pub fn is_expired(&self) -> bool {
471        SystemTime::now() > self.expires_at
472    }
473
474    /// Get record age
475    pub fn age(&self) -> Duration {
476        SystemTime::now()
477            .duration_since(self.created_at)
478            .unwrap_or(Duration::ZERO)
479    }
480
481    /// Sign the record (placeholder for future cryptographic verification)
482    pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
483        // Placeholder implementation
484        // In real implementation, this would create a cryptographic signature
485        self.signature = Some(vec![0u8; 64]); // Dummy signature
486        Ok(())
487    }
488
489    /// Verify record signature (placeholder)
490    pub fn verify(&self, _public_key: &[u8]) -> bool {
491        // Placeholder implementation
492        // In real implementation, this would verify the cryptographic signature
493        self.signature.is_some()
494    }
495}
496
497impl DHTNode {
498    /// Create a new DHT node
499    pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
500        let node_key = Key::new(peer_id.as_bytes());
501        let distance = node_key.distance(local_id);
502
503        Self {
504            peer_id,
505            addresses,
506            last_seen: Instant::now(),
507            distance,
508            is_connected: false,
509        }
510    }
511
512    /// Create a new DHT node with explicit key (for testing)
513    pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
514        Self {
515            peer_id,
516            addresses,
517            last_seen: Instant::now(),
518            distance: key,
519            is_connected: false,
520        }
521    }
522
523    /// Update last seen timestamp
524    pub fn touch(&mut self) {
525        self.last_seen = Instant::now();
526    }
527
528    /// Check if node is stale
529    pub fn is_stale(&self, timeout: Duration) -> bool {
530        self.last_seen.elapsed() > timeout
531    }
532
533    /// Get node key
534    pub fn key(&self) -> Key {
535        Key::new(self.peer_id.as_bytes())
536    }
537
538    /// Convert to serializable form
539    pub fn to_serializable(&self) -> SerializableDHTNode {
540        SerializableDHTNode {
541            peer_id: self.peer_id.clone(),
542            addresses: self.addresses.clone(),
543            last_seen_secs: self.last_seen.elapsed().as_secs(),
544            distance: self.distance.clone(),
545            is_connected: self.is_connected,
546        }
547    }
548}
549
550impl SerializableDHTNode {
551    /// Convert from serializable form to DHTNode
552    pub fn to_dht_node(&self) -> DHTNode {
553        DHTNode {
554            peer_id: self.peer_id.clone(),
555            addresses: self.addresses.clone(),
556            last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
557            distance: self.distance.clone(),
558            is_connected: self.is_connected,
559        }
560    }
561}
562
563impl KBucket {
564    /// Create a new k-bucket
565    fn new(capacity: usize) -> Self {
566        Self {
567            nodes: VecDeque::new(),
568            capacity,
569            last_refresh: Instant::now(),
570        }
571    }
572
573    /// Add a node to the bucket
574    fn add_node(&mut self, node: DHTNode) -> bool {
575        // Check if node already exists
576        if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
577            // Move to front (most recently seen)
578            // Update existing node safely
579            let updated_node = if let Some(existing_node) = self.nodes.get_mut(pos) {
580                existing_node.touch();
581                existing_node.is_connected = node.is_connected;
582                existing_node.clone()
583            } else {
584                return false; // Position invalid (should not happen)
585            };
586            // Remove and re-add at front
587            self.nodes.remove(pos);
588            self.nodes.push_front(updated_node);
589            return true;
590        }
591        if self.nodes.len() < self.capacity {
592            // Add new node to front
593            self.nodes.push_front(node);
594            true
595        } else {
596            // Bucket is full - could implement replacement strategy here
597            false
598        }
599    }
600
601    /// Remove a node from the bucket
602    fn remove_node(&mut self, peer_id: &PeerId) -> bool {
603        if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
604            self.nodes.remove(pos);
605            true
606        } else {
607            false
608        }
609    }
610
611    /// Get nodes closest to a target
612    fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
613        let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
614        nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
615        nodes.into_iter().take(count).collect()
616    }
617
618    /// Check if bucket needs refresh
619    fn needs_refresh(&self, interval: Duration) -> bool {
620        self.last_refresh.elapsed() > interval
621    }
622}
623
624impl RoutingTable {
625    /// Create a new routing table
626    pub fn new(local_id: Key, config: DHTConfig) -> Self {
627        let mut buckets = Vec::new();
628        for _ in 0..256 {
629            buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
630        }
631
632        Self {
633            local_id,
634            buckets,
635            config,
636        }
637    }
638
639    /// Add a node to the routing table
640    pub async fn add_node(&self, node: DHTNode) -> Result<()> {
641        let bucket_index = node.key().bucket_index(&self.local_id);
642        let mut bucket = self.buckets[bucket_index].write().await;
643
644        if bucket.add_node(node.clone()) {
645            debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
646        } else {
647            debug!(
648                "Bucket {} full, could not add node {}",
649                bucket_index, node.peer_id
650            );
651        }
652
653        Ok(())
654    }
655
656    /// Remove a node from the routing table
657    pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
658        let node_key = Key::new(peer_id.as_bytes());
659        let bucket_index = node_key.bucket_index(&self.local_id);
660        let mut bucket = self.buckets[bucket_index].write().await;
661
662        if bucket.remove_node(peer_id) {
663            debug!("Removed node {} from bucket {}", peer_id, bucket_index);
664        }
665
666        Ok(())
667    }
668
669    /// Find nodes closest to a target key
670    pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
671        let mut all_nodes = Vec::new();
672
673        // Check buckets in order of distance from target
674        let target_bucket = target.bucket_index(&self.local_id);
675
676        // Start with the target bucket and expand outward
677        let mut checked = vec![false; 256];
678        let mut to_check = VecDeque::new();
679        to_check.push_back(target_bucket);
680
681        while let Some(bucket_idx) = to_check.pop_front() {
682            if checked[bucket_idx] {
683                continue;
684            }
685            checked[bucket_idx] = true;
686
687            let bucket = self.buckets[bucket_idx].read().await;
688            all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
689
690            // Add adjacent buckets to check
691            if bucket_idx > 0 && !checked[bucket_idx - 1] {
692                to_check.push_back(bucket_idx - 1);
693            }
694            if bucket_idx < 255 && !checked[bucket_idx + 1] {
695                to_check.push_back(bucket_idx + 1);
696            }
697
698            // Stop if we have enough nodes
699            if all_nodes.len() >= count * 2 {
700                break;
701            }
702        }
703
704        // Sort by distance and return closest
705        all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
706        all_nodes.into_iter().take(count).collect()
707    }
708
709    /// Get routing table statistics
710    pub async fn stats(&self) -> (usize, usize) {
711        let mut total_nodes = 0;
712        let mut active_buckets = 0;
713
714        for bucket in &self.buckets {
715            let bucket_guard = bucket.read().await;
716            let node_count = bucket_guard.nodes.len();
717            total_nodes += node_count;
718            if node_count > 0 {
719                active_buckets += 1;
720            }
721        }
722
723        (total_nodes, active_buckets)
724    }
725}
726
727impl DHTStorage {
728    /// Create new DHT storage with performance optimizations
729    pub fn new(config: DHTConfig) -> Self {
730        Self {
731            optimized_storage: optimized_storage::OptimizedDHTStorage::new(config.clone()),
732            config,
733        }
734    }
735
736    /// Store a record with O(1) performance and memory bounds
737    pub async fn store(&self, record: Record) -> Result<()> {
738        self.optimized_storage.store(record).await
739    }
740
741    /// Retrieve a record with O(1) performance
742    pub async fn get(&self, key: &Key) -> Option<Record> {
743        self.optimized_storage.get(key).await
744    }
745
746    /// Remove expired records with O(log n) performance
747    pub async fn cleanup_expired(&self) -> usize {
748        self.optimized_storage.cleanup_expired().await.unwrap_or(0)
749    }
750
751    /// Get all stored records (for republishing) - converted from optimized storage
752    pub async fn all_records(&self) -> Vec<Record> {
753        // Get records by publisher - this is an approximation since optimized storage
754        // doesn't have a direct "all records" method for performance reasons
755        let stats = self.optimized_storage.get_stats().await;
756
757        // For now, return empty vector as this is primarily used for republishing
758        // which should be redesigned to work with the indexed storage
759        // TODO: Implement efficient republishing that doesn't require loading all records
760        debug!(
761            "all_records() called on optimized storage with {} records - returning empty for performance",
762            stats.total_records
763        );
764        Vec::new()
765    }
766
767    /// Get storage statistics with enhanced performance metrics
768    pub async fn stats(&self) -> (usize, usize) {
769        let stats = self.optimized_storage.get_stats().await;
770        (stats.total_records, stats.expired_records)
771    }
772
773    /// Get records by publisher with indexed lookup (new optimized method)
774    pub async fn get_records_by_publisher(
775        &self,
776        publisher: &str,
777        limit: Option<usize>,
778    ) -> Vec<Record> {
779        self.optimized_storage
780            .get_records_by_publisher(publisher, limit)
781            .await
782    }
783
784    /// Get records expiring within a time window (new optimized method)
785    pub async fn get_expiring_records(&self, within: Duration) -> Vec<Record> {
786        self.optimized_storage.get_expiring_records(within).await
787    }
788
789    /// Check if storage is near memory limits
790    pub async fn is_near_memory_limit(&self) -> bool {
791        self.optimized_storage.is_near_memory_limit().await
792    }
793
794    /// Force eviction of oldest records if needed
795    pub async fn force_eviction(&self, target_count: usize) -> Result<usize> {
796        self.optimized_storage.force_eviction(target_count).await
797    }
798
799    /// Get detailed storage statistics
800    pub async fn get_detailed_stats(&self) -> optimized_storage::StorageStats {
801        self.optimized_storage.get_stats().await
802    }
803}
804
805impl DHT {
806    /// Create a new DHT instance
807    pub fn new(local_id: Key, config: DHTConfig) -> Self {
808        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
809        let storage = DHTStorage::new(config.clone());
810
811        Self {
812            local_id,
813            routing_table,
814            storage,
815            config,
816            skademlia: None,
817            ipv6_identity_manager: None,
818        }
819    }
820
821    /// Create a new DHT instance with S/Kademlia security extensions
822    pub fn new_with_security(
823        local_id: Key,
824        config: DHTConfig,
825        skademlia_config: skademlia::SKademliaConfig,
826    ) -> Self {
827        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
828        let storage = DHTStorage::new(config.clone());
829        let skademlia = skademlia::SKademlia::new(skademlia_config);
830
831        Self {
832            local_id,
833            routing_table,
834            storage,
835            config,
836            skademlia: Some(skademlia),
837            ipv6_identity_manager: None,
838        }
839    }
840
841    /// Create a new DHT instance with full IPv6 security integration
842    pub fn new_with_ipv6_security(
843        local_id: Key,
844        config: DHTConfig,
845        skademlia_config: skademlia::SKademliaConfig,
846        ipv6_config: ipv6_identity::IPv6DHTConfig,
847    ) -> Self {
848        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
849        let storage = DHTStorage::new(config.clone());
850        let skademlia = skademlia::SKademlia::new(skademlia_config);
851        let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
852
853        Self {
854            local_id,
855            routing_table,
856            storage,
857            config,
858            skademlia: Some(skademlia),
859            ipv6_identity_manager: Some(ipv6_identity_manager),
860        }
861    }
862
863    /// Initialize local IPv6 identity
864    pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
865        if let Some(ref mut manager) = self.ipv6_identity_manager {
866            // Update local_id to match IPv6 identity
867            self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
868            manager.set_local_identity(identity)?;
869            info!("Local IPv6 identity set and DHT key updated");
870            Ok(())
871        } else {
872            Err(P2PError::Security(
873                crate::error::SecurityError::AuthorizationFailed(
874                    "IPv6 identity manager not enabled".into(),
875                ),
876            ))
877        }
878    }
879
880    /// Add a bootstrap node to the DHT
881    pub async fn add_bootstrap_node(
882        &self,
883        peer_id: PeerId,
884        addresses: Vec<Multiaddr>,
885    ) -> Result<()> {
886        let node = DHTNode::new(peer_id, addresses, &self.local_id);
887        self.routing_table.add_node(node).await
888    }
889
890    /// Add an IPv6-verified node to the DHT
891    pub async fn add_ipv6_node(
892        &mut self,
893        peer_id: PeerId,
894        addresses: Vec<Multiaddr>,
895        ipv6_identity: crate::security::IPv6NodeID,
896    ) -> Result<()> {
897        if let Some(ref mut manager) = self.ipv6_identity_manager {
898            // Validate the node join with IPv6 security checks
899            let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
900            let security_event = manager
901                .validate_node_join(&base_node, &ipv6_identity)
902                .await?;
903
904            match security_event {
905                ipv6_identity::IPv6SecurityEvent::NodeJoined {
906                    verification_confidence,
907                    ..
908                } => {
909                    // Enhance the node with IPv6 identity
910                    let ipv6_node = manager
911                        .enhance_dht_node(base_node.clone(), ipv6_identity)
912                        .await?;
913
914                    // Update the DHT key to match IPv6 identity
915                    let mut enhanced_base_node = base_node;
916                    enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
917
918                    // Add to routing table
919                    self.routing_table.add_node(enhanced_base_node).await?;
920
921                    info!(
922                        "Added IPv6-verified node {} with confidence {:.2}",
923                        peer_id, verification_confidence
924                    );
925                    Ok(())
926                }
927                ipv6_identity::IPv6SecurityEvent::VerificationFailed { .. } => Err(
928                    P2PError::Security(crate::error::SecurityError::AuthenticationFailed),
929                ),
930                ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => Err(
931                    P2PError::Security(crate::error::SecurityError::AuthorizationFailed(
932                        format!("IP diversity violation: {subnet_type}").into(),
933                    )),
934                ),
935                ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => Err(
936                    P2PError::Security(crate::error::SecurityError::AuthorizationFailed(
937                        format!("Node banned: {reason}").into(),
938                    )),
939                ),
940                _ => Err(P2PError::Security(
941                    crate::error::SecurityError::AuthenticationFailed,
942                )),
943            }
944        } else {
945            // Fallback to regular node addition if IPv6 manager not enabled
946            self.add_bootstrap_node(peer_id, addresses).await
947        }
948    }
949
950    /// Remove node with IPv6 cleanup
951    pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
952        // Remove from routing table
953        self.routing_table.remove_node(peer_id).await?;
954
955        // Clean up IPv6 tracking
956        if let Some(ref mut manager) = self.ipv6_identity_manager {
957            manager.remove_node(peer_id);
958        }
959
960        Ok(())
961    }
962
963    /// Check if node is banned due to IPv6 security violations
964    pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
965        if let Some(ref manager) = self.ipv6_identity_manager {
966            manager.is_node_banned(peer_id)
967        } else {
968            false
969        }
970    }
971
972    /// Store a record in the DHT with replication
973    pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
974        let record = Record::new(key.clone(), value, self.local_id.to_hex());
975
976        // Store locally first
977        self.storage.store(record.clone()).await?;
978
979        // Find nodes closest to the key for replication
980        let closest_nodes = self
981            .routing_table
982            .closest_nodes(&key, self.config.replication_factor)
983            .await;
984
985        info!(
986            "Storing record with key {} on {} nodes",
987            key.to_hex(),
988            closest_nodes.len()
989        );
990
991        // If no other nodes available, just store locally (single node scenario)
992        if closest_nodes.is_empty() {
993            info!("No other nodes available for replication, storing only locally");
994            return Ok(());
995        }
996
997        // Replicate to closest nodes (simulated for now)
998        let mut successful_replications = 0;
999        for node in &closest_nodes {
1000            if self.replicate_record(&record, node).await.is_ok() {
1001                successful_replications += 1;
1002            }
1003        }
1004
1005        info!(
1006            "Successfully replicated record {} to {}/{} nodes",
1007            key.to_hex(),
1008            successful_replications,
1009            closest_nodes.len()
1010        );
1011
1012        // Consider replication successful if we stored to at least 1 node or have reasonable coverage
1013        let required_replications = if closest_nodes.len() == 1 {
1014            1
1015        } else {
1016            std::cmp::max(1, closest_nodes.len() / 2)
1017        };
1018
1019        if successful_replications >= required_replications {
1020            Ok(())
1021        } else {
1022            Err(P2PError::Dht(crate::error::DhtError::ReplicationFailed(
1023                format!(
1024                    "Insufficient replication for key {}: only {}/{} nodes stored the record (required: {})",
1025                    key,
1026                    successful_replications, closest_nodes.len(), required_replications
1027                ).into()
1028            )))
1029        }
1030    }
1031
1032    /// Retrieve a record from the DHT with consistency checks
1033    pub async fn get(&self, key: &Key) -> Option<Record> {
1034        // Check local storage first
1035        if let Some(record) = self.storage.get(key).await
1036            && !record.is_expired()
1037        {
1038            return Some(record);
1039        }
1040
1041        // Perform iterative lookup to find the record
1042        if let Some(record) = self.iterative_find_value(key).await {
1043            // Store locally for future access (caching)
1044            if self.storage.store(record.clone()).await.is_ok() {
1045                debug!("Cached retrieved record with key {}", key.to_hex());
1046            }
1047            return Some(record);
1048        }
1049
1050        None
1051    }
1052
1053    /// Find nodes close to a key
1054    pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
1055        self.routing_table
1056            .closest_nodes(key, self.config.replication_factor)
1057            .await
1058    }
1059
1060    /// Handle incoming DHT query
1061    pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
1062        match query {
1063            DHTQuery::FindNode { key, requester: _ } => {
1064                let nodes = self.find_node(&key).await;
1065                let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
1066                DHTResponse::Nodes {
1067                    nodes: serializable_nodes,
1068                }
1069            }
1070            DHTQuery::FindValue { key, requester: _ } => {
1071                if let Some(record) = self.storage.get(&key).await
1072                    && !record.is_expired()
1073                {
1074                    return DHTResponse::Value { record };
1075                }
1076                let nodes = self.find_node(&key).await;
1077                let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
1078                DHTResponse::Nodes {
1079                    nodes: serializable_nodes,
1080                }
1081            }
1082            DHTQuery::Store {
1083                record,
1084                requester: _,
1085            } => match self.storage.store(record).await {
1086                Ok(()) => DHTResponse::Stored { success: true },
1087                Err(_) => DHTResponse::Stored { success: false },
1088            },
1089            DHTQuery::Ping { requester: _ } => DHTResponse::Pong {
1090                responder: self.local_id.to_hex(),
1091            },
1092        }
1093    }
1094
1095    /// Get DHT statistics
1096    pub async fn stats(&self) -> DHTStats {
1097        let (total_nodes, active_buckets) = self.routing_table.stats().await;
1098        let (stored_records, expired_records) = self.storage.stats().await;
1099
1100        DHTStats {
1101            local_id: self.local_id.clone(),
1102            total_nodes,
1103            active_buckets,
1104            stored_records,
1105            expired_records,
1106        }
1107    }
1108
1109    /// Perform periodic maintenance
1110    pub async fn maintenance(&self) -> Result<()> {
1111        // Clean up expired records
1112        let expired_count = self.storage.cleanup_expired().await;
1113        if expired_count > 0 {
1114            debug!("Cleaned up {} expired records", expired_count);
1115        }
1116
1117        // Republish records that are close to expiration
1118        self.republish_records().await?;
1119
1120        // Refresh buckets that haven't been active
1121        self.refresh_buckets().await?;
1122
1123        // Note: S/Kademlia cleanup would happen here in a mutable context
1124
1125        Ok(())
1126    }
1127
1128    /// Perform secure lookup using S/Kademlia disjoint paths with distance verification
1129    pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1130        // Check local storage first
1131        if let Some(record) = self.storage.get(key).await
1132            && !record.is_expired()
1133        {
1134            return Ok(Some(record));
1135        }
1136
1137        // Check if S/Kademlia is enabled and extract configuration
1138        let (enable_distance_verification, disjoint_path_count, min_reputation) =
1139            if let Some(ref skademlia) = self.skademlia {
1140                (
1141                    skademlia.config.enable_distance_verification,
1142                    skademlia.config.disjoint_path_count,
1143                    skademlia.config.min_routing_reputation,
1144                )
1145            } else {
1146                // Fallback to regular get if S/Kademlia not enabled
1147                return Ok(self.get(key).await);
1148            };
1149
1150        // Get initial nodes for disjoint path lookup
1151        let initial_nodes = self
1152            .routing_table
1153            .closest_nodes(key, disjoint_path_count * 3)
1154            .await;
1155
1156        if initial_nodes.is_empty() {
1157            return Ok(None);
1158        }
1159
1160        // Perform secure lookup with disjoint paths
1161        let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
1162            skademlia.secure_lookup(key.clone(), initial_nodes).await?
1163        } else {
1164            return Ok(None);
1165        };
1166
1167        // Query the securely found nodes with distance verification
1168        for node in &secure_nodes {
1169            // Verify node distance before querying if enabled
1170            if enable_distance_verification {
1171                let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1172
1173                let consensus = if let Some(ref mut skademlia) = self.skademlia {
1174                    skademlia
1175                        .verify_distance_consensus(&node.peer_id, key, witness_nodes)
1176                        .await?
1177                } else {
1178                    continue;
1179                };
1180
1181                if consensus.confidence < min_reputation {
1182                    debug!(
1183                        "Skipping node {} due to low distance verification confidence",
1184                        node.peer_id
1185                    );
1186                    continue;
1187                }
1188            }
1189
1190            let query = DHTQuery::FindValue {
1191                key: key.clone(),
1192                requester: self.local_id.to_hex(),
1193            };
1194            if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1195                // Store locally for future access
1196                let _ = self.storage.store(record.clone()).await;
1197                return Ok(Some(record));
1198            }
1199        }
1200
1201        Ok(None)
1202    }
1203
1204    /// Store a record using S/Kademlia security-aware node selection
1205    pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1206        let record = Record::new(key.clone(), value, self.local_id.to_hex());
1207
1208        // Store locally first
1209        self.storage.store(record.clone()).await?;
1210
1211        // Get secure nodes and perform replications
1212        let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1213            // Get candidate nodes
1214            let candidate_nodes = self
1215                .routing_table
1216                .closest_nodes(&key, self.config.replication_factor * 2)
1217                .await;
1218
1219            // Use S/Kademlia to select secure nodes based on reputation
1220            skademlia.select_secure_nodes(&candidate_nodes, &key, self.config.replication_factor)
1221        } else {
1222            // Fallback to regular closest nodes
1223            self.routing_table
1224                .closest_nodes(&key, self.config.replication_factor)
1225                .await
1226        };
1227
1228        info!(
1229            "Storing record with key {} on {} secure nodes",
1230            key.to_hex(),
1231            secure_nodes.len()
1232        );
1233
1234        // Perform replications and collect results
1235        let mut replication_results = Vec::new();
1236        let mut successful_replications = 0;
1237
1238        for node in &secure_nodes {
1239            let success = self.replicate_record(&record, node).await.is_ok();
1240            replication_results.push((node.peer_id.clone(), success));
1241            if success {
1242                successful_replications += 1;
1243            }
1244        }
1245
1246        // Update reputations if S/Kademlia enabled
1247        if let Some(ref mut skademlia) = self.skademlia {
1248            for (peer_id, success) in replication_results {
1249                skademlia.reputation_manager.update_reputation(
1250                    &peer_id,
1251                    success,
1252                    Duration::from_millis(100),
1253                );
1254            }
1255        }
1256
1257        if successful_replications > 0 {
1258            info!(
1259                "Successfully replicated to {}/{} secure nodes",
1260                successful_replications,
1261                secure_nodes.len()
1262            );
1263        }
1264
1265        Ok(())
1266    }
1267
1268    /// Update sibling lists for a key range
1269    pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
1270        if let Some(ref mut skademlia) = self.skademlia {
1271            let nodes = self
1272                .routing_table
1273                .closest_nodes(&key, skademlia.config.sibling_list_size)
1274                .await;
1275            skademlia.update_sibling_list(key, nodes);
1276        }
1277        Ok(())
1278    }
1279
1280    /// Validate routing table consistency using S/Kademlia
1281    pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
1282        if let Some(ref skademlia) = self.skademlia {
1283            // Get sample of nodes for validation (using closest_nodes as a proxy)
1284            let sample_key = Key::random();
1285            let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
1286            skademlia.validate_routing_consistency(&sample_nodes).await
1287        } else {
1288            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1289                "S/Kademlia not enabled".to_string().into(),
1290            )))
1291        }
1292    }
1293
1294    /// Create a distance verification challenge for a peer
1295    pub fn create_distance_challenge(
1296        &mut self,
1297        peer_id: &PeerId,
1298        key: &Key,
1299    ) -> Option<skademlia::DistanceChallenge> {
1300        self.skademlia
1301            .as_mut()
1302            .map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
1303    }
1304
1305    /// Verify a distance proof
1306    pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
1307        if let Some(ref skademlia) = self.skademlia {
1308            skademlia.verify_distance_proof(proof)
1309        } else {
1310            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1311                "S/Kademlia not enabled".to_string().into(),
1312            )))
1313        }
1314    }
1315
1316    /// Verify distances of multiple nodes using enhanced consensus
1317    #[allow(dead_code)]
1318    async fn verify_node_distances(
1319        &self,
1320        nodes: &[DHTNode],
1321        _target_key: &Key,
1322        min_reputation: f64,
1323    ) -> Result<Vec<DHTNode>> {
1324        let mut verified_nodes = Vec::new();
1325
1326        for node in nodes {
1327            let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1328
1329            // Only proceed if we have enough witness nodes
1330            if witness_nodes.len() >= 2 {
1331                // Simulate consensus verification (simplified for now)
1332                let consensus_confidence = 0.8; // Placeholder
1333
1334                if consensus_confidence >= min_reputation {
1335                    verified_nodes.push(node.clone());
1336                } else {
1337                    debug!(
1338                        "Node {} failed distance verification with confidence {}",
1339                        node.peer_id, consensus_confidence
1340                    );
1341                }
1342            }
1343        }
1344
1345        Ok(verified_nodes)
1346    }
1347
1348    /// Select witness nodes for distance verification
1349    async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
1350        // Get nodes that are close to the target but not the target itself
1351        let target_key = Key::new(target_peer.as_bytes());
1352        let candidate_nodes = self
1353            .routing_table
1354            .closest_nodes(&target_key, count * 2)
1355            .await;
1356
1357        candidate_nodes
1358            .into_iter()
1359            .filter(|node| node.peer_id != *target_peer)
1360            .take(count)
1361            .map(|node| node.peer_id)
1362            .collect()
1363    }
1364
1365    /// Create enhanced distance challenge with adaptive difficulty
1366    pub fn create_enhanced_distance_challenge(
1367        &mut self,
1368        peer_id: &PeerId,
1369        key: &Key,
1370        suspected_attack: bool,
1371    ) -> Option<skademlia::EnhancedDistanceChallenge> {
1372        self.skademlia.as_mut().map(|skademlia| {
1373            skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack)
1374        })
1375    }
1376
1377    /// Verify distance using multi-round challenge protocol
1378    pub async fn verify_distance_multi_round(
1379        &mut self,
1380        challenge: &skademlia::EnhancedDistanceChallenge,
1381    ) -> Result<bool> {
1382        if let Some(ref mut skademlia) = self.skademlia {
1383            skademlia.verify_distance_multi_round(challenge).await
1384        } else {
1385            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1386                "S/Kademlia not enabled".to_string().into(),
1387            )))
1388        }
1389    }
1390
1391    /// Get security bucket for a key range
1392    pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
1393        self.skademlia
1394            .as_mut()
1395            .map(|skademlia| skademlia.get_security_bucket(key))
1396    }
1397
1398    /// Add trusted node to security bucket
1399    pub async fn add_trusted_node(
1400        &mut self,
1401        key: &Key,
1402        peer_id: PeerId,
1403        addresses: Vec<Multiaddr>,
1404    ) -> Result<()> {
1405        if let Some(ref mut skademlia) = self.skademlia {
1406            let node = DHTNode::new(peer_id, addresses, &self.local_id);
1407            let security_bucket = skademlia.get_security_bucket(key);
1408            security_bucket.add_trusted_node(node);
1409        }
1410        Ok(())
1411    }
1412
1413    /// Perform IPv6-enhanced secure get operation
1414    pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1415        // Check if requester would be banned
1416        if self.is_node_banned(&self.local_id.to_hex()) {
1417            return Err(P2PError::Security(
1418                crate::error::SecurityError::AuthorizationFailed("Local node is banned".into()),
1419            ));
1420        }
1421
1422        // Check local storage first
1423        if let Some(record) = self.storage.get(key).await
1424            && !record.is_expired()
1425        {
1426            return Ok(Some(record));
1427        }
1428
1429        // Get IPv6-verified nodes for secure lookup
1430        let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
1431
1432        if verified_nodes.is_empty() {
1433            // Fallback to regular secure_get if no IPv6 nodes available
1434            return self.secure_get(key).await;
1435        }
1436
1437        // Perform S/Kademlia secure lookup with IPv6-verified nodes
1438        if let Some(ref mut skademlia) = self.skademlia {
1439            let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
1440
1441            // Query nodes with both distance and IPv6 verification
1442            for node in &secure_nodes {
1443                // Additional IPv6 verification
1444                if let Some(ref manager) = self.ipv6_identity_manager {
1445                    if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1446                        // Check if IPv6 identity needs refresh
1447                        if ipv6_node
1448                            .needs_identity_refresh(manager.config.identity_refresh_interval)
1449                        {
1450                            debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
1451                            continue;
1452                        }
1453                    } else {
1454                        debug!(
1455                            "Skipping node {} without verified IPv6 identity",
1456                            node.peer_id
1457                        );
1458                        continue;
1459                    }
1460                }
1461
1462                let query = DHTQuery::FindValue {
1463                    key: key.clone(),
1464                    requester: self.local_id.to_hex(),
1465                };
1466                if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1467                    // Update IPv6 reputation for successful response
1468                    if let Some(ref mut manager) = self.ipv6_identity_manager {
1469                        manager.update_ipv6_reputation(&node.peer_id, true);
1470                    }
1471
1472                    // Store locally for future access
1473                    let _ = self.storage.store(record.clone()).await;
1474                    return Ok(Some(record));
1475                }
1476            }
1477        }
1478
1479        Ok(None)
1480    }
1481
1482    /// Perform IPv6-enhanced secure put operation
1483    pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1484        // Check if local node would be banned
1485        if self.is_node_banned(&self.local_id.to_hex()) {
1486            return Err(P2PError::Security(
1487                crate::error::SecurityError::AuthorizationFailed("Local node is banned".into()),
1488            ));
1489        }
1490
1491        let record = Record::new(key.clone(), value, self.local_id.to_hex());
1492
1493        // Store locally first
1494        self.storage.store(record.clone()).await?;
1495
1496        // Get IPv6-verified nodes for secure replication
1497        let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
1498
1499        // Use S/Kademlia for secure node selection among verified nodes
1500        let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1501            skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
1502        } else {
1503            verified_nodes
1504                .into_iter()
1505                .take(self.config.replication_factor)
1506                .collect()
1507        };
1508
1509        info!(
1510            "Storing record with key {} on {} IPv6-verified secure nodes",
1511            key.to_hex(),
1512            secure_nodes.len()
1513        );
1514
1515        // Perform replications with IPv6 reputation tracking
1516        let mut successful_replications = 0;
1517
1518        for node in &secure_nodes {
1519            let success = self.replicate_record(&record, node).await.is_ok();
1520
1521            // Update IPv6 reputation based on replication success
1522            if let Some(ref mut manager) = self.ipv6_identity_manager {
1523                manager.update_ipv6_reputation(&node.peer_id, success);
1524            }
1525
1526            if success {
1527                successful_replications += 1;
1528            }
1529        }
1530
1531        if successful_replications == 0 && !secure_nodes.is_empty() {
1532            return Err(P2PError::Dht(crate::error::DhtError::ReplicationFailed(
1533                format!("Failed to replicate key {} to any IPv6-verified nodes", key).into(),
1534            )));
1535        }
1536
1537        info!(
1538            "Successfully replicated to {}/{} IPv6-verified nodes",
1539            successful_replications,
1540            secure_nodes.len()
1541        );
1542        Ok(())
1543    }
1544
1545    /// Get IPv6-verified nodes suitable for a key
1546    async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
1547        let mut verified_nodes = Vec::new();
1548
1549        // Get closest nodes from routing table
1550        let candidate_nodes = self
1551            .routing_table
1552            .closest_nodes(key, self.config.replication_factor * 2)
1553            .await;
1554
1555        if let Some(ref manager) = self.ipv6_identity_manager {
1556            for node in candidate_nodes {
1557                // Check if node has verified IPv6 identity
1558                if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1559                    // Check if identity is still fresh
1560                    if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1561                        // Check if node is not banned
1562                        if !manager.is_node_banned(&node.peer_id) {
1563                            verified_nodes.push(node);
1564                        }
1565                    }
1566                }
1567            }
1568        } else {
1569            // If no IPv6 manager, return all candidates
1570            verified_nodes = candidate_nodes;
1571        }
1572
1573        Ok(verified_nodes)
1574    }
1575
1576    /// Get IPv6 diversity statistics
1577    pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
1578        self.ipv6_identity_manager
1579            .as_ref()
1580            .map(|manager| manager.get_ipv6_diversity_stats())
1581    }
1582
1583    /// Cleanup expired IPv6 identities and reputation data
1584    pub fn cleanup_ipv6_data(&mut self) {
1585        if let Some(ref mut manager) = self.ipv6_identity_manager {
1586            manager.cleanup_expired();
1587        }
1588    }
1589
1590    /// Ban a node for IPv6 security violations
1591    pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
1592        if let Some(ref mut manager) = self.ipv6_identity_manager {
1593            manager.ban_node(peer_id, reason);
1594        }
1595    }
1596
1597    /// Get local IPv6 identity
1598    pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
1599        self.ipv6_identity_manager
1600            .as_ref()
1601            .and_then(|manager| manager.get_local_identity())
1602    }
1603
1604    /// Replicate a record to a specific node
1605    async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
1606        // In a real implementation, this would send a STORE message over the network
1607        // For now, we simulate successful replication to nodes in our routing table
1608        debug!(
1609            "Replicating record {} to node {}",
1610            record.key.to_hex(),
1611            node.peer_id
1612        );
1613
1614        // Simulate network delay and occasional failures
1615        tokio::time::sleep(Duration::from_millis(10)).await;
1616
1617        // Simulate 95% success rate for replication (high success rate for testing)
1618        if rand::random::<f64>() < 0.95 {
1619            Ok(())
1620        } else {
1621            Err(P2PError::Network(
1622                crate::error::NetworkError::ProtocolError("Replication failed".to_string().into()),
1623            ))
1624        }
1625    }
1626
1627    /// Perform iterative lookup to find a value
1628    async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
1629        debug!("Starting iterative lookup for key {}", key.to_hex());
1630
1631        let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1632
1633        // Start with closest nodes from routing table
1634        let initial_nodes = self
1635            .routing_table
1636            .closest_nodes(key, self.config.alpha)
1637            .await;
1638        lookup_state.add_nodes(initial_nodes);
1639
1640        // Perform iterative queries
1641        let mut iterations = 0;
1642        const MAX_ITERATIONS: usize = 10;
1643
1644        while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1645            let nodes_to_query = lookup_state.next_nodes();
1646            if nodes_to_query.is_empty() {
1647                break;
1648            }
1649
1650            // Query nodes in parallel
1651            let mut queries = Vec::new();
1652            for node in &nodes_to_query {
1653                let query = DHTQuery::FindValue {
1654                    key: key.clone(),
1655                    requester: self.local_id.to_hex(),
1656                };
1657                queries.push(self.simulate_query(node, query));
1658            }
1659
1660            // Process responses
1661            for query_result in futures::future::join_all(queries).await {
1662                match query_result {
1663                    Ok(DHTResponse::Value { record }) => {
1664                        debug!(
1665                            "Found value for key {} in iteration {}",
1666                            key.to_hex(),
1667                            iterations
1668                        );
1669                        return Some(record);
1670                    }
1671                    Ok(DHTResponse::Nodes { nodes }) => {
1672                        let dht_nodes: Vec<DHTNode> =
1673                            nodes.into_iter().map(|n| n.to_dht_node()).collect();
1674                        lookup_state.add_nodes(dht_nodes);
1675                    }
1676                    _ => {
1677                        // Query failed or returned unexpected response
1678                        debug!("Query failed during iterative lookup");
1679                    }
1680                }
1681            }
1682
1683            iterations += 1;
1684        }
1685
1686        debug!(
1687            "Iterative lookup for key {} completed after {} iterations, value not found",
1688            key.to_hex(),
1689            iterations
1690        );
1691        None
1692    }
1693
1694    /// Simulate a query to a remote node (placeholder for real network implementation)
1695    async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
1696        // Add some realistic delay
1697        tokio::time::sleep(Duration::from_millis(50)).await;
1698
1699        // Handle the query locally (simulating remote node response)
1700        Ok(self.handle_query(query).await)
1701    }
1702
1703    /// Republish records that are close to expiration
1704    async fn republish_records(&self) -> Result<()> {
1705        let all_records = self.storage.all_records().await;
1706        let mut republished_count = 0;
1707
1708        for record in all_records {
1709            // Republish if record has less than 1/4 of its TTL remaining
1710            let remaining_ttl = record
1711                .expires_at
1712                .duration_since(SystemTime::now())
1713                .unwrap_or(Duration::ZERO);
1714
1715            if remaining_ttl < self.config.record_ttl / 4 {
1716                // Find nodes responsible for this key
1717                let closest_nodes = self
1718                    .routing_table
1719                    .closest_nodes(&record.key, self.config.replication_factor)
1720                    .await;
1721
1722                // Republish to closest nodes
1723                for node in &closest_nodes {
1724                    if self.replicate_record(&record, node).await.is_ok() {
1725                        republished_count += 1;
1726                    }
1727                }
1728            }
1729        }
1730
1731        if republished_count > 0 {
1732            debug!(
1733                "Republished {} records during maintenance",
1734                republished_count
1735            );
1736        }
1737
1738        Ok(())
1739    }
1740
1741    /// Refresh buckets that haven't been active recently
1742    async fn refresh_buckets(&self) -> Result<()> {
1743        let mut refreshed_count = 0;
1744
1745        for bucket_index in 0..256 {
1746            let needs_refresh = {
1747                let bucket = self.routing_table.buckets[bucket_index].read().await;
1748                bucket.needs_refresh(self.config.bucket_refresh_interval)
1749            };
1750
1751            if needs_refresh {
1752                // Generate a random key in this bucket's range and perform lookup
1753                let target_key = self.generate_key_for_bucket(bucket_index);
1754                let _nodes = self.iterative_find_node(&target_key).await;
1755                refreshed_count += 1;
1756
1757                // Update bucket refresh time
1758                {
1759                    let mut bucket = self.routing_table.buckets[bucket_index].write().await;
1760                    bucket.last_refresh = Instant::now();
1761                }
1762            }
1763        }
1764
1765        if refreshed_count > 0 {
1766            debug!("Refreshed {} buckets during maintenance", refreshed_count);
1767        }
1768
1769        Ok(())
1770    }
1771
1772    /// Generate a key that would fall into the specified bucket
1773    fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
1774        let mut key_bytes = self.local_id.as_bytes().to_vec();
1775
1776        // Flip the bit at position (255 - bucket_index) to ensure distance
1777        if bucket_index < 256 {
1778            let byte_index = (255 - bucket_index) / 8;
1779            let bit_index = (255 - bucket_index) % 8;
1780
1781            if byte_index < key_bytes.len() {
1782                key_bytes[byte_index] ^= 1 << bit_index;
1783            }
1784        }
1785
1786        let mut hash = [0u8; 32];
1787        hash.copy_from_slice(&key_bytes);
1788        Key::from_hash(hash)
1789    }
1790
1791    /// Perform iterative node lookup
1792    async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
1793        debug!("Starting iterative node lookup for key {}", key.to_hex());
1794
1795        let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1796
1797        // Start with closest nodes from routing table
1798        let initial_nodes = self
1799            .routing_table
1800            .closest_nodes(key, self.config.alpha)
1801            .await;
1802        lookup_state.add_nodes(initial_nodes);
1803
1804        // Perform iterative queries
1805        let mut iterations = 0;
1806        const MAX_ITERATIONS: usize = 10;
1807
1808        while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1809            let nodes_to_query = lookup_state.next_nodes();
1810            if nodes_to_query.is_empty() {
1811                break;
1812            }
1813
1814            // Query nodes in parallel
1815            let mut queries = Vec::new();
1816            for node in &nodes_to_query {
1817                let query = DHTQuery::FindNode {
1818                    key: key.clone(),
1819                    requester: self.local_id.to_hex(),
1820                };
1821                queries.push(self.simulate_query(node, query));
1822            }
1823
1824            // Process responses
1825            for query_result in futures::future::join_all(queries).await {
1826                if let Ok(DHTResponse::Nodes { nodes }) = query_result {
1827                    let dht_nodes: Vec<DHTNode> =
1828                        nodes.into_iter().map(|n| n.to_dht_node()).collect();
1829                    lookup_state.add_nodes(dht_nodes);
1830                }
1831            }
1832
1833            iterations += 1;
1834        }
1835
1836        debug!(
1837            "Iterative node lookup for key {} completed after {} iterations",
1838            key.to_hex(),
1839            iterations
1840        );
1841
1842        // Return the closest nodes found
1843        lookup_state
1844            .closest
1845            .into_iter()
1846            .take(self.config.replication_factor)
1847            .collect()
1848    }
1849
1850    /// Check consistency of a record across multiple nodes
1851    pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
1852        debug!("Checking consistency for key {}", key.to_hex());
1853
1854        // Find nodes that should have this record
1855        let closest_nodes = self
1856            .routing_table
1857            .closest_nodes(key, self.config.replication_factor)
1858            .await;
1859
1860        let mut records_found = Vec::new();
1861        let mut nodes_queried = 0;
1862        let mut nodes_responded = 0;
1863
1864        // Query each node for the record
1865        for node in &closest_nodes {
1866            nodes_queried += 1;
1867
1868            let query = DHTQuery::FindValue {
1869                key: key.clone(),
1870                requester: self.local_id.to_hex(),
1871            };
1872
1873            match self.simulate_query(node, query).await {
1874                Ok(DHTResponse::Value { record }) => {
1875                    nodes_responded += 1;
1876                    records_found.push((node.peer_id.clone(), record));
1877                }
1878                Ok(DHTResponse::Nodes { .. }) => {
1879                    nodes_responded += 1;
1880                    // Node doesn't have the record
1881                }
1882                _ => {
1883                    // Node didn't respond or error occurred
1884                }
1885            }
1886        }
1887
1888        // Analyze consistency
1889        let mut consistent = true;
1890        let mut canonical_record: Option<Record> = None;
1891        let mut conflicts = Vec::new();
1892
1893        for (node_id, record) in &records_found {
1894            if let Some(ref canonical) = canonical_record {
1895                // Check if records match
1896                if record.value != canonical.value
1897                    || record.created_at != canonical.created_at
1898                    || record.publisher != canonical.publisher
1899                {
1900                    consistent = false;
1901                    conflicts.push((node_id.clone(), record.clone()));
1902                }
1903            } else {
1904                canonical_record = Some(record.clone());
1905            }
1906        }
1907
1908        let report = ConsistencyReport {
1909            key: key.clone(),
1910            nodes_queried,
1911            nodes_responded,
1912            records_found: records_found.len(),
1913            consistent,
1914            canonical_record,
1915            conflicts,
1916            replication_factor: self.config.replication_factor,
1917        };
1918
1919        debug!(
1920            "Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}",
1921            key.to_hex(),
1922            report.nodes_queried,
1923            report.nodes_responded,
1924            report.records_found,
1925            report.consistent
1926        );
1927
1928        Ok(report)
1929    }
1930
1931    /// Repair inconsistencies for a specific key
1932    pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
1933        debug!("Starting repair for key {}", key.to_hex());
1934
1935        let consistency_report = self.check_consistency(key).await?;
1936
1937        if consistency_report.consistent {
1938            return Ok(RepairResult {
1939                key: key.clone(),
1940                repairs_needed: false,
1941                repairs_attempted: 0,
1942                repairs_successful: 0,
1943                final_state: "consistent".to_string(),
1944            });
1945        }
1946
1947        // Determine the canonical version (use most recent)
1948        let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
1949            canonical
1950        } else {
1951            return Ok(RepairResult {
1952                key: key.clone(),
1953                repairs_needed: false,
1954                repairs_attempted: 0,
1955                repairs_successful: 0,
1956                final_state: "no_records_found".to_string(),
1957            });
1958        };
1959
1960        // Find the most recent version among conflicts
1961        let mut most_recent = canonical_record.clone();
1962        for (_, conflicted_record) in &consistency_report.conflicts {
1963            if conflicted_record.created_at > most_recent.created_at {
1964                most_recent = conflicted_record.clone();
1965            }
1966        }
1967
1968        // Replicate the canonical version to all responsible nodes
1969        let closest_nodes = self
1970            .routing_table
1971            .closest_nodes(key, self.config.replication_factor)
1972            .await;
1973
1974        let mut repairs_attempted = 0;
1975        let mut repairs_successful = 0;
1976
1977        for node in &closest_nodes {
1978            repairs_attempted += 1;
1979            if self.replicate_record(&most_recent, node).await.is_ok() {
1980                repairs_successful += 1;
1981            }
1982        }
1983
1984        let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
1985            "repaired".to_string()
1986        } else {
1987            "repair_failed".to_string()
1988        };
1989
1990        debug!(
1991            "Repair for key {} completed: {}/{} repairs successful, final state: {}",
1992            key.to_hex(),
1993            repairs_successful,
1994            repairs_attempted,
1995            final_state
1996        );
1997
1998        Ok(RepairResult {
1999            key: key.clone(),
2000            repairs_needed: true,
2001            repairs_attempted,
2002            repairs_successful,
2003            final_state,
2004        })
2005    }
2006
2007    // ===============================
2008    // INBOX SYSTEM IMPLEMENTATION
2009    // ===============================
2010
2011    /// Create a new inbox for a user with infinite TTL
2012    pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
2013        info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
2014
2015        let inbox_key = Key::from_inbox_id(inbox_id);
2016
2017        // Create inbox metadata record with infinite TTL
2018        let inbox_metadata = InboxMetadata {
2019            inbox_id: inbox_id.to_string(),
2020            owner: owner_peer_id.clone(),
2021            created_at: SystemTime::now(),
2022            message_count: 0,
2023            max_messages: 1000, // Configurable limit
2024            is_public: true,
2025            access_keys: vec![owner_peer_id.clone()],
2026        };
2027
2028        let metadata_value = serde_json::to_vec(&inbox_metadata)
2029            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2030
2031        let metadata_record = Record {
2032            key: inbox_key.clone(),
2033            value: metadata_value,
2034            publisher: owner_peer_id.clone(),
2035            created_at: SystemTime::now(),
2036            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
2037            signature: None,
2038        };
2039
2040        // Store metadata with infinite TTL
2041        self.put_record_with_infinite_ttl(metadata_record).await?;
2042
2043        // Create empty message index
2044        let index_key = Key::from_inbox_index(inbox_id);
2045        let empty_index = InboxMessageIndex {
2046            inbox_id: inbox_id.to_string(),
2047            messages: Vec::new(),
2048            last_updated: SystemTime::now(),
2049        };
2050
2051        let index_value = serde_json::to_vec(&empty_index)
2052            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2053
2054        let index_record = Record {
2055            key: index_key,
2056            value: index_value,
2057            publisher: owner_peer_id.clone(),
2058            created_at: SystemTime::now(),
2059            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
2060            signature: None,
2061        };
2062
2063        self.put_record_with_infinite_ttl(index_record).await?;
2064
2065        let inbox_info = InboxInfo {
2066            inbox_id: inbox_id.to_string(),
2067            four_word_address: self.generate_four_word_address(inbox_id),
2068            owner: owner_peer_id,
2069            created_at: SystemTime::now(),
2070            message_count: 0,
2071            is_accessible: true,
2072        };
2073
2074        info!(
2075            "Successfully created inbox {} with four-word address: {}",
2076            inbox_id, inbox_info.four_word_address
2077        );
2078
2079        Ok(inbox_info)
2080    }
2081
2082    /// Send a message to an inbox
2083    pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
2084        info!("Sending message to inbox {}", inbox_id);
2085
2086        // Get current inbox metadata
2087        let inbox_key = Key::from_inbox_id(inbox_id);
2088        let metadata_record = self.get(&inbox_key).await.ok_or_else(|| {
2089            P2PError::Dht(crate::error::DhtError::KeyNotFound(
2090                format!("inbox:{inbox_id}").into(),
2091            ))
2092        })?;
2093
2094        let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
2095            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2096
2097        // Check message limit
2098        if inbox_metadata.message_count >= inbox_metadata.max_messages {
2099            return Err(P2PError::Dht(crate::error::DhtError::StoreFailed(
2100                format!("Inbox {} is full", inbox_id).into(),
2101            )));
2102        }
2103
2104        // Create message record with infinite TTL
2105        let message_key = Key::from_inbox_message(inbox_id, &message.id);
2106        let message_value = serde_json::to_vec(&message)
2107            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2108
2109        let message_record = Record {
2110            key: message_key.clone(),
2111            value: message_value,
2112            publisher: message.sender.clone(),
2113            created_at: message.timestamp,
2114            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
2115            signature: None,
2116        };
2117
2118        self.put_record_with_infinite_ttl(message_record).await?;
2119
2120        // Update message index
2121        let index_key = Key::from_inbox_index(inbox_id);
2122        let index_record = self.get(&index_key).await.ok_or_else(|| {
2123            P2PError::Dht(crate::error::DhtError::KeyNotFound(
2124                format!("inbox_index:{inbox_id}").into(),
2125            ))
2126        })?;
2127
2128        let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
2129            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2130
2131        message_index.messages.push(MessageRef {
2132            message_id: message.id.clone(),
2133            sender: message.sender.clone(),
2134            timestamp: message.timestamp,
2135            message_type: message.message_type.clone(),
2136        });
2137        message_index.last_updated = SystemTime::now();
2138
2139        // Update metadata
2140        inbox_metadata.message_count += 1;
2141
2142        // Store updated index and metadata
2143        let updated_index_value = serde_json::to_vec(&message_index)
2144            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2145
2146        let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
2147            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2148
2149        let updated_index_record = Record {
2150            key: index_key,
2151            value: updated_index_value,
2152            publisher: message.sender.clone(),
2153            created_at: SystemTime::now(),
2154            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
2155            signature: None,
2156        };
2157
2158        let updated_metadata_record = Record {
2159            key: inbox_key,
2160            value: updated_metadata_value,
2161            publisher: message.sender.clone(),
2162            created_at: SystemTime::now(),
2163            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
2164            signature: None,
2165        };
2166
2167        self.put_record_with_infinite_ttl(updated_index_record)
2168            .await?;
2169        self.put_record_with_infinite_ttl(updated_metadata_record)
2170            .await?;
2171
2172        info!(
2173            "Successfully sent message {} to inbox {}",
2174            message.id, inbox_id
2175        );
2176        Ok(())
2177    }
2178
2179    /// Get messages from an inbox
2180    pub async fn get_inbox_messages(
2181        &self,
2182        inbox_id: &str,
2183        limit: Option<usize>,
2184    ) -> Result<Vec<InboxMessage>> {
2185        info!("Retrieving messages from inbox {}", inbox_id);
2186
2187        let index_key = Key::from_inbox_index(inbox_id);
2188        let index_record = self.get(&index_key).await.ok_or_else(|| {
2189            P2PError::Dht(crate::error::DhtError::KeyNotFound(
2190                format!("inbox:{inbox_id}").into(),
2191            ))
2192        })?;
2193
2194        let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
2195            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2196
2197        let mut messages = Vec::new();
2198        let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
2199            message_index.messages.iter().rev().take(limit).collect()
2200        } else {
2201            message_index.messages.iter().collect()
2202        };
2203
2204        for message_ref in message_refs {
2205            let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
2206            if let Some(message_record) = self.get(&message_key).await
2207                && let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value)
2208            {
2209                messages.push(message);
2210            }
2211        }
2212
2213        // Sort messages by timestamp (newest first)
2214        messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
2215
2216        info!(
2217            "Retrieved {} messages from inbox {}",
2218            messages.len(),
2219            inbox_id
2220        );
2221        Ok(messages)
2222    }
2223
2224    /// Get inbox information
2225    pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
2226        let inbox_key = Key::from_inbox_id(inbox_id);
2227        let metadata_record = self.get(&inbox_key).await;
2228
2229        if let Some(record) = metadata_record {
2230            let metadata: InboxMetadata = serde_json::from_slice(&record.value)
2231                .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2232
2233            let inbox_info = InboxInfo {
2234                inbox_id: inbox_id.to_string(),
2235                four_word_address: self.generate_four_word_address(inbox_id),
2236                owner: metadata.owner,
2237                created_at: metadata.created_at,
2238                message_count: metadata.message_count,
2239                is_accessible: true,
2240            };
2241
2242            Ok(Some(inbox_info))
2243        } else {
2244            Ok(None)
2245        }
2246    }
2247
2248    /// Store a record with infinite TTL (never expires)
2249    async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
2250        // Store locally first
2251        self.storage.store(record.clone()).await?;
2252
2253        // Find closest nodes for replication
2254        let closest_nodes = self
2255            .routing_table
2256            .closest_nodes(&record.key, self.config.replication_factor)
2257            .await;
2258
2259        // Replicate to closest nodes
2260        for node in &closest_nodes {
2261            if let Err(e) = self.replicate_record(&record, node).await {
2262                debug!(
2263                    "Failed to replicate infinite TTL record to node {}: {}",
2264                    node.peer_id, e
2265                );
2266            }
2267        }
2268
2269        Ok(())
2270    }
2271
2272    /// Generate a four-word address for an inbox
2273    fn generate_four_word_address(&self, inbox_id: &str) -> String {
2274        // TODO: Replace with real four_word_networking when available
2275        // For now, use a simple placeholder implementation
2276        let words = [
2277            "alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta",
2278        ];
2279        let mut hash = 0u32;
2280
2281        for byte in inbox_id.as_bytes() {
2282            hash = hash.wrapping_mul(31).wrapping_add(*byte as u32);
2283        }
2284
2285        let word1 = words[hash as usize % words.len()];
2286        let word2 = words[(hash >> 8) as usize % words.len()];
2287        let word3 = words[(hash >> 16) as usize % words.len()];
2288
2289        format!("{word1}.{word2}.{word3}")
2290    }
2291
2292    /// Add a node to the DHT routing table
2293    pub async fn add_node(&self, node: DHTNode) -> Result<()> {
2294        self.routing_table.add_node(node).await
2295    }
2296
2297    /// Remove a node from the DHT routing table
2298    pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
2299        self.routing_table.remove_node(peer_id).await
2300    }
2301}
2302
2303/// DHT statistics
2304#[derive(Debug, Clone)]
2305pub struct DHTStats {
2306    /// Local node ID
2307    pub local_id: Key,
2308    /// Total nodes in routing table
2309    pub total_nodes: usize,
2310    /// Number of active buckets
2311    pub active_buckets: usize,
2312    /// Number of stored records
2313    pub stored_records: usize,
2314    /// Number of expired records
2315    pub expired_records: usize,
2316}
2317
2318/// Consistency check report
2319#[derive(Debug, Clone)]
2320pub struct ConsistencyReport {
2321    /// Key being checked
2322    pub key: Key,
2323    /// Number of nodes queried
2324    pub nodes_queried: usize,
2325    /// Number of nodes that responded
2326    pub nodes_responded: usize,
2327    /// Number of records found
2328    pub records_found: usize,
2329    /// Whether all records are consistent
2330    pub consistent: bool,
2331    /// The canonical record (if any)
2332    pub canonical_record: Option<Record>,
2333    /// Conflicting records found
2334    pub conflicts: Vec<(PeerId, Record)>,
2335    /// Expected replication factor
2336    pub replication_factor: usize,
2337}
2338
2339/// Result of a repair operation
2340#[derive(Debug, Clone)]
2341pub struct RepairResult {
2342    /// Key that was repaired
2343    pub key: Key,
2344    /// Whether repairs were needed
2345    pub repairs_needed: bool,
2346    /// Number of repair attempts made
2347    pub repairs_attempted: usize,
2348    /// Number of successful repairs
2349    pub repairs_successful: usize,
2350    /// Final state description
2351    pub final_state: String,
2352}
2353
2354impl LookupState {
2355    /// Create a new lookup state
2356    pub fn new(target: Key, alpha: usize) -> Self {
2357        Self {
2358            target,
2359            queried: HashMap::new(),
2360            to_query: VecDeque::new(),
2361            closest: Vec::new(),
2362            started_at: Instant::now(),
2363            alpha,
2364        }
2365    }
2366
2367    /// Add nodes to query
2368    pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
2369        for node in nodes {
2370            if !self.queried.contains_key(&node.peer_id) {
2371                self.to_query.push_back(node);
2372            }
2373        }
2374
2375        // Sort by distance to target
2376        let target = &self.target;
2377        self.to_query
2378            .make_contiguous()
2379            .sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
2380    }
2381
2382    /// Get next nodes to query
2383    pub fn next_nodes(&mut self) -> Vec<DHTNode> {
2384        let mut nodes = Vec::new();
2385        for _ in 0..self.alpha {
2386            if let Some(node) = self.to_query.pop_front() {
2387                self.queried.insert(node.peer_id.clone(), Instant::now());
2388                nodes.push(node);
2389            } else {
2390                break;
2391            }
2392        }
2393        nodes
2394    }
2395
2396    /// Check if lookup is complete
2397    pub fn is_complete(&self) -> bool {
2398        self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
2399    }
2400}
2401
2402// ===============================
2403// INBOX SYSTEM DATA STRUCTURES
2404// ===============================
2405
2406/// Inbox metadata stored in DHT
2407#[derive(Debug, Clone, Serialize, Deserialize)]
2408pub struct InboxMetadata {
2409    pub inbox_id: String,
2410    pub owner: PeerId,
2411    pub created_at: SystemTime,
2412    pub message_count: usize,
2413    pub max_messages: usize,
2414    pub is_public: bool,
2415    pub access_keys: Vec<PeerId>,
2416}
2417
2418/// Inbox message stored in DHT
2419#[derive(Debug, Clone, Serialize, Deserialize)]
2420pub struct InboxMessage {
2421    pub id: String,
2422    pub sender: PeerId,
2423    pub recipient_inbox: String,
2424    pub content: String,
2425    pub message_type: String,
2426    pub timestamp: SystemTime,
2427    pub attachments: Vec<MessageAttachment>,
2428}
2429
2430/// Message attachment metadata
2431#[derive(Debug, Clone, Serialize, Deserialize)]
2432pub struct MessageAttachment {
2433    pub filename: String,
2434    pub content_type: String,
2435    pub size: u64,
2436    pub hash: String,
2437}
2438
2439/// Message index for efficient inbox querying
2440#[derive(Debug, Clone, Serialize, Deserialize)]
2441pub struct InboxMessageIndex {
2442    pub inbox_id: String,
2443    pub messages: Vec<MessageRef>,
2444    pub last_updated: SystemTime,
2445}
2446
2447/// Reference to a message in the index
2448#[derive(Debug, Clone, Serialize, Deserialize)]
2449pub struct MessageRef {
2450    pub message_id: String,
2451    pub sender: PeerId,
2452    pub timestamp: SystemTime,
2453    pub message_type: String,
2454}
2455
2456/// Inbox information returned to users
2457#[derive(Debug, Clone, Serialize, Deserialize)]
2458pub struct InboxInfo {
2459    pub inbox_id: String,
2460    pub four_word_address: String,
2461    pub owner: PeerId,
2462    pub created_at: SystemTime,
2463    pub message_count: usize,
2464    pub is_accessible: bool,
2465}
2466
2467// ===============================
2468// KEY EXTENSIONS FOR INBOX SYSTEM
2469// ===============================
2470
2471impl Key {
2472    /// Create a key for inbox metadata
2473    pub fn from_inbox_id(inbox_id: &str) -> Self {
2474        let mut hasher = Sha256::new();
2475        hasher.update(b"INBOX_METADATA:");
2476        hasher.update(inbox_id.as_bytes());
2477        let hash = hasher.finalize();
2478        Key { hash: hash.into() }
2479    }
2480
2481    /// Create a key for inbox message index
2482    pub fn from_inbox_index(inbox_id: &str) -> Self {
2483        let mut hasher = Sha256::new();
2484        hasher.update(b"INBOX_INDEX:");
2485        hasher.update(inbox_id.as_bytes());
2486        let hash = hasher.finalize();
2487        Key { hash: hash.into() }
2488    }
2489
2490    /// Create a key for a specific message in an inbox
2491    pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
2492        let mut hasher = Sha256::new();
2493        hasher.update(b"INBOX_MESSAGE:");
2494        hasher.update(inbox_id.as_bytes());
2495        hasher.update(b":");
2496        hasher.update(message_id.as_bytes());
2497        let hash = hasher.finalize();
2498        Key { hash: hash.into() }
2499    }
2500}
2501
2502// Add hex dependency for key display
2503// This would need to be added to Cargo.toml dependencies