saorsa_core/
dht.rs

1//! Distributed Hash Table (DHT) Implementation
2//!
3//! This module provides a Kademlia-based DHT for distributed peer routing and data storage.
4//! It implements the core Kademlia algorithm with proper distance metrics, k-buckets,
5//! and network operations for a fully decentralized P2P system.
6//!
7//! The implementation includes S/Kademlia security extensions for enhanced protection
8//! against various attacks on the DHT infrastructure.
9
10use crate::{PeerId, Multiaddr, Result, P2PError};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::time::{Duration, Instant, SystemTime};
14use sha2::{Digest, Sha256};
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17use futures;
18
19// S/Kademlia security extensions
20pub mod skademlia;
21
22// IPv6-based node identity system
23pub mod ipv6_identity;
24
25// Enhanced storage with K=8 replication
26pub mod enhanced_storage;
27
28/// DHT configuration parameters
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct DHTConfig {
31    /// Replication parameter (k) - number of nodes to store each record
32    pub replication_factor: usize,
33    /// Maximum nodes per k-bucket
34    pub bucket_size: usize,
35    /// Concurrency parameter for parallel lookups
36    pub alpha: usize,
37    /// Record expiration time
38    pub record_ttl: Duration,
39    /// Refresh interval for buckets
40    pub bucket_refresh_interval: Duration,
41    /// Republish interval for stored records
42    pub republish_interval: Duration,
43    /// Maximum distance for considering nodes "close"
44    pub max_distance: u8,
45}
46
47/// DHT key type with proper Kademlia distance calculation
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct Key {
50    /// 256-bit key hash
51    hash: [u8; 32],
52}
53
54/// DHT record containing key-value data with metadata
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct Record {
57    /// Record key
58    pub key: Key,
59    /// Record value
60    pub value: Vec<u8>,
61    /// Publisher peer ID
62    pub publisher: PeerId,
63    /// Record creation time
64    pub created_at: SystemTime,
65    /// Record expiration time
66    pub expires_at: SystemTime,
67    /// Signature for verification (optional)
68    pub signature: Option<Vec<u8>>,
69}
70
71/// DHT node information
72#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
73pub struct DHTNode {
74    /// Node peer ID
75    pub peer_id: PeerId,
76    /// Node addresses
77    pub addresses: Vec<Multiaddr>,
78    /// Last seen timestamp (seconds since epoch)
79    #[serde(with = "instant_as_secs")]
80    pub last_seen: Instant,
81    /// Node distance from local node
82    pub distance: Key,
83    /// Connection status
84    pub is_connected: bool,
85}
86
87/// Serde helper for Instant serialization
88mod instant_as_secs {
89    use serde::{Deserializer, Serializer, Deserialize, Serialize};
90    use std::time::Instant;
91    
92    pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
93    where
94        S: Serializer,
95    {
96        // Convert to approximate seconds since creation
97        instant.elapsed().as_secs().serialize(serializer)
98    }
99    
100    pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
101    where
102        D: Deserializer<'de>,
103    {
104        let secs = u64::deserialize(deserializer)?;
105        // Return current instant minus the stored duration (approximate)
106        Ok(Instant::now() - std::time::Duration::from_secs(secs))
107    }
108}
109
110/// Serializable DHT node for network transmission
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct SerializableDHTNode {
113    /// Node peer ID
114    pub peer_id: PeerId,
115    /// Node addresses
116    pub addresses: Vec<Multiaddr>,
117    /// Last seen timestamp as seconds since epoch
118    pub last_seen_secs: u64,
119    /// Node distance from local node
120    pub distance: Key,
121    /// Connection status
122    pub is_connected: bool,
123}
124
125/// Kademlia routing table bucket
126#[derive(Debug)]
127struct KBucket {
128    /// Nodes in this bucket (up to k nodes)
129    nodes: VecDeque<DHTNode>,
130    /// Bucket capacity
131    capacity: usize,
132    /// Last refresh time
133    last_refresh: Instant,
134}
135
136/// Kademlia routing table
137#[derive(Debug)]
138pub struct RoutingTable {
139    /// Local node ID
140    local_id: Key,
141    /// K-buckets indexed by distance
142    buckets: Vec<RwLock<KBucket>>,
143    /// Configuration (reserved for future use)
144    #[allow(dead_code)]
145    config: DHTConfig,
146}
147
148/// DHT storage for local records
149#[derive(Debug)]
150pub struct DHTStorage {
151    /// Stored records
152    records: RwLock<HashMap<Key, Record>>,
153    /// Configuration (reserved for future use)
154    #[allow(dead_code)]
155    config: DHTConfig,
156}
157
158/// Main DHT implementation with S/Kademlia security extensions
159#[derive(Debug)]
160pub struct DHT {
161    /// Local node ID
162    local_id: Key,
163    /// Routing table
164    routing_table: RoutingTable,
165    /// Local storage
166    storage: DHTStorage,
167    /// Configuration (reserved for future use)
168    #[allow(dead_code)]
169    config: DHTConfig,
170    /// S/Kademlia security extensions
171    pub skademlia: Option<skademlia::SKademlia>,
172    /// IPv6-based identity manager
173    pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
174}
175
176/// DHT query types
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub enum DHTQuery {
179    /// Find nodes close to a key
180    FindNode { 
181        /// The key to find nodes near
182        key: Key, 
183        /// ID of the requesting peer
184        requester: PeerId 
185    },
186    /// Find value for a key
187    FindValue { 
188        /// The key to find value for
189        key: Key, 
190        /// ID of the requesting peer
191        requester: PeerId 
192    },
193    /// Store a record
194    Store { 
195        /// The record to store
196        record: Record, 
197        /// ID of the requesting peer
198        requester: PeerId 
199    },
200    /// Ping to check node availability
201    Ping { 
202        /// ID of the requesting peer
203        requester: PeerId 
204    },
205}
206
207/// DHT response types
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub enum DHTResponse {
210    /// Response to FindNode query
211    Nodes { 
212        /// List of nodes near the requested key
213        nodes: Vec<SerializableDHTNode> 
214    },
215    /// Response to FindValue query
216    Value { 
217        /// The found record
218        record: Record 
219    },
220    /// Response to Store query
221    Stored { 
222        /// Whether storage was successful
223        success: bool 
224    },
225    /// Response to Ping query
226    Pong { 
227        /// ID of the responding peer
228        responder: PeerId 
229    },
230    /// Error response
231    Error { 
232        /// Error message describing what went wrong
233        message: String 
234    },
235}
236
237/// DHT lookup state for iterative queries
238#[derive(Debug)]
239pub struct LookupState {
240    /// Target key
241    pub target: Key,
242    /// Nodes queried so far
243    pub queried: HashMap<PeerId, Instant>,
244    /// Nodes to query next
245    pub to_query: VecDeque<DHTNode>,
246    /// Closest nodes found
247    pub closest: Vec<DHTNode>,
248    /// Lookup start time
249    pub started_at: Instant,
250    /// Maximum nodes to query in parallel
251    pub alpha: usize,
252}
253
254impl Default for DHTConfig {
255    fn default() -> Self {
256        Self {
257            replication_factor: 20,     // k = 20 (standard Kademlia)
258            bucket_size: 20,            // k = 20 nodes per bucket
259            alpha: 3,                   // α = 3 concurrent lookups
260            record_ttl: Duration::from_secs(24 * 60 * 60), // 24 hours
261            bucket_refresh_interval: Duration::from_secs(60 * 60), // 1 hour
262            republish_interval: Duration::from_secs(24 * 60 * 60), // 24 hours
263            max_distance: 160,          // 160-bit distance space
264        }
265    }
266}
267
268impl Key {
269    /// Create a new key from raw data
270    pub fn new(data: &[u8]) -> Self {
271        let mut hasher = Sha256::new();
272        hasher.update(data);
273        let hash: [u8; 32] = hasher.finalize().into();
274        Self { hash }
275    }
276    
277    /// Create a key from existing hash
278    pub fn from_hash(hash: [u8; 32]) -> Self {
279        Self { hash }
280    }
281    
282    /// Create a random key
283    pub fn random() -> Self {
284        use rand::RngCore;
285        let mut hash = [0u8; 32];
286        rand::thread_rng().fill_bytes(&mut hash);
287        Self { hash }
288    }
289    
290    /// Get key as bytes
291    pub fn as_bytes(&self) -> &[u8] {
292        &self.hash
293    }
294    
295    /// Get key as hex string
296    pub fn to_hex(&self) -> String {
297        hex::encode(self.hash)
298    }
299    
300    /// Calculate XOR distance between two keys (Kademlia distance metric)
301    pub fn distance(&self, other: &Key) -> Key {
302        let mut result = [0u8; 32];
303        for i in 0..32 {
304            result[i] = self.hash[i] ^ other.hash[i];
305        }
306        Key { hash: result }
307    }
308    
309    /// Get the bit length of the distance (number of leading zeros)
310    pub fn leading_zeros(&self) -> u32 {
311        for (i, &byte) in self.hash.iter().enumerate() {
312            if byte != 0 {
313                return (i * 8) as u32 + byte.leading_zeros();
314            }
315        }
316        256 // All bits are zero
317    }
318    
319    /// Get bucket index for this key relative to local node
320    pub fn bucket_index(&self, local_id: &Key) -> usize {
321        let distance = self.distance(local_id);
322        let leading_zeros = distance.leading_zeros();
323        if leading_zeros >= 255 {
324            255 // Maximum bucket index
325        } else {
326            (255 - leading_zeros) as usize
327        }
328    }
329}
330
331impl Record {
332    /// Create a new record
333    pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
334        let now = SystemTime::now();
335        let ttl = Duration::from_secs(24 * 60 * 60); // 24 hours default
336        
337        Self {
338            key,
339            value,
340            publisher,
341            created_at: now,
342            expires_at: now + ttl,
343            signature: None,
344        }
345    }
346    
347    /// Create a record with custom TTL
348    pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
349        let now = SystemTime::now();
350        
351        Self {
352            key,
353            value,
354            publisher,
355            created_at: now,
356            expires_at: now + ttl,
357            signature: None,
358        }
359    }
360    
361    /// Check if record has expired
362    pub fn is_expired(&self) -> bool {
363        SystemTime::now() > self.expires_at
364    }
365    
366    /// Get record age
367    pub fn age(&self) -> Duration {
368        SystemTime::now()
369            .duration_since(self.created_at)
370            .unwrap_or(Duration::ZERO)
371    }
372    
373    /// Sign the record (placeholder for future cryptographic verification)
374    pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
375        // Placeholder implementation
376        // In real implementation, this would create a cryptographic signature
377        self.signature = Some(vec![0u8; 64]); // Dummy signature
378        Ok(())
379    }
380    
381    /// Verify record signature (placeholder)
382    pub fn verify(&self, _public_key: &[u8]) -> bool {
383        // Placeholder implementation
384        // In real implementation, this would verify the cryptographic signature
385        self.signature.is_some()
386    }
387}
388
389impl DHTNode {
390    /// Create a new DHT node
391    pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
392        let node_key = Key::new(peer_id.as_bytes());
393        let distance = node_key.distance(local_id);
394        
395        Self {
396            peer_id,
397            addresses,
398            last_seen: Instant::now(),
399            distance,
400            is_connected: false,
401        }
402    }
403    
404    /// Create a new DHT node with explicit key (for testing)
405    pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
406        Self {
407            peer_id,
408            addresses,
409            last_seen: Instant::now(),
410            distance: key,
411            is_connected: false,
412        }
413    }
414    
415    /// Update last seen timestamp
416    pub fn touch(&mut self) {
417        self.last_seen = Instant::now();
418    }
419    
420    /// Check if node is stale
421    pub fn is_stale(&self, timeout: Duration) -> bool {
422        self.last_seen.elapsed() > timeout
423    }
424    
425    /// Get node key
426    pub fn key(&self) -> Key {
427        Key::new(self.peer_id.as_bytes())
428    }
429    
430    /// Convert to serializable form
431    pub fn to_serializable(&self) -> SerializableDHTNode {
432        SerializableDHTNode {
433            peer_id: self.peer_id.clone(),
434            addresses: self.addresses.clone(),
435            last_seen_secs: self.last_seen.elapsed().as_secs(),
436            distance: self.distance.clone(),
437            is_connected: self.is_connected,
438        }
439    }
440}
441
442impl SerializableDHTNode {
443    /// Convert from serializable form to DHTNode
444    pub fn to_dht_node(&self) -> DHTNode {
445        DHTNode {
446            peer_id: self.peer_id.clone(),
447            addresses: self.addresses.clone(),
448            last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
449            distance: self.distance.clone(),
450            is_connected: self.is_connected,
451        }
452    }
453}
454
455impl KBucket {
456    /// Create a new k-bucket
457    fn new(capacity: usize) -> Self {
458        Self {
459            nodes: VecDeque::new(),
460            capacity,
461            last_refresh: Instant::now(),
462        }
463    }
464    
465    /// Add a node to the bucket
466    fn add_node(&mut self, node: DHTNode) -> bool {
467        // Check if node already exists
468        if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
469            // Move to front (most recently seen)
470            let mut existing = self.nodes.remove(pos).unwrap();
471            existing.touch();
472            existing.is_connected = node.is_connected;
473            self.nodes.push_front(existing);
474            return true;
475        }
476        
477        if self.nodes.len() < self.capacity {
478            // Add new node to front
479            self.nodes.push_front(node);
480            true
481        } else {
482            // Bucket is full - could implement replacement strategy here
483            false
484        }
485    }
486    
487    /// Remove a node from the bucket
488    fn remove_node(&mut self, peer_id: &PeerId) -> bool {
489        if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
490            self.nodes.remove(pos);
491            true
492        } else {
493            false
494        }
495    }
496    
497    /// Get nodes closest to a target
498    fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
499        let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
500        nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
501        nodes.into_iter().take(count).collect()
502    }
503    
504    /// Check if bucket needs refresh
505    fn needs_refresh(&self, interval: Duration) -> bool {
506        self.last_refresh.elapsed() > interval
507    }
508}
509
510impl RoutingTable {
511    /// Create a new routing table
512    pub fn new(local_id: Key, config: DHTConfig) -> Self {
513        let mut buckets = Vec::new();
514        for _ in 0..256 {
515            buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
516        }
517        
518        Self {
519            local_id,
520            buckets,
521            config,
522        }
523    }
524    
525    /// Add a node to the routing table
526    pub async fn add_node(&self, node: DHTNode) -> Result<()> {
527        let bucket_index = node.key().bucket_index(&self.local_id);
528        let mut bucket = self.buckets[bucket_index].write().await;
529        
530        if bucket.add_node(node.clone()) {
531            debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
532        } else {
533            debug!("Bucket {} full, could not add node {}", bucket_index, node.peer_id);
534        }
535        
536        Ok(())
537    }
538    
539    /// Remove a node from the routing table
540    pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
541        let node_key = Key::new(peer_id.as_bytes());
542        let bucket_index = node_key.bucket_index(&self.local_id);
543        let mut bucket = self.buckets[bucket_index].write().await;
544        
545        if bucket.remove_node(peer_id) {
546            debug!("Removed node {} from bucket {}", peer_id, bucket_index);
547        }
548        
549        Ok(())
550    }
551    
552    /// Find nodes closest to a target key
553    pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
554        let mut all_nodes = Vec::new();
555        
556        // Check buckets in order of distance from target
557        let target_bucket = target.bucket_index(&self.local_id);
558        
559        // Start with the target bucket and expand outward
560        let mut checked = vec![false; 256];
561        let mut to_check = VecDeque::new();
562        to_check.push_back(target_bucket);
563        
564        while let Some(bucket_idx) = to_check.pop_front() {
565            if checked[bucket_idx] {
566                continue;
567            }
568            checked[bucket_idx] = true;
569            
570            let bucket = self.buckets[bucket_idx].read().await;
571            all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
572            
573            // Add adjacent buckets to check
574            if bucket_idx > 0 && !checked[bucket_idx - 1] {
575                to_check.push_back(bucket_idx - 1);
576            }
577            if bucket_idx < 255 && !checked[bucket_idx + 1] {
578                to_check.push_back(bucket_idx + 1);
579            }
580            
581            // Stop if we have enough nodes
582            if all_nodes.len() >= count * 2 {
583                break;
584            }
585        }
586        
587        // Sort by distance and return closest
588        all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
589        all_nodes.into_iter().take(count).collect()
590    }
591    
592    /// Get routing table statistics
593    pub async fn stats(&self) -> (usize, usize) {
594        let mut total_nodes = 0;
595        let mut active_buckets = 0;
596        
597        for bucket in &self.buckets {
598            let bucket_guard = bucket.read().await;
599            let node_count = bucket_guard.nodes.len();
600            total_nodes += node_count;
601            if node_count > 0 {
602                active_buckets += 1;
603            }
604        }
605        
606        (total_nodes, active_buckets)
607    }
608}
609
610impl DHTStorage {
611    /// Create new DHT storage
612    pub fn new(config: DHTConfig) -> Self {
613        Self {
614            records: RwLock::new(HashMap::new()),
615            config,
616        }
617    }
618    
619    /// Store a record
620    pub async fn store(&self, record: Record) -> Result<()> {
621        let mut records = self.records.write().await;
622        records.insert(record.key.clone(), record);
623        Ok(())
624    }
625    
626    /// Retrieve a record
627    pub async fn get(&self, key: &Key) -> Option<Record> {
628        let records = self.records.read().await;
629        records.get(key).cloned()
630    }
631    
632    /// Remove expired records
633    pub async fn cleanup_expired(&self) -> usize {
634        let mut records = self.records.write().await;
635        let initial_count = records.len();
636        records.retain(|_, record| !record.is_expired());
637        initial_count - records.len()
638    }
639    
640    /// Get all stored records (for republishing)
641    pub async fn all_records(&self) -> Vec<Record> {
642        let records = self.records.read().await;
643        records.values().cloned().collect()
644    }
645    
646    /// Get storage statistics
647    pub async fn stats(&self) -> (usize, usize) {
648        let records = self.records.read().await;
649        let total = records.len();
650        let expired = records.values().filter(|r| r.is_expired()).count();
651        (total, expired)
652    }
653}
654
655impl DHT {
656    /// Create a new DHT instance
657    pub fn new(local_id: Key, config: DHTConfig) -> Self {
658        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
659        let storage = DHTStorage::new(config.clone());
660        
661        Self {
662            local_id,
663            routing_table,
664            storage,
665            config,
666            skademlia: None,
667            ipv6_identity_manager: None,
668        }
669    }
670    
671    /// Create a new DHT instance with S/Kademlia security extensions
672    pub fn new_with_security(local_id: Key, config: DHTConfig, skademlia_config: skademlia::SKademliaConfig) -> Self {
673        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
674        let storage = DHTStorage::new(config.clone());
675        let skademlia = skademlia::SKademlia::new(skademlia_config);
676        
677        Self {
678            local_id,
679            routing_table,
680            storage,
681            config,
682            skademlia: Some(skademlia),
683            ipv6_identity_manager: None,
684        }
685    }
686
687    /// Create a new DHT instance with full IPv6 security integration
688    pub fn new_with_ipv6_security(
689        local_id: Key, 
690        config: DHTConfig, 
691        skademlia_config: skademlia::SKademliaConfig,
692        ipv6_config: ipv6_identity::IPv6DHTConfig
693    ) -> Self {
694        let routing_table = RoutingTable::new(local_id.clone(), config.clone());
695        let storage = DHTStorage::new(config.clone());
696        let skademlia = skademlia::SKademlia::new(skademlia_config);
697        let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
698        
699        Self {
700            local_id,
701            routing_table,
702            storage,
703            config,
704            skademlia: Some(skademlia),
705            ipv6_identity_manager: Some(ipv6_identity_manager),
706        }
707    }
708
709    /// Initialize local IPv6 identity
710    pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
711        if let Some(ref mut manager) = self.ipv6_identity_manager {
712            // Update local_id to match IPv6 identity
713            self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
714            manager.set_local_identity(identity)?;
715            info!("Local IPv6 identity set and DHT key updated");
716            Ok(())
717        } else {
718            Err(P2PError::Security("IPv6 identity manager not enabled".to_string()).into())
719        }
720    }
721    
722    /// Add a bootstrap node to the DHT
723    pub async fn add_bootstrap_node(&self, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
724        let node = DHTNode::new(peer_id, addresses, &self.local_id);
725        self.routing_table.add_node(node).await
726    }
727
728    /// Add an IPv6-verified node to the DHT
729    pub async fn add_ipv6_node(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>, ipv6_identity: crate::security::IPv6NodeID) -> Result<()> {
730        if let Some(ref mut manager) = self.ipv6_identity_manager {
731            // Validate the node join with IPv6 security checks
732            let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
733            let security_event = manager.validate_node_join(&base_node, &ipv6_identity).await?;
734            
735            match security_event {
736                ipv6_identity::IPv6SecurityEvent::NodeJoined { verification_confidence, .. } => {
737                    // Enhance the node with IPv6 identity
738                    let ipv6_node = manager.enhance_dht_node(base_node.clone(), ipv6_identity).await?;
739                    
740                    // Update the DHT key to match IPv6 identity
741                    let mut enhanced_base_node = base_node;
742                    enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
743                    
744                    // Add to routing table
745                    self.routing_table.add_node(enhanced_base_node).await?;
746                    
747                    info!("Added IPv6-verified node {} with confidence {:.2}", peer_id, verification_confidence);
748                    Ok(())
749                }
750                ipv6_identity::IPv6SecurityEvent::VerificationFailed { reason, .. } => {
751                    Err(P2PError::Security(format!("IPv6 verification failed: {}", reason)).into())
752                }
753                ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => {
754                    Err(P2PError::Security(format!("IP diversity violation: {}", subnet_type)).into())
755                }
756                ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => {
757                    Err(P2PError::Security(format!("Node banned: {}", reason)).into())
758                }
759                _ => {
760                    Err(P2PError::Security("Unexpected security event".to_string()).into())
761                }
762            }
763        } else {
764            // Fallback to regular node addition if IPv6 manager not enabled
765            self.add_bootstrap_node(peer_id, addresses).await
766        }
767    }
768
769    /// Remove node with IPv6 cleanup
770    pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
771        // Remove from routing table
772        self.routing_table.remove_node(peer_id).await?;
773        
774        // Clean up IPv6 tracking
775        if let Some(ref mut manager) = self.ipv6_identity_manager {
776            manager.remove_node(peer_id);
777        }
778        
779        Ok(())
780    }
781
782    /// Check if node is banned due to IPv6 security violations
783    pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
784        if let Some(ref manager) = self.ipv6_identity_manager {
785            manager.is_node_banned(peer_id)
786        } else {
787            false
788        }
789    }
790    
791    /// Store a record in the DHT with replication
792    pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
793        let record = Record::new(key.clone(), value, self.local_id.to_hex());
794        
795        // Store locally first
796        self.storage.store(record.clone()).await?;
797        
798        // Find nodes closest to the key for replication
799        let closest_nodes = self.routing_table
800            .closest_nodes(&key, self.config.replication_factor)
801            .await;
802        
803        info!("Storing record with key {} on {} nodes", key.to_hex(), closest_nodes.len());
804        
805        // If no other nodes available, just store locally (single node scenario)
806        if closest_nodes.is_empty() {
807            info!("No other nodes available for replication, storing only locally");
808            return Ok(());
809        }
810        
811        // Replicate to closest nodes (simulated for now)
812        let mut successful_replications = 0;
813        for node in &closest_nodes {
814            if self.replicate_record(&record, node).await.is_ok() {
815                successful_replications += 1;
816            }
817        }
818        
819        info!("Successfully replicated record {} to {}/{} nodes", 
820              key.to_hex(), successful_replications, closest_nodes.len());
821        
822        // Consider replication successful if we stored to at least 1 node or have reasonable coverage
823        let required_replications = if closest_nodes.len() == 1 {
824            1
825        } else {
826            std::cmp::max(1, closest_nodes.len() / 2)
827        };
828        
829        if successful_replications >= required_replications {
830            Ok(())
831        } else {
832            Err(P2PError::DHT(format!(
833                "Insufficient replication: only {}/{} nodes stored the record (required: {})", 
834                successful_replications, closest_nodes.len(), required_replications
835            )).into())
836        }
837    }
838    
839    /// Retrieve a record from the DHT with consistency checks
840    pub async fn get(&self, key: &Key) -> Option<Record> {
841        // Check local storage first
842        if let Some(record) = self.storage.get(key).await {
843            if !record.is_expired() {
844                return Some(record);
845            }
846        }
847        
848        // Perform iterative lookup to find the record
849        if let Some(record) = self.iterative_find_value(key).await {
850            // Store locally for future access (caching)
851            if self.storage.store(record.clone()).await.is_ok() {
852                debug!("Cached retrieved record with key {}", key.to_hex());
853            }
854            return Some(record);
855        }
856        
857        None
858    }
859    
860    /// Find nodes close to a key
861    pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
862        self.routing_table.closest_nodes(key, self.config.replication_factor).await
863    }
864    
865    /// Handle incoming DHT query
866    pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
867        match query {
868            DHTQuery::FindNode { key, requester: _ } => {
869                let nodes = self.find_node(&key).await;
870                let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
871                DHTResponse::Nodes { nodes: serializable_nodes }
872            }
873            DHTQuery::FindValue { key, requester: _ } => {
874                if let Some(record) = self.storage.get(&key).await {
875                    if !record.is_expired() {
876                        return DHTResponse::Value { record };
877                    }
878                }
879                let nodes = self.find_node(&key).await;
880                let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
881                DHTResponse::Nodes { nodes: serializable_nodes }
882            }
883            DHTQuery::Store { record, requester: _ } => {
884                match self.storage.store(record).await {
885                    Ok(()) => DHTResponse::Stored { success: true },
886                    Err(_) => DHTResponse::Stored { success: false },
887                }
888            }
889            DHTQuery::Ping { requester: _ } => {
890                DHTResponse::Pong { responder: self.local_id.to_hex() }
891            }
892        }
893    }
894    
895    /// Get DHT statistics
896    pub async fn stats(&self) -> DHTStats {
897        let (total_nodes, active_buckets) = self.routing_table.stats().await;
898        let (stored_records, expired_records) = self.storage.stats().await;
899        
900        DHTStats {
901            local_id: self.local_id.clone(),
902            total_nodes,
903            active_buckets,
904            stored_records,
905            expired_records,
906        }
907    }
908    
909    /// Perform periodic maintenance
910    pub async fn maintenance(&self) -> Result<()> {
911        // Clean up expired records
912        let expired_count = self.storage.cleanup_expired().await;
913        if expired_count > 0 {
914            debug!("Cleaned up {} expired records", expired_count);
915        }
916        
917        // Republish records that are close to expiration
918        self.republish_records().await?;
919        
920        // Refresh buckets that haven't been active
921        self.refresh_buckets().await?;
922        
923        // Note: S/Kademlia cleanup would happen here in a mutable context
924        
925        Ok(())
926    }
927    
928    /// Perform secure lookup using S/Kademlia disjoint paths with distance verification
929    pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
930        // Check local storage first
931        if let Some(record) = self.storage.get(key).await {
932            if !record.is_expired() {
933                return Ok(Some(record));
934            }
935        }
936        
937        // Check if S/Kademlia is enabled and extract configuration
938        let (enable_distance_verification, disjoint_path_count, min_reputation) = if let Some(ref skademlia) = self.skademlia {
939            (skademlia.config.enable_distance_verification, 
940             skademlia.config.disjoint_path_count,
941             skademlia.config.min_routing_reputation)
942        } else {
943            // Fallback to regular get if S/Kademlia not enabled
944            return Ok(self.get(key).await);
945        };
946        
947        // Get initial nodes for disjoint path lookup
948        let initial_nodes = self.routing_table
949            .closest_nodes(key, disjoint_path_count * 3)
950            .await;
951        
952        if initial_nodes.is_empty() {
953            return Ok(None);
954        }
955        
956        // Perform secure lookup with disjoint paths
957        let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
958            skademlia.secure_lookup(key.clone(), initial_nodes).await?
959        } else {
960            return Ok(None);
961        };
962        
963        // Query the securely found nodes with distance verification
964        for node in &secure_nodes {
965            // Verify node distance before querying if enabled
966            if enable_distance_verification {
967                let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
968                
969                let consensus = if let Some(ref mut skademlia) = self.skademlia {
970                    skademlia.verify_distance_consensus(&node.peer_id, key, witness_nodes).await?
971                } else {
972                    continue;
973                };
974                
975                if consensus.confidence < min_reputation {
976                    debug!("Skipping node {} due to low distance verification confidence", node.peer_id);
977                    continue;
978                }
979            }
980            
981            let query = DHTQuery::FindValue { 
982                key: key.clone(), 
983                requester: self.local_id.to_hex() 
984            };
985            if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
986                // Store locally for future access
987                let _ = self.storage.store(record.clone()).await;
988                return Ok(Some(record));
989            }
990        }
991        
992        Ok(None)
993    }
994    
995    /// Store a record using S/Kademlia security-aware node selection
996    pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
997        let record = Record::new(key.clone(), value, self.local_id.to_hex());
998        
999        // Store locally first
1000        self.storage.store(record.clone()).await?;
1001        
1002        // Get secure nodes and perform replications
1003        let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1004            // Get candidate nodes
1005            let candidate_nodes = self.routing_table
1006                .closest_nodes(&key, self.config.replication_factor * 2)
1007                .await;
1008            
1009            // Use S/Kademlia to select secure nodes based on reputation
1010            skademlia.select_secure_nodes(
1011                &candidate_nodes, 
1012                &key, 
1013                self.config.replication_factor
1014            )
1015        } else {
1016            // Fallback to regular closest nodes
1017            self.routing_table.closest_nodes(&key, self.config.replication_factor).await
1018        };
1019        
1020        info!("Storing record with key {} on {} secure nodes", key.to_hex(), secure_nodes.len());
1021        
1022        // Perform replications and collect results
1023        let mut replication_results = Vec::new();
1024        let mut successful_replications = 0;
1025        
1026        for node in &secure_nodes {
1027            let success = self.replicate_record(&record, node).await.is_ok();
1028            replication_results.push((node.peer_id.clone(), success));
1029            if success {
1030                successful_replications += 1;
1031            }
1032        }
1033        
1034        // Update reputations if S/Kademlia enabled
1035        if let Some(ref mut skademlia) = self.skademlia {
1036            for (peer_id, success) in replication_results {
1037                skademlia.reputation_manager.update_reputation(
1038                    &peer_id, 
1039                    success, 
1040                    Duration::from_millis(100)
1041                );
1042            }
1043        }
1044        
1045        if successful_replications > 0 {
1046            info!("Successfully replicated to {}/{} secure nodes", 
1047                 successful_replications, secure_nodes.len());
1048        }
1049        
1050        Ok(())
1051    }
1052    
1053    /// Update sibling lists for a key range
1054    pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
1055        if let Some(ref mut skademlia) = self.skademlia {
1056            let nodes = self.routing_table.closest_nodes(&key, skademlia.config.sibling_list_size).await;
1057            skademlia.update_sibling_list(key, nodes);
1058        }
1059        Ok(())
1060    }
1061    
1062    /// Validate routing table consistency using S/Kademlia
1063    pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
1064        if let Some(ref skademlia) = self.skademlia {
1065            // Get sample of nodes for validation (using closest_nodes as a proxy)
1066            let sample_key = Key::random();
1067            let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
1068            skademlia.validate_routing_consistency(&sample_nodes).await
1069        } else {
1070            Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1071        }
1072    }
1073    
1074    /// Create a distance verification challenge for a peer
1075    pub fn create_distance_challenge(&mut self, peer_id: &PeerId, key: &Key) -> Option<skademlia::DistanceChallenge> {
1076        self.skademlia.as_mut()
1077            .map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
1078    }
1079    
1080    /// Verify a distance proof
1081    pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
1082        if let Some(ref skademlia) = self.skademlia {
1083            skademlia.verify_distance_proof(proof)
1084        } else {
1085            Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1086        }
1087    }
1088
1089    /// Verify distances of multiple nodes using enhanced consensus
1090    #[allow(dead_code)]
1091    async fn verify_node_distances(&self, nodes: &[DHTNode], _target_key: &Key, min_reputation: f64) -> Result<Vec<DHTNode>> {
1092        let mut verified_nodes = Vec::new();
1093        
1094        for node in nodes {
1095            let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1096            
1097            // Only proceed if we have enough witness nodes
1098            if witness_nodes.len() >= 2 {
1099                // Simulate consensus verification (simplified for now)
1100                let consensus_confidence = 0.8; // Placeholder
1101                
1102                if consensus_confidence >= min_reputation {
1103                    verified_nodes.push(node.clone());
1104                } else {
1105                    debug!("Node {} failed distance verification with confidence {}", 
1106                           node.peer_id, consensus_confidence);
1107                }
1108            }
1109        }
1110        
1111        Ok(verified_nodes)
1112    }
1113
1114    /// Select witness nodes for distance verification  
1115    async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
1116        // Get nodes that are close to the target but not the target itself
1117        let target_key = Key::new(target_peer.as_bytes());
1118        let candidate_nodes = self.routing_table.closest_nodes(&target_key, count * 2).await;
1119        
1120        candidate_nodes.into_iter()
1121            .filter(|node| node.peer_id != *target_peer)
1122            .take(count)
1123            .map(|node| node.peer_id)
1124            .collect()
1125    }
1126
1127    /// Create enhanced distance challenge with adaptive difficulty
1128    pub fn create_enhanced_distance_challenge(&mut self, peer_id: &PeerId, key: &Key, suspected_attack: bool) -> Option<skademlia::EnhancedDistanceChallenge> {
1129        if let Some(ref mut skademlia) = self.skademlia {
1130            Some(skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack))
1131        } else {
1132            None
1133        }
1134    }
1135
1136    /// Verify distance using multi-round challenge protocol  
1137    pub async fn verify_distance_multi_round(&mut self, challenge: &skademlia::EnhancedDistanceChallenge) -> Result<bool> {
1138        if let Some(ref mut skademlia) = self.skademlia {
1139            skademlia.verify_distance_multi_round(challenge).await
1140        } else {
1141            Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1142        }
1143    }
1144    
1145    /// Get security bucket for a key range
1146    pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
1147        self.skademlia.as_mut()
1148            .map(|skademlia| skademlia.get_security_bucket(key))
1149    }
1150    
1151    /// Add trusted node to security bucket
1152    pub async fn add_trusted_node(&mut self, key: &Key, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
1153        if let Some(ref mut skademlia) = self.skademlia {
1154            let node = DHTNode::new(peer_id, addresses, &self.local_id);
1155            let security_bucket = skademlia.get_security_bucket(key);
1156            security_bucket.add_trusted_node(node);
1157        }
1158        Ok(())
1159    }
1160
1161    /// Perform IPv6-enhanced secure get operation
1162    pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1163        // Check if requester would be banned
1164        if self.is_node_banned(&self.local_id.to_hex()) {
1165            return Err(P2PError::Security("Local node is banned".to_string()).into());
1166        }
1167
1168        // Check local storage first
1169        if let Some(record) = self.storage.get(key).await {
1170            if !record.is_expired() {
1171                return Ok(Some(record));
1172            }
1173        }
1174        
1175        // Get IPv6-verified nodes for secure lookup
1176        let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
1177        
1178        if verified_nodes.is_empty() {
1179            // Fallback to regular secure_get if no IPv6 nodes available
1180            return self.secure_get(key).await;
1181        }
1182
1183        // Perform S/Kademlia secure lookup with IPv6-verified nodes
1184        if let Some(ref mut skademlia) = self.skademlia {
1185            let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
1186            
1187            // Query nodes with both distance and IPv6 verification
1188            for node in &secure_nodes {
1189                // Additional IPv6 verification
1190                if let Some(ref manager) = self.ipv6_identity_manager {
1191                    if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1192                        // Check if IPv6 identity needs refresh
1193                        if ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1194                            debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
1195                            continue;
1196                        }
1197                    } else {
1198                        debug!("Skipping node {} without verified IPv6 identity", node.peer_id);
1199                        continue;
1200                    }
1201                }
1202                
1203                let query = DHTQuery::FindValue { 
1204                    key: key.clone(), 
1205                    requester: self.local_id.to_hex() 
1206                };
1207                if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1208                    // Update IPv6 reputation for successful response
1209                    if let Some(ref mut manager) = self.ipv6_identity_manager {
1210                        manager.update_ipv6_reputation(&node.peer_id, true);
1211                    }
1212                    
1213                    // Store locally for future access
1214                    let _ = self.storage.store(record.clone()).await;
1215                    return Ok(Some(record));
1216                }
1217            }
1218        }
1219        
1220        Ok(None)
1221    }
1222
1223    /// Perform IPv6-enhanced secure put operation
1224    pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1225        // Check if local node would be banned
1226        if self.is_node_banned(&self.local_id.to_hex()) {
1227            return Err(P2PError::Security("Local node is banned".to_string()).into());
1228        }
1229
1230        let record = Record::new(key.clone(), value, self.local_id.to_hex());
1231        
1232        // Store locally first
1233        self.storage.store(record.clone()).await?;
1234        
1235        // Get IPv6-verified nodes for secure replication
1236        let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
1237        
1238        // Use S/Kademlia for secure node selection among verified nodes
1239        let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1240            skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
1241        } else {
1242            verified_nodes.into_iter().take(self.config.replication_factor).collect()
1243        };
1244
1245        info!("Storing record with key {} on {} IPv6-verified secure nodes", key.to_hex(), secure_nodes.len());
1246        
1247        // Perform replications with IPv6 reputation tracking
1248        let mut successful_replications = 0;
1249        
1250        for node in &secure_nodes {
1251            let success = self.replicate_record(&record, node).await.is_ok();
1252            
1253            // Update IPv6 reputation based on replication success
1254            if let Some(ref mut manager) = self.ipv6_identity_manager {
1255                manager.update_ipv6_reputation(&node.peer_id, success);
1256            }
1257            
1258            if success {
1259                successful_replications += 1;
1260            }
1261        }
1262        
1263        if successful_replications == 0 && !secure_nodes.is_empty() {
1264            return Err(P2PError::DHT("Failed to replicate to any IPv6-verified nodes".to_string()).into());
1265        }
1266        
1267        info!("Successfully replicated to {}/{} IPv6-verified nodes", 
1268              successful_replications, secure_nodes.len());
1269        Ok(())
1270    }
1271
1272    /// Get IPv6-verified nodes suitable for a key
1273    async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
1274        let mut verified_nodes = Vec::new();
1275        
1276        // Get closest nodes from routing table
1277        let candidate_nodes = self.routing_table.closest_nodes(key, self.config.replication_factor * 2).await;
1278        
1279        if let Some(ref manager) = self.ipv6_identity_manager {
1280            for node in candidate_nodes {
1281                // Check if node has verified IPv6 identity
1282                if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1283                    // Check if identity is still fresh
1284                    if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1285                        // Check if node is not banned
1286                        if !manager.is_node_banned(&node.peer_id) {
1287                            verified_nodes.push(node);
1288                        }
1289                    }
1290                }
1291            }
1292        } else {
1293            // If no IPv6 manager, return all candidates
1294            verified_nodes = candidate_nodes;
1295        }
1296        
1297        Ok(verified_nodes)
1298    }
1299
1300    /// Get IPv6 diversity statistics
1301    pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
1302        self.ipv6_identity_manager.as_ref()
1303            .map(|manager| manager.get_ipv6_diversity_stats())
1304    }
1305
1306    /// Cleanup expired IPv6 identities and reputation data
1307    pub fn cleanup_ipv6_data(&mut self) {
1308        if let Some(ref mut manager) = self.ipv6_identity_manager {
1309            manager.cleanup_expired();
1310        }
1311    }
1312
1313    /// Ban a node for IPv6 security violations
1314    pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
1315        if let Some(ref mut manager) = self.ipv6_identity_manager {
1316            manager.ban_node(peer_id, reason);
1317        }
1318    }
1319
1320    /// Get local IPv6 identity
1321    pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
1322        self.ipv6_identity_manager.as_ref()
1323            .and_then(|manager| manager.get_local_identity())
1324    }
1325    
1326    /// Replicate a record to a specific node
1327    async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
1328        // In a real implementation, this would send a STORE message over the network
1329        // For now, we simulate successful replication to nodes in our routing table
1330        debug!("Replicating record {} to node {}", record.key.to_hex(), node.peer_id);
1331        
1332        // Simulate network delay and occasional failures
1333        tokio::time::sleep(Duration::from_millis(10)).await;
1334        
1335        // Simulate 95% success rate for replication (high success rate for testing)
1336        if rand::random::<f64>() < 0.95 {
1337            Ok(())
1338        } else {
1339            Err(P2PError::Network("Replication failed".to_string()).into())
1340        }
1341    }
1342    
1343    /// Perform iterative lookup to find a value
1344    async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
1345        debug!("Starting iterative lookup for key {}", key.to_hex());
1346        
1347        let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1348        
1349        // Start with closest nodes from routing table
1350        let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1351        lookup_state.add_nodes(initial_nodes);
1352        
1353        // Perform iterative queries
1354        let mut iterations = 0;
1355        const MAX_ITERATIONS: usize = 10;
1356        
1357        while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1358            let nodes_to_query = lookup_state.next_nodes();
1359            if nodes_to_query.is_empty() {
1360                break;
1361            }
1362            
1363            // Query nodes in parallel
1364            let mut queries = Vec::new();
1365            for node in &nodes_to_query {
1366                let query = DHTQuery::FindValue { 
1367                    key: key.clone(), 
1368                    requester: self.local_id.to_hex() 
1369                };
1370                queries.push(self.simulate_query(node, query));
1371            }
1372            
1373            // Process responses
1374            for query_result in futures::future::join_all(queries).await {
1375                match query_result {
1376                    Ok(DHTResponse::Value { record }) => {
1377                        debug!("Found value for key {} in iteration {}", key.to_hex(), iterations);
1378                        return Some(record);
1379                    }
1380                    Ok(DHTResponse::Nodes { nodes }) => {
1381                        let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1382                            .map(|n| n.to_dht_node())
1383                            .collect();
1384                        lookup_state.add_nodes(dht_nodes);
1385                    }
1386                    _ => {
1387                        // Query failed or returned unexpected response
1388                        debug!("Query failed during iterative lookup");
1389                    }
1390                }
1391            }
1392            
1393            iterations += 1;
1394        }
1395        
1396        debug!("Iterative lookup for key {} completed after {} iterations, value not found", 
1397               key.to_hex(), iterations);
1398        None
1399    }
1400    
1401    /// Simulate a query to a remote node (placeholder for real network implementation)
1402    async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
1403        // Add some realistic delay
1404        tokio::time::sleep(Duration::from_millis(50)).await;
1405        
1406        // Handle the query locally (simulating remote node response)
1407        Ok(self.handle_query(query).await)
1408    }
1409    
1410    /// Republish records that are close to expiration
1411    async fn republish_records(&self) -> Result<()> {
1412        let all_records = self.storage.all_records().await;
1413        let mut republished_count = 0;
1414        
1415        for record in all_records {
1416            // Republish if record has less than 1/4 of its TTL remaining
1417            let remaining_ttl = record.expires_at
1418                .duration_since(SystemTime::now())
1419                .unwrap_or(Duration::ZERO);
1420            
1421            if remaining_ttl < self.config.record_ttl / 4 {
1422                // Find nodes responsible for this key
1423                let closest_nodes = self.routing_table
1424                    .closest_nodes(&record.key, self.config.replication_factor)
1425                    .await;
1426                
1427                // Republish to closest nodes
1428                for node in &closest_nodes {
1429                    if self.replicate_record(&record, node).await.is_ok() {
1430                        republished_count += 1;
1431                    }
1432                }
1433            }
1434        }
1435        
1436        if republished_count > 0 {
1437            debug!("Republished {} records during maintenance", republished_count);
1438        }
1439        
1440        Ok(())
1441    }
1442    
1443    /// Refresh buckets that haven't been active recently
1444    async fn refresh_buckets(&self) -> Result<()> {
1445        let mut refreshed_count = 0;
1446        
1447        for bucket_index in 0..256 {
1448            let needs_refresh = {
1449                let bucket = self.routing_table.buckets[bucket_index].read().await;
1450                bucket.needs_refresh(self.config.bucket_refresh_interval)
1451            };
1452            
1453            if needs_refresh {
1454                // Generate a random key in this bucket's range and perform lookup
1455                let target_key = self.generate_key_for_bucket(bucket_index);
1456                let _nodes = self.iterative_find_node(&target_key).await;
1457                refreshed_count += 1;
1458                
1459                // Update bucket refresh time
1460                {
1461                    let mut bucket = self.routing_table.buckets[bucket_index].write().await;
1462                    bucket.last_refresh = Instant::now();
1463                }
1464            }
1465        }
1466        
1467        if refreshed_count > 0 {
1468            debug!("Refreshed {} buckets during maintenance", refreshed_count);
1469        }
1470        
1471        Ok(())
1472    }
1473    
1474    /// Generate a key that would fall into the specified bucket
1475    fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
1476        let mut key_bytes = self.local_id.as_bytes().to_vec();
1477        
1478        // Flip the bit at position (255 - bucket_index) to ensure distance
1479        if bucket_index < 256 {
1480            let byte_index = (255 - bucket_index) / 8;
1481            let bit_index = (255 - bucket_index) % 8;
1482            
1483            if byte_index < key_bytes.len() {
1484                key_bytes[byte_index] ^= 1 << bit_index;
1485            }
1486        }
1487        
1488        let mut hash = [0u8; 32];
1489        hash.copy_from_slice(&key_bytes);
1490        Key::from_hash(hash)
1491    }
1492    
1493    /// Perform iterative node lookup
1494    async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
1495        debug!("Starting iterative node lookup for key {}", key.to_hex());
1496        
1497        let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1498        
1499        // Start with closest nodes from routing table
1500        let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1501        lookup_state.add_nodes(initial_nodes);
1502        
1503        // Perform iterative queries
1504        let mut iterations = 0;
1505        const MAX_ITERATIONS: usize = 10;
1506        
1507        while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1508            let nodes_to_query = lookup_state.next_nodes();
1509            if nodes_to_query.is_empty() {
1510                break;
1511            }
1512            
1513            // Query nodes in parallel
1514            let mut queries = Vec::new();
1515            for node in &nodes_to_query {
1516                let query = DHTQuery::FindNode { 
1517                    key: key.clone(), 
1518                    requester: self.local_id.to_hex() 
1519                };
1520                queries.push(self.simulate_query(node, query));
1521            }
1522            
1523            // Process responses
1524            for query_result in futures::future::join_all(queries).await {
1525                if let Ok(DHTResponse::Nodes { nodes }) = query_result {
1526                    let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1527                        .map(|n| n.to_dht_node())
1528                        .collect();
1529                    lookup_state.add_nodes(dht_nodes);
1530                }
1531            }
1532            
1533            iterations += 1;
1534        }
1535        
1536        debug!("Iterative node lookup for key {} completed after {} iterations", 
1537               key.to_hex(), iterations);
1538        
1539        // Return the closest nodes found
1540        lookup_state.closest.into_iter()
1541            .take(self.config.replication_factor)
1542            .collect()
1543    }
1544    
1545    /// Check consistency of a record across multiple nodes
1546    pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
1547        debug!("Checking consistency for key {}", key.to_hex());
1548        
1549        // Find nodes that should have this record
1550        let closest_nodes = self.routing_table
1551            .closest_nodes(key, self.config.replication_factor)
1552            .await;
1553        
1554        let mut records_found = Vec::new();
1555        let mut nodes_queried = 0;
1556        let mut nodes_responded = 0;
1557        
1558        // Query each node for the record
1559        for node in &closest_nodes {
1560            nodes_queried += 1;
1561            
1562            let query = DHTQuery::FindValue { 
1563                key: key.clone(), 
1564                requester: self.local_id.to_hex() 
1565            };
1566            
1567            match self.simulate_query(node, query).await {
1568                Ok(DHTResponse::Value { record }) => {
1569                    nodes_responded += 1;
1570                    records_found.push((node.peer_id.clone(), record));
1571                }
1572                Ok(DHTResponse::Nodes { .. }) => {
1573                    nodes_responded += 1;
1574                    // Node doesn't have the record
1575                }
1576                _ => {
1577                    // Node didn't respond or error occurred
1578                }
1579            }
1580        }
1581        
1582        // Analyze consistency
1583        let mut consistent = true;
1584        let mut canonical_record: Option<Record> = None;
1585        let mut conflicts = Vec::new();
1586        
1587        for (node_id, record) in &records_found {
1588            if let Some(ref canonical) = canonical_record {
1589                // Check if records match
1590                if record.value != canonical.value || 
1591                   record.created_at != canonical.created_at ||
1592                   record.publisher != canonical.publisher {
1593                    consistent = false;
1594                    conflicts.push((node_id.clone(), record.clone()));
1595                }
1596            } else {
1597                canonical_record = Some(record.clone());
1598            }
1599        }
1600        
1601        let report = ConsistencyReport {
1602            key: key.clone(),
1603            nodes_queried,
1604            nodes_responded,
1605            records_found: records_found.len(),
1606            consistent,
1607            canonical_record,
1608            conflicts,
1609            replication_factor: self.config.replication_factor,
1610        };
1611        
1612        debug!("Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}", 
1613               key.to_hex(), report.nodes_queried, report.nodes_responded, 
1614               report.records_found, report.consistent);
1615        
1616        Ok(report)
1617    }
1618    
1619    /// Repair inconsistencies for a specific key
1620    pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
1621        debug!("Starting repair for key {}", key.to_hex());
1622        
1623        let consistency_report = self.check_consistency(key).await?;
1624        
1625        if consistency_report.consistent {
1626            return Ok(RepairResult {
1627                key: key.clone(),
1628                repairs_needed: false,
1629                repairs_attempted: 0,
1630                repairs_successful: 0,
1631                final_state: "consistent".to_string(),
1632            });
1633        }
1634        
1635        // Determine the canonical version (use most recent)
1636        let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
1637            canonical
1638        } else {
1639            return Ok(RepairResult {
1640                key: key.clone(),
1641                repairs_needed: false,
1642                repairs_attempted: 0,
1643                repairs_successful: 0,
1644                final_state: "no_records_found".to_string(),
1645            });
1646        };
1647        
1648        // Find the most recent version among conflicts
1649        let mut most_recent = canonical_record.clone();
1650        for (_, conflicted_record) in &consistency_report.conflicts {
1651            if conflicted_record.created_at > most_recent.created_at {
1652                most_recent = conflicted_record.clone();
1653            }
1654        }
1655        
1656        // Replicate the canonical version to all responsible nodes
1657        let closest_nodes = self.routing_table
1658            .closest_nodes(key, self.config.replication_factor)
1659            .await;
1660        
1661        let mut repairs_attempted = 0;
1662        let mut repairs_successful = 0;
1663        
1664        for node in &closest_nodes {
1665            repairs_attempted += 1;
1666            if self.replicate_record(&most_recent, node).await.is_ok() {
1667                repairs_successful += 1;
1668            }
1669        }
1670        
1671        let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
1672            "repaired".to_string()
1673        } else {
1674            "repair_failed".to_string()
1675        };
1676        
1677        debug!("Repair for key {} completed: {}/{} repairs successful, final state: {}", 
1678               key.to_hex(), repairs_successful, repairs_attempted, final_state);
1679        
1680        Ok(RepairResult {
1681            key: key.clone(),
1682            repairs_needed: true,
1683            repairs_attempted,
1684            repairs_successful,
1685            final_state,
1686        })
1687    }
1688    
1689    // ===============================
1690    // INBOX SYSTEM IMPLEMENTATION
1691    // ===============================
1692    
1693    /// Create a new inbox for a user with infinite TTL
1694    pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
1695        info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
1696        
1697        let inbox_key = Key::from_inbox_id(inbox_id);
1698        
1699        // Create inbox metadata record with infinite TTL
1700        let inbox_metadata = InboxMetadata {
1701            inbox_id: inbox_id.to_string(),
1702            owner: owner_peer_id.clone(),
1703            created_at: SystemTime::now(),
1704            message_count: 0,
1705            max_messages: 1000, // Configurable limit
1706            is_public: true,
1707            access_keys: vec![owner_peer_id.clone()],
1708        };
1709        
1710        let metadata_value = serde_json::to_vec(&inbox_metadata)
1711            .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox metadata: {}", e)))?;
1712        
1713        let metadata_record = Record {
1714            key: inbox_key.clone(),
1715            value: metadata_value,
1716            publisher: owner_peer_id.clone(),
1717            created_at: SystemTime::now(),
1718            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
1719            signature: None,
1720        };
1721        
1722        // Store metadata with infinite TTL
1723        self.put_record_with_infinite_ttl(metadata_record).await?;
1724        
1725        // Create empty message index
1726        let index_key = Key::from_inbox_index(inbox_id);
1727        let empty_index = InboxMessageIndex {
1728            inbox_id: inbox_id.to_string(),
1729            messages: Vec::new(),
1730            last_updated: SystemTime::now(),
1731        };
1732        
1733        let index_value = serde_json::to_vec(&empty_index)
1734            .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox index: {}", e)))?;
1735        
1736        let index_record = Record {
1737            key: index_key,
1738            value: index_value,
1739            publisher: owner_peer_id.clone(),
1740            created_at: SystemTime::now(),
1741            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
1742            signature: None,
1743        };
1744        
1745        self.put_record_with_infinite_ttl(index_record).await?;
1746        
1747        let inbox_info = InboxInfo {
1748            inbox_id: inbox_id.to_string(),
1749            three_word_address: self.generate_three_word_address(inbox_id),
1750            owner: owner_peer_id,
1751            created_at: SystemTime::now(),
1752            message_count: 0,
1753            is_accessible: true,
1754        };
1755        
1756        info!("Successfully created inbox {} with three-word address: {}", 
1757              inbox_id, inbox_info.three_word_address);
1758        
1759        Ok(inbox_info)
1760    }
1761    
1762    /// Send a message to an inbox
1763    pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
1764        info!("Sending message to inbox {}", inbox_id);
1765        
1766        // Get current inbox metadata
1767        let inbox_key = Key::from_inbox_id(inbox_id);
1768        let metadata_record = self.get(&inbox_key).await
1769            .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1770        
1771        let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
1772            .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1773        
1774        // Check message limit
1775        if inbox_metadata.message_count >= inbox_metadata.max_messages {
1776            return Err(P2PError::DHT(format!("Inbox {} is full", inbox_id)));
1777        }
1778        
1779        // Create message record with infinite TTL
1780        let message_key = Key::from_inbox_message(inbox_id, &message.id);
1781        let message_value = serde_json::to_vec(&message)
1782            .map_err(|e| P2PError::DHT(format!("Failed to serialize message: {}", e)))?;
1783        
1784        let message_record = Record {
1785            key: message_key.clone(),
1786            value: message_value,
1787            publisher: message.sender.clone(),
1788            created_at: message.timestamp,
1789            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), // Infinite TTL
1790            signature: None,
1791        };
1792        
1793        self.put_record_with_infinite_ttl(message_record).await?;
1794        
1795        // Update message index
1796        let index_key = Key::from_inbox_index(inbox_id);
1797        let index_record = self.get(&index_key).await
1798            .ok_or_else(|| P2PError::DHT(format!("Inbox index {} not found", inbox_id)))?;
1799        
1800        let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1801            .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1802        
1803        message_index.messages.push(MessageRef {
1804            message_id: message.id.clone(),
1805            sender: message.sender.clone(),
1806            timestamp: message.timestamp,
1807            message_type: message.message_type.clone(),
1808        });
1809        message_index.last_updated = SystemTime::now();
1810        
1811        // Update metadata
1812        inbox_metadata.message_count += 1;
1813        
1814        // Store updated index and metadata
1815        let updated_index_value = serde_json::to_vec(&message_index)
1816            .map_err(|e| P2PError::DHT(format!("Failed to serialize updated index: {}", e)))?;
1817        
1818        let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
1819            .map_err(|e| P2PError::DHT(format!("Failed to serialize updated metadata: {}", e)))?;
1820        
1821        let updated_index_record = Record {
1822            key: index_key,
1823            value: updated_index_value,
1824            publisher: message.sender.clone(),
1825            created_at: SystemTime::now(),
1826            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1827            signature: None,
1828        };
1829        
1830        let updated_metadata_record = Record {
1831            key: inbox_key,
1832            value: updated_metadata_value,
1833            publisher: message.sender.clone(),
1834            created_at: SystemTime::now(),
1835            expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1836            signature: None,
1837        };
1838        
1839        self.put_record_with_infinite_ttl(updated_index_record).await?;
1840        self.put_record_with_infinite_ttl(updated_metadata_record).await?;
1841        
1842        info!("Successfully sent message {} to inbox {}", message.id, inbox_id);
1843        Ok(())
1844    }
1845    
1846    /// Get messages from an inbox
1847    pub async fn get_inbox_messages(&self, inbox_id: &str, limit: Option<usize>) -> Result<Vec<InboxMessage>> {
1848        info!("Retrieving messages from inbox {}", inbox_id);
1849        
1850        let index_key = Key::from_inbox_index(inbox_id);
1851        let index_record = self.get(&index_key).await
1852            .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1853        
1854        let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1855            .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1856        
1857        let mut messages = Vec::new();
1858        let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
1859            message_index.messages.iter().rev().take(limit).collect()
1860        } else {
1861            message_index.messages.iter().collect()
1862        };
1863        
1864        for message_ref in message_refs {
1865            let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
1866            if let Some(message_record) = self.get(&message_key).await {
1867                if let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value) {
1868                    messages.push(message);
1869                }
1870            }
1871        }
1872        
1873        // Sort messages by timestamp (newest first)
1874        messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1875        
1876        info!("Retrieved {} messages from inbox {}", messages.len(), inbox_id);
1877        Ok(messages)
1878    }
1879    
1880    /// Get inbox information
1881    pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
1882        let inbox_key = Key::from_inbox_id(inbox_id);
1883        let metadata_record = self.get(&inbox_key).await;
1884        
1885        if let Some(record) = metadata_record {
1886            let metadata: InboxMetadata = serde_json::from_slice(&record.value)
1887                .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1888            
1889            let inbox_info = InboxInfo {
1890                inbox_id: inbox_id.to_string(),
1891                three_word_address: self.generate_three_word_address(inbox_id),
1892                owner: metadata.owner,
1893                created_at: metadata.created_at,
1894                message_count: metadata.message_count,
1895                is_accessible: true,
1896            };
1897            
1898            Ok(Some(inbox_info))
1899        } else {
1900            Ok(None)
1901        }
1902    }
1903    
1904    /// Store a record with infinite TTL (never expires)
1905    async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
1906        // Store locally first
1907        self.storage.store(record.clone()).await?;
1908        
1909        // Find closest nodes for replication
1910        let closest_nodes = self.routing_table
1911            .closest_nodes(&record.key, self.config.replication_factor)
1912            .await;
1913        
1914        // Replicate to closest nodes
1915        for node in &closest_nodes {
1916            if let Err(e) = self.replicate_record(&record, node).await {
1917                debug!("Failed to replicate infinite TTL record to node {}: {}", node.peer_id, e);
1918            }
1919        }
1920        
1921        Ok(())
1922    }
1923    
1924    /// Generate a three-word address for an inbox
1925    fn generate_three_word_address(&self, inbox_id: &str) -> String {
1926        use crate::bootstrap::words::WordEncoder;
1927        
1928        let encoder = WordEncoder::new();
1929        let fake_multiaddr = format!("/inbox/{}/dht", inbox_id).parse().unwrap_or_else(|_| {
1930            "/ip6/::1/udp/9000/quic".parse().unwrap()
1931        });
1932        
1933        if let Ok(words) = encoder.encode_multiaddr(&fake_multiaddr) {
1934            words.to_string()
1935        } else {
1936            format!("inbox.{}.messages", inbox_id.chars().take(8).collect::<String>())
1937        }
1938    }
1939    
1940    /// Add a node to the DHT routing table
1941    pub async fn add_node(&self, node: DHTNode) -> Result<()> {
1942        self.routing_table.add_node(node).await
1943    }
1944    
1945    /// Remove a node from the DHT routing table
1946    pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
1947        self.routing_table.remove_node(peer_id).await
1948    }
1949}
1950
1951/// DHT statistics
1952#[derive(Debug, Clone)]
1953pub struct DHTStats {
1954    /// Local node ID
1955    pub local_id: Key,
1956    /// Total nodes in routing table
1957    pub total_nodes: usize,
1958    /// Number of active buckets
1959    pub active_buckets: usize,
1960    /// Number of stored records
1961    pub stored_records: usize,
1962    /// Number of expired records
1963    pub expired_records: usize,
1964}
1965
1966/// Consistency check report
1967#[derive(Debug, Clone)]
1968pub struct ConsistencyReport {
1969    /// Key being checked
1970    pub key: Key,
1971    /// Number of nodes queried
1972    pub nodes_queried: usize,
1973    /// Number of nodes that responded
1974    pub nodes_responded: usize,
1975    /// Number of records found
1976    pub records_found: usize,
1977    /// Whether all records are consistent
1978    pub consistent: bool,
1979    /// The canonical record (if any)
1980    pub canonical_record: Option<Record>,
1981    /// Conflicting records found
1982    pub conflicts: Vec<(PeerId, Record)>,
1983    /// Expected replication factor
1984    pub replication_factor: usize,
1985}
1986
1987/// Result of a repair operation
1988#[derive(Debug, Clone)]
1989pub struct RepairResult {
1990    /// Key that was repaired
1991    pub key: Key,
1992    /// Whether repairs were needed
1993    pub repairs_needed: bool,
1994    /// Number of repair attempts made
1995    pub repairs_attempted: usize,
1996    /// Number of successful repairs
1997    pub repairs_successful: usize,
1998    /// Final state description
1999    pub final_state: String,
2000}
2001
2002impl LookupState {
2003    /// Create a new lookup state
2004    pub fn new(target: Key, alpha: usize) -> Self {
2005        Self {
2006            target,
2007            queried: HashMap::new(),
2008            to_query: VecDeque::new(),
2009            closest: Vec::new(),
2010            started_at: Instant::now(),
2011            alpha,
2012        }
2013    }
2014    
2015    /// Add nodes to query
2016    pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
2017        for node in nodes {
2018            if !self.queried.contains_key(&node.peer_id) {
2019                self.to_query.push_back(node);
2020            }
2021        }
2022        
2023        // Sort by distance to target
2024        let target = &self.target;
2025        self.to_query.make_contiguous().sort_by_key(|node| {
2026            node.key().distance(target).as_bytes().to_vec()
2027        });
2028    }
2029    
2030    /// Get next nodes to query
2031    pub fn next_nodes(&mut self) -> Vec<DHTNode> {
2032        let mut nodes = Vec::new();
2033        for _ in 0..self.alpha {
2034            if let Some(node) = self.to_query.pop_front() {
2035                self.queried.insert(node.peer_id.clone(), Instant::now());
2036                nodes.push(node);
2037            } else {
2038                break;
2039            }
2040        }
2041        nodes
2042    }
2043    
2044    /// Check if lookup is complete
2045    pub fn is_complete(&self) -> bool {
2046        self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
2047    }
2048}
2049
2050// ===============================
2051// INBOX SYSTEM DATA STRUCTURES
2052// ===============================
2053
2054/// Inbox metadata stored in DHT
2055#[derive(Debug, Clone, Serialize, Deserialize)]
2056pub struct InboxMetadata {
2057    pub inbox_id: String,
2058    pub owner: PeerId,
2059    pub created_at: SystemTime,
2060    pub message_count: usize,
2061    pub max_messages: usize,
2062    pub is_public: bool,
2063    pub access_keys: Vec<PeerId>,
2064}
2065
2066/// Inbox message stored in DHT
2067#[derive(Debug, Clone, Serialize, Deserialize)]
2068pub struct InboxMessage {
2069    pub id: String,
2070    pub sender: PeerId,
2071    pub recipient_inbox: String,
2072    pub content: String,
2073    pub message_type: String,
2074    pub timestamp: SystemTime,
2075    pub attachments: Vec<MessageAttachment>,
2076}
2077
2078/// Message attachment metadata
2079#[derive(Debug, Clone, Serialize, Deserialize)]
2080pub struct MessageAttachment {
2081    pub filename: String,
2082    pub content_type: String,
2083    pub size: u64,
2084    pub hash: String,
2085}
2086
2087/// Message index for efficient inbox querying
2088#[derive(Debug, Clone, Serialize, Deserialize)]
2089pub struct InboxMessageIndex {
2090    pub inbox_id: String,
2091    pub messages: Vec<MessageRef>,
2092    pub last_updated: SystemTime,
2093}
2094
2095/// Reference to a message in the index
2096#[derive(Debug, Clone, Serialize, Deserialize)]
2097pub struct MessageRef {
2098    pub message_id: String,
2099    pub sender: PeerId,
2100    pub timestamp: SystemTime,
2101    pub message_type: String,
2102}
2103
2104/// Inbox information returned to users
2105#[derive(Debug, Clone, Serialize, Deserialize)]
2106pub struct InboxInfo {
2107    pub inbox_id: String,
2108    pub three_word_address: String,
2109    pub owner: PeerId,
2110    pub created_at: SystemTime,
2111    pub message_count: usize,
2112    pub is_accessible: bool,
2113}
2114
2115// ===============================
2116// KEY EXTENSIONS FOR INBOX SYSTEM
2117// ===============================
2118
2119impl Key {
2120    /// Create a key for inbox metadata
2121    pub fn from_inbox_id(inbox_id: &str) -> Self {
2122        let mut hasher = Sha256::new();
2123        hasher.update(b"INBOX_METADATA:");
2124        hasher.update(inbox_id.as_bytes());
2125        let hash = hasher.finalize();
2126        Key { hash: hash.into() }
2127    }
2128    
2129    /// Create a key for inbox message index
2130    pub fn from_inbox_index(inbox_id: &str) -> Self {
2131        let mut hasher = Sha256::new();
2132        hasher.update(b"INBOX_INDEX:");
2133        hasher.update(inbox_id.as_bytes());
2134        let hash = hasher.finalize();
2135        Key { hash: hash.into() }
2136    }
2137    
2138    /// Create a key for a specific message in an inbox
2139    pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
2140        let mut hasher = Sha256::new();
2141        hasher.update(b"INBOX_MESSAGE:");
2142        hasher.update(inbox_id.as_bytes());
2143        hasher.update(b":");
2144        hasher.update(message_id.as_bytes());
2145        let hash = hasher.finalize();
2146        Key { hash: hash.into() }
2147    }
2148}
2149
2150// Add hex dependency for key display
2151// This would need to be added to Cargo.toml dependencies