ant_core/
dht.rs

1//! Distributed Hash Table (DHT) Implementation
2//!
3//! This module provides a Kademlia-based DHT for distributed peer routing and data storage.
4//! It implements the core Kademlia algorithm with proper distance metrics, k-buckets,
5//! and network operations for a fully decentralized P2P system.
6//!
7//! The implementation includes S/Kademlia security extensions for enhanced protection
8//! against various attacks on the DHT infrastructure.
9
10use crate::{PeerId, Multiaddr, Result, P2PError};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::time::{Duration, Instant, SystemTime};
14use sha2::{Digest, Sha256};
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17use futures;
18
19// S/Kademlia security extensions
20pub mod skademlia;
21
22// IPv6-based node identity system
23pub mod ipv6_identity;
24
25/// DHT configuration parameters
26#[derive(Debug, Clone)]
27pub struct DHTConfig {
28    /// Replication parameter (k) - number of nodes to store each record
29    pub replication_factor: usize,
30    /// Maximum nodes per k-bucket
31    pub bucket_size: usize,
32    /// Concurrency parameter for parallel lookups
33    pub alpha: usize,
34    /// Record expiration time
35    pub record_ttl: Duration,
36    /// Refresh interval for buckets
37    pub bucket_refresh_interval: Duration,
38    /// Republish interval for stored records
39    pub republish_interval: Duration,
40    /// Maximum distance for considering nodes "close"
41    pub max_distance: u8,
42}
43
44/// DHT key type with proper Kademlia distance calculation
45#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub struct Key {
47    /// 256-bit key hash
48    hash: [u8; 32],
49}
50
51/// DHT record containing key-value data with metadata
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Record {
54    /// Record key
55    pub key: Key,
56    /// Record value
57    pub value: Vec<u8>,
58    /// Publisher peer ID
59    pub publisher: PeerId,
60    /// Record creation time
61    pub created_at: SystemTime,
62    /// Record expiration time
63    pub expires_at: SystemTime,
64    /// Signature for verification (optional)
65    pub signature: Option<Vec<u8>>,
66}
67
68/// DHT node information
69#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70pub struct DHTNode {
71    /// Node peer ID
72    pub peer_id: PeerId,
73    /// Node addresses
74    pub addresses: Vec<Multiaddr>,
75    /// Last seen timestamp (seconds since epoch)
76    #[serde(with = "instant_as_secs")]
77    pub last_seen: Instant,
78    /// Node distance from local node
79    pub distance: Key,
80    /// Connection status
81    pub is_connected: bool,
82}
83
84/// Serde helper for Instant serialization
85mod instant_as_secs {
86    use serde::{Deserializer, Serializer, Deserialize, Serialize};
87    use std::time::Instant;
88    
89    pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
90    where
91        S: Serializer,
92    {
93        // Convert to approximate seconds since creation
94        instant.elapsed().as_secs().serialize(serializer)
95    }
96    
97    pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
98    where
99        D: Deserializer<'de>,
100    {
101        let secs = u64::deserialize(deserializer)?;
102        // Return current instant minus the stored duration (approximate)
103        Ok(Instant::now() - std::time::Duration::from_secs(secs))
104    }
105}
106
107/// Serializable DHT node for network transmission
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct SerializableDHTNode {
110    /// Node peer ID
111    pub peer_id: PeerId,
112    /// Node addresses
113    pub addresses: Vec<Multiaddr>,
114    /// Last seen timestamp as seconds since epoch
115    pub last_seen_secs: u64,
116    /// Node distance from local node
117    pub distance: Key,
118    /// Connection status
119    pub is_connected: bool,
120}
121
122/// Kademlia routing table bucket
123#[derive(Debug)]
124struct KBucket {
125    /// Nodes in this bucket (up to k nodes)
126    nodes: VecDeque<DHTNode>,
127    /// Bucket capacity
128    capacity: usize,
129    /// Last refresh time
130    last_refresh: Instant,
131}
132
133/// Kademlia routing table
134#[derive(Debug)]
135pub struct RoutingTable {
136    /// Local node ID
137    local_id: Key,
138    /// K-buckets indexed by distance
139    buckets: Vec<RwLock<KBucket>>,
140    /// Configuration (reserved for future use)
141    #[allow(dead_code)]
142    config: DHTConfig,
143}
144
145/// DHT storage for local records
146#[derive(Debug)]
147pub struct DHTStorage {
148    /// Stored records
149    records: RwLock<HashMap<Key, Record>>,
150    /// Configuration (reserved for future use)
151    #[allow(dead_code)]
152    config: DHTConfig,
153}
154
155/// Main DHT implementation with S/Kademlia security extensions
156#[derive(Debug)]
157pub struct DHT {
158    /// Local node ID
159    local_id: Key,
160    /// Routing table
161    routing_table: RoutingTable,
162    /// Local storage
163    storage: DHTStorage,
164    /// Configuration (reserved for future use)
165    #[allow(dead_code)]
166    config: DHTConfig,
167    /// S/Kademlia security extensions
168    pub skademlia: Option<skademlia::SKademlia>,
169    /// IPv6-based identity manager
170    pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
171}
172
173/// DHT query types
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub enum DHTQuery {
176    /// Find nodes close to a key
177    FindNode { 
178        /// The key to find nodes near
179        key: Key, 
180        /// ID of the requesting peer
181        requester: PeerId 
182    },
183    /// Find value for a key
184    FindValue { 
185        /// The key to find value for
186        key: Key, 
187        /// ID of the requesting peer
188        requester: PeerId 
189    },
190    /// Store a record
191    Store { 
192        /// The record to store
193        record: Record, 
194        /// ID of the requesting peer
195        requester: PeerId 
196    },
197    /// Ping to check node availability
198    Ping { 
199        /// ID of the requesting peer
200        requester: PeerId 
201    },
202}
203
204/// DHT response types
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub enum DHTResponse {
207    /// Response to FindNode query
208    Nodes { 
209        /// List of nodes near the requested key
210        nodes: Vec<SerializableDHTNode> 
211    },
212    /// Response to FindValue query
213    Value { 
214        /// The found record
215        record: Record 
216    },
217    /// Response to Store query
218    Stored { 
219        /// Whether storage was successful
220        success: bool 
221    },
222    /// Response to Ping query
223    Pong { 
224        /// ID of the responding peer
225        responder: PeerId 
226    },
227    /// Error response
228    Error { 
229        /// Error message describing what went wrong
230        message: String 
231    },
232}
233
234/// DHT lookup state for iterative queries
235#[derive(Debug)]
236pub struct LookupState {
237    /// Target key
238    pub target: Key,
239    /// Nodes queried so far
240    pub queried: HashMap<PeerId, Instant>,
241    /// Nodes to query next
242    pub to_query: VecDeque<DHTNode>,
243    /// Closest nodes found
244    pub closest: Vec<DHTNode>,
245    /// Lookup start time
246    pub started_at: Instant,
247    /// Maximum nodes to query in parallel
248    pub alpha: usize,
249}
250
251impl Default for DHTConfig {
252    fn default() -> Self {
253        Self {
254            replication_factor: 20,     // k = 20 (standard Kademlia)
255            bucket_size: 20,            // k = 20 nodes per bucket
256            alpha: 3,                   // α = 3 concurrent lookups
257            record_ttl: Duration::from_secs(24 * 60 * 60), // 24 hours
258            bucket_refresh_interval: Duration::from_secs(60 * 60), // 1 hour
259            republish_interval: Duration::from_secs(24 * 60 * 60), // 24 hours
260            max_distance: 160,          // 160-bit distance space
261        }
262    }
263}
264
265impl Key {
266    /// Create a new key from raw data
267    pub fn new(data: &[u8]) -> Self {
268        let mut hasher = Sha256::new();
269        hasher.update(data);
270        let hash: [u8; 32] = hasher.finalize().into();
271        Self { hash }
272    }
273    
274    /// Create a key from existing hash
275    pub fn from_hash(hash: [u8; 32]) -> Self {
276        Self { hash }
277    }
278    
279    /// Create a random key
280    pub fn random() -> Self {
281        use rand::RngCore;
282        let mut hash = [0u8; 32];
283        rand::thread_rng().fill_bytes(&mut hash);
284        Self { hash }
285    }
286    
287    /// Get key as bytes
288    pub fn as_bytes(&self) -> &[u8] {
289        &self.hash
290    }
291    
292    /// Get key as hex string
293    pub fn to_hex(&self) -> String {
294        hex::encode(self.hash)
295    }
296    
297    /// Calculate XOR distance between two keys (Kademlia distance metric)
298    pub fn distance(&self, other: &Key) -> Key {
299        let mut result = [0u8; 32];
300        for i in 0..32 {
301            result[i] = self.hash[i] ^ other.hash[i];
302        }
303        Key { hash: result }
304    }
305    
306    /// Get the bit length of the distance (number of leading zeros)
307    pub fn leading_zeros(&self) -> u32 {
308        for (i, &byte) in self.hash.iter().enumerate() {
309            if byte != 0 {
310                return (i * 8) as u32 + byte.leading_zeros();
311            }
312        }
313        256 // All bits are zero
314    }
315    
316    /// Get bucket index for this key relative to local node
317    pub fn bucket_index(&self, local_id: &Key) -> usize {
318        let distance = self.distance(local_id);
319        let leading_zeros = distance.leading_zeros();
320        if leading_zeros >= 255 {
321            255 // Maximum bucket index
322        } else {
323            (255 - leading_zeros) as usize
324        }
325    }
326}
327
328impl Record {
329    /// Create a new record
330    pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
331        let now = SystemTime::now();
332        let ttl = Duration::from_secs(24 * 60 * 60); // 24 hours default
333        
334        Self {
335            key,
336            value,
337            publisher,
338            created_at: now,
339            expires_at: now + ttl,
340            signature: None,
341        }
342    }
343    
344    /// Create a record with custom TTL
345    pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
346        let now = SystemTime::now();
347        
348        Self {
349            key,
350            value,
351            publisher,
352            created_at: now,
353            expires_at: now + ttl,
354            signature: None,
355        }
356    }
357    
358    /// Check if record has expired
359    pub fn is_expired(&self) -> bool {
360        SystemTime::now() > self.expires_at
361    }
362    
363    /// Get record age
364    pub fn age(&self) -> Duration {
365        SystemTime::now()
366            .duration_since(self.created_at)
367            .unwrap_or(Duration::ZERO)
368    }
369    
370    /// Sign the record (placeholder for future cryptographic verification)
371    pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
372        // Placeholder implementation
373        // In real implementation, this would create a cryptographic signature
374        self.signature = Some(vec![0u8; 64]); // Dummy signature
375        Ok(())
376    }
377    
378    /// Verify record signature (placeholder)
379    pub fn verify(&self, _public_key: &[u8]) -> bool {
380        // Placeholder implementation
381        // In real implementation, this would verify the cryptographic signature
382        self.signature.is_some()
383    }
384}
385
386impl DHTNode {
387    /// Create a new DHT node
388    pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
389        let node_key = Key::new(peer_id.as_bytes());
390        let distance = node_key.distance(local_id);
391        
392        Self {
393            peer_id,
394            addresses,
395            last_seen: Instant::now(),
396            distance,
397            is_connected: false,
398        }
399    }
400    
401    /// Create a new DHT node with explicit key (for testing)
402    pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
403        Self {
404            peer_id,
405            addresses,
406            last_seen: Instant::now(),
407            distance: key,
408            is_connected: false,
409        }
410    }
411    
412    /// Update last seen timestamp
413    pub fn touch(&mut self) {
414        self.last_seen = Instant::now();
415    }
416    
417    /// Check if node is stale
418    pub fn is_stale(&self, timeout: Duration) -> bool {
419        self.last_seen.elapsed() > timeout
420    }
421    
422    /// Get node key
423    pub fn key(&self) -> Key {
424        Key::new(self.peer_id.as_bytes())
425    }
426    
427    /// Convert to serializable form
428    pub fn to_serializable(&self) -> SerializableDHTNode {
429        SerializableDHTNode {
430            peer_id: self.peer_id.clone(),
431            addresses: self.addresses.clone(),
432            last_seen_secs: self.last_seen.elapsed().as_secs(),
433            distance: self.distance.clone(),
434            is_connected: self.is_connected,
435        }
436    }
437}
438
439impl SerializableDHTNode {
440    /// Convert from serializable form to DHTNode
441    pub fn to_dht_node(&self) -> DHTNode {
442        DHTNode {
443            peer_id: self.peer_id.clone(),
444            addresses: self.addresses.clone(),
445            last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
446            distance: self.distance.clone(),
447            is_connected: self.is_connected,
448        }
449    }
450}
451
452impl KBucket {
453    /// Create a new k-bucket
454    fn new(capacity: usize) -> Self {
455        Self {
456            nodes: VecDeque::new(),
457            capacity,
458            last_refresh: Instant::now(),
459        }
460    }
461    
462    /// Add a node to the bucket
463    fn add_node(&mut self, node: DHTNode) -> bool {
464        // Check if node already exists
465        if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
466            // Move to front (most recently seen)
467            let mut existing = self.nodes.remove(pos).unwrap();
468            existing.touch();
469            existing.is_connected = node.is_connected;
470            self.nodes.push_front(existing);
471            return true;
472        }
473        
474        if self.nodes.len() < self.capacity {
475            // Add new node to front
476            self.nodes.push_front(node);
477            true
478        } else {
479            // Bucket is full - could implement replacement strategy here
480            false
481        }
482    }
483    
484    /// Remove a node from the bucket
485    fn remove_node(&mut self, peer_id: &PeerId) -> bool {
486        if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
487            self.nodes.remove(pos);
488            true
489        } else {
490            false
491        }
492    }
493    
494    /// Get nodes closest to a target
495    fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
496        let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
497        nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
498        nodes.into_iter().take(count).collect()
499    }
500    
501    /// Check if bucket needs refresh
502    fn needs_refresh(&self, interval: Duration) -> bool {
503        self.last_refresh.elapsed() > interval
504    }
505}
506
507impl RoutingTable {
508    /// Create a new routing table
509    pub fn new(local_id: Key, config: DHTConfig) -> Self {
510        let mut buckets = Vec::new();
511        for _ in 0..256 {
512            buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
513        }
514        
515        Self {
516            local_id,
517            buckets,
518            config,
519        }
520    }
521    
522    /// Add a node to the routing table
523    pub async fn add_node(&self, node: DHTNode) -> Result<()> {
524        let bucket_index = node.key().bucket_index(&self.local_id);
525        let mut bucket = self.buckets[bucket_index].write().await;
526        
527        if bucket.add_node(node.clone()) {
528            debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
529        } else {
530            debug!("Bucket {} full, could not add node {}", bucket_index, node.peer_id);
531        }
532        
533        Ok(())
534    }
535    
536    /// Remove a node from the routing table
537    pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
538        let node_key = Key::new(peer_id.as_bytes());
539        let bucket_index = node_key.bucket_index(&self.local_id);
540        let mut bucket = self.buckets[bucket_index].write().await;
541        
542        if bucket.remove_node(peer_id) {
543            debug!("Removed node {} from bucket {}", peer_id, bucket_index);
544        }
545        
546        Ok(())
547    }
548    
549    /// Find nodes closest to a target key
550    pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
551        let mut all_nodes = Vec::new();
552        
553        // Check buckets in order of distance from target
554        let target_bucket = target.bucket_index(&self.local_id);
555        
556        // Start with the target bucket and expand outward
557        let mut checked = vec![false; 256];
558        let mut to_check = VecDeque::new();
559        to_check.push_back(target_bucket);
560        
561        while let Some(bucket_idx) = to_check.pop_front() {
562            if checked[bucket_idx] {
563                continue;
564            }
565            checked[bucket_idx] = true;
566            
567            let bucket = self.buckets[bucket_idx].read().await;
568            all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
569            
570            // Add adjacent buckets to check
571            if bucket_idx > 0 && !checked[bucket_idx - 1] {
572                to_check.push_back(bucket_idx - 1);
573            }
574            if bucket_idx < 255 && !checked[bucket_idx + 1] {
575                to_check.push_back(bucket_idx + 1);
576            }
577            
578            // Stop if we have enough nodes
579            if all_nodes.len() >= count * 2 {
580                break;
581            }
582        }
583        
584        // Sort by distance and return closest
585        all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
586        all_nodes.into_iter().take(count).collect()
587    }
588    
589    /// Get routing table statistics
590    pub async fn stats(&self) -> (usize, usize) {
591        let mut total_nodes = 0;
592        let mut active_buckets = 0;
593        
594        for bucket in &self.buckets {
595            let bucket_guard = bucket.read().await;
596            let node_count = bucket_guard.nodes.len();
597            total_nodes += node_count;
598            if node_count > 0 {
599                active_buckets += 1;
600            }
601        }
602        
603        (total_nodes, active_buckets)
604    }
605}
606
607impl DHTStorage {
608    /// Create new DHT storage
609    pub fn new(config: DHTConfig) -> Self {
610        Self {
611            records: RwLock::new(HashMap::new()),
612            config,
613        }
614    }
615    
616    /// Store a record
617    pub async fn store(&self, record: Record) -> Result<()> {
618        let mut records = self.records.write().await;
619        records.insert(record.key.clone(), record);
620        Ok(())
621    }
622    
623    /// Retrieve a record
624    pub async fn get(&self, key: &Key) -> Option<Record> {
625        let records = self.records.read().await;
626        records.get(key).cloned()
627    }
628    
629    /// Remove expired records
630    pub async fn cleanup_expired(&self) -> usize {
631        let mut records = self.records.write().await;
632        let initial_count = records.len();
633        records.retain(|_, record| !record.is_expired());
634        initial_count - records.len()
635    }
636    
637    /// Get all stored records (for republishing)
638    pub async fn all_records(&self) -> Vec<Record> {
639        let records = self.records.read().await;
640        records.values().cloned().collect()
641    }
642    
643    /// Get storage statistics
644    pub async fn stats(&self) -> (usize, usize) {
645        let records = self.records.read().await;
646        let total = records.len();
647        let expired = records.values().filter(|r| r.is_expired()).count();
648        (total, expired)
649    }
650}
651
652impl DHT {
653    /// Create a new DHT instance
654    pub fn new(local_id: Key, config: DHTConfig) -> Self {
655        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
656        let storage = DHTStorage::new(config.clone());
657        
658        Self {
659            local_id,
660            routing_table,
661            storage,
662            config,
663            skademlia: None,
664            ipv6_identity_manager: None,
665        }
666    }
667    
668    /// Create a new DHT instance with S/Kademlia security extensions
669    pub fn new_with_security(local_id: Key, config: DHTConfig, skademlia_config: skademlia::SKademliaConfig) -> Self {
670        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
671        let storage = DHTStorage::new(config.clone());
672        let skademlia = skademlia::SKademlia::new(skademlia_config);
673        
674        Self {
675            local_id,
676            routing_table,
677            storage,
678            config,
679            skademlia: Some(skademlia),
680            ipv6_identity_manager: None,
681        }
682    }
683
684    /// Create a new DHT instance with full IPv6 security integration
685    pub fn new_with_ipv6_security(
686        local_id: Key, 
687        config: DHTConfig, 
688        skademlia_config: skademlia::SKademliaConfig,
689        ipv6_config: ipv6_identity::IPv6DHTConfig
690    ) -> Self {
691        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
692        let storage = DHTStorage::new(config.clone());
693        let skademlia = skademlia::SKademlia::new(skademlia_config);
694        let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
695        
696        Self {
697            local_id,
698            routing_table,
699            storage,
700            config,
701            skademlia: Some(skademlia),
702            ipv6_identity_manager: Some(ipv6_identity_manager),
703        }
704    }
705
706    /// Initialize local IPv6 identity
707    pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
708        if let Some(ref mut manager) = self.ipv6_identity_manager {
709            // Update local_id to match IPv6 identity
710            self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
711            manager.set_local_identity(identity)?;
712            info!("Local IPv6 identity set and DHT key updated");
713            Ok(())
714        } else {
715            Err(P2PError::Security("IPv6 identity manager not enabled".to_string()).into())
716        }
717    }
718    
719    /// Add a bootstrap node to the DHT
720    pub async fn add_bootstrap_node(&self, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
721        let node = DHTNode::new(peer_id, addresses, &self.local_id);
722        self.routing_table.add_node(node).await
723    }
724
725    /// Add an IPv6-verified node to the DHT
726    pub async fn add_ipv6_node(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>, ipv6_identity: crate::security::IPv6NodeID) -> Result<()> {
727        if let Some(ref mut manager) = self.ipv6_identity_manager {
728            // Validate the node join with IPv6 security checks
729            let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
730            let security_event = manager.validate_node_join(&base_node, &ipv6_identity).await?;
731            
732            match security_event {
733                ipv6_identity::IPv6SecurityEvent::NodeJoined { verification_confidence, .. } => {
734                    // Enhance the node with IPv6 identity
735                    let ipv6_node = manager.enhance_dht_node(base_node.clone(), ipv6_identity).await?;
736                    
737                    // Update the DHT key to match IPv6 identity
738                    let mut enhanced_base_node = base_node;
739                    enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
740                    
741                    // Add to routing table
742                    self.routing_table.add_node(enhanced_base_node).await?;
743                    
744                    info!("Added IPv6-verified node {} with confidence {:.2}", peer_id, verification_confidence);
745                    Ok(())
746                }
747                ipv6_identity::IPv6SecurityEvent::VerificationFailed { reason, .. } => {
748                    Err(P2PError::Security(format!("IPv6 verification failed: {}", reason)).into())
749                }
750                ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => {
751                    Err(P2PError::Security(format!("IP diversity violation: {}", subnet_type)).into())
752                }
753                ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => {
754                    Err(P2PError::Security(format!("Node banned: {}", reason)).into())
755                }
756                _ => {
757                    Err(P2PError::Security("Unexpected security event".to_string()).into())
758                }
759            }
760        } else {
761            // Fallback to regular node addition if IPv6 manager not enabled
762            self.add_bootstrap_node(peer_id, addresses).await
763        }
764    }
765
766    /// Remove node with IPv6 cleanup
767    pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
768        // Remove from routing table
769        self.routing_table.remove_node(peer_id).await?;
770        
771        // Clean up IPv6 tracking
772        if let Some(ref mut manager) = self.ipv6_identity_manager {
773            manager.remove_node(peer_id);
774        }
775        
776        Ok(())
777    }
778
779    /// Check if node is banned due to IPv6 security violations
780    pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
781        if let Some(ref manager) = self.ipv6_identity_manager {
782            manager.is_node_banned(peer_id)
783        } else {
784            false
785        }
786    }
787    
788    /// Store a record in the DHT with replication
789    pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
790        let record = Record::new(key.clone(), value, self.local_id.to_hex());
791        
792        // Store locally first
793        self.storage.store(record.clone()).await?;
794        
795        // Find nodes closest to the key for replication
796        let closest_nodes = self.routing_table
797            .closest_nodes(&key, self.config.replication_factor)
798            .await;
799        
800        info!("Storing record with key {} on {} nodes", key.to_hex(), closest_nodes.len());
801        
802        // If no other nodes available, just store locally (single node scenario)
803        if closest_nodes.is_empty() {
804            info!("No other nodes available for replication, storing only locally");
805            return Ok(());
806        }
807        
808        // Replicate to closest nodes (simulated for now)
809        let mut successful_replications = 0;
810        for node in &closest_nodes {
811            if self.replicate_record(&record, node).await.is_ok() {
812                successful_replications += 1;
813            }
814        }
815        
816        info!("Successfully replicated record {} to {}/{} nodes", 
817              key.to_hex(), successful_replications, closest_nodes.len());
818        
819        // Consider replication successful if we stored to at least 1 node or have reasonable coverage
820        let required_replications = if closest_nodes.len() == 1 {
821            1
822        } else {
823            std::cmp::max(1, closest_nodes.len() / 2)
824        };
825        
826        if successful_replications >= required_replications {
827            Ok(())
828        } else {
829            Err(P2PError::DHT(format!(
830                "Insufficient replication: only {}/{} nodes stored the record (required: {})", 
831                successful_replications, closest_nodes.len(), required_replications
832            )).into())
833        }
834    }
835    
836    /// Retrieve a record from the DHT with consistency checks
837    pub async fn get(&self, key: &Key) -> Option<Record> {
838        // Check local storage first
839        if let Some(record) = self.storage.get(key).await {
840            if !record.is_expired() {
841                return Some(record);
842            }
843        }
844        
845        // Perform iterative lookup to find the record
846        if let Some(record) = self.iterative_find_value(key).await {
847            // Store locally for future access (caching)
848            if self.storage.store(record.clone()).await.is_ok() {
849                debug!("Cached retrieved record with key {}", key.to_hex());
850            }
851            return Some(record);
852        }
853        
854        None
855    }
856    
857    /// Find nodes close to a key
858    pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
859        self.routing_table.closest_nodes(key, self.config.replication_factor).await
860    }
861    
862    /// Handle incoming DHT query
863    pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
864        match query {
865            DHTQuery::FindNode { key, requester: _ } => {
866                let nodes = self.find_node(&key).await;
867                let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
868                DHTResponse::Nodes { nodes: serializable_nodes }
869            }
870            DHTQuery::FindValue { key, requester: _ } => {
871                if let Some(record) = self.storage.get(&key).await {
872                    if !record.is_expired() {
873                        return DHTResponse::Value { record };
874                    }
875                }
876                let nodes = self.find_node(&key).await;
877                let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
878                DHTResponse::Nodes { nodes: serializable_nodes }
879            }
880            DHTQuery::Store { record, requester: _ } => {
881                match self.storage.store(record).await {
882                    Ok(()) => DHTResponse::Stored { success: true },
883                    Err(_) => DHTResponse::Stored { success: false },
884                }
885            }
886            DHTQuery::Ping { requester: _ } => {
887                DHTResponse::Pong { responder: self.local_id.to_hex() }
888            }
889        }
890    }
891    
892    /// Get DHT statistics
893    pub async fn stats(&self) -> DHTStats {
894        let (total_nodes, active_buckets) = self.routing_table.stats().await;
895        let (stored_records, expired_records) = self.storage.stats().await;
896        
897        DHTStats {
898            local_id: self.local_id.clone(),
899            total_nodes,
900            active_buckets,
901            stored_records,
902            expired_records,
903        }
904    }
905    
906    /// Perform periodic maintenance
907    pub async fn maintenance(&self) -> Result<()> {
908        // Clean up expired records
909        let expired_count = self.storage.cleanup_expired().await;
910        if expired_count > 0 {
911            debug!("Cleaned up {} expired records", expired_count);
912        }
913        
914        // Republish records that are close to expiration
915        self.republish_records().await?;
916        
917        // Refresh buckets that haven't been active
918        self.refresh_buckets().await?;
919        
920        // Note: S/Kademlia cleanup would happen here in a mutable context
921        
922        Ok(())
923    }
924    
925    /// Perform secure lookup using S/Kademlia disjoint paths with distance verification
926    pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
927        // Check local storage first
928        if let Some(record) = self.storage.get(key).await {
929            if !record.is_expired() {
930                return Ok(Some(record));
931            }
932        }
933        
934        // Check if S/Kademlia is enabled and extract configuration
935        let (enable_distance_verification, disjoint_path_count, min_reputation) = if let Some(ref skademlia) = self.skademlia {
936            (skademlia.config.enable_distance_verification, 
937             skademlia.config.disjoint_path_count,
938             skademlia.config.min_routing_reputation)
939        } else {
940            // Fallback to regular get if S/Kademlia not enabled
941            return Ok(self.get(key).await);
942        };
943        
944        // Get initial nodes for disjoint path lookup
945        let initial_nodes = self.routing_table
946            .closest_nodes(key, disjoint_path_count * 3)
947            .await;
948        
949        if initial_nodes.is_empty() {
950            return Ok(None);
951        }
952        
953        // Perform secure lookup with disjoint paths
954        let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
955            skademlia.secure_lookup(key.clone(), initial_nodes).await?
956        } else {
957            return Ok(None);
958        };
959        
960        // Query the securely found nodes with distance verification
961        for node in &secure_nodes {
962            // Verify node distance before querying if enabled
963            if enable_distance_verification {
964                let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
965                
966                let consensus = if let Some(ref mut skademlia) = self.skademlia {
967                    skademlia.verify_distance_consensus(&node.peer_id, key, witness_nodes).await?
968                } else {
969                    continue;
970                };
971                
972                if consensus.confidence < min_reputation {
973                    debug!("Skipping node {} due to low distance verification confidence", node.peer_id);
974                    continue;
975                }
976            }
977            
978            let query = DHTQuery::FindValue { 
979                key: key.clone(), 
980                requester: self.local_id.to_hex() 
981            };
982            if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
983                // Store locally for future access
984                let _ = self.storage.store(record.clone()).await;
985                return Ok(Some(record));
986            }
987        }
988        
989        Ok(None)
990    }
991    
992    /// Store a record using S/Kademlia security-aware node selection
993    pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
994        let record = Record::new(key.clone(), value, self.local_id.to_hex());
995        
996        // Store locally first
997        self.storage.store(record.clone()).await?;
998        
999        // Get secure nodes and perform replications
1000        let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1001            // Get candidate nodes
1002            let candidate_nodes = self.routing_table
1003                .closest_nodes(&key, self.config.replication_factor * 2)
1004                .await;
1005            
1006            // Use S/Kademlia to select secure nodes based on reputation
1007            skademlia.select_secure_nodes(
1008                &candidate_nodes, 
1009                &key, 
1010                self.config.replication_factor
1011            )
1012        } else {
1013            // Fallback to regular closest nodes
1014            self.routing_table.closest_nodes(&key, self.config.replication_factor).await
1015        };
1016        
1017        info!("Storing record with key {} on {} secure nodes", key.to_hex(), secure_nodes.len());
1018        
1019        // Perform replications and collect results
1020        let mut replication_results = Vec::new();
1021        let mut successful_replications = 0;
1022        
1023        for node in &secure_nodes {
1024            let success = self.replicate_record(&record, node).await.is_ok();
1025            replication_results.push((node.peer_id.clone(), success));
1026            if success {
1027                successful_replications += 1;
1028            }
1029        }
1030        
1031        // Update reputations if S/Kademlia enabled
1032        if let Some(ref mut skademlia) = self.skademlia {
1033            for (peer_id, success) in replication_results {
1034                skademlia.reputation_manager.update_reputation(
1035                    &peer_id, 
1036                    success, 
1037                    Duration::from_millis(100)
1038                );
1039            }
1040        }
1041        
1042        if successful_replications > 0 {
1043            info!("Successfully replicated to {}/{} secure nodes", 
1044                 successful_replications, secure_nodes.len());
1045        }
1046        
1047        Ok(())
1048    }
1049    
1050    /// Update sibling lists for a key range
1051    pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
1052        if let Some(ref mut skademlia) = self.skademlia {
1053            let nodes = self.routing_table.closest_nodes(&key, skademlia.config.sibling_list_size).await;
1054            skademlia.update_sibling_list(key, nodes);
1055        }
1056        Ok(())
1057    }
1058    
1059    /// Validate routing table consistency using S/Kademlia
1060    pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
1061        if let Some(ref skademlia) = self.skademlia {
1062            // Get sample of nodes for validation (using closest_nodes as a proxy)
1063            let sample_key = Key::random();
1064            let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
1065            skademlia.validate_routing_consistency(&sample_nodes).await
1066        } else {
1067            Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1068        }
1069    }
1070    
1071    /// Create a distance verification challenge for a peer
1072    pub fn create_distance_challenge(&mut self, peer_id: &PeerId, key: &Key) -> Option<skademlia::DistanceChallenge> {
1073        self.skademlia.as_mut()
1074            .map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
1075    }
1076    
1077    /// Verify a distance proof
1078    pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
1079        if let Some(ref skademlia) = self.skademlia {
1080            skademlia.verify_distance_proof(proof)
1081        } else {
1082            Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1083        }
1084    }
1085
1086    /// Verify distances of multiple nodes using enhanced consensus
1087    #[allow(dead_code)]
1088    async fn verify_node_distances(&self, nodes: &[DHTNode], _target_key: &Key, min_reputation: f64) -> Result<Vec<DHTNode>> {
1089        let mut verified_nodes = Vec::new();
1090        
1091        for node in nodes {
1092            let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1093            
1094            // Only proceed if we have enough witness nodes
1095            if witness_nodes.len() >= 2 {
1096                // Simulate consensus verification (simplified for now)
1097                let consensus_confidence = 0.8; // Placeholder
1098                
1099                if consensus_confidence >= min_reputation {
1100                    verified_nodes.push(node.clone());
1101                } else {
1102                    debug!("Node {} failed distance verification with confidence {}", 
1103                           node.peer_id, consensus_confidence);
1104                }
1105            }
1106        }
1107        
1108        Ok(verified_nodes)
1109    }
1110
1111    /// Select witness nodes for distance verification  
1112    async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
1113        // Get nodes that are close to the target but not the target itself
1114        let target_key = Key::new(target_peer.as_bytes());
1115        let candidate_nodes = self.routing_table.closest_nodes(&target_key, count * 2).await;
1116        
1117        candidate_nodes.into_iter()
1118            .filter(|node| node.peer_id != *target_peer)
1119            .take(count)
1120            .map(|node| node.peer_id)
1121            .collect()
1122    }
1123
1124    /// Create enhanced distance challenge with adaptive difficulty
1125    pub fn create_enhanced_distance_challenge(&mut self, peer_id: &PeerId, key: &Key, suspected_attack: bool) -> Option<skademlia::EnhancedDistanceChallenge> {
1126        if let Some(ref mut skademlia) = self.skademlia {
1127            Some(skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack))
1128        } else {
1129            None
1130        }
1131    }
1132
1133    /// Verify distance using multi-round challenge protocol  
1134    pub async fn verify_distance_multi_round(&mut self, challenge: &skademlia::EnhancedDistanceChallenge) -> Result<bool> {
1135        if let Some(ref mut skademlia) = self.skademlia {
1136            skademlia.verify_distance_multi_round(challenge).await
1137        } else {
1138            Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1139        }
1140    }
1141    
1142    /// Get security bucket for a key range
1143    pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
1144        self.skademlia.as_mut()
1145            .map(|skademlia| skademlia.get_security_bucket(key))
1146    }
1147    
1148    /// Add trusted node to security bucket
1149    pub async fn add_trusted_node(&mut self, key: &Key, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
1150        if let Some(ref mut skademlia) = self.skademlia {
1151            let node = DHTNode::new(peer_id, addresses, &self.local_id);
1152            let security_bucket = skademlia.get_security_bucket(key);
1153            security_bucket.add_trusted_node(node);
1154        }
1155        Ok(())
1156    }
1157
1158    /// Perform IPv6-enhanced secure get operation
1159    pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1160        // Check if requester would be banned
1161        if self.is_node_banned(&self.local_id.to_hex()) {
1162            return Err(P2PError::Security("Local node is banned".to_string()).into());
1163        }
1164
1165        // Check local storage first
1166        if let Some(record) = self.storage.get(key).await {
1167            if !record.is_expired() {
1168                return Ok(Some(record));
1169            }
1170        }
1171        
1172        // Get IPv6-verified nodes for secure lookup
1173        let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
1174        
1175        if verified_nodes.is_empty() {
1176            // Fallback to regular secure_get if no IPv6 nodes available
1177            return self.secure_get(key).await;
1178        }
1179
1180        // Perform S/Kademlia secure lookup with IPv6-verified nodes
1181        if let Some(ref mut skademlia) = self.skademlia {
1182            let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
1183            
1184            // Query nodes with both distance and IPv6 verification
1185            for node in &secure_nodes {
1186                // Additional IPv6 verification
1187                if let Some(ref manager) = self.ipv6_identity_manager {
1188                    if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1189                        // Check if IPv6 identity needs refresh
1190                        if ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1191                            debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
1192                            continue;
1193                        }
1194                    } else {
1195                        debug!("Skipping node {} without verified IPv6 identity", node.peer_id);
1196                        continue;
1197                    }
1198                }
1199                
1200                let query = DHTQuery::FindValue { 
1201                    key: key.clone(), 
1202                    requester: self.local_id.to_hex() 
1203                };
1204                if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1205                    // Update IPv6 reputation for successful response
1206                    if let Some(ref mut manager) = self.ipv6_identity_manager {
1207                        manager.update_ipv6_reputation(&node.peer_id, true);
1208                    }
1209                    
1210                    // Store locally for future access
1211                    let _ = self.storage.store(record.clone()).await;
1212                    return Ok(Some(record));
1213                }
1214            }
1215        }
1216        
1217        Ok(None)
1218    }
1219
1220    /// Perform IPv6-enhanced secure put operation
1221    pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1222        // Check if local node would be banned
1223        if self.is_node_banned(&self.local_id.to_hex()) {
1224            return Err(P2PError::Security("Local node is banned".to_string()).into());
1225        }
1226
1227        let record = Record::new(key.clone(), value, self.local_id.to_hex());
1228        
1229        // Store locally first
1230        self.storage.store(record.clone()).await?;
1231        
1232        // Get IPv6-verified nodes for secure replication
1233        let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
1234        
1235        // Use S/Kademlia for secure node selection among verified nodes
1236        let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1237            skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
1238        } else {
1239            verified_nodes.into_iter().take(self.config.replication_factor).collect()
1240        };
1241
1242        info!("Storing record with key {} on {} IPv6-verified secure nodes", key.to_hex(), secure_nodes.len());
1243        
1244        // Perform replications with IPv6 reputation tracking
1245        let mut successful_replications = 0;
1246        
1247        for node in &secure_nodes {
1248            let success = self.replicate_record(&record, node).await.is_ok();
1249            
1250            // Update IPv6 reputation based on replication success
1251            if let Some(ref mut manager) = self.ipv6_identity_manager {
1252                manager.update_ipv6_reputation(&node.peer_id, success);
1253            }
1254            
1255            if success {
1256                successful_replications += 1;
1257            }
1258        }
1259        
1260        if successful_replications == 0 && !secure_nodes.is_empty() {
1261            return Err(P2PError::DHT("Failed to replicate to any IPv6-verified nodes".to_string()).into());
1262        }
1263        
1264        info!("Successfully replicated to {}/{} IPv6-verified nodes", 
1265              successful_replications, secure_nodes.len());
1266        Ok(())
1267    }
1268
1269    /// Get IPv6-verified nodes suitable for a key
1270    async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
1271        let mut verified_nodes = Vec::new();
1272        
1273        // Get closest nodes from routing table
1274        let candidate_nodes = self.routing_table.closest_nodes(key, self.config.replication_factor * 2).await;
1275        
1276        if let Some(ref manager) = self.ipv6_identity_manager {
1277            for node in candidate_nodes {
1278                // Check if node has verified IPv6 identity
1279                if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1280                    // Check if identity is still fresh
1281                    if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1282                        // Check if node is not banned
1283                        if !manager.is_node_banned(&node.peer_id) {
1284                            verified_nodes.push(node);
1285                        }
1286                    }
1287                }
1288            }
1289        } else {
1290            // If no IPv6 manager, return all candidates
1291            verified_nodes = candidate_nodes;
1292        }
1293        
1294        Ok(verified_nodes)
1295    }
1296
1297    /// Get IPv6 diversity statistics
1298    pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
1299        self.ipv6_identity_manager.as_ref()
1300            .map(|manager| manager.get_ipv6_diversity_stats())
1301    }
1302
1303    /// Cleanup expired IPv6 identities and reputation data
1304    pub fn cleanup_ipv6_data(&mut self) {
1305        if let Some(ref mut manager) = self.ipv6_identity_manager {
1306            manager.cleanup_expired();
1307        }
1308    }
1309
1310    /// Ban a node for IPv6 security violations
1311    pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
1312        if let Some(ref mut manager) = self.ipv6_identity_manager {
1313            manager.ban_node(peer_id, reason);
1314        }
1315    }
1316
1317    /// Get local IPv6 identity
1318    pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
1319        self.ipv6_identity_manager.as_ref()
1320            .and_then(|manager| manager.get_local_identity())
1321    }
1322    
1323    /// Replicate a record to a specific node
1324    async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
1325        // In a real implementation, this would send a STORE message over the network
1326        // For now, we simulate successful replication to nodes in our routing table
1327        debug!("Replicating record {} to node {}", record.key.to_hex(), node.peer_id);
1328        
1329        // Simulate network delay and occasional failures
1330        tokio::time::sleep(Duration::from_millis(10)).await;
1331        
1332        // Simulate 95% success rate for replication (high success rate for testing)
1333        if rand::random::<f64>() < 0.95 {
1334            Ok(())
1335        } else {
1336            Err(P2PError::Network("Replication failed".to_string()).into())
1337        }
1338    }
1339    
1340    /// Perform iterative lookup to find a value
1341    async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
1342        debug!("Starting iterative lookup for key {}", key.to_hex());
1343        
1344        let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1345        
1346        // Start with closest nodes from routing table
1347        let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1348        lookup_state.add_nodes(initial_nodes);
1349        
1350        // Perform iterative queries
1351        let mut iterations = 0;
1352        const MAX_ITERATIONS: usize = 10;
1353        
1354        while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1355            let nodes_to_query = lookup_state.next_nodes();
1356            if nodes_to_query.is_empty() {
1357                break;
1358            }
1359            
1360            // Query nodes in parallel
1361            let mut queries = Vec::new();
1362            for node in &nodes_to_query {
1363                let query = DHTQuery::FindValue { 
1364                    key: key.clone(), 
1365                    requester: self.local_id.to_hex() 
1366                };
1367                queries.push(self.simulate_query(node, query));
1368            }
1369            
1370            // Process responses
1371            for query_result in futures::future::join_all(queries).await {
1372                match query_result {
1373                    Ok(DHTResponse::Value { record }) => {
1374                        debug!("Found value for key {} in iteration {}", key.to_hex(), iterations);
1375                        return Some(record);
1376                    }
1377                    Ok(DHTResponse::Nodes { nodes }) => {
1378                        let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1379                            .map(|n| n.to_dht_node())
1380                            .collect();
1381                        lookup_state.add_nodes(dht_nodes);
1382                    }
1383                    _ => {
1384                        // Query failed or returned unexpected response
1385                        debug!("Query failed during iterative lookup");
1386                    }
1387                }
1388            }
1389            
1390            iterations += 1;
1391        }
1392        
1393        debug!("Iterative lookup for key {} completed after {} iterations, value not found", 
1394               key.to_hex(), iterations);
1395        None
1396    }
1397    
1398    /// Simulate a query to a remote node (placeholder for real network implementation)
1399    async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
1400        // Add some realistic delay
1401        tokio::time::sleep(Duration::from_millis(50)).await;
1402        
1403        // Handle the query locally (simulating remote node response)
1404        Ok(self.handle_query(query).await)
1405    }
1406    
1407    /// Republish records that are close to expiration
1408    async fn republish_records(&self) -> Result<()> {
1409        let all_records = self.storage.all_records().await;
1410        let mut republished_count = 0;
1411        
1412        for record in all_records {
1413            // Republish if record has less than 1/4 of its TTL remaining
1414            let remaining_ttl = record.expires_at
1415                .duration_since(SystemTime::now())
1416                .unwrap_or(Duration::ZERO);
1417            
1418            if remaining_ttl < self.config.record_ttl / 4 {
1419                // Find nodes responsible for this key
1420                let closest_nodes = self.routing_table
1421                    .closest_nodes(&record.key, self.config.replication_factor)
1422                    .await;
1423                
1424                // Republish to closest nodes
1425                for node in &closest_nodes {
1426                    if self.replicate_record(&record, node).await.is_ok() {
1427                        republished_count += 1;
1428                    }
1429                }
1430            }
1431        }
1432        
1433        if republished_count > 0 {
1434            debug!("Republished {} records during maintenance", republished_count);
1435        }
1436        
1437        Ok(())
1438    }
1439    
1440    /// Refresh buckets that haven't been active recently
1441    async fn refresh_buckets(&self) -> Result<()> {
1442        let mut refreshed_count = 0;
1443        
1444        for bucket_index in 0..256 {
1445            let needs_refresh = {
1446                let bucket = self.routing_table.buckets[bucket_index].read().await;
1447                bucket.needs_refresh(self.config.bucket_refresh_interval)
1448            };
1449            
1450            if needs_refresh {
1451                // Generate a random key in this bucket's range and perform lookup
1452                let target_key = self.generate_key_for_bucket(bucket_index);
1453                let _nodes = self.iterative_find_node(&target_key).await;
1454                refreshed_count += 1;
1455                
1456                // Update bucket refresh time
1457                {
1458                    let mut bucket = self.routing_table.buckets[bucket_index].write().await;
1459                    bucket.last_refresh = Instant::now();
1460                }
1461            }
1462        }
1463        
1464        if refreshed_count > 0 {
1465            debug!("Refreshed {} buckets during maintenance", refreshed_count);
1466        }
1467        
1468        Ok(())
1469    }
1470    
1471    /// Generate a key that would fall into the specified bucket
1472    fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
1473        let mut key_bytes = self.local_id.as_bytes().to_vec();
1474        
1475        // Flip the bit at position (255 - bucket_index) to ensure distance
1476        if bucket_index < 256 {
1477            let byte_index = (255 - bucket_index) / 8;
1478            let bit_index = (255 - bucket_index) % 8;
1479            
1480            if byte_index < key_bytes.len() {
1481                key_bytes[byte_index] ^= 1 << bit_index;
1482            }
1483        }
1484        
1485        let mut hash = [0u8; 32];
1486        hash.copy_from_slice(&key_bytes);
1487        Key::from_hash(hash)
1488    }
1489    
1490    /// Perform iterative node lookup
1491    async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
1492        debug!("Starting iterative node lookup for key {}", key.to_hex());
1493        
1494        let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1495        
1496        // Start with closest nodes from routing table
1497        let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1498        lookup_state.add_nodes(initial_nodes);
1499        
1500        // Perform iterative queries
1501        let mut iterations = 0;
1502        const MAX_ITERATIONS: usize = 10;
1503        
1504        while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1505            let nodes_to_query = lookup_state.next_nodes();
1506            if nodes_to_query.is_empty() {
1507                break;
1508            }
1509            
1510            // Query nodes in parallel
1511            let mut queries = Vec::new();
1512            for node in &nodes_to_query {
1513                let query = DHTQuery::FindNode { 
1514                    key: key.clone(), 
1515                    requester: self.local_id.to_hex() 
1516                };
1517                queries.push(self.simulate_query(node, query));
1518            }
1519            
1520            // Process responses
1521            for query_result in futures::future::join_all(queries).await {
1522                if let Ok(DHTResponse::Nodes { nodes }) = query_result {
1523                    let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1524                        .map(|n| n.to_dht_node())
1525                        .collect();
1526                    lookup_state.add_nodes(dht_nodes);
1527                }
1528            }
1529            
1530            iterations += 1;
1531        }
1532        
1533        debug!("Iterative node lookup for key {} completed after {} iterations", 
1534               key.to_hex(), iterations);
1535        
1536        // Return the closest nodes found
1537        lookup_state.closest.into_iter()
1538            .take(self.config.replication_factor)
1539            .collect()
1540    }
1541    
1542    /// Check consistency of a record across multiple nodes
1543    pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
1544        debug!("Checking consistency for key {}", key.to_hex());
1545        
1546        // Find nodes that should have this record
1547        let closest_nodes = self.routing_table
1548            .closest_nodes(key, self.config.replication_factor)
1549            .await;
1550        
1551        let mut records_found = Vec::new();
1552        let mut nodes_queried = 0;
1553        let mut nodes_responded = 0;
1554        
1555        // Query each node for the record
1556        for node in &closest_nodes {
1557            nodes_queried += 1;
1558            
1559            let query = DHTQuery::FindValue { 
1560                key: key.clone(), 
1561                requester: self.local_id.to_hex() 
1562            };
1563            
1564            match self.simulate_query(node, query).await {
1565                Ok(DHTResponse::Value { record }) => {
1566                    nodes_responded += 1;
1567                    records_found.push((node.peer_id.clone(), record));
1568                }
1569                Ok(DHTResponse::Nodes { .. }) => {
1570                    nodes_responded += 1;
1571                    // Node doesn't have the record
1572                }
1573                _ => {
1574                    // Node didn't respond or error occurred
1575                }
1576            }
1577        }
1578        
1579        // Analyze consistency
1580        let mut consistent = true;
1581        let mut canonical_record: Option<Record> = None;
1582        let mut conflicts = Vec::new();
1583        
1584        for (node_id, record) in &records_found {
1585            if let Some(ref canonical) = canonical_record {
1586                // Check if records match
1587                if record.value != canonical.value || 
1588                   record.created_at != canonical.created_at ||
1589                   record.publisher != canonical.publisher {
1590                    consistent = false;
1591                    conflicts.push((node_id.clone(), record.clone()));
1592                }
1593            } else {
1594                canonical_record = Some(record.clone());
1595            }
1596        }
1597        
1598        let report = ConsistencyReport {
1599            key: key.clone(),
1600            nodes_queried,
1601            nodes_responded,
1602            records_found: records_found.len(),
1603            consistent,
1604            canonical_record,
1605            conflicts,
1606            replication_factor: self.config.replication_factor,
1607        };
1608        
1609        debug!("Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}", 
1610               key.to_hex(), report.nodes_queried, report.nodes_responded, 
1611               report.records_found, report.consistent);
1612        
1613        Ok(report)
1614    }
1615    
1616    /// Repair inconsistencies for a specific key
1617    pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
1618        debug!("Starting repair for key {}", key.to_hex());
1619        
1620        let consistency_report = self.check_consistency(key).await?;
1621        
1622        if consistency_report.consistent {
1623            return Ok(RepairResult {
1624                key: key.clone(),
1625                repairs_needed: false,
1626                repairs_attempted: 0,
1627                repairs_successful: 0,
1628                final_state: "consistent".to_string(),
1629            });
1630        }
1631        
1632        // Determine the canonical version (use most recent)
1633        let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
1634            canonical
1635        } else {
1636            return Ok(RepairResult {
1637                key: key.clone(),
1638                repairs_needed: false,
1639                repairs_attempted: 0,
1640                repairs_successful: 0,
1641                final_state: "no_records_found".to_string(),
1642            });
1643        };
1644        
1645        // Find the most recent version among conflicts
1646        let mut most_recent = canonical_record.clone();
1647        for (_, conflicted_record) in &consistency_report.conflicts {
1648            if conflicted_record.created_at > most_recent.created_at {
1649                most_recent = conflicted_record.clone();
1650            }
1651        }
1652        
1653        // Replicate the canonical version to all responsible nodes
1654        let closest_nodes = self.routing_table
1655            .closest_nodes(key, self.config.replication_factor)
1656            .await;
1657        
1658        let mut repairs_attempted = 0;
1659        let mut repairs_successful = 0;
1660        
1661        for node in &closest_nodes {
1662            repairs_attempted += 1;
1663            if self.replicate_record(&most_recent, node).await.is_ok() {
1664                repairs_successful += 1;
1665            }
1666        }
1667        
1668        let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
1669            "repaired".to_string()
1670        } else {
1671            "repair_failed".to_string()
1672        };
1673        
1674        debug!("Repair for key {} completed: {}/{} repairs successful, final state: {}", 
1675               key.to_hex(), repairs_successful, repairs_attempted, final_state);
1676        
1677        Ok(RepairResult {
1678            key: key.clone(),
1679            repairs_needed: true,
1680            repairs_attempted,
1681            repairs_successful,
1682            final_state,
1683        })
1684    }
1685    
1686    // ===============================
1687    // INBOX SYSTEM IMPLEMENTATION
1688    // ===============================
1689    
1690    /// Create a new inbox for a user with infinite TTL
1691    pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
1692        info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
1693        
1694        let inbox_key = Key::from_inbox_id(inbox_id);
1695        
1696        // Create inbox metadata record with infinite TTL
1697        let inbox_metadata = InboxMetadata {
1698            inbox_id: inbox_id.to_string(),
1699            owner: owner_peer_id.clone(),
1700            created_at: SystemTime::now(),
1701            message_count: 0,
1702            max_messages: 1000, // Configurable limit
1703            is_public: true,
1704            access_keys: vec![owner_peer_id.clone()],
1705        };
1706        
1707        let metadata_value = serde_json::to_vec(&inbox_metadata)
1708            .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox metadata: {}", e)))?;
1709        
1710        let metadata_record = Record {
1711            key: inbox_key.clone(),
1712            value: metadata_value,
1713            publisher: owner_peer_id.clone(),
1714            created_at: SystemTime::now(),
1715            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
1716            signature: None,
1717        };
1718        
1719        // Store metadata with infinite TTL
1720        self.put_record_with_infinite_ttl(metadata_record).await?;
1721        
1722        // Create empty message index
1723        let index_key = Key::from_inbox_index(inbox_id);
1724        let empty_index = InboxMessageIndex {
1725            inbox_id: inbox_id.to_string(),
1726            messages: Vec::new(),
1727            last_updated: SystemTime::now(),
1728        };
1729        
1730        let index_value = serde_json::to_vec(&empty_index)
1731            .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox index: {}", e)))?;
1732        
1733        let index_record = Record {
1734            key: index_key,
1735            value: index_value,
1736            publisher: owner_peer_id.clone(),
1737            created_at: SystemTime::now(),
1738            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
1739            signature: None,
1740        };
1741        
1742        self.put_record_with_infinite_ttl(index_record).await?;
1743        
1744        let inbox_info = InboxInfo {
1745            inbox_id: inbox_id.to_string(),
1746            three_word_address: self.generate_three_word_address(inbox_id),
1747            owner: owner_peer_id,
1748            created_at: SystemTime::now(),
1749            message_count: 0,
1750            is_accessible: true,
1751        };
1752        
1753        info!("Successfully created inbox {} with three-word address: {}", 
1754              inbox_id, inbox_info.three_word_address);
1755        
1756        Ok(inbox_info)
1757    }
1758    
1759    /// Send a message to an inbox
1760    pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
1761        info!("Sending message to inbox {}", inbox_id);
1762        
1763        // Get current inbox metadata
1764        let inbox_key = Key::from_inbox_id(inbox_id);
1765        let metadata_record = self.get(&inbox_key).await
1766            .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1767        
1768        let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
1769            .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1770        
1771        // Check message limit
1772        if inbox_metadata.message_count >= inbox_metadata.max_messages {
1773            return Err(P2PError::DHT(format!("Inbox {} is full", inbox_id)));
1774        }
1775        
1776        // Create message record with infinite TTL
1777        let message_key = Key::from_inbox_message(inbox_id, &message.id);
1778        let message_value = serde_json::to_vec(&message)
1779            .map_err(|e| P2PError::DHT(format!("Failed to serialize message: {}", e)))?;
1780        
1781        let message_record = Record {
1782            key: message_key.clone(),
1783            value: message_value,
1784            publisher: message.sender.clone(),
1785            created_at: message.timestamp,
1786            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
1787            signature: None,
1788        };
1789        
1790        self.put_record_with_infinite_ttl(message_record).await?;
1791        
1792        // Update message index
1793        let index_key = Key::from_inbox_index(inbox_id);
1794        let index_record = self.get(&index_key).await
1795            .ok_or_else(|| P2PError::DHT(format!("Inbox index {} not found", inbox_id)))?;
1796        
1797        let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1798            .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1799        
1800        message_index.messages.push(MessageRef {
1801            message_id: message.id.clone(),
1802            sender: message.sender.clone(),
1803            timestamp: message.timestamp,
1804            message_type: message.message_type.clone(),
1805        });
1806        message_index.last_updated = SystemTime::now();
1807        
1808        // Update metadata
1809        inbox_metadata.message_count += 1;
1810        
1811        // Store updated index and metadata
1812        let updated_index_value = serde_json::to_vec(&message_index)
1813            .map_err(|e| P2PError::DHT(format!("Failed to serialize updated index: {}", e)))?;
1814        
1815        let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
1816            .map_err(|e| P2PError::DHT(format!("Failed to serialize updated metadata: {}", e)))?;
1817        
1818        let updated_index_record = Record {
1819            key: index_key,
1820            value: updated_index_value,
1821            publisher: message.sender.clone(),
1822            created_at: SystemTime::now(),
1823            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1824            signature: None,
1825        };
1826        
1827        let updated_metadata_record = Record {
1828            key: inbox_key,
1829            value: updated_metadata_value,
1830            publisher: message.sender.clone(),
1831            created_at: SystemTime::now(),
1832            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1833            signature: None,
1834        };
1835        
1836        self.put_record_with_infinite_ttl(updated_index_record).await?;
1837        self.put_record_with_infinite_ttl(updated_metadata_record).await?;
1838        
1839        info!("Successfully sent message {} to inbox {}", message.id, inbox_id);
1840        Ok(())
1841    }
1842    
1843    /// Get messages from an inbox
1844    pub async fn get_inbox_messages(&self, inbox_id: &str, limit: Option<usize>) -> Result<Vec<InboxMessage>> {
1845        info!("Retrieving messages from inbox {}", inbox_id);
1846        
1847        let index_key = Key::from_inbox_index(inbox_id);
1848        let index_record = self.get(&index_key).await
1849            .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1850        
1851        let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1852            .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1853        
1854        let mut messages = Vec::new();
1855        let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
1856            message_index.messages.iter().rev().take(limit).collect()
1857        } else {
1858            message_index.messages.iter().collect()
1859        };
1860        
1861        for message_ref in message_refs {
1862            let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
1863            if let Some(message_record) = self.get(&message_key).await {
1864                if let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value) {
1865                    messages.push(message);
1866                }
1867            }
1868        }
1869        
1870        // Sort messages by timestamp (newest first)
1871        messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1872        
1873        info!("Retrieved {} messages from inbox {}", messages.len(), inbox_id);
1874        Ok(messages)
1875    }
1876    
1877    /// Get inbox information
1878    pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
1879        let inbox_key = Key::from_inbox_id(inbox_id);
1880        let metadata_record = self.get(&inbox_key).await;
1881        
1882        if let Some(record) = metadata_record {
1883            let metadata: InboxMetadata = serde_json::from_slice(&record.value)
1884                .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1885            
1886            let inbox_info = InboxInfo {
1887                inbox_id: inbox_id.to_string(),
1888                three_word_address: self.generate_three_word_address(inbox_id),
1889                owner: metadata.owner,
1890                created_at: metadata.created_at,
1891                message_count: metadata.message_count,
1892                is_accessible: true,
1893            };
1894            
1895            Ok(Some(inbox_info))
1896        } else {
1897            Ok(None)
1898        }
1899    }
1900    
1901    /// Store a record with infinite TTL (never expires)
1902    async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
1903        // Store locally first
1904        self.storage.store(record.clone()).await?;
1905        
1906        // Find closest nodes for replication
1907        let closest_nodes = self.routing_table
1908            .closest_nodes(&record.key, self.config.replication_factor)
1909            .await;
1910        
1911        // Replicate to closest nodes
1912        for node in &closest_nodes {
1913            if let Err(e) = self.replicate_record(&record, node).await {
1914                debug!("Failed to replicate infinite TTL record to node {}: {}", node.peer_id, e);
1915            }
1916        }
1917        
1918        Ok(())
1919    }
1920    
1921    /// Generate a three-word address for an inbox
1922    fn generate_three_word_address(&self, inbox_id: &str) -> String {
1923        use crate::bootstrap::words::WordEncoder;
1924        
1925        let encoder = WordEncoder::new();
1926        let fake_multiaddr = format!("/inbox/{}/dht", inbox_id).parse().unwrap_or_else(|_| {
1927            "/ip6/::1/udp/9000/quic".parse().unwrap()
1928        });
1929        
1930        if let Ok(words) = encoder.encode_multiaddr(&fake_multiaddr) {
1931            words.to_string()
1932        } else {
1933            format!("inbox.{}.messages", inbox_id.chars().take(8).collect::<String>())
1934        }
1935    }
1936}
1937
1938/// DHT statistics
1939#[derive(Debug, Clone)]
1940pub struct DHTStats {
1941    /// Local node ID
1942    pub local_id: Key,
1943    /// Total nodes in routing table
1944    pub total_nodes: usize,
1945    /// Number of active buckets
1946    pub active_buckets: usize,
1947    /// Number of stored records
1948    pub stored_records: usize,
1949    /// Number of expired records
1950    pub expired_records: usize,
1951}
1952
1953/// Consistency check report
1954#[derive(Debug, Clone)]
1955pub struct ConsistencyReport {
1956    /// Key being checked
1957    pub key: Key,
1958    /// Number of nodes queried
1959    pub nodes_queried: usize,
1960    /// Number of nodes that responded
1961    pub nodes_responded: usize,
1962    /// Number of records found
1963    pub records_found: usize,
1964    /// Whether all records are consistent
1965    pub consistent: bool,
1966    /// The canonical record (if any)
1967    pub canonical_record: Option<Record>,
1968    /// Conflicting records found
1969    pub conflicts: Vec<(PeerId, Record)>,
1970    /// Expected replication factor
1971    pub replication_factor: usize,
1972}
1973
1974/// Result of a repair operation
1975#[derive(Debug, Clone)]
1976pub struct RepairResult {
1977    /// Key that was repaired
1978    pub key: Key,
1979    /// Whether repairs were needed
1980    pub repairs_needed: bool,
1981    /// Number of repair attempts made
1982    pub repairs_attempted: usize,
1983    /// Number of successful repairs
1984    pub repairs_successful: usize,
1985    /// Final state description
1986    pub final_state: String,
1987}
1988
1989impl LookupState {
1990    /// Create a new lookup state
1991    pub fn new(target: Key, alpha: usize) -> Self {
1992        Self {
1993            target,
1994            queried: HashMap::new(),
1995            to_query: VecDeque::new(),
1996            closest: Vec::new(),
1997            started_at: Instant::now(),
1998            alpha,
1999        }
2000    }
2001    
2002    /// Add nodes to query
2003    pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
2004        for node in nodes {
2005            if !self.queried.contains_key(&node.peer_id) {
2006                self.to_query.push_back(node);
2007            }
2008        }
2009        
2010        // Sort by distance to target
2011        let target = &self.target;
2012        self.to_query.make_contiguous().sort_by_key(|node| {
2013            node.key().distance(target).as_bytes().to_vec()
2014        });
2015    }
2016    
2017    /// Get next nodes to query
2018    pub fn next_nodes(&mut self) -> Vec<DHTNode> {
2019        let mut nodes = Vec::new();
2020        for _ in 0..self.alpha {
2021            if let Some(node) = self.to_query.pop_front() {
2022                self.queried.insert(node.peer_id.clone(), Instant::now());
2023                nodes.push(node);
2024            } else {
2025                break;
2026            }
2027        }
2028        nodes
2029    }
2030    
2031    /// Check if lookup is complete
2032    pub fn is_complete(&self) -> bool {
2033        self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
2034    }
2035}
2036
2037// ===============================
2038// INBOX SYSTEM DATA STRUCTURES
2039// ===============================
2040
2041/// Inbox metadata stored in DHT
2042#[derive(Debug, Clone, Serialize, Deserialize)]
2043pub struct InboxMetadata {
2044    pub inbox_id: String,
2045    pub owner: PeerId,
2046    pub created_at: SystemTime,
2047    pub message_count: usize,
2048    pub max_messages: usize,
2049    pub is_public: bool,
2050    pub access_keys: Vec<PeerId>,
2051}
2052
2053/// Inbox message stored in DHT
2054#[derive(Debug, Clone, Serialize, Deserialize)]
2055pub struct InboxMessage {
2056    pub id: String,
2057    pub sender: PeerId,
2058    pub recipient_inbox: String,
2059    pub content: String,
2060    pub message_type: String,
2061    pub timestamp: SystemTime,
2062    pub attachments: Vec<MessageAttachment>,
2063}
2064
2065/// Message attachment metadata
2066#[derive(Debug, Clone, Serialize, Deserialize)]
2067pub struct MessageAttachment {
2068    pub filename: String,
2069    pub content_type: String,
2070    pub size: u64,
2071    pub hash: String,
2072}
2073
2074/// Message index for efficient inbox querying
2075#[derive(Debug, Clone, Serialize, Deserialize)]
2076pub struct InboxMessageIndex {
2077    pub inbox_id: String,
2078    pub messages: Vec<MessageRef>,
2079    pub last_updated: SystemTime,
2080}
2081
2082/// Reference to a message in the index
2083#[derive(Debug, Clone, Serialize, Deserialize)]
2084pub struct MessageRef {
2085    pub message_id: String,
2086    pub sender: PeerId,
2087    pub timestamp: SystemTime,
2088    pub message_type: String,
2089}
2090
2091/// Inbox information returned to users
2092#[derive(Debug, Clone, Serialize, Deserialize)]
2093pub struct InboxInfo {
2094    pub inbox_id: String,
2095    pub three_word_address: String,
2096    pub owner: PeerId,
2097    pub created_at: SystemTime,
2098    pub message_count: usize,
2099    pub is_accessible: bool,
2100}
2101
2102// ===============================
2103// KEY EXTENSIONS FOR INBOX SYSTEM
2104// ===============================
2105
2106impl Key {
2107    /// Create a key for inbox metadata
2108    pub fn from_inbox_id(inbox_id: &str) -> Self {
2109        let mut hasher = Sha256::new();
2110        hasher.update(b"INBOX_METADATA:");
2111        hasher.update(inbox_id.as_bytes());
2112        let hash = hasher.finalize();
2113        Key { hash: hash.into() }
2114    }
2115    
2116    /// Create a key for inbox message index
2117    pub fn from_inbox_index(inbox_id: &str) -> Self {
2118        let mut hasher = Sha256::new();
2119        hasher.update(b"INBOX_INDEX:");
2120        hasher.update(inbox_id.as_bytes());
2121        let hash = hasher.finalize();
2122        Key { hash: hash.into() }
2123    }
2124    
2125    /// Create a key for a specific message in an inbox
2126    pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
2127        let mut hasher = Sha256::new();
2128        hasher.update(b"INBOX_MESSAGE:");
2129        hasher.update(inbox_id.as_bytes());
2130        hasher.update(b":");
2131        hasher.update(message_id.as_bytes());
2132        let hash = hasher.finalize();
2133        Key { hash: hash.into() }
2134    }
2135}
2136
2137// Add hex dependency for key display
2138// This would need to be added to Cargo.toml dependencies