1use crate::{PeerId, Multiaddr, Result, P2PError};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::time::{Duration, Instant, SystemTime};
14use sha2::{Digest, Sha256};
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17use futures;
18
19pub mod skademlia;
21
22pub mod ipv6_identity;
24
25pub mod enhanced_storage;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct DHTConfig {
31 pub replication_factor: usize,
33 pub bucket_size: usize,
35 pub alpha: usize,
37 pub record_ttl: Duration,
39 pub bucket_refresh_interval: Duration,
41 pub republish_interval: Duration,
43 pub max_distance: u8,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct Key {
50 hash: [u8; 32],
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct Record {
57 pub key: Key,
59 pub value: Vec<u8>,
61 pub publisher: PeerId,
63 pub created_at: SystemTime,
65 pub expires_at: SystemTime,
67 pub signature: Option<Vec<u8>>,
69}
70
71#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
73pub struct DHTNode {
74 pub peer_id: PeerId,
76 pub addresses: Vec<Multiaddr>,
78 #[serde(with = "instant_as_secs")]
80 pub last_seen: Instant,
81 pub distance: Key,
83 pub is_connected: bool,
85}
86
87mod instant_as_secs {
89 use serde::{Deserializer, Serializer, Deserialize, Serialize};
90 use std::time::Instant;
91
92 pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
93 where
94 S: Serializer,
95 {
96 instant.elapsed().as_secs().serialize(serializer)
98 }
99
100 pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 let secs = u64::deserialize(deserializer)?;
105 Ok(Instant::now() - std::time::Duration::from_secs(secs))
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct SerializableDHTNode {
113 pub peer_id: PeerId,
115 pub addresses: Vec<Multiaddr>,
117 pub last_seen_secs: u64,
119 pub distance: Key,
121 pub is_connected: bool,
123}
124
125#[derive(Debug)]
127struct KBucket {
128 nodes: VecDeque<DHTNode>,
130 capacity: usize,
132 last_refresh: Instant,
134}
135
136#[derive(Debug)]
138pub struct RoutingTable {
139 local_id: Key,
141 buckets: Vec<RwLock<KBucket>>,
143 #[allow(dead_code)]
145 config: DHTConfig,
146}
147
148#[derive(Debug)]
150pub struct DHTStorage {
151 records: RwLock<HashMap<Key, Record>>,
153 #[allow(dead_code)]
155 config: DHTConfig,
156}
157
158#[derive(Debug)]
160pub struct DHT {
161 local_id: Key,
163 routing_table: RoutingTable,
165 storage: DHTStorage,
167 #[allow(dead_code)]
169 config: DHTConfig,
170 pub skademlia: Option<skademlia::SKademlia>,
172 pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub enum DHTQuery {
179 FindNode {
181 key: Key,
183 requester: PeerId
185 },
186 FindValue {
188 key: Key,
190 requester: PeerId
192 },
193 Store {
195 record: Record,
197 requester: PeerId
199 },
200 Ping {
202 requester: PeerId
204 },
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub enum DHTResponse {
210 Nodes {
212 nodes: Vec<SerializableDHTNode>
214 },
215 Value {
217 record: Record
219 },
220 Stored {
222 success: bool
224 },
225 Pong {
227 responder: PeerId
229 },
230 Error {
232 message: String
234 },
235}
236
237#[derive(Debug)]
239pub struct LookupState {
240 pub target: Key,
242 pub queried: HashMap<PeerId, Instant>,
244 pub to_query: VecDeque<DHTNode>,
246 pub closest: Vec<DHTNode>,
248 pub started_at: Instant,
250 pub alpha: usize,
252}
253
254impl Default for DHTConfig {
255 fn default() -> Self {
256 Self {
257 replication_factor: 20, bucket_size: 20, alpha: 3, record_ttl: Duration::from_secs(24 * 60 * 60), bucket_refresh_interval: Duration::from_secs(60 * 60), republish_interval: Duration::from_secs(24 * 60 * 60), max_distance: 160, }
265 }
266}
267
268impl Key {
269 pub fn new(data: &[u8]) -> Self {
271 let mut hasher = Sha256::new();
272 hasher.update(data);
273 let hash: [u8; 32] = hasher.finalize().into();
274 Self { hash }
275 }
276
277 pub fn from_hash(hash: [u8; 32]) -> Self {
279 Self { hash }
280 }
281
282 pub fn random() -> Self {
284 use rand::RngCore;
285 let mut hash = [0u8; 32];
286 rand::thread_rng().fill_bytes(&mut hash);
287 Self { hash }
288 }
289
290 pub fn as_bytes(&self) -> &[u8] {
292 &self.hash
293 }
294
295 pub fn to_hex(&self) -> String {
297 hex::encode(self.hash)
298 }
299
300 pub fn distance(&self, other: &Key) -> Key {
302 let mut result = [0u8; 32];
303 for i in 0..32 {
304 result[i] = self.hash[i] ^ other.hash[i];
305 }
306 Key { hash: result }
307 }
308
309 pub fn leading_zeros(&self) -> u32 {
311 for (i, &byte) in self.hash.iter().enumerate() {
312 if byte != 0 {
313 return (i * 8) as u32 + byte.leading_zeros();
314 }
315 }
316 256 }
318
319 pub fn bucket_index(&self, local_id: &Key) -> usize {
321 let distance = self.distance(local_id);
322 let leading_zeros = distance.leading_zeros();
323 if leading_zeros >= 255 {
324 255 } else {
326 (255 - leading_zeros) as usize
327 }
328 }
329}
330
331impl Record {
332 pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
334 let now = SystemTime::now();
335 let ttl = Duration::from_secs(24 * 60 * 60); Self {
338 key,
339 value,
340 publisher,
341 created_at: now,
342 expires_at: now + ttl,
343 signature: None,
344 }
345 }
346
347 pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
349 let now = SystemTime::now();
350
351 Self {
352 key,
353 value,
354 publisher,
355 created_at: now,
356 expires_at: now + ttl,
357 signature: None,
358 }
359 }
360
361 pub fn is_expired(&self) -> bool {
363 SystemTime::now() > self.expires_at
364 }
365
366 pub fn age(&self) -> Duration {
368 SystemTime::now()
369 .duration_since(self.created_at)
370 .unwrap_or(Duration::ZERO)
371 }
372
373 pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
375 self.signature = Some(vec![0u8; 64]); Ok(())
379 }
380
381 pub fn verify(&self, _public_key: &[u8]) -> bool {
383 self.signature.is_some()
386 }
387}
388
389impl DHTNode {
390 pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
392 let node_key = Key::new(peer_id.as_bytes());
393 let distance = node_key.distance(local_id);
394
395 Self {
396 peer_id,
397 addresses,
398 last_seen: Instant::now(),
399 distance,
400 is_connected: false,
401 }
402 }
403
404 pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
406 Self {
407 peer_id,
408 addresses,
409 last_seen: Instant::now(),
410 distance: key,
411 is_connected: false,
412 }
413 }
414
415 pub fn touch(&mut self) {
417 self.last_seen = Instant::now();
418 }
419
420 pub fn is_stale(&self, timeout: Duration) -> bool {
422 self.last_seen.elapsed() > timeout
423 }
424
425 pub fn key(&self) -> Key {
427 Key::new(self.peer_id.as_bytes())
428 }
429
430 pub fn to_serializable(&self) -> SerializableDHTNode {
432 SerializableDHTNode {
433 peer_id: self.peer_id.clone(),
434 addresses: self.addresses.clone(),
435 last_seen_secs: self.last_seen.elapsed().as_secs(),
436 distance: self.distance.clone(),
437 is_connected: self.is_connected,
438 }
439 }
440}
441
442impl SerializableDHTNode {
443 pub fn to_dht_node(&self) -> DHTNode {
445 DHTNode {
446 peer_id: self.peer_id.clone(),
447 addresses: self.addresses.clone(),
448 last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
449 distance: self.distance.clone(),
450 is_connected: self.is_connected,
451 }
452 }
453}
454
455impl KBucket {
456 fn new(capacity: usize) -> Self {
458 Self {
459 nodes: VecDeque::new(),
460 capacity,
461 last_refresh: Instant::now(),
462 }
463 }
464
465 fn add_node(&mut self, node: DHTNode) -> bool {
467 if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
469 let mut existing = self.nodes.remove(pos).unwrap();
471 existing.touch();
472 existing.is_connected = node.is_connected;
473 self.nodes.push_front(existing);
474 return true;
475 }
476
477 if self.nodes.len() < self.capacity {
478 self.nodes.push_front(node);
480 true
481 } else {
482 false
484 }
485 }
486
487 fn remove_node(&mut self, peer_id: &PeerId) -> bool {
489 if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
490 self.nodes.remove(pos);
491 true
492 } else {
493 false
494 }
495 }
496
497 fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
499 let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
500 nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
501 nodes.into_iter().take(count).collect()
502 }
503
504 fn needs_refresh(&self, interval: Duration) -> bool {
506 self.last_refresh.elapsed() > interval
507 }
508}
509
510impl RoutingTable {
511 pub fn new(local_id: Key, config: DHTConfig) -> Self {
513 let mut buckets = Vec::new();
514 for _ in 0..256 {
515 buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
516 }
517
518 Self {
519 local_id,
520 buckets,
521 config,
522 }
523 }
524
525 pub async fn add_node(&self, node: DHTNode) -> Result<()> {
527 let bucket_index = node.key().bucket_index(&self.local_id);
528 let mut bucket = self.buckets[bucket_index].write().await;
529
530 if bucket.add_node(node.clone()) {
531 debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
532 } else {
533 debug!("Bucket {} full, could not add node {}", bucket_index, node.peer_id);
534 }
535
536 Ok(())
537 }
538
539 pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
541 let node_key = Key::new(peer_id.as_bytes());
542 let bucket_index = node_key.bucket_index(&self.local_id);
543 let mut bucket = self.buckets[bucket_index].write().await;
544
545 if bucket.remove_node(peer_id) {
546 debug!("Removed node {} from bucket {}", peer_id, bucket_index);
547 }
548
549 Ok(())
550 }
551
552 pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
554 let mut all_nodes = Vec::new();
555
556 let target_bucket = target.bucket_index(&self.local_id);
558
559 let mut checked = vec![false; 256];
561 let mut to_check = VecDeque::new();
562 to_check.push_back(target_bucket);
563
564 while let Some(bucket_idx) = to_check.pop_front() {
565 if checked[bucket_idx] {
566 continue;
567 }
568 checked[bucket_idx] = true;
569
570 let bucket = self.buckets[bucket_idx].read().await;
571 all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
572
573 if bucket_idx > 0 && !checked[bucket_idx - 1] {
575 to_check.push_back(bucket_idx - 1);
576 }
577 if bucket_idx < 255 && !checked[bucket_idx + 1] {
578 to_check.push_back(bucket_idx + 1);
579 }
580
581 if all_nodes.len() >= count * 2 {
583 break;
584 }
585 }
586
587 all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
589 all_nodes.into_iter().take(count).collect()
590 }
591
592 pub async fn stats(&self) -> (usize, usize) {
594 let mut total_nodes = 0;
595 let mut active_buckets = 0;
596
597 for bucket in &self.buckets {
598 let bucket_guard = bucket.read().await;
599 let node_count = bucket_guard.nodes.len();
600 total_nodes += node_count;
601 if node_count > 0 {
602 active_buckets += 1;
603 }
604 }
605
606 (total_nodes, active_buckets)
607 }
608}
609
610impl DHTStorage {
611 pub fn new(config: DHTConfig) -> Self {
613 Self {
614 records: RwLock::new(HashMap::new()),
615 config,
616 }
617 }
618
619 pub async fn store(&self, record: Record) -> Result<()> {
621 let mut records = self.records.write().await;
622 records.insert(record.key.clone(), record);
623 Ok(())
624 }
625
626 pub async fn get(&self, key: &Key) -> Option<Record> {
628 let records = self.records.read().await;
629 records.get(key).cloned()
630 }
631
632 pub async fn cleanup_expired(&self) -> usize {
634 let mut records = self.records.write().await;
635 let initial_count = records.len();
636 records.retain(|_, record| !record.is_expired());
637 initial_count - records.len()
638 }
639
640 pub async fn all_records(&self) -> Vec<Record> {
642 let records = self.records.read().await;
643 records.values().cloned().collect()
644 }
645
646 pub async fn stats(&self) -> (usize, usize) {
648 let records = self.records.read().await;
649 let total = records.len();
650 let expired = records.values().filter(|r| r.is_expired()).count();
651 (total, expired)
652 }
653}
654
655impl DHT {
656 pub fn new(local_id: Key, config: DHTConfig) -> Self {
658 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
659 let storage = DHTStorage::new(config.clone());
660
661 Self {
662 local_id,
663 routing_table,
664 storage,
665 config,
666 skademlia: None,
667 ipv6_identity_manager: None,
668 }
669 }
670
671 pub fn new_with_security(local_id: Key, config: DHTConfig, skademlia_config: skademlia::SKademliaConfig) -> Self {
673 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
674 let storage = DHTStorage::new(config.clone());
675 let skademlia = skademlia::SKademlia::new(skademlia_config);
676
677 Self {
678 local_id,
679 routing_table,
680 storage,
681 config,
682 skademlia: Some(skademlia),
683 ipv6_identity_manager: None,
684 }
685 }
686
687 pub fn new_with_ipv6_security(
689 local_id: Key,
690 config: DHTConfig,
691 skademlia_config: skademlia::SKademliaConfig,
692 ipv6_config: ipv6_identity::IPv6DHTConfig
693 ) -> Self {
694 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
695 let storage = DHTStorage::new(config.clone());
696 let skademlia = skademlia::SKademlia::new(skademlia_config);
697 let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
698
699 Self {
700 local_id,
701 routing_table,
702 storage,
703 config,
704 skademlia: Some(skademlia),
705 ipv6_identity_manager: Some(ipv6_identity_manager),
706 }
707 }
708
709 pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
711 if let Some(ref mut manager) = self.ipv6_identity_manager {
712 self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
714 manager.set_local_identity(identity)?;
715 info!("Local IPv6 identity set and DHT key updated");
716 Ok(())
717 } else {
718 Err(P2PError::Security("IPv6 identity manager not enabled".to_string()).into())
719 }
720 }
721
722 pub async fn add_bootstrap_node(&self, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
724 let node = DHTNode::new(peer_id, addresses, &self.local_id);
725 self.routing_table.add_node(node).await
726 }
727
728 pub async fn add_ipv6_node(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>, ipv6_identity: crate::security::IPv6NodeID) -> Result<()> {
730 if let Some(ref mut manager) = self.ipv6_identity_manager {
731 let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
733 let security_event = manager.validate_node_join(&base_node, &ipv6_identity).await?;
734
735 match security_event {
736 ipv6_identity::IPv6SecurityEvent::NodeJoined { verification_confidence, .. } => {
737 let ipv6_node = manager.enhance_dht_node(base_node.clone(), ipv6_identity).await?;
739
740 let mut enhanced_base_node = base_node;
742 enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
743
744 self.routing_table.add_node(enhanced_base_node).await?;
746
747 info!("Added IPv6-verified node {} with confidence {:.2}", peer_id, verification_confidence);
748 Ok(())
749 }
750 ipv6_identity::IPv6SecurityEvent::VerificationFailed { reason, .. } => {
751 Err(P2PError::Security(format!("IPv6 verification failed: {}", reason)).into())
752 }
753 ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => {
754 Err(P2PError::Security(format!("IP diversity violation: {}", subnet_type)).into())
755 }
756 ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => {
757 Err(P2PError::Security(format!("Node banned: {}", reason)).into())
758 }
759 _ => {
760 Err(P2PError::Security("Unexpected security event".to_string()).into())
761 }
762 }
763 } else {
764 self.add_bootstrap_node(peer_id, addresses).await
766 }
767 }
768
769 pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
771 self.routing_table.remove_node(peer_id).await?;
773
774 if let Some(ref mut manager) = self.ipv6_identity_manager {
776 manager.remove_node(peer_id);
777 }
778
779 Ok(())
780 }
781
782 pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
784 if let Some(ref manager) = self.ipv6_identity_manager {
785 manager.is_node_banned(peer_id)
786 } else {
787 false
788 }
789 }
790
791 pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
793 let record = Record::new(key.clone(), value, self.local_id.to_hex());
794
795 self.storage.store(record.clone()).await?;
797
798 let closest_nodes = self.routing_table
800 .closest_nodes(&key, self.config.replication_factor)
801 .await;
802
803 info!("Storing record with key {} on {} nodes", key.to_hex(), closest_nodes.len());
804
805 if closest_nodes.is_empty() {
807 info!("No other nodes available for replication, storing only locally");
808 return Ok(());
809 }
810
811 let mut successful_replications = 0;
813 for node in &closest_nodes {
814 if self.replicate_record(&record, node).await.is_ok() {
815 successful_replications += 1;
816 }
817 }
818
819 info!("Successfully replicated record {} to {}/{} nodes",
820 key.to_hex(), successful_replications, closest_nodes.len());
821
822 let required_replications = if closest_nodes.len() == 1 {
824 1
825 } else {
826 std::cmp::max(1, closest_nodes.len() / 2)
827 };
828
829 if successful_replications >= required_replications {
830 Ok(())
831 } else {
832 Err(P2PError::DHT(format!(
833 "Insufficient replication: only {}/{} nodes stored the record (required: {})",
834 successful_replications, closest_nodes.len(), required_replications
835 )).into())
836 }
837 }
838
839 pub async fn get(&self, key: &Key) -> Option<Record> {
841 if let Some(record) = self.storage.get(key).await {
843 if !record.is_expired() {
844 return Some(record);
845 }
846 }
847
848 if let Some(record) = self.iterative_find_value(key).await {
850 if self.storage.store(record.clone()).await.is_ok() {
852 debug!("Cached retrieved record with key {}", key.to_hex());
853 }
854 return Some(record);
855 }
856
857 None
858 }
859
860 pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
862 self.routing_table.closest_nodes(key, self.config.replication_factor).await
863 }
864
865 pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
867 match query {
868 DHTQuery::FindNode { key, requester: _ } => {
869 let nodes = self.find_node(&key).await;
870 let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
871 DHTResponse::Nodes { nodes: serializable_nodes }
872 }
873 DHTQuery::FindValue { key, requester: _ } => {
874 if let Some(record) = self.storage.get(&key).await {
875 if !record.is_expired() {
876 return DHTResponse::Value { record };
877 }
878 }
879 let nodes = self.find_node(&key).await;
880 let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
881 DHTResponse::Nodes { nodes: serializable_nodes }
882 }
883 DHTQuery::Store { record, requester: _ } => {
884 match self.storage.store(record).await {
885 Ok(()) => DHTResponse::Stored { success: true },
886 Err(_) => DHTResponse::Stored { success: false },
887 }
888 }
889 DHTQuery::Ping { requester: _ } => {
890 DHTResponse::Pong { responder: self.local_id.to_hex() }
891 }
892 }
893 }
894
895 pub async fn stats(&self) -> DHTStats {
897 let (total_nodes, active_buckets) = self.routing_table.stats().await;
898 let (stored_records, expired_records) = self.storage.stats().await;
899
900 DHTStats {
901 local_id: self.local_id.clone(),
902 total_nodes,
903 active_buckets,
904 stored_records,
905 expired_records,
906 }
907 }
908
909 pub async fn maintenance(&self) -> Result<()> {
911 let expired_count = self.storage.cleanup_expired().await;
913 if expired_count > 0 {
914 debug!("Cleaned up {} expired records", expired_count);
915 }
916
917 self.republish_records().await?;
919
920 self.refresh_buckets().await?;
922
923 Ok(())
926 }
927
928 pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
930 if let Some(record) = self.storage.get(key).await {
932 if !record.is_expired() {
933 return Ok(Some(record));
934 }
935 }
936
937 let (enable_distance_verification, disjoint_path_count, min_reputation) = if let Some(ref skademlia) = self.skademlia {
939 (skademlia.config.enable_distance_verification,
940 skademlia.config.disjoint_path_count,
941 skademlia.config.min_routing_reputation)
942 } else {
943 return Ok(self.get(key).await);
945 };
946
947 let initial_nodes = self.routing_table
949 .closest_nodes(key, disjoint_path_count * 3)
950 .await;
951
952 if initial_nodes.is_empty() {
953 return Ok(None);
954 }
955
956 let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
958 skademlia.secure_lookup(key.clone(), initial_nodes).await?
959 } else {
960 return Ok(None);
961 };
962
963 for node in &secure_nodes {
965 if enable_distance_verification {
967 let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
968
969 let consensus = if let Some(ref mut skademlia) = self.skademlia {
970 skademlia.verify_distance_consensus(&node.peer_id, key, witness_nodes).await?
971 } else {
972 continue;
973 };
974
975 if consensus.confidence < min_reputation {
976 debug!("Skipping node {} due to low distance verification confidence", node.peer_id);
977 continue;
978 }
979 }
980
981 let query = DHTQuery::FindValue {
982 key: key.clone(),
983 requester: self.local_id.to_hex()
984 };
985 if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
986 let _ = self.storage.store(record.clone()).await;
988 return Ok(Some(record));
989 }
990 }
991
992 Ok(None)
993 }
994
995 pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
997 let record = Record::new(key.clone(), value, self.local_id.to_hex());
998
999 self.storage.store(record.clone()).await?;
1001
1002 let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1004 let candidate_nodes = self.routing_table
1006 .closest_nodes(&key, self.config.replication_factor * 2)
1007 .await;
1008
1009 skademlia.select_secure_nodes(
1011 &candidate_nodes,
1012 &key,
1013 self.config.replication_factor
1014 )
1015 } else {
1016 self.routing_table.closest_nodes(&key, self.config.replication_factor).await
1018 };
1019
1020 info!("Storing record with key {} on {} secure nodes", key.to_hex(), secure_nodes.len());
1021
1022 let mut replication_results = Vec::new();
1024 let mut successful_replications = 0;
1025
1026 for node in &secure_nodes {
1027 let success = self.replicate_record(&record, node).await.is_ok();
1028 replication_results.push((node.peer_id.clone(), success));
1029 if success {
1030 successful_replications += 1;
1031 }
1032 }
1033
1034 if let Some(ref mut skademlia) = self.skademlia {
1036 for (peer_id, success) in replication_results {
1037 skademlia.reputation_manager.update_reputation(
1038 &peer_id,
1039 success,
1040 Duration::from_millis(100)
1041 );
1042 }
1043 }
1044
1045 if successful_replications > 0 {
1046 info!("Successfully replicated to {}/{} secure nodes",
1047 successful_replications, secure_nodes.len());
1048 }
1049
1050 Ok(())
1051 }
1052
1053 pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
1055 if let Some(ref mut skademlia) = self.skademlia {
1056 let nodes = self.routing_table.closest_nodes(&key, skademlia.config.sibling_list_size).await;
1057 skademlia.update_sibling_list(key, nodes);
1058 }
1059 Ok(())
1060 }
1061
1062 pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
1064 if let Some(ref skademlia) = self.skademlia {
1065 let sample_key = Key::random();
1067 let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
1068 skademlia.validate_routing_consistency(&sample_nodes).await
1069 } else {
1070 Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1071 }
1072 }
1073
1074 pub fn create_distance_challenge(&mut self, peer_id: &PeerId, key: &Key) -> Option<skademlia::DistanceChallenge> {
1076 self.skademlia.as_mut()
1077 .map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
1078 }
1079
1080 pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
1082 if let Some(ref skademlia) = self.skademlia {
1083 skademlia.verify_distance_proof(proof)
1084 } else {
1085 Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1086 }
1087 }
1088
1089 #[allow(dead_code)]
1091 async fn verify_node_distances(&self, nodes: &[DHTNode], _target_key: &Key, min_reputation: f64) -> Result<Vec<DHTNode>> {
1092 let mut verified_nodes = Vec::new();
1093
1094 for node in nodes {
1095 let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1096
1097 if witness_nodes.len() >= 2 {
1099 let consensus_confidence = 0.8; if consensus_confidence >= min_reputation {
1103 verified_nodes.push(node.clone());
1104 } else {
1105 debug!("Node {} failed distance verification with confidence {}",
1106 node.peer_id, consensus_confidence);
1107 }
1108 }
1109 }
1110
1111 Ok(verified_nodes)
1112 }
1113
1114 async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
1116 let target_key = Key::new(target_peer.as_bytes());
1118 let candidate_nodes = self.routing_table.closest_nodes(&target_key, count * 2).await;
1119
1120 candidate_nodes.into_iter()
1121 .filter(|node| node.peer_id != *target_peer)
1122 .take(count)
1123 .map(|node| node.peer_id)
1124 .collect()
1125 }
1126
1127 pub fn create_enhanced_distance_challenge(&mut self, peer_id: &PeerId, key: &Key, suspected_attack: bool) -> Option<skademlia::EnhancedDistanceChallenge> {
1129 if let Some(ref mut skademlia) = self.skademlia {
1130 Some(skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack))
1131 } else {
1132 None
1133 }
1134 }
1135
1136 pub async fn verify_distance_multi_round(&mut self, challenge: &skademlia::EnhancedDistanceChallenge) -> Result<bool> {
1138 if let Some(ref mut skademlia) = self.skademlia {
1139 skademlia.verify_distance_multi_round(challenge).await
1140 } else {
1141 Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1142 }
1143 }
1144
1145 pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
1147 self.skademlia.as_mut()
1148 .map(|skademlia| skademlia.get_security_bucket(key))
1149 }
1150
1151 pub async fn add_trusted_node(&mut self, key: &Key, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
1153 if let Some(ref mut skademlia) = self.skademlia {
1154 let node = DHTNode::new(peer_id, addresses, &self.local_id);
1155 let security_bucket = skademlia.get_security_bucket(key);
1156 security_bucket.add_trusted_node(node);
1157 }
1158 Ok(())
1159 }
1160
1161 pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1163 if self.is_node_banned(&self.local_id.to_hex()) {
1165 return Err(P2PError::Security("Local node is banned".to_string()).into());
1166 }
1167
1168 if let Some(record) = self.storage.get(key).await {
1170 if !record.is_expired() {
1171 return Ok(Some(record));
1172 }
1173 }
1174
1175 let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
1177
1178 if verified_nodes.is_empty() {
1179 return self.secure_get(key).await;
1181 }
1182
1183 if let Some(ref mut skademlia) = self.skademlia {
1185 let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
1186
1187 for node in &secure_nodes {
1189 if let Some(ref manager) = self.ipv6_identity_manager {
1191 if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1192 if ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1194 debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
1195 continue;
1196 }
1197 } else {
1198 debug!("Skipping node {} without verified IPv6 identity", node.peer_id);
1199 continue;
1200 }
1201 }
1202
1203 let query = DHTQuery::FindValue {
1204 key: key.clone(),
1205 requester: self.local_id.to_hex()
1206 };
1207 if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1208 if let Some(ref mut manager) = self.ipv6_identity_manager {
1210 manager.update_ipv6_reputation(&node.peer_id, true);
1211 }
1212
1213 let _ = self.storage.store(record.clone()).await;
1215 return Ok(Some(record));
1216 }
1217 }
1218 }
1219
1220 Ok(None)
1221 }
1222
1223 pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1225 if self.is_node_banned(&self.local_id.to_hex()) {
1227 return Err(P2PError::Security("Local node is banned".to_string()).into());
1228 }
1229
1230 let record = Record::new(key.clone(), value, self.local_id.to_hex());
1231
1232 self.storage.store(record.clone()).await?;
1234
1235 let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
1237
1238 let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1240 skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
1241 } else {
1242 verified_nodes.into_iter().take(self.config.replication_factor).collect()
1243 };
1244
1245 info!("Storing record with key {} on {} IPv6-verified secure nodes", key.to_hex(), secure_nodes.len());
1246
1247 let mut successful_replications = 0;
1249
1250 for node in &secure_nodes {
1251 let success = self.replicate_record(&record, node).await.is_ok();
1252
1253 if let Some(ref mut manager) = self.ipv6_identity_manager {
1255 manager.update_ipv6_reputation(&node.peer_id, success);
1256 }
1257
1258 if success {
1259 successful_replications += 1;
1260 }
1261 }
1262
1263 if successful_replications == 0 && !secure_nodes.is_empty() {
1264 return Err(P2PError::DHT("Failed to replicate to any IPv6-verified nodes".to_string()).into());
1265 }
1266
1267 info!("Successfully replicated to {}/{} IPv6-verified nodes",
1268 successful_replications, secure_nodes.len());
1269 Ok(())
1270 }
1271
1272 async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
1274 let mut verified_nodes = Vec::new();
1275
1276 let candidate_nodes = self.routing_table.closest_nodes(key, self.config.replication_factor * 2).await;
1278
1279 if let Some(ref manager) = self.ipv6_identity_manager {
1280 for node in candidate_nodes {
1281 if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1283 if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1285 if !manager.is_node_banned(&node.peer_id) {
1287 verified_nodes.push(node);
1288 }
1289 }
1290 }
1291 }
1292 } else {
1293 verified_nodes = candidate_nodes;
1295 }
1296
1297 Ok(verified_nodes)
1298 }
1299
1300 pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
1302 self.ipv6_identity_manager.as_ref()
1303 .map(|manager| manager.get_ipv6_diversity_stats())
1304 }
1305
1306 pub fn cleanup_ipv6_data(&mut self) {
1308 if let Some(ref mut manager) = self.ipv6_identity_manager {
1309 manager.cleanup_expired();
1310 }
1311 }
1312
1313 pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
1315 if let Some(ref mut manager) = self.ipv6_identity_manager {
1316 manager.ban_node(peer_id, reason);
1317 }
1318 }
1319
1320 pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
1322 self.ipv6_identity_manager.as_ref()
1323 .and_then(|manager| manager.get_local_identity())
1324 }
1325
1326 async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
1328 debug!("Replicating record {} to node {}", record.key.to_hex(), node.peer_id);
1331
1332 tokio::time::sleep(Duration::from_millis(10)).await;
1334
1335 if rand::random::<f64>() < 0.95 {
1337 Ok(())
1338 } else {
1339 Err(P2PError::Network("Replication failed".to_string()).into())
1340 }
1341 }
1342
1343 async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
1345 debug!("Starting iterative lookup for key {}", key.to_hex());
1346
1347 let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1348
1349 let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1351 lookup_state.add_nodes(initial_nodes);
1352
1353 let mut iterations = 0;
1355 const MAX_ITERATIONS: usize = 10;
1356
1357 while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1358 let nodes_to_query = lookup_state.next_nodes();
1359 if nodes_to_query.is_empty() {
1360 break;
1361 }
1362
1363 let mut queries = Vec::new();
1365 for node in &nodes_to_query {
1366 let query = DHTQuery::FindValue {
1367 key: key.clone(),
1368 requester: self.local_id.to_hex()
1369 };
1370 queries.push(self.simulate_query(node, query));
1371 }
1372
1373 for query_result in futures::future::join_all(queries).await {
1375 match query_result {
1376 Ok(DHTResponse::Value { record }) => {
1377 debug!("Found value for key {} in iteration {}", key.to_hex(), iterations);
1378 return Some(record);
1379 }
1380 Ok(DHTResponse::Nodes { nodes }) => {
1381 let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1382 .map(|n| n.to_dht_node())
1383 .collect();
1384 lookup_state.add_nodes(dht_nodes);
1385 }
1386 _ => {
1387 debug!("Query failed during iterative lookup");
1389 }
1390 }
1391 }
1392
1393 iterations += 1;
1394 }
1395
1396 debug!("Iterative lookup for key {} completed after {} iterations, value not found",
1397 key.to_hex(), iterations);
1398 None
1399 }
1400
1401 async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
1403 tokio::time::sleep(Duration::from_millis(50)).await;
1405
1406 Ok(self.handle_query(query).await)
1408 }
1409
1410 async fn republish_records(&self) -> Result<()> {
1412 let all_records = self.storage.all_records().await;
1413 let mut republished_count = 0;
1414
1415 for record in all_records {
1416 let remaining_ttl = record.expires_at
1418 .duration_since(SystemTime::now())
1419 .unwrap_or(Duration::ZERO);
1420
1421 if remaining_ttl < self.config.record_ttl / 4 {
1422 let closest_nodes = self.routing_table
1424 .closest_nodes(&record.key, self.config.replication_factor)
1425 .await;
1426
1427 for node in &closest_nodes {
1429 if self.replicate_record(&record, node).await.is_ok() {
1430 republished_count += 1;
1431 }
1432 }
1433 }
1434 }
1435
1436 if republished_count > 0 {
1437 debug!("Republished {} records during maintenance", republished_count);
1438 }
1439
1440 Ok(())
1441 }
1442
1443 async fn refresh_buckets(&self) -> Result<()> {
1445 let mut refreshed_count = 0;
1446
1447 for bucket_index in 0..256 {
1448 let needs_refresh = {
1449 let bucket = self.routing_table.buckets[bucket_index].read().await;
1450 bucket.needs_refresh(self.config.bucket_refresh_interval)
1451 };
1452
1453 if needs_refresh {
1454 let target_key = self.generate_key_for_bucket(bucket_index);
1456 let _nodes = self.iterative_find_node(&target_key).await;
1457 refreshed_count += 1;
1458
1459 {
1461 let mut bucket = self.routing_table.buckets[bucket_index].write().await;
1462 bucket.last_refresh = Instant::now();
1463 }
1464 }
1465 }
1466
1467 if refreshed_count > 0 {
1468 debug!("Refreshed {} buckets during maintenance", refreshed_count);
1469 }
1470
1471 Ok(())
1472 }
1473
1474 fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
1476 let mut key_bytes = self.local_id.as_bytes().to_vec();
1477
1478 if bucket_index < 256 {
1480 let byte_index = (255 - bucket_index) / 8;
1481 let bit_index = (255 - bucket_index) % 8;
1482
1483 if byte_index < key_bytes.len() {
1484 key_bytes[byte_index] ^= 1 << bit_index;
1485 }
1486 }
1487
1488 let mut hash = [0u8; 32];
1489 hash.copy_from_slice(&key_bytes);
1490 Key::from_hash(hash)
1491 }
1492
1493 async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
1495 debug!("Starting iterative node lookup for key {}", key.to_hex());
1496
1497 let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1498
1499 let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1501 lookup_state.add_nodes(initial_nodes);
1502
1503 let mut iterations = 0;
1505 const MAX_ITERATIONS: usize = 10;
1506
1507 while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1508 let nodes_to_query = lookup_state.next_nodes();
1509 if nodes_to_query.is_empty() {
1510 break;
1511 }
1512
1513 let mut queries = Vec::new();
1515 for node in &nodes_to_query {
1516 let query = DHTQuery::FindNode {
1517 key: key.clone(),
1518 requester: self.local_id.to_hex()
1519 };
1520 queries.push(self.simulate_query(node, query));
1521 }
1522
1523 for query_result in futures::future::join_all(queries).await {
1525 if let Ok(DHTResponse::Nodes { nodes }) = query_result {
1526 let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1527 .map(|n| n.to_dht_node())
1528 .collect();
1529 lookup_state.add_nodes(dht_nodes);
1530 }
1531 }
1532
1533 iterations += 1;
1534 }
1535
1536 debug!("Iterative node lookup for key {} completed after {} iterations",
1537 key.to_hex(), iterations);
1538
1539 lookup_state.closest.into_iter()
1541 .take(self.config.replication_factor)
1542 .collect()
1543 }
1544
1545 pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
1547 debug!("Checking consistency for key {}", key.to_hex());
1548
1549 let closest_nodes = self.routing_table
1551 .closest_nodes(key, self.config.replication_factor)
1552 .await;
1553
1554 let mut records_found = Vec::new();
1555 let mut nodes_queried = 0;
1556 let mut nodes_responded = 0;
1557
1558 for node in &closest_nodes {
1560 nodes_queried += 1;
1561
1562 let query = DHTQuery::FindValue {
1563 key: key.clone(),
1564 requester: self.local_id.to_hex()
1565 };
1566
1567 match self.simulate_query(node, query).await {
1568 Ok(DHTResponse::Value { record }) => {
1569 nodes_responded += 1;
1570 records_found.push((node.peer_id.clone(), record));
1571 }
1572 Ok(DHTResponse::Nodes { .. }) => {
1573 nodes_responded += 1;
1574 }
1576 _ => {
1577 }
1579 }
1580 }
1581
1582 let mut consistent = true;
1584 let mut canonical_record: Option<Record> = None;
1585 let mut conflicts = Vec::new();
1586
1587 for (node_id, record) in &records_found {
1588 if let Some(ref canonical) = canonical_record {
1589 if record.value != canonical.value ||
1591 record.created_at != canonical.created_at ||
1592 record.publisher != canonical.publisher {
1593 consistent = false;
1594 conflicts.push((node_id.clone(), record.clone()));
1595 }
1596 } else {
1597 canonical_record = Some(record.clone());
1598 }
1599 }
1600
1601 let report = ConsistencyReport {
1602 key: key.clone(),
1603 nodes_queried,
1604 nodes_responded,
1605 records_found: records_found.len(),
1606 consistent,
1607 canonical_record,
1608 conflicts,
1609 replication_factor: self.config.replication_factor,
1610 };
1611
1612 debug!("Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}",
1613 key.to_hex(), report.nodes_queried, report.nodes_responded,
1614 report.records_found, report.consistent);
1615
1616 Ok(report)
1617 }
1618
1619 pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
1621 debug!("Starting repair for key {}", key.to_hex());
1622
1623 let consistency_report = self.check_consistency(key).await?;
1624
1625 if consistency_report.consistent {
1626 return Ok(RepairResult {
1627 key: key.clone(),
1628 repairs_needed: false,
1629 repairs_attempted: 0,
1630 repairs_successful: 0,
1631 final_state: "consistent".to_string(),
1632 });
1633 }
1634
1635 let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
1637 canonical
1638 } else {
1639 return Ok(RepairResult {
1640 key: key.clone(),
1641 repairs_needed: false,
1642 repairs_attempted: 0,
1643 repairs_successful: 0,
1644 final_state: "no_records_found".to_string(),
1645 });
1646 };
1647
1648 let mut most_recent = canonical_record.clone();
1650 for (_, conflicted_record) in &consistency_report.conflicts {
1651 if conflicted_record.created_at > most_recent.created_at {
1652 most_recent = conflicted_record.clone();
1653 }
1654 }
1655
1656 let closest_nodes = self.routing_table
1658 .closest_nodes(key, self.config.replication_factor)
1659 .await;
1660
1661 let mut repairs_attempted = 0;
1662 let mut repairs_successful = 0;
1663
1664 for node in &closest_nodes {
1665 repairs_attempted += 1;
1666 if self.replicate_record(&most_recent, node).await.is_ok() {
1667 repairs_successful += 1;
1668 }
1669 }
1670
1671 let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
1672 "repaired".to_string()
1673 } else {
1674 "repair_failed".to_string()
1675 };
1676
1677 debug!("Repair for key {} completed: {}/{} repairs successful, final state: {}",
1678 key.to_hex(), repairs_successful, repairs_attempted, final_state);
1679
1680 Ok(RepairResult {
1681 key: key.clone(),
1682 repairs_needed: true,
1683 repairs_attempted,
1684 repairs_successful,
1685 final_state,
1686 })
1687 }
1688
1689 pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
1695 info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
1696
1697 let inbox_key = Key::from_inbox_id(inbox_id);
1698
1699 let inbox_metadata = InboxMetadata {
1701 inbox_id: inbox_id.to_string(),
1702 owner: owner_peer_id.clone(),
1703 created_at: SystemTime::now(),
1704 message_count: 0,
1705 max_messages: 1000, is_public: true,
1707 access_keys: vec![owner_peer_id.clone()],
1708 };
1709
1710 let metadata_value = serde_json::to_vec(&inbox_metadata)
1711 .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox metadata: {}", e)))?;
1712
1713 let metadata_record = Record {
1714 key: inbox_key.clone(),
1715 value: metadata_value,
1716 publisher: owner_peer_id.clone(),
1717 created_at: SystemTime::now(),
1718 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
1720 };
1721
1722 self.put_record_with_infinite_ttl(metadata_record).await?;
1724
1725 let index_key = Key::from_inbox_index(inbox_id);
1727 let empty_index = InboxMessageIndex {
1728 inbox_id: inbox_id.to_string(),
1729 messages: Vec::new(),
1730 last_updated: SystemTime::now(),
1731 };
1732
1733 let index_value = serde_json::to_vec(&empty_index)
1734 .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox index: {}", e)))?;
1735
1736 let index_record = Record {
1737 key: index_key,
1738 value: index_value,
1739 publisher: owner_peer_id.clone(),
1740 created_at: SystemTime::now(),
1741 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
1743 };
1744
1745 self.put_record_with_infinite_ttl(index_record).await?;
1746
1747 let inbox_info = InboxInfo {
1748 inbox_id: inbox_id.to_string(),
1749 three_word_address: self.generate_three_word_address(inbox_id),
1750 owner: owner_peer_id,
1751 created_at: SystemTime::now(),
1752 message_count: 0,
1753 is_accessible: true,
1754 };
1755
1756 info!("Successfully created inbox {} with three-word address: {}",
1757 inbox_id, inbox_info.three_word_address);
1758
1759 Ok(inbox_info)
1760 }
1761
1762 pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
1764 info!("Sending message to inbox {}", inbox_id);
1765
1766 let inbox_key = Key::from_inbox_id(inbox_id);
1768 let metadata_record = self.get(&inbox_key).await
1769 .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1770
1771 let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
1772 .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1773
1774 if inbox_metadata.message_count >= inbox_metadata.max_messages {
1776 return Err(P2PError::DHT(format!("Inbox {} is full", inbox_id)));
1777 }
1778
1779 let message_key = Key::from_inbox_message(inbox_id, &message.id);
1781 let message_value = serde_json::to_vec(&message)
1782 .map_err(|e| P2PError::DHT(format!("Failed to serialize message: {}", e)))?;
1783
1784 let message_record = Record {
1785 key: message_key.clone(),
1786 value: message_value,
1787 publisher: message.sender.clone(),
1788 created_at: message.timestamp,
1789 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
1791 };
1792
1793 self.put_record_with_infinite_ttl(message_record).await?;
1794
1795 let index_key = Key::from_inbox_index(inbox_id);
1797 let index_record = self.get(&index_key).await
1798 .ok_or_else(|| P2PError::DHT(format!("Inbox index {} not found", inbox_id)))?;
1799
1800 let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1801 .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1802
1803 message_index.messages.push(MessageRef {
1804 message_id: message.id.clone(),
1805 sender: message.sender.clone(),
1806 timestamp: message.timestamp,
1807 message_type: message.message_type.clone(),
1808 });
1809 message_index.last_updated = SystemTime::now();
1810
1811 inbox_metadata.message_count += 1;
1813
1814 let updated_index_value = serde_json::to_vec(&message_index)
1816 .map_err(|e| P2PError::DHT(format!("Failed to serialize updated index: {}", e)))?;
1817
1818 let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
1819 .map_err(|e| P2PError::DHT(format!("Failed to serialize updated metadata: {}", e)))?;
1820
1821 let updated_index_record = Record {
1822 key: index_key,
1823 value: updated_index_value,
1824 publisher: message.sender.clone(),
1825 created_at: SystemTime::now(),
1826 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1827 signature: None,
1828 };
1829
1830 let updated_metadata_record = Record {
1831 key: inbox_key,
1832 value: updated_metadata_value,
1833 publisher: message.sender.clone(),
1834 created_at: SystemTime::now(),
1835 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1836 signature: None,
1837 };
1838
1839 self.put_record_with_infinite_ttl(updated_index_record).await?;
1840 self.put_record_with_infinite_ttl(updated_metadata_record).await?;
1841
1842 info!("Successfully sent message {} to inbox {}", message.id, inbox_id);
1843 Ok(())
1844 }
1845
1846 pub async fn get_inbox_messages(&self, inbox_id: &str, limit: Option<usize>) -> Result<Vec<InboxMessage>> {
1848 info!("Retrieving messages from inbox {}", inbox_id);
1849
1850 let index_key = Key::from_inbox_index(inbox_id);
1851 let index_record = self.get(&index_key).await
1852 .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1853
1854 let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1855 .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1856
1857 let mut messages = Vec::new();
1858 let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
1859 message_index.messages.iter().rev().take(limit).collect()
1860 } else {
1861 message_index.messages.iter().collect()
1862 };
1863
1864 for message_ref in message_refs {
1865 let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
1866 if let Some(message_record) = self.get(&message_key).await {
1867 if let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value) {
1868 messages.push(message);
1869 }
1870 }
1871 }
1872
1873 messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1875
1876 info!("Retrieved {} messages from inbox {}", messages.len(), inbox_id);
1877 Ok(messages)
1878 }
1879
1880 pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
1882 let inbox_key = Key::from_inbox_id(inbox_id);
1883 let metadata_record = self.get(&inbox_key).await;
1884
1885 if let Some(record) = metadata_record {
1886 let metadata: InboxMetadata = serde_json::from_slice(&record.value)
1887 .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1888
1889 let inbox_info = InboxInfo {
1890 inbox_id: inbox_id.to_string(),
1891 three_word_address: self.generate_three_word_address(inbox_id),
1892 owner: metadata.owner,
1893 created_at: metadata.created_at,
1894 message_count: metadata.message_count,
1895 is_accessible: true,
1896 };
1897
1898 Ok(Some(inbox_info))
1899 } else {
1900 Ok(None)
1901 }
1902 }
1903
1904 async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
1906 self.storage.store(record.clone()).await?;
1908
1909 let closest_nodes = self.routing_table
1911 .closest_nodes(&record.key, self.config.replication_factor)
1912 .await;
1913
1914 for node in &closest_nodes {
1916 if let Err(e) = self.replicate_record(&record, node).await {
1917 debug!("Failed to replicate infinite TTL record to node {}: {}", node.peer_id, e);
1918 }
1919 }
1920
1921 Ok(())
1922 }
1923
1924 fn generate_three_word_address(&self, inbox_id: &str) -> String {
1926 use crate::bootstrap::words::WordEncoder;
1927
1928 let encoder = WordEncoder::new();
1929 let fake_multiaddr = format!("/inbox/{}/dht", inbox_id).parse().unwrap_or_else(|_| {
1930 "/ip6/::1/udp/9000/quic".parse().unwrap()
1931 });
1932
1933 if let Ok(words) = encoder.encode_multiaddr(&fake_multiaddr) {
1934 words.to_string()
1935 } else {
1936 format!("inbox.{}.messages", inbox_id.chars().take(8).collect::<String>())
1937 }
1938 }
1939
1940 pub async fn add_node(&self, node: DHTNode) -> Result<()> {
1942 self.routing_table.add_node(node).await
1943 }
1944
1945 pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
1947 self.routing_table.remove_node(peer_id).await
1948 }
1949}
1950
1951#[derive(Debug, Clone)]
1953pub struct DHTStats {
1954 pub local_id: Key,
1956 pub total_nodes: usize,
1958 pub active_buckets: usize,
1960 pub stored_records: usize,
1962 pub expired_records: usize,
1964}
1965
1966#[derive(Debug, Clone)]
1968pub struct ConsistencyReport {
1969 pub key: Key,
1971 pub nodes_queried: usize,
1973 pub nodes_responded: usize,
1975 pub records_found: usize,
1977 pub consistent: bool,
1979 pub canonical_record: Option<Record>,
1981 pub conflicts: Vec<(PeerId, Record)>,
1983 pub replication_factor: usize,
1985}
1986
1987#[derive(Debug, Clone)]
1989pub struct RepairResult {
1990 pub key: Key,
1992 pub repairs_needed: bool,
1994 pub repairs_attempted: usize,
1996 pub repairs_successful: usize,
1998 pub final_state: String,
2000}
2001
2002impl LookupState {
2003 pub fn new(target: Key, alpha: usize) -> Self {
2005 Self {
2006 target,
2007 queried: HashMap::new(),
2008 to_query: VecDeque::new(),
2009 closest: Vec::new(),
2010 started_at: Instant::now(),
2011 alpha,
2012 }
2013 }
2014
2015 pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
2017 for node in nodes {
2018 if !self.queried.contains_key(&node.peer_id) {
2019 self.to_query.push_back(node);
2020 }
2021 }
2022
2023 let target = &self.target;
2025 self.to_query.make_contiguous().sort_by_key(|node| {
2026 node.key().distance(target).as_bytes().to_vec()
2027 });
2028 }
2029
2030 pub fn next_nodes(&mut self) -> Vec<DHTNode> {
2032 let mut nodes = Vec::new();
2033 for _ in 0..self.alpha {
2034 if let Some(node) = self.to_query.pop_front() {
2035 self.queried.insert(node.peer_id.clone(), Instant::now());
2036 nodes.push(node);
2037 } else {
2038 break;
2039 }
2040 }
2041 nodes
2042 }
2043
2044 pub fn is_complete(&self) -> bool {
2046 self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
2047 }
2048}
2049
2050#[derive(Debug, Clone, Serialize, Deserialize)]
2056pub struct InboxMetadata {
2057 pub inbox_id: String,
2058 pub owner: PeerId,
2059 pub created_at: SystemTime,
2060 pub message_count: usize,
2061 pub max_messages: usize,
2062 pub is_public: bool,
2063 pub access_keys: Vec<PeerId>,
2064}
2065
2066#[derive(Debug, Clone, Serialize, Deserialize)]
2068pub struct InboxMessage {
2069 pub id: String,
2070 pub sender: PeerId,
2071 pub recipient_inbox: String,
2072 pub content: String,
2073 pub message_type: String,
2074 pub timestamp: SystemTime,
2075 pub attachments: Vec<MessageAttachment>,
2076}
2077
2078#[derive(Debug, Clone, Serialize, Deserialize)]
2080pub struct MessageAttachment {
2081 pub filename: String,
2082 pub content_type: String,
2083 pub size: u64,
2084 pub hash: String,
2085}
2086
2087#[derive(Debug, Clone, Serialize, Deserialize)]
2089pub struct InboxMessageIndex {
2090 pub inbox_id: String,
2091 pub messages: Vec<MessageRef>,
2092 pub last_updated: SystemTime,
2093}
2094
2095#[derive(Debug, Clone, Serialize, Deserialize)]
2097pub struct MessageRef {
2098 pub message_id: String,
2099 pub sender: PeerId,
2100 pub timestamp: SystemTime,
2101 pub message_type: String,
2102}
2103
2104#[derive(Debug, Clone, Serialize, Deserialize)]
2106pub struct InboxInfo {
2107 pub inbox_id: String,
2108 pub three_word_address: String,
2109 pub owner: PeerId,
2110 pub created_at: SystemTime,
2111 pub message_count: usize,
2112 pub is_accessible: bool,
2113}
2114
2115impl Key {
2120 pub fn from_inbox_id(inbox_id: &str) -> Self {
2122 let mut hasher = Sha256::new();
2123 hasher.update(b"INBOX_METADATA:");
2124 hasher.update(inbox_id.as_bytes());
2125 let hash = hasher.finalize();
2126 Key { hash: hash.into() }
2127 }
2128
2129 pub fn from_inbox_index(inbox_id: &str) -> Self {
2131 let mut hasher = Sha256::new();
2132 hasher.update(b"INBOX_INDEX:");
2133 hasher.update(inbox_id.as_bytes());
2134 let hash = hasher.finalize();
2135 Key { hash: hash.into() }
2136 }
2137
2138 pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
2140 let mut hasher = Sha256::new();
2141 hasher.update(b"INBOX_MESSAGE:");
2142 hasher.update(inbox_id.as_bytes());
2143 hasher.update(b":");
2144 hasher.update(message_id.as_bytes());
2145 let hash = hasher.finalize();
2146 Key { hash: hash.into() }
2147 }
2148}
2149
2150