1use crate::error::{P2PError, P2pResult as Result};
24use crate::validation::{
25 Validate, ValidationContext, validate_dht_key, validate_dht_value, validate_peer_id,
26};
27use crate::{Multiaddr, PeerId};
28use futures;
29use serde::{Deserialize, Serialize};
30use sha2::{Digest, Sha256};
31use std::collections::{HashMap, VecDeque};
32use std::fmt;
33use std::time::{Duration, Instant, SystemTime};
34use tokio::sync::RwLock;
35use tracing::{debug, info};
36
37pub mod skademlia;
39
40pub mod ipv6_identity;
42
43pub mod enhanced_storage;
45
46pub mod optimized_storage;
48
49#[allow(unused)]
51pub mod content_addressing;
52#[allow(unused)]
53pub mod core_engine;
54#[allow(unused)]
55pub mod network_integration;
56#[allow(unused)]
57pub mod reed_solomon;
58#[allow(unused)]
59pub mod witness;
60
61pub mod client;
63
64pub mod rsps_integration;
66
67pub mod geographic_network_integration;
69pub mod geographic_routing;
70pub mod geographic_routing_table;
71pub mod latency_aware_selection;
72
73#[cfg(test)]
75mod content_addressing_test;
76#[cfg(test)]
77mod core_engine_test;
78#[cfg(test)]
79mod reed_solomon_test;
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct DHTConfig {
84 pub replication_factor: usize,
86 pub bucket_size: usize,
88 pub alpha: usize,
90 pub record_ttl: Duration,
92 pub bucket_refresh_interval: Duration,
94 pub republish_interval: Duration,
96 pub max_distance: u8,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
102pub struct Key {
103 hash: [u8; 32],
105}
106
107impl Validate for Key {
108 fn validate(&self, ctx: &ValidationContext) -> Result<()> {
109 validate_dht_key(&self.hash, ctx)?;
111 Ok(())
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct Record {
118 pub key: Key,
120 pub value: Vec<u8>,
122 pub publisher: PeerId,
124 pub created_at: SystemTime,
126 pub expires_at: SystemTime,
128 pub signature: Option<Vec<u8>>,
130}
131
132impl Validate for Record {
133 fn validate(&self, ctx: &ValidationContext) -> Result<()> {
134 self.key.validate(ctx)?;
136
137 validate_dht_value(&self.value, ctx)?;
139
140 validate_peer_id(&self.publisher)?;
142
143 let now = SystemTime::now();
145 if self.created_at > now {
146 return Err(P2PError::validation(
147 "Record creation time is in the future",
148 ));
149 }
150
151 if self.expires_at < self.created_at {
152 return Err(P2PError::validation(
153 "Record expiration time is before creation time",
154 ));
155 }
156
157 if let Some(sig) = &self.signature
159 && (sig.is_empty() || sig.len() > 512)
160 {
161 return Err(P2PError::validation("Invalid signature size"));
162 }
163
164 Ok(())
165 }
166}
167
168#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
170pub struct DHTNode {
171 pub peer_id: PeerId,
173 pub addresses: Vec<Multiaddr>,
175 #[serde(with = "instant_as_secs")]
177 pub last_seen: Instant,
178 pub distance: Key,
180 pub is_connected: bool,
182}
183
184mod instant_as_secs {
186 use serde::{Deserialize, Deserializer, Serialize, Serializer};
187 use std::time::Instant;
188
189 pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
190 where
191 S: Serializer,
192 {
193 instant.elapsed().as_secs().serialize(serializer)
195 }
196
197 pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
198 where
199 D: Deserializer<'de>,
200 {
201 let secs = u64::deserialize(deserializer)?;
202 Ok(Instant::now() - std::time::Duration::from_secs(secs))
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct SerializableDHTNode {
210 pub peer_id: PeerId,
212 pub addresses: Vec<Multiaddr>,
214 pub last_seen_secs: u64,
216 pub distance: Key,
218 pub is_connected: bool,
220}
221
222#[derive(Debug)]
224struct KBucket {
225 nodes: VecDeque<DHTNode>,
227 capacity: usize,
229 last_refresh: Instant,
231}
232
233#[derive(Debug)]
235pub struct RoutingTable {
236 local_id: Key,
238 buckets: Vec<RwLock<KBucket>>,
240 #[allow(dead_code)]
242 config: DHTConfig,
243}
244
245#[derive(Debug)]
247pub struct DHTStorage {
248 optimized_storage: optimized_storage::OptimizedDHTStorage,
250 #[allow(dead_code)]
252 config: DHTConfig,
253}
254
255#[derive(Debug)]
257pub struct DHT {
258 local_id: Key,
260 routing_table: RoutingTable,
262 storage: DHTStorage,
264 #[allow(dead_code)]
266 config: DHTConfig,
267 pub skademlia: Option<skademlia::SKademlia>,
269 pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
275pub enum DHTQuery {
276 FindNode {
278 key: Key,
280 requester: PeerId,
282 },
283 FindValue {
285 key: Key,
287 requester: PeerId,
289 },
290 Store {
292 record: Record,
294 requester: PeerId,
296 },
297 Ping {
299 requester: PeerId,
301 },
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
306pub enum DHTResponse {
307 Nodes {
309 nodes: Vec<SerializableDHTNode>,
311 },
312 Value {
314 record: Record,
316 },
317 Stored {
319 success: bool,
321 },
322 Pong {
324 responder: PeerId,
326 },
327 Error {
329 message: String,
331 },
332}
333
334#[derive(Debug)]
336pub struct LookupState {
337 pub target: Key,
339 pub queried: HashMap<PeerId, Instant>,
341 pub to_query: VecDeque<DHTNode>,
343 pub closest: Vec<DHTNode>,
345 pub started_at: Instant,
347 pub alpha: usize,
349}
350
351impl Default for DHTConfig {
352 fn default() -> Self {
353 Self {
354 replication_factor: 20, bucket_size: 20, alpha: 3, record_ttl: Duration::from_secs(24 * 60 * 60), bucket_refresh_interval: Duration::from_secs(60 * 60), republish_interval: Duration::from_secs(24 * 60 * 60), max_distance: 160, }
362 }
363}
364
365impl Key {
366 pub fn new(data: &[u8]) -> Self {
368 let mut hasher = Sha256::new();
369 hasher.update(data);
370 let hash: [u8; 32] = hasher.finalize().into();
371 Self { hash }
372 }
373
374 pub fn from_hash(hash: [u8; 32]) -> Self {
376 Self { hash }
377 }
378
379 pub fn random() -> Self {
381 use rand::RngCore;
382 let mut hash = [0u8; 32];
383 rand::thread_rng().fill_bytes(&mut hash);
384 Self { hash }
385 }
386
387 pub fn as_bytes(&self) -> &[u8] {
389 &self.hash
390 }
391
392 pub fn hash_bytes(&self) -> [u8; 32] {
394 self.hash
395 }
396
397 pub fn to_hex(&self) -> String {
399 hex::encode(self.hash)
400 }
401
402 pub fn distance(&self, other: &Key) -> Key {
404 let mut result = [0u8; 32];
405 for (i, out) in result.iter_mut().enumerate() {
406 *out = self.hash[i] ^ other.hash[i];
407 }
408 Key { hash: result }
409 }
410
411 pub fn leading_zeros(&self) -> u32 {
413 for (i, &byte) in self.hash.iter().enumerate() {
414 if byte != 0 {
415 return (i * 8) as u32 + byte.leading_zeros();
416 }
417 }
418 256 }
420
421 pub fn bucket_index(&self, local_id: &Key) -> usize {
423 let distance = self.distance(local_id);
424 let leading_zeros = distance.leading_zeros();
425 if leading_zeros >= 255 {
426 255 } else {
428 (255 - leading_zeros) as usize
429 }
430 }
431}
432
433impl fmt::Display for Key {
434 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435 write!(f, "{}", hex::encode(&self.hash[..8]))
436 }
437}
438
439impl Record {
440 pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
442 let now = SystemTime::now();
443 let ttl = Duration::from_secs(24 * 60 * 60); Self {
446 key,
447 value,
448 publisher,
449 created_at: now,
450 expires_at: now + ttl,
451 signature: None,
452 }
453 }
454
455 pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
457 let now = SystemTime::now();
458
459 Self {
460 key,
461 value,
462 publisher,
463 created_at: now,
464 expires_at: now + ttl,
465 signature: None,
466 }
467 }
468
469 pub fn is_expired(&self) -> bool {
471 SystemTime::now() > self.expires_at
472 }
473
474 pub fn age(&self) -> Duration {
476 SystemTime::now()
477 .duration_since(self.created_at)
478 .unwrap_or(Duration::ZERO)
479 }
480
481 pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
483 self.signature = Some(vec![0u8; 64]); Ok(())
487 }
488
489 pub fn verify(&self, _public_key: &[u8]) -> bool {
491 self.signature.is_some()
494 }
495}
496
497impl DHTNode {
498 pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
500 let node_key = Key::new(peer_id.as_bytes());
501 let distance = node_key.distance(local_id);
502
503 Self {
504 peer_id,
505 addresses,
506 last_seen: Instant::now(),
507 distance,
508 is_connected: false,
509 }
510 }
511
512 pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
514 Self {
515 peer_id,
516 addresses,
517 last_seen: Instant::now(),
518 distance: key,
519 is_connected: false,
520 }
521 }
522
523 pub fn touch(&mut self) {
525 self.last_seen = Instant::now();
526 }
527
528 pub fn is_stale(&self, timeout: Duration) -> bool {
530 self.last_seen.elapsed() > timeout
531 }
532
533 pub fn key(&self) -> Key {
535 Key::new(self.peer_id.as_bytes())
536 }
537
538 pub fn to_serializable(&self) -> SerializableDHTNode {
540 SerializableDHTNode {
541 peer_id: self.peer_id.clone(),
542 addresses: self.addresses.clone(),
543 last_seen_secs: self.last_seen.elapsed().as_secs(),
544 distance: self.distance.clone(),
545 is_connected: self.is_connected,
546 }
547 }
548}
549
550impl SerializableDHTNode {
551 pub fn to_dht_node(&self) -> DHTNode {
553 DHTNode {
554 peer_id: self.peer_id.clone(),
555 addresses: self.addresses.clone(),
556 last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
557 distance: self.distance.clone(),
558 is_connected: self.is_connected,
559 }
560 }
561}
562
563impl KBucket {
564 fn new(capacity: usize) -> Self {
566 Self {
567 nodes: VecDeque::new(),
568 capacity,
569 last_refresh: Instant::now(),
570 }
571 }
572
573 fn add_node(&mut self, node: DHTNode) -> bool {
575 if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
577 let updated_node = if let Some(existing_node) = self.nodes.get_mut(pos) {
580 existing_node.touch();
581 existing_node.is_connected = node.is_connected;
582 existing_node.clone()
583 } else {
584 return false; };
586 self.nodes.remove(pos);
588 self.nodes.push_front(updated_node);
589 return true;
590 }
591 if self.nodes.len() < self.capacity {
592 self.nodes.push_front(node);
594 true
595 } else {
596 false
598 }
599 }
600
601 fn remove_node(&mut self, peer_id: &PeerId) -> bool {
603 if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
604 self.nodes.remove(pos);
605 true
606 } else {
607 false
608 }
609 }
610
611 fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
613 let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
614 nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
615 nodes.into_iter().take(count).collect()
616 }
617
618 fn needs_refresh(&self, interval: Duration) -> bool {
620 self.last_refresh.elapsed() > interval
621 }
622}
623
624impl RoutingTable {
625 pub fn new(local_id: Key, config: DHTConfig) -> Self {
627 let mut buckets = Vec::new();
628 for _ in 0..256 {
629 buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
630 }
631
632 Self {
633 local_id,
634 buckets,
635 config,
636 }
637 }
638
639 pub async fn add_node(&self, node: DHTNode) -> Result<()> {
641 let bucket_index = node.key().bucket_index(&self.local_id);
642 let mut bucket = self.buckets[bucket_index].write().await;
643
644 if bucket.add_node(node.clone()) {
645 debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
646 } else {
647 debug!(
648 "Bucket {} full, could not add node {}",
649 bucket_index, node.peer_id
650 );
651 }
652
653 Ok(())
654 }
655
656 pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
658 let node_key = Key::new(peer_id.as_bytes());
659 let bucket_index = node_key.bucket_index(&self.local_id);
660 let mut bucket = self.buckets[bucket_index].write().await;
661
662 if bucket.remove_node(peer_id) {
663 debug!("Removed node {} from bucket {}", peer_id, bucket_index);
664 }
665
666 Ok(())
667 }
668
669 pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
671 let mut all_nodes = Vec::new();
672
673 let target_bucket = target.bucket_index(&self.local_id);
675
676 let mut checked = vec![false; 256];
678 let mut to_check = VecDeque::new();
679 to_check.push_back(target_bucket);
680
681 while let Some(bucket_idx) = to_check.pop_front() {
682 if checked[bucket_idx] {
683 continue;
684 }
685 checked[bucket_idx] = true;
686
687 let bucket = self.buckets[bucket_idx].read().await;
688 all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
689
690 if bucket_idx > 0 && !checked[bucket_idx - 1] {
692 to_check.push_back(bucket_idx - 1);
693 }
694 if bucket_idx < 255 && !checked[bucket_idx + 1] {
695 to_check.push_back(bucket_idx + 1);
696 }
697
698 if all_nodes.len() >= count * 2 {
700 break;
701 }
702 }
703
704 all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
706 all_nodes.into_iter().take(count).collect()
707 }
708
709 pub async fn stats(&self) -> (usize, usize) {
711 let mut total_nodes = 0;
712 let mut active_buckets = 0;
713
714 for bucket in &self.buckets {
715 let bucket_guard = bucket.read().await;
716 let node_count = bucket_guard.nodes.len();
717 total_nodes += node_count;
718 if node_count > 0 {
719 active_buckets += 1;
720 }
721 }
722
723 (total_nodes, active_buckets)
724 }
725}
726
727impl DHTStorage {
728 pub fn new(config: DHTConfig) -> Self {
730 Self {
731 optimized_storage: optimized_storage::OptimizedDHTStorage::new(config.clone()),
732 config,
733 }
734 }
735
736 pub async fn store(&self, record: Record) -> Result<()> {
738 self.optimized_storage.store(record).await
739 }
740
741 pub async fn get(&self, key: &Key) -> Option<Record> {
743 self.optimized_storage.get(key).await
744 }
745
746 pub async fn cleanup_expired(&self) -> usize {
748 self.optimized_storage.cleanup_expired().await.unwrap_or(0)
749 }
750
751 pub async fn all_records(&self) -> Vec<Record> {
753 let stats = self.optimized_storage.get_stats().await;
756
757 debug!(
761 "all_records() called on optimized storage with {} records - returning empty for performance",
762 stats.total_records
763 );
764 Vec::new()
765 }
766
767 pub async fn stats(&self) -> (usize, usize) {
769 let stats = self.optimized_storage.get_stats().await;
770 (stats.total_records, stats.expired_records)
771 }
772
773 pub async fn get_records_by_publisher(
775 &self,
776 publisher: &str,
777 limit: Option<usize>,
778 ) -> Vec<Record> {
779 self.optimized_storage
780 .get_records_by_publisher(publisher, limit)
781 .await
782 }
783
784 pub async fn get_expiring_records(&self, within: Duration) -> Vec<Record> {
786 self.optimized_storage.get_expiring_records(within).await
787 }
788
789 pub async fn is_near_memory_limit(&self) -> bool {
791 self.optimized_storage.is_near_memory_limit().await
792 }
793
794 pub async fn force_eviction(&self, target_count: usize) -> Result<usize> {
796 self.optimized_storage.force_eviction(target_count).await
797 }
798
799 pub async fn get_detailed_stats(&self) -> optimized_storage::StorageStats {
801 self.optimized_storage.get_stats().await
802 }
803}
804
805impl DHT {
806 pub fn new(local_id: Key, config: DHTConfig) -> Self {
808 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
809 let storage = DHTStorage::new(config.clone());
810
811 Self {
812 local_id,
813 routing_table,
814 storage,
815 config,
816 skademlia: None,
817 ipv6_identity_manager: None,
818 }
819 }
820
821 pub fn new_with_security(
823 local_id: Key,
824 config: DHTConfig,
825 skademlia_config: skademlia::SKademliaConfig,
826 ) -> Self {
827 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
828 let storage = DHTStorage::new(config.clone());
829 let skademlia = skademlia::SKademlia::new(skademlia_config);
830
831 Self {
832 local_id,
833 routing_table,
834 storage,
835 config,
836 skademlia: Some(skademlia),
837 ipv6_identity_manager: None,
838 }
839 }
840
841 pub fn new_with_ipv6_security(
843 local_id: Key,
844 config: DHTConfig,
845 skademlia_config: skademlia::SKademliaConfig,
846 ipv6_config: ipv6_identity::IPv6DHTConfig,
847 ) -> Self {
848 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
849 let storage = DHTStorage::new(config.clone());
850 let skademlia = skademlia::SKademlia::new(skademlia_config);
851 let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
852
853 Self {
854 local_id,
855 routing_table,
856 storage,
857 config,
858 skademlia: Some(skademlia),
859 ipv6_identity_manager: Some(ipv6_identity_manager),
860 }
861 }
862
863 pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
865 if let Some(ref mut manager) = self.ipv6_identity_manager {
866 self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
868 manager.set_local_identity(identity)?;
869 info!("Local IPv6 identity set and DHT key updated");
870 Ok(())
871 } else {
872 Err(P2PError::Security(
873 crate::error::SecurityError::AuthorizationFailed(
874 "IPv6 identity manager not enabled".into(),
875 ),
876 ))
877 }
878 }
879
880 pub async fn add_bootstrap_node(
882 &self,
883 peer_id: PeerId,
884 addresses: Vec<Multiaddr>,
885 ) -> Result<()> {
886 let node = DHTNode::new(peer_id, addresses, &self.local_id);
887 self.routing_table.add_node(node).await
888 }
889
890 pub async fn add_ipv6_node(
892 &mut self,
893 peer_id: PeerId,
894 addresses: Vec<Multiaddr>,
895 ipv6_identity: crate::security::IPv6NodeID,
896 ) -> Result<()> {
897 if let Some(ref mut manager) = self.ipv6_identity_manager {
898 let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
900 let security_event = manager
901 .validate_node_join(&base_node, &ipv6_identity)
902 .await?;
903
904 match security_event {
905 ipv6_identity::IPv6SecurityEvent::NodeJoined {
906 verification_confidence,
907 ..
908 } => {
909 let ipv6_node = manager
911 .enhance_dht_node(base_node.clone(), ipv6_identity)
912 .await?;
913
914 let mut enhanced_base_node = base_node;
916 enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
917
918 self.routing_table.add_node(enhanced_base_node).await?;
920
921 info!(
922 "Added IPv6-verified node {} with confidence {:.2}",
923 peer_id, verification_confidence
924 );
925 Ok(())
926 }
927 ipv6_identity::IPv6SecurityEvent::VerificationFailed { .. } => Err(
928 P2PError::Security(crate::error::SecurityError::AuthenticationFailed),
929 ),
930 ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => Err(
931 P2PError::Security(crate::error::SecurityError::AuthorizationFailed(
932 format!("IP diversity violation: {subnet_type}").into(),
933 )),
934 ),
935 ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => Err(
936 P2PError::Security(crate::error::SecurityError::AuthorizationFailed(
937 format!("Node banned: {reason}").into(),
938 )),
939 ),
940 _ => Err(P2PError::Security(
941 crate::error::SecurityError::AuthenticationFailed,
942 )),
943 }
944 } else {
945 self.add_bootstrap_node(peer_id, addresses).await
947 }
948 }
949
950 pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
952 self.routing_table.remove_node(peer_id).await?;
954
955 if let Some(ref mut manager) = self.ipv6_identity_manager {
957 manager.remove_node(peer_id);
958 }
959
960 Ok(())
961 }
962
963 pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
965 if let Some(ref manager) = self.ipv6_identity_manager {
966 manager.is_node_banned(peer_id)
967 } else {
968 false
969 }
970 }
971
972 pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
974 let record = Record::new(key.clone(), value, self.local_id.to_hex());
975
976 self.storage.store(record.clone()).await?;
978
979 let closest_nodes = self
981 .routing_table
982 .closest_nodes(&key, self.config.replication_factor)
983 .await;
984
985 info!(
986 "Storing record with key {} on {} nodes",
987 key.to_hex(),
988 closest_nodes.len()
989 );
990
991 if closest_nodes.is_empty() {
993 info!("No other nodes available for replication, storing only locally");
994 return Ok(());
995 }
996
997 let mut successful_replications = 0;
999 for node in &closest_nodes {
1000 if self.replicate_record(&record, node).await.is_ok() {
1001 successful_replications += 1;
1002 }
1003 }
1004
1005 info!(
1006 "Successfully replicated record {} to {}/{} nodes",
1007 key.to_hex(),
1008 successful_replications,
1009 closest_nodes.len()
1010 );
1011
1012 let required_replications = if closest_nodes.len() == 1 {
1014 1
1015 } else {
1016 std::cmp::max(1, closest_nodes.len() / 2)
1017 };
1018
1019 if successful_replications >= required_replications {
1020 Ok(())
1021 } else {
1022 Err(P2PError::Dht(crate::error::DhtError::ReplicationFailed(
1023 format!(
1024 "Insufficient replication for key {}: only {}/{} nodes stored the record (required: {})",
1025 key,
1026 successful_replications, closest_nodes.len(), required_replications
1027 ).into()
1028 )))
1029 }
1030 }
1031
1032 pub async fn get(&self, key: &Key) -> Option<Record> {
1034 if let Some(record) = self.storage.get(key).await
1036 && !record.is_expired()
1037 {
1038 return Some(record);
1039 }
1040
1041 if let Some(record) = self.iterative_find_value(key).await {
1043 if self.storage.store(record.clone()).await.is_ok() {
1045 debug!("Cached retrieved record with key {}", key.to_hex());
1046 }
1047 return Some(record);
1048 }
1049
1050 None
1051 }
1052
1053 pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
1055 self.routing_table
1056 .closest_nodes(key, self.config.replication_factor)
1057 .await
1058 }
1059
1060 pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
1062 match query {
1063 DHTQuery::FindNode { key, requester: _ } => {
1064 let nodes = self.find_node(&key).await;
1065 let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
1066 DHTResponse::Nodes {
1067 nodes: serializable_nodes,
1068 }
1069 }
1070 DHTQuery::FindValue { key, requester: _ } => {
1071 if let Some(record) = self.storage.get(&key).await
1072 && !record.is_expired()
1073 {
1074 return DHTResponse::Value { record };
1075 }
1076 let nodes = self.find_node(&key).await;
1077 let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
1078 DHTResponse::Nodes {
1079 nodes: serializable_nodes,
1080 }
1081 }
1082 DHTQuery::Store {
1083 record,
1084 requester: _,
1085 } => match self.storage.store(record).await {
1086 Ok(()) => DHTResponse::Stored { success: true },
1087 Err(_) => DHTResponse::Stored { success: false },
1088 },
1089 DHTQuery::Ping { requester: _ } => DHTResponse::Pong {
1090 responder: self.local_id.to_hex(),
1091 },
1092 }
1093 }
1094
1095 pub async fn stats(&self) -> DHTStats {
1097 let (total_nodes, active_buckets) = self.routing_table.stats().await;
1098 let (stored_records, expired_records) = self.storage.stats().await;
1099
1100 DHTStats {
1101 local_id: self.local_id.clone(),
1102 total_nodes,
1103 active_buckets,
1104 stored_records,
1105 expired_records,
1106 }
1107 }
1108
1109 pub async fn maintenance(&self) -> Result<()> {
1111 let expired_count = self.storage.cleanup_expired().await;
1113 if expired_count > 0 {
1114 debug!("Cleaned up {} expired records", expired_count);
1115 }
1116
1117 self.republish_records().await?;
1119
1120 self.refresh_buckets().await?;
1122
1123 Ok(())
1126 }
1127
1128 pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1130 if let Some(record) = self.storage.get(key).await
1132 && !record.is_expired()
1133 {
1134 return Ok(Some(record));
1135 }
1136
1137 let (enable_distance_verification, disjoint_path_count, min_reputation) =
1139 if let Some(ref skademlia) = self.skademlia {
1140 (
1141 skademlia.config.enable_distance_verification,
1142 skademlia.config.disjoint_path_count,
1143 skademlia.config.min_routing_reputation,
1144 )
1145 } else {
1146 return Ok(self.get(key).await);
1148 };
1149
1150 let initial_nodes = self
1152 .routing_table
1153 .closest_nodes(key, disjoint_path_count * 3)
1154 .await;
1155
1156 if initial_nodes.is_empty() {
1157 return Ok(None);
1158 }
1159
1160 let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
1162 skademlia.secure_lookup(key.clone(), initial_nodes).await?
1163 } else {
1164 return Ok(None);
1165 };
1166
1167 for node in &secure_nodes {
1169 if enable_distance_verification {
1171 let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1172
1173 let consensus = if let Some(ref mut skademlia) = self.skademlia {
1174 skademlia
1175 .verify_distance_consensus(&node.peer_id, key, witness_nodes)
1176 .await?
1177 } else {
1178 continue;
1179 };
1180
1181 if consensus.confidence < min_reputation {
1182 debug!(
1183 "Skipping node {} due to low distance verification confidence",
1184 node.peer_id
1185 );
1186 continue;
1187 }
1188 }
1189
1190 let query = DHTQuery::FindValue {
1191 key: key.clone(),
1192 requester: self.local_id.to_hex(),
1193 };
1194 if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1195 let _ = self.storage.store(record.clone()).await;
1197 return Ok(Some(record));
1198 }
1199 }
1200
1201 Ok(None)
1202 }
1203
1204 pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1206 let record = Record::new(key.clone(), value, self.local_id.to_hex());
1207
1208 self.storage.store(record.clone()).await?;
1210
1211 let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1213 let candidate_nodes = self
1215 .routing_table
1216 .closest_nodes(&key, self.config.replication_factor * 2)
1217 .await;
1218
1219 skademlia.select_secure_nodes(&candidate_nodes, &key, self.config.replication_factor)
1221 } else {
1222 self.routing_table
1224 .closest_nodes(&key, self.config.replication_factor)
1225 .await
1226 };
1227
1228 info!(
1229 "Storing record with key {} on {} secure nodes",
1230 key.to_hex(),
1231 secure_nodes.len()
1232 );
1233
1234 let mut replication_results = Vec::new();
1236 let mut successful_replications = 0;
1237
1238 for node in &secure_nodes {
1239 let success = self.replicate_record(&record, node).await.is_ok();
1240 replication_results.push((node.peer_id.clone(), success));
1241 if success {
1242 successful_replications += 1;
1243 }
1244 }
1245
1246 if let Some(ref mut skademlia) = self.skademlia {
1248 for (peer_id, success) in replication_results {
1249 skademlia.reputation_manager.update_reputation(
1250 &peer_id,
1251 success,
1252 Duration::from_millis(100),
1253 );
1254 }
1255 }
1256
1257 if successful_replications > 0 {
1258 info!(
1259 "Successfully replicated to {}/{} secure nodes",
1260 successful_replications,
1261 secure_nodes.len()
1262 );
1263 }
1264
1265 Ok(())
1266 }
1267
1268 pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
1270 if let Some(ref mut skademlia) = self.skademlia {
1271 let nodes = self
1272 .routing_table
1273 .closest_nodes(&key, skademlia.config.sibling_list_size)
1274 .await;
1275 skademlia.update_sibling_list(key, nodes);
1276 }
1277 Ok(())
1278 }
1279
1280 pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
1282 if let Some(ref skademlia) = self.skademlia {
1283 let sample_key = Key::random();
1285 let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
1286 skademlia.validate_routing_consistency(&sample_nodes).await
1287 } else {
1288 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1289 "S/Kademlia not enabled".to_string().into(),
1290 )))
1291 }
1292 }
1293
1294 pub fn create_distance_challenge(
1296 &mut self,
1297 peer_id: &PeerId,
1298 key: &Key,
1299 ) -> Option<skademlia::DistanceChallenge> {
1300 self.skademlia
1301 .as_mut()
1302 .map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
1303 }
1304
1305 pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
1307 if let Some(ref skademlia) = self.skademlia {
1308 skademlia.verify_distance_proof(proof)
1309 } else {
1310 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1311 "S/Kademlia not enabled".to_string().into(),
1312 )))
1313 }
1314 }
1315
1316 #[allow(dead_code)]
1318 async fn verify_node_distances(
1319 &self,
1320 nodes: &[DHTNode],
1321 _target_key: &Key,
1322 min_reputation: f64,
1323 ) -> Result<Vec<DHTNode>> {
1324 let mut verified_nodes = Vec::new();
1325
1326 for node in nodes {
1327 let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1328
1329 if witness_nodes.len() >= 2 {
1331 let consensus_confidence = 0.8; if consensus_confidence >= min_reputation {
1335 verified_nodes.push(node.clone());
1336 } else {
1337 debug!(
1338 "Node {} failed distance verification with confidence {}",
1339 node.peer_id, consensus_confidence
1340 );
1341 }
1342 }
1343 }
1344
1345 Ok(verified_nodes)
1346 }
1347
1348 async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
1350 let target_key = Key::new(target_peer.as_bytes());
1352 let candidate_nodes = self
1353 .routing_table
1354 .closest_nodes(&target_key, count * 2)
1355 .await;
1356
1357 candidate_nodes
1358 .into_iter()
1359 .filter(|node| node.peer_id != *target_peer)
1360 .take(count)
1361 .map(|node| node.peer_id)
1362 .collect()
1363 }
1364
1365 pub fn create_enhanced_distance_challenge(
1367 &mut self,
1368 peer_id: &PeerId,
1369 key: &Key,
1370 suspected_attack: bool,
1371 ) -> Option<skademlia::EnhancedDistanceChallenge> {
1372 self.skademlia.as_mut().map(|skademlia| {
1373 skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack)
1374 })
1375 }
1376
1377 pub async fn verify_distance_multi_round(
1379 &mut self,
1380 challenge: &skademlia::EnhancedDistanceChallenge,
1381 ) -> Result<bool> {
1382 if let Some(ref mut skademlia) = self.skademlia {
1383 skademlia.verify_distance_multi_round(challenge).await
1384 } else {
1385 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1386 "S/Kademlia not enabled".to_string().into(),
1387 )))
1388 }
1389 }
1390
1391 pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
1393 self.skademlia
1394 .as_mut()
1395 .map(|skademlia| skademlia.get_security_bucket(key))
1396 }
1397
1398 pub async fn add_trusted_node(
1400 &mut self,
1401 key: &Key,
1402 peer_id: PeerId,
1403 addresses: Vec<Multiaddr>,
1404 ) -> Result<()> {
1405 if let Some(ref mut skademlia) = self.skademlia {
1406 let node = DHTNode::new(peer_id, addresses, &self.local_id);
1407 let security_bucket = skademlia.get_security_bucket(key);
1408 security_bucket.add_trusted_node(node);
1409 }
1410 Ok(())
1411 }
1412
1413 pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1415 if self.is_node_banned(&self.local_id.to_hex()) {
1417 return Err(P2PError::Security(
1418 crate::error::SecurityError::AuthorizationFailed("Local node is banned".into()),
1419 ));
1420 }
1421
1422 if let Some(record) = self.storage.get(key).await
1424 && !record.is_expired()
1425 {
1426 return Ok(Some(record));
1427 }
1428
1429 let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
1431
1432 if verified_nodes.is_empty() {
1433 return self.secure_get(key).await;
1435 }
1436
1437 if let Some(ref mut skademlia) = self.skademlia {
1439 let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
1440
1441 for node in &secure_nodes {
1443 if let Some(ref manager) = self.ipv6_identity_manager {
1445 if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1446 if ipv6_node
1448 .needs_identity_refresh(manager.config.identity_refresh_interval)
1449 {
1450 debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
1451 continue;
1452 }
1453 } else {
1454 debug!(
1455 "Skipping node {} without verified IPv6 identity",
1456 node.peer_id
1457 );
1458 continue;
1459 }
1460 }
1461
1462 let query = DHTQuery::FindValue {
1463 key: key.clone(),
1464 requester: self.local_id.to_hex(),
1465 };
1466 if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1467 if let Some(ref mut manager) = self.ipv6_identity_manager {
1469 manager.update_ipv6_reputation(&node.peer_id, true);
1470 }
1471
1472 let _ = self.storage.store(record.clone()).await;
1474 return Ok(Some(record));
1475 }
1476 }
1477 }
1478
1479 Ok(None)
1480 }
1481
1482 pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1484 if self.is_node_banned(&self.local_id.to_hex()) {
1486 return Err(P2PError::Security(
1487 crate::error::SecurityError::AuthorizationFailed("Local node is banned".into()),
1488 ));
1489 }
1490
1491 let record = Record::new(key.clone(), value, self.local_id.to_hex());
1492
1493 self.storage.store(record.clone()).await?;
1495
1496 let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
1498
1499 let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1501 skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
1502 } else {
1503 verified_nodes
1504 .into_iter()
1505 .take(self.config.replication_factor)
1506 .collect()
1507 };
1508
1509 info!(
1510 "Storing record with key {} on {} IPv6-verified secure nodes",
1511 key.to_hex(),
1512 secure_nodes.len()
1513 );
1514
1515 let mut successful_replications = 0;
1517
1518 for node in &secure_nodes {
1519 let success = self.replicate_record(&record, node).await.is_ok();
1520
1521 if let Some(ref mut manager) = self.ipv6_identity_manager {
1523 manager.update_ipv6_reputation(&node.peer_id, success);
1524 }
1525
1526 if success {
1527 successful_replications += 1;
1528 }
1529 }
1530
1531 if successful_replications == 0 && !secure_nodes.is_empty() {
1532 return Err(P2PError::Dht(crate::error::DhtError::ReplicationFailed(
1533 format!("Failed to replicate key {} to any IPv6-verified nodes", key).into(),
1534 )));
1535 }
1536
1537 info!(
1538 "Successfully replicated to {}/{} IPv6-verified nodes",
1539 successful_replications,
1540 secure_nodes.len()
1541 );
1542 Ok(())
1543 }
1544
1545 async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
1547 let mut verified_nodes = Vec::new();
1548
1549 let candidate_nodes = self
1551 .routing_table
1552 .closest_nodes(key, self.config.replication_factor * 2)
1553 .await;
1554
1555 if let Some(ref manager) = self.ipv6_identity_manager {
1556 for node in candidate_nodes {
1557 if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1559 if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1561 if !manager.is_node_banned(&node.peer_id) {
1563 verified_nodes.push(node);
1564 }
1565 }
1566 }
1567 }
1568 } else {
1569 verified_nodes = candidate_nodes;
1571 }
1572
1573 Ok(verified_nodes)
1574 }
1575
1576 pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
1578 self.ipv6_identity_manager
1579 .as_ref()
1580 .map(|manager| manager.get_ipv6_diversity_stats())
1581 }
1582
1583 pub fn cleanup_ipv6_data(&mut self) {
1585 if let Some(ref mut manager) = self.ipv6_identity_manager {
1586 manager.cleanup_expired();
1587 }
1588 }
1589
1590 pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
1592 if let Some(ref mut manager) = self.ipv6_identity_manager {
1593 manager.ban_node(peer_id, reason);
1594 }
1595 }
1596
1597 pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
1599 self.ipv6_identity_manager
1600 .as_ref()
1601 .and_then(|manager| manager.get_local_identity())
1602 }
1603
1604 async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
1606 debug!(
1609 "Replicating record {} to node {}",
1610 record.key.to_hex(),
1611 node.peer_id
1612 );
1613
1614 tokio::time::sleep(Duration::from_millis(10)).await;
1616
1617 if rand::random::<f64>() < 0.95 {
1619 Ok(())
1620 } else {
1621 Err(P2PError::Network(
1622 crate::error::NetworkError::ProtocolError("Replication failed".to_string().into()),
1623 ))
1624 }
1625 }
1626
1627 async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
1629 debug!("Starting iterative lookup for key {}", key.to_hex());
1630
1631 let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1632
1633 let initial_nodes = self
1635 .routing_table
1636 .closest_nodes(key, self.config.alpha)
1637 .await;
1638 lookup_state.add_nodes(initial_nodes);
1639
1640 let mut iterations = 0;
1642 const MAX_ITERATIONS: usize = 10;
1643
1644 while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1645 let nodes_to_query = lookup_state.next_nodes();
1646 if nodes_to_query.is_empty() {
1647 break;
1648 }
1649
1650 let mut queries = Vec::new();
1652 for node in &nodes_to_query {
1653 let query = DHTQuery::FindValue {
1654 key: key.clone(),
1655 requester: self.local_id.to_hex(),
1656 };
1657 queries.push(self.simulate_query(node, query));
1658 }
1659
1660 for query_result in futures::future::join_all(queries).await {
1662 match query_result {
1663 Ok(DHTResponse::Value { record }) => {
1664 debug!(
1665 "Found value for key {} in iteration {}",
1666 key.to_hex(),
1667 iterations
1668 );
1669 return Some(record);
1670 }
1671 Ok(DHTResponse::Nodes { nodes }) => {
1672 let dht_nodes: Vec<DHTNode> =
1673 nodes.into_iter().map(|n| n.to_dht_node()).collect();
1674 lookup_state.add_nodes(dht_nodes);
1675 }
1676 _ => {
1677 debug!("Query failed during iterative lookup");
1679 }
1680 }
1681 }
1682
1683 iterations += 1;
1684 }
1685
1686 debug!(
1687 "Iterative lookup for key {} completed after {} iterations, value not found",
1688 key.to_hex(),
1689 iterations
1690 );
1691 None
1692 }
1693
1694 async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
1696 tokio::time::sleep(Duration::from_millis(50)).await;
1698
1699 Ok(self.handle_query(query).await)
1701 }
1702
1703 async fn republish_records(&self) -> Result<()> {
1705 let all_records = self.storage.all_records().await;
1706 let mut republished_count = 0;
1707
1708 for record in all_records {
1709 let remaining_ttl = record
1711 .expires_at
1712 .duration_since(SystemTime::now())
1713 .unwrap_or(Duration::ZERO);
1714
1715 if remaining_ttl < self.config.record_ttl / 4 {
1716 let closest_nodes = self
1718 .routing_table
1719 .closest_nodes(&record.key, self.config.replication_factor)
1720 .await;
1721
1722 for node in &closest_nodes {
1724 if self.replicate_record(&record, node).await.is_ok() {
1725 republished_count += 1;
1726 }
1727 }
1728 }
1729 }
1730
1731 if republished_count > 0 {
1732 debug!(
1733 "Republished {} records during maintenance",
1734 republished_count
1735 );
1736 }
1737
1738 Ok(())
1739 }
1740
1741 async fn refresh_buckets(&self) -> Result<()> {
1743 let mut refreshed_count = 0;
1744
1745 for bucket_index in 0..256 {
1746 let needs_refresh = {
1747 let bucket = self.routing_table.buckets[bucket_index].read().await;
1748 bucket.needs_refresh(self.config.bucket_refresh_interval)
1749 };
1750
1751 if needs_refresh {
1752 let target_key = self.generate_key_for_bucket(bucket_index);
1754 let _nodes = self.iterative_find_node(&target_key).await;
1755 refreshed_count += 1;
1756
1757 {
1759 let mut bucket = self.routing_table.buckets[bucket_index].write().await;
1760 bucket.last_refresh = Instant::now();
1761 }
1762 }
1763 }
1764
1765 if refreshed_count > 0 {
1766 debug!("Refreshed {} buckets during maintenance", refreshed_count);
1767 }
1768
1769 Ok(())
1770 }
1771
1772 fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
1774 let mut key_bytes = self.local_id.as_bytes().to_vec();
1775
1776 if bucket_index < 256 {
1778 let byte_index = (255 - bucket_index) / 8;
1779 let bit_index = (255 - bucket_index) % 8;
1780
1781 if byte_index < key_bytes.len() {
1782 key_bytes[byte_index] ^= 1 << bit_index;
1783 }
1784 }
1785
1786 let mut hash = [0u8; 32];
1787 hash.copy_from_slice(&key_bytes);
1788 Key::from_hash(hash)
1789 }
1790
1791 async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
1793 debug!("Starting iterative node lookup for key {}", key.to_hex());
1794
1795 let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1796
1797 let initial_nodes = self
1799 .routing_table
1800 .closest_nodes(key, self.config.alpha)
1801 .await;
1802 lookup_state.add_nodes(initial_nodes);
1803
1804 let mut iterations = 0;
1806 const MAX_ITERATIONS: usize = 10;
1807
1808 while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1809 let nodes_to_query = lookup_state.next_nodes();
1810 if nodes_to_query.is_empty() {
1811 break;
1812 }
1813
1814 let mut queries = Vec::new();
1816 for node in &nodes_to_query {
1817 let query = DHTQuery::FindNode {
1818 key: key.clone(),
1819 requester: self.local_id.to_hex(),
1820 };
1821 queries.push(self.simulate_query(node, query));
1822 }
1823
1824 for query_result in futures::future::join_all(queries).await {
1826 if let Ok(DHTResponse::Nodes { nodes }) = query_result {
1827 let dht_nodes: Vec<DHTNode> =
1828 nodes.into_iter().map(|n| n.to_dht_node()).collect();
1829 lookup_state.add_nodes(dht_nodes);
1830 }
1831 }
1832
1833 iterations += 1;
1834 }
1835
1836 debug!(
1837 "Iterative node lookup for key {} completed after {} iterations",
1838 key.to_hex(),
1839 iterations
1840 );
1841
1842 lookup_state
1844 .closest
1845 .into_iter()
1846 .take(self.config.replication_factor)
1847 .collect()
1848 }
1849
1850 pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
1852 debug!("Checking consistency for key {}", key.to_hex());
1853
1854 let closest_nodes = self
1856 .routing_table
1857 .closest_nodes(key, self.config.replication_factor)
1858 .await;
1859
1860 let mut records_found = Vec::new();
1861 let mut nodes_queried = 0;
1862 let mut nodes_responded = 0;
1863
1864 for node in &closest_nodes {
1866 nodes_queried += 1;
1867
1868 let query = DHTQuery::FindValue {
1869 key: key.clone(),
1870 requester: self.local_id.to_hex(),
1871 };
1872
1873 match self.simulate_query(node, query).await {
1874 Ok(DHTResponse::Value { record }) => {
1875 nodes_responded += 1;
1876 records_found.push((node.peer_id.clone(), record));
1877 }
1878 Ok(DHTResponse::Nodes { .. }) => {
1879 nodes_responded += 1;
1880 }
1882 _ => {
1883 }
1885 }
1886 }
1887
1888 let mut consistent = true;
1890 let mut canonical_record: Option<Record> = None;
1891 let mut conflicts = Vec::new();
1892
1893 for (node_id, record) in &records_found {
1894 if let Some(ref canonical) = canonical_record {
1895 if record.value != canonical.value
1897 || record.created_at != canonical.created_at
1898 || record.publisher != canonical.publisher
1899 {
1900 consistent = false;
1901 conflicts.push((node_id.clone(), record.clone()));
1902 }
1903 } else {
1904 canonical_record = Some(record.clone());
1905 }
1906 }
1907
1908 let report = ConsistencyReport {
1909 key: key.clone(),
1910 nodes_queried,
1911 nodes_responded,
1912 records_found: records_found.len(),
1913 consistent,
1914 canonical_record,
1915 conflicts,
1916 replication_factor: self.config.replication_factor,
1917 };
1918
1919 debug!(
1920 "Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}",
1921 key.to_hex(),
1922 report.nodes_queried,
1923 report.nodes_responded,
1924 report.records_found,
1925 report.consistent
1926 );
1927
1928 Ok(report)
1929 }
1930
1931 pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
1933 debug!("Starting repair for key {}", key.to_hex());
1934
1935 let consistency_report = self.check_consistency(key).await?;
1936
1937 if consistency_report.consistent {
1938 return Ok(RepairResult {
1939 key: key.clone(),
1940 repairs_needed: false,
1941 repairs_attempted: 0,
1942 repairs_successful: 0,
1943 final_state: "consistent".to_string(),
1944 });
1945 }
1946
1947 let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
1949 canonical
1950 } else {
1951 return Ok(RepairResult {
1952 key: key.clone(),
1953 repairs_needed: false,
1954 repairs_attempted: 0,
1955 repairs_successful: 0,
1956 final_state: "no_records_found".to_string(),
1957 });
1958 };
1959
1960 let mut most_recent = canonical_record.clone();
1962 for (_, conflicted_record) in &consistency_report.conflicts {
1963 if conflicted_record.created_at > most_recent.created_at {
1964 most_recent = conflicted_record.clone();
1965 }
1966 }
1967
1968 let closest_nodes = self
1970 .routing_table
1971 .closest_nodes(key, self.config.replication_factor)
1972 .await;
1973
1974 let mut repairs_attempted = 0;
1975 let mut repairs_successful = 0;
1976
1977 for node in &closest_nodes {
1978 repairs_attempted += 1;
1979 if self.replicate_record(&most_recent, node).await.is_ok() {
1980 repairs_successful += 1;
1981 }
1982 }
1983
1984 let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
1985 "repaired".to_string()
1986 } else {
1987 "repair_failed".to_string()
1988 };
1989
1990 debug!(
1991 "Repair for key {} completed: {}/{} repairs successful, final state: {}",
1992 key.to_hex(),
1993 repairs_successful,
1994 repairs_attempted,
1995 final_state
1996 );
1997
1998 Ok(RepairResult {
1999 key: key.clone(),
2000 repairs_needed: true,
2001 repairs_attempted,
2002 repairs_successful,
2003 final_state,
2004 })
2005 }
2006
2007 pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
2013 info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
2014
2015 let inbox_key = Key::from_inbox_id(inbox_id);
2016
2017 let inbox_metadata = InboxMetadata {
2019 inbox_id: inbox_id.to_string(),
2020 owner: owner_peer_id.clone(),
2021 created_at: SystemTime::now(),
2022 message_count: 0,
2023 max_messages: 1000, is_public: true,
2025 access_keys: vec![owner_peer_id.clone()],
2026 };
2027
2028 let metadata_value = serde_json::to_vec(&inbox_metadata)
2029 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2030
2031 let metadata_record = Record {
2032 key: inbox_key.clone(),
2033 value: metadata_value,
2034 publisher: owner_peer_id.clone(),
2035 created_at: SystemTime::now(),
2036 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
2038 };
2039
2040 self.put_record_with_infinite_ttl(metadata_record).await?;
2042
2043 let index_key = Key::from_inbox_index(inbox_id);
2045 let empty_index = InboxMessageIndex {
2046 inbox_id: inbox_id.to_string(),
2047 messages: Vec::new(),
2048 last_updated: SystemTime::now(),
2049 };
2050
2051 let index_value = serde_json::to_vec(&empty_index)
2052 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2053
2054 let index_record = Record {
2055 key: index_key,
2056 value: index_value,
2057 publisher: owner_peer_id.clone(),
2058 created_at: SystemTime::now(),
2059 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
2061 };
2062
2063 self.put_record_with_infinite_ttl(index_record).await?;
2064
2065 let inbox_info = InboxInfo {
2066 inbox_id: inbox_id.to_string(),
2067 four_word_address: self.generate_four_word_address(inbox_id),
2068 owner: owner_peer_id,
2069 created_at: SystemTime::now(),
2070 message_count: 0,
2071 is_accessible: true,
2072 };
2073
2074 info!(
2075 "Successfully created inbox {} with four-word address: {}",
2076 inbox_id, inbox_info.four_word_address
2077 );
2078
2079 Ok(inbox_info)
2080 }
2081
2082 pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
2084 info!("Sending message to inbox {}", inbox_id);
2085
2086 let inbox_key = Key::from_inbox_id(inbox_id);
2088 let metadata_record = self.get(&inbox_key).await.ok_or_else(|| {
2089 P2PError::Dht(crate::error::DhtError::KeyNotFound(
2090 format!("inbox:{inbox_id}").into(),
2091 ))
2092 })?;
2093
2094 let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
2095 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2096
2097 if inbox_metadata.message_count >= inbox_metadata.max_messages {
2099 return Err(P2PError::Dht(crate::error::DhtError::StoreFailed(
2100 format!("Inbox {} is full", inbox_id).into(),
2101 )));
2102 }
2103
2104 let message_key = Key::from_inbox_message(inbox_id, &message.id);
2106 let message_value = serde_json::to_vec(&message)
2107 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2108
2109 let message_record = Record {
2110 key: message_key.clone(),
2111 value: message_value,
2112 publisher: message.sender.clone(),
2113 created_at: message.timestamp,
2114 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
2116 };
2117
2118 self.put_record_with_infinite_ttl(message_record).await?;
2119
2120 let index_key = Key::from_inbox_index(inbox_id);
2122 let index_record = self.get(&index_key).await.ok_or_else(|| {
2123 P2PError::Dht(crate::error::DhtError::KeyNotFound(
2124 format!("inbox_index:{inbox_id}").into(),
2125 ))
2126 })?;
2127
2128 let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
2129 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2130
2131 message_index.messages.push(MessageRef {
2132 message_id: message.id.clone(),
2133 sender: message.sender.clone(),
2134 timestamp: message.timestamp,
2135 message_type: message.message_type.clone(),
2136 });
2137 message_index.last_updated = SystemTime::now();
2138
2139 inbox_metadata.message_count += 1;
2141
2142 let updated_index_value = serde_json::to_vec(&message_index)
2144 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2145
2146 let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
2147 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2148
2149 let updated_index_record = Record {
2150 key: index_key,
2151 value: updated_index_value,
2152 publisher: message.sender.clone(),
2153 created_at: SystemTime::now(),
2154 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
2155 signature: None,
2156 };
2157
2158 let updated_metadata_record = Record {
2159 key: inbox_key,
2160 value: updated_metadata_value,
2161 publisher: message.sender.clone(),
2162 created_at: SystemTime::now(),
2163 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
2164 signature: None,
2165 };
2166
2167 self.put_record_with_infinite_ttl(updated_index_record)
2168 .await?;
2169 self.put_record_with_infinite_ttl(updated_metadata_record)
2170 .await?;
2171
2172 info!(
2173 "Successfully sent message {} to inbox {}",
2174 message.id, inbox_id
2175 );
2176 Ok(())
2177 }
2178
2179 pub async fn get_inbox_messages(
2181 &self,
2182 inbox_id: &str,
2183 limit: Option<usize>,
2184 ) -> Result<Vec<InboxMessage>> {
2185 info!("Retrieving messages from inbox {}", inbox_id);
2186
2187 let index_key = Key::from_inbox_index(inbox_id);
2188 let index_record = self.get(&index_key).await.ok_or_else(|| {
2189 P2PError::Dht(crate::error::DhtError::KeyNotFound(
2190 format!("inbox:{inbox_id}").into(),
2191 ))
2192 })?;
2193
2194 let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
2195 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2196
2197 let mut messages = Vec::new();
2198 let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
2199 message_index.messages.iter().rev().take(limit).collect()
2200 } else {
2201 message_index.messages.iter().collect()
2202 };
2203
2204 for message_ref in message_refs {
2205 let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
2206 if let Some(message_record) = self.get(&message_key).await
2207 && let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value)
2208 {
2209 messages.push(message);
2210 }
2211 }
2212
2213 messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
2215
2216 info!(
2217 "Retrieved {} messages from inbox {}",
2218 messages.len(),
2219 inbox_id
2220 );
2221 Ok(messages)
2222 }
2223
2224 pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
2226 let inbox_key = Key::from_inbox_id(inbox_id);
2227 let metadata_record = self.get(&inbox_key).await;
2228
2229 if let Some(record) = metadata_record {
2230 let metadata: InboxMetadata = serde_json::from_slice(&record.value)
2231 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2232
2233 let inbox_info = InboxInfo {
2234 inbox_id: inbox_id.to_string(),
2235 four_word_address: self.generate_four_word_address(inbox_id),
2236 owner: metadata.owner,
2237 created_at: metadata.created_at,
2238 message_count: metadata.message_count,
2239 is_accessible: true,
2240 };
2241
2242 Ok(Some(inbox_info))
2243 } else {
2244 Ok(None)
2245 }
2246 }
2247
2248 async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
2250 self.storage.store(record.clone()).await?;
2252
2253 let closest_nodes = self
2255 .routing_table
2256 .closest_nodes(&record.key, self.config.replication_factor)
2257 .await;
2258
2259 for node in &closest_nodes {
2261 if let Err(e) = self.replicate_record(&record, node).await {
2262 debug!(
2263 "Failed to replicate infinite TTL record to node {}: {}",
2264 node.peer_id, e
2265 );
2266 }
2267 }
2268
2269 Ok(())
2270 }
2271
2272 fn generate_four_word_address(&self, inbox_id: &str) -> String {
2274 let words = [
2277 "alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta",
2278 ];
2279 let mut hash = 0u32;
2280
2281 for byte in inbox_id.as_bytes() {
2282 hash = hash.wrapping_mul(31).wrapping_add(*byte as u32);
2283 }
2284
2285 let word1 = words[hash as usize % words.len()];
2286 let word2 = words[(hash >> 8) as usize % words.len()];
2287 let word3 = words[(hash >> 16) as usize % words.len()];
2288
2289 format!("{word1}.{word2}.{word3}")
2290 }
2291
2292 pub async fn add_node(&self, node: DHTNode) -> Result<()> {
2294 self.routing_table.add_node(node).await
2295 }
2296
2297 pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
2299 self.routing_table.remove_node(peer_id).await
2300 }
2301}
2302
2303#[derive(Debug, Clone)]
2305pub struct DHTStats {
2306 pub local_id: Key,
2308 pub total_nodes: usize,
2310 pub active_buckets: usize,
2312 pub stored_records: usize,
2314 pub expired_records: usize,
2316}
2317
2318#[derive(Debug, Clone)]
2320pub struct ConsistencyReport {
2321 pub key: Key,
2323 pub nodes_queried: usize,
2325 pub nodes_responded: usize,
2327 pub records_found: usize,
2329 pub consistent: bool,
2331 pub canonical_record: Option<Record>,
2333 pub conflicts: Vec<(PeerId, Record)>,
2335 pub replication_factor: usize,
2337}
2338
2339#[derive(Debug, Clone)]
2341pub struct RepairResult {
2342 pub key: Key,
2344 pub repairs_needed: bool,
2346 pub repairs_attempted: usize,
2348 pub repairs_successful: usize,
2350 pub final_state: String,
2352}
2353
2354impl LookupState {
2355 pub fn new(target: Key, alpha: usize) -> Self {
2357 Self {
2358 target,
2359 queried: HashMap::new(),
2360 to_query: VecDeque::new(),
2361 closest: Vec::new(),
2362 started_at: Instant::now(),
2363 alpha,
2364 }
2365 }
2366
2367 pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
2369 for node in nodes {
2370 if !self.queried.contains_key(&node.peer_id) {
2371 self.to_query.push_back(node);
2372 }
2373 }
2374
2375 let target = &self.target;
2377 self.to_query
2378 .make_contiguous()
2379 .sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
2380 }
2381
2382 pub fn next_nodes(&mut self) -> Vec<DHTNode> {
2384 let mut nodes = Vec::new();
2385 for _ in 0..self.alpha {
2386 if let Some(node) = self.to_query.pop_front() {
2387 self.queried.insert(node.peer_id.clone(), Instant::now());
2388 nodes.push(node);
2389 } else {
2390 break;
2391 }
2392 }
2393 nodes
2394 }
2395
2396 pub fn is_complete(&self) -> bool {
2398 self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
2399 }
2400}
2401
2402#[derive(Debug, Clone, Serialize, Deserialize)]
2408pub struct InboxMetadata {
2409 pub inbox_id: String,
2410 pub owner: PeerId,
2411 pub created_at: SystemTime,
2412 pub message_count: usize,
2413 pub max_messages: usize,
2414 pub is_public: bool,
2415 pub access_keys: Vec<PeerId>,
2416}
2417
2418#[derive(Debug, Clone, Serialize, Deserialize)]
2420pub struct InboxMessage {
2421 pub id: String,
2422 pub sender: PeerId,
2423 pub recipient_inbox: String,
2424 pub content: String,
2425 pub message_type: String,
2426 pub timestamp: SystemTime,
2427 pub attachments: Vec<MessageAttachment>,
2428}
2429
2430#[derive(Debug, Clone, Serialize, Deserialize)]
2432pub struct MessageAttachment {
2433 pub filename: String,
2434 pub content_type: String,
2435 pub size: u64,
2436 pub hash: String,
2437}
2438
2439#[derive(Debug, Clone, Serialize, Deserialize)]
2441pub struct InboxMessageIndex {
2442 pub inbox_id: String,
2443 pub messages: Vec<MessageRef>,
2444 pub last_updated: SystemTime,
2445}
2446
2447#[derive(Debug, Clone, Serialize, Deserialize)]
2449pub struct MessageRef {
2450 pub message_id: String,
2451 pub sender: PeerId,
2452 pub timestamp: SystemTime,
2453 pub message_type: String,
2454}
2455
2456#[derive(Debug, Clone, Serialize, Deserialize)]
2458pub struct InboxInfo {
2459 pub inbox_id: String,
2460 pub four_word_address: String,
2461 pub owner: PeerId,
2462 pub created_at: SystemTime,
2463 pub message_count: usize,
2464 pub is_accessible: bool,
2465}
2466
2467impl Key {
2472 pub fn from_inbox_id(inbox_id: &str) -> Self {
2474 let mut hasher = Sha256::new();
2475 hasher.update(b"INBOX_METADATA:");
2476 hasher.update(inbox_id.as_bytes());
2477 let hash = hasher.finalize();
2478 Key { hash: hash.into() }
2479 }
2480
2481 pub fn from_inbox_index(inbox_id: &str) -> Self {
2483 let mut hasher = Sha256::new();
2484 hasher.update(b"INBOX_INDEX:");
2485 hasher.update(inbox_id.as_bytes());
2486 let hash = hasher.finalize();
2487 Key { hash: hash.into() }
2488 }
2489
2490 pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
2492 let mut hasher = Sha256::new();
2493 hasher.update(b"INBOX_MESSAGE:");
2494 hasher.update(inbox_id.as_bytes());
2495 hasher.update(b":");
2496 hasher.update(message_id.as_bytes());
2497 let hash = hasher.finalize();
2498 Key { hash: hash.into() }
2499 }
2500}
2501
2502