1use crate::{PeerId, Multiaddr, Result, P2PError};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::time::{Duration, Instant, SystemTime};
14use sha2::{Digest, Sha256};
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17use futures;
18
19pub mod skademlia;
21
22pub mod ipv6_identity;
24
25#[derive(Debug, Clone)]
27pub struct DHTConfig {
28 pub replication_factor: usize,
30 pub bucket_size: usize,
32 pub alpha: usize,
34 pub record_ttl: Duration,
36 pub bucket_refresh_interval: Duration,
38 pub republish_interval: Duration,
40 pub max_distance: u8,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub struct Key {
47 hash: [u8; 32],
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Record {
54 pub key: Key,
56 pub value: Vec<u8>,
58 pub publisher: PeerId,
60 pub created_at: SystemTime,
62 pub expires_at: SystemTime,
64 pub signature: Option<Vec<u8>>,
66}
67
68#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70pub struct DHTNode {
71 pub peer_id: PeerId,
73 pub addresses: Vec<Multiaddr>,
75 #[serde(with = "instant_as_secs")]
77 pub last_seen: Instant,
78 pub distance: Key,
80 pub is_connected: bool,
82}
83
84mod instant_as_secs {
86 use serde::{Deserializer, Serializer, Deserialize, Serialize};
87 use std::time::Instant;
88
89 pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
90 where
91 S: Serializer,
92 {
93 instant.elapsed().as_secs().serialize(serializer)
95 }
96
97 pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
98 where
99 D: Deserializer<'de>,
100 {
101 let secs = u64::deserialize(deserializer)?;
102 Ok(Instant::now() - std::time::Duration::from_secs(secs))
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct SerializableDHTNode {
110 pub peer_id: PeerId,
112 pub addresses: Vec<Multiaddr>,
114 pub last_seen_secs: u64,
116 pub distance: Key,
118 pub is_connected: bool,
120}
121
122#[derive(Debug)]
124struct KBucket {
125 nodes: VecDeque<DHTNode>,
127 capacity: usize,
129 last_refresh: Instant,
131}
132
133#[derive(Debug)]
135pub struct RoutingTable {
136 local_id: Key,
138 buckets: Vec<RwLock<KBucket>>,
140 #[allow(dead_code)]
142 config: DHTConfig,
143}
144
145#[derive(Debug)]
147pub struct DHTStorage {
148 records: RwLock<HashMap<Key, Record>>,
150 #[allow(dead_code)]
152 config: DHTConfig,
153}
154
155#[derive(Debug)]
157pub struct DHT {
158 local_id: Key,
160 routing_table: RoutingTable,
162 storage: DHTStorage,
164 #[allow(dead_code)]
166 config: DHTConfig,
167 pub skademlia: Option<skademlia::SKademlia>,
169 pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub enum DHTQuery {
176 FindNode {
178 key: Key,
180 requester: PeerId
182 },
183 FindValue {
185 key: Key,
187 requester: PeerId
189 },
190 Store {
192 record: Record,
194 requester: PeerId
196 },
197 Ping {
199 requester: PeerId
201 },
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
206pub enum DHTResponse {
207 Nodes {
209 nodes: Vec<SerializableDHTNode>
211 },
212 Value {
214 record: Record
216 },
217 Stored {
219 success: bool
221 },
222 Pong {
224 responder: PeerId
226 },
227 Error {
229 message: String
231 },
232}
233
234#[derive(Debug)]
236pub struct LookupState {
237 pub target: Key,
239 pub queried: HashMap<PeerId, Instant>,
241 pub to_query: VecDeque<DHTNode>,
243 pub closest: Vec<DHTNode>,
245 pub started_at: Instant,
247 pub alpha: usize,
249}
250
251impl Default for DHTConfig {
252 fn default() -> Self {
253 Self {
254 replication_factor: 20, bucket_size: 20, alpha: 3, record_ttl: Duration::from_secs(24 * 60 * 60), bucket_refresh_interval: Duration::from_secs(60 * 60), republish_interval: Duration::from_secs(24 * 60 * 60), max_distance: 160, }
262 }
263}
264
265impl Key {
266 pub fn new(data: &[u8]) -> Self {
268 let mut hasher = Sha256::new();
269 hasher.update(data);
270 let hash: [u8; 32] = hasher.finalize().into();
271 Self { hash }
272 }
273
274 pub fn from_hash(hash: [u8; 32]) -> Self {
276 Self { hash }
277 }
278
279 pub fn random() -> Self {
281 use rand::RngCore;
282 let mut hash = [0u8; 32];
283 rand::thread_rng().fill_bytes(&mut hash);
284 Self { hash }
285 }
286
287 pub fn as_bytes(&self) -> &[u8] {
289 &self.hash
290 }
291
292 pub fn to_hex(&self) -> String {
294 hex::encode(self.hash)
295 }
296
297 pub fn distance(&self, other: &Key) -> Key {
299 let mut result = [0u8; 32];
300 for i in 0..32 {
301 result[i] = self.hash[i] ^ other.hash[i];
302 }
303 Key { hash: result }
304 }
305
306 pub fn leading_zeros(&self) -> u32 {
308 for (i, &byte) in self.hash.iter().enumerate() {
309 if byte != 0 {
310 return (i * 8) as u32 + byte.leading_zeros();
311 }
312 }
313 256 }
315
316 pub fn bucket_index(&self, local_id: &Key) -> usize {
318 let distance = self.distance(local_id);
319 let leading_zeros = distance.leading_zeros();
320 if leading_zeros >= 255 {
321 255 } else {
323 (255 - leading_zeros) as usize
324 }
325 }
326}
327
328impl Record {
329 pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
331 let now = SystemTime::now();
332 let ttl = Duration::from_secs(24 * 60 * 60); Self {
335 key,
336 value,
337 publisher,
338 created_at: now,
339 expires_at: now + ttl,
340 signature: None,
341 }
342 }
343
344 pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
346 let now = SystemTime::now();
347
348 Self {
349 key,
350 value,
351 publisher,
352 created_at: now,
353 expires_at: now + ttl,
354 signature: None,
355 }
356 }
357
358 pub fn is_expired(&self) -> bool {
360 SystemTime::now() > self.expires_at
361 }
362
363 pub fn age(&self) -> Duration {
365 SystemTime::now()
366 .duration_since(self.created_at)
367 .unwrap_or(Duration::ZERO)
368 }
369
370 pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
372 self.signature = Some(vec![0u8; 64]); Ok(())
376 }
377
378 pub fn verify(&self, _public_key: &[u8]) -> bool {
380 self.signature.is_some()
383 }
384}
385
386impl DHTNode {
387 pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
389 let node_key = Key::new(peer_id.as_bytes());
390 let distance = node_key.distance(local_id);
391
392 Self {
393 peer_id,
394 addresses,
395 last_seen: Instant::now(),
396 distance,
397 is_connected: false,
398 }
399 }
400
401 pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
403 Self {
404 peer_id,
405 addresses,
406 last_seen: Instant::now(),
407 distance: key,
408 is_connected: false,
409 }
410 }
411
412 pub fn touch(&mut self) {
414 self.last_seen = Instant::now();
415 }
416
417 pub fn is_stale(&self, timeout: Duration) -> bool {
419 self.last_seen.elapsed() > timeout
420 }
421
422 pub fn key(&self) -> Key {
424 Key::new(self.peer_id.as_bytes())
425 }
426
427 pub fn to_serializable(&self) -> SerializableDHTNode {
429 SerializableDHTNode {
430 peer_id: self.peer_id.clone(),
431 addresses: self.addresses.clone(),
432 last_seen_secs: self.last_seen.elapsed().as_secs(),
433 distance: self.distance.clone(),
434 is_connected: self.is_connected,
435 }
436 }
437}
438
439impl SerializableDHTNode {
440 pub fn to_dht_node(&self) -> DHTNode {
442 DHTNode {
443 peer_id: self.peer_id.clone(),
444 addresses: self.addresses.clone(),
445 last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
446 distance: self.distance.clone(),
447 is_connected: self.is_connected,
448 }
449 }
450}
451
452impl KBucket {
453 fn new(capacity: usize) -> Self {
455 Self {
456 nodes: VecDeque::new(),
457 capacity,
458 last_refresh: Instant::now(),
459 }
460 }
461
462 fn add_node(&mut self, node: DHTNode) -> bool {
464 if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
466 let mut existing = self.nodes.remove(pos).unwrap();
468 existing.touch();
469 existing.is_connected = node.is_connected;
470 self.nodes.push_front(existing);
471 return true;
472 }
473
474 if self.nodes.len() < self.capacity {
475 self.nodes.push_front(node);
477 true
478 } else {
479 false
481 }
482 }
483
484 fn remove_node(&mut self, peer_id: &PeerId) -> bool {
486 if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
487 self.nodes.remove(pos);
488 true
489 } else {
490 false
491 }
492 }
493
494 fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
496 let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
497 nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
498 nodes.into_iter().take(count).collect()
499 }
500
501 fn needs_refresh(&self, interval: Duration) -> bool {
503 self.last_refresh.elapsed() > interval
504 }
505}
506
507impl RoutingTable {
508 pub fn new(local_id: Key, config: DHTConfig) -> Self {
510 let mut buckets = Vec::new();
511 for _ in 0..256 {
512 buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
513 }
514
515 Self {
516 local_id,
517 buckets,
518 config,
519 }
520 }
521
522 pub async fn add_node(&self, node: DHTNode) -> Result<()> {
524 let bucket_index = node.key().bucket_index(&self.local_id);
525 let mut bucket = self.buckets[bucket_index].write().await;
526
527 if bucket.add_node(node.clone()) {
528 debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
529 } else {
530 debug!("Bucket {} full, could not add node {}", bucket_index, node.peer_id);
531 }
532
533 Ok(())
534 }
535
536 pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
538 let node_key = Key::new(peer_id.as_bytes());
539 let bucket_index = node_key.bucket_index(&self.local_id);
540 let mut bucket = self.buckets[bucket_index].write().await;
541
542 if bucket.remove_node(peer_id) {
543 debug!("Removed node {} from bucket {}", peer_id, bucket_index);
544 }
545
546 Ok(())
547 }
548
549 pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
551 let mut all_nodes = Vec::new();
552
553 let target_bucket = target.bucket_index(&self.local_id);
555
556 let mut checked = vec![false; 256];
558 let mut to_check = VecDeque::new();
559 to_check.push_back(target_bucket);
560
561 while let Some(bucket_idx) = to_check.pop_front() {
562 if checked[bucket_idx] {
563 continue;
564 }
565 checked[bucket_idx] = true;
566
567 let bucket = self.buckets[bucket_idx].read().await;
568 all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
569
570 if bucket_idx > 0 && !checked[bucket_idx - 1] {
572 to_check.push_back(bucket_idx - 1);
573 }
574 if bucket_idx < 255 && !checked[bucket_idx + 1] {
575 to_check.push_back(bucket_idx + 1);
576 }
577
578 if all_nodes.len() >= count * 2 {
580 break;
581 }
582 }
583
584 all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
586 all_nodes.into_iter().take(count).collect()
587 }
588
589 pub async fn stats(&self) -> (usize, usize) {
591 let mut total_nodes = 0;
592 let mut active_buckets = 0;
593
594 for bucket in &self.buckets {
595 let bucket_guard = bucket.read().await;
596 let node_count = bucket_guard.nodes.len();
597 total_nodes += node_count;
598 if node_count > 0 {
599 active_buckets += 1;
600 }
601 }
602
603 (total_nodes, active_buckets)
604 }
605}
606
607impl DHTStorage {
608 pub fn new(config: DHTConfig) -> Self {
610 Self {
611 records: RwLock::new(HashMap::new()),
612 config,
613 }
614 }
615
616 pub async fn store(&self, record: Record) -> Result<()> {
618 let mut records = self.records.write().await;
619 records.insert(record.key.clone(), record);
620 Ok(())
621 }
622
623 pub async fn get(&self, key: &Key) -> Option<Record> {
625 let records = self.records.read().await;
626 records.get(key).cloned()
627 }
628
629 pub async fn cleanup_expired(&self) -> usize {
631 let mut records = self.records.write().await;
632 let initial_count = records.len();
633 records.retain(|_, record| !record.is_expired());
634 initial_count - records.len()
635 }
636
637 pub async fn all_records(&self) -> Vec<Record> {
639 let records = self.records.read().await;
640 records.values().cloned().collect()
641 }
642
643 pub async fn stats(&self) -> (usize, usize) {
645 let records = self.records.read().await;
646 let total = records.len();
647 let expired = records.values().filter(|r| r.is_expired()).count();
648 (total, expired)
649 }
650}
651
652impl DHT {
653 pub fn new(local_id: Key, config: DHTConfig) -> Self {
655 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
656 let storage = DHTStorage::new(config.clone());
657
658 Self {
659 local_id,
660 routing_table,
661 storage,
662 config,
663 skademlia: None,
664 ipv6_identity_manager: None,
665 }
666 }
667
668 pub fn new_with_security(local_id: Key, config: DHTConfig, skademlia_config: skademlia::SKademliaConfig) -> Self {
670 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
671 let storage = DHTStorage::new(config.clone());
672 let skademlia = skademlia::SKademlia::new(skademlia_config);
673
674 Self {
675 local_id,
676 routing_table,
677 storage,
678 config,
679 skademlia: Some(skademlia),
680 ipv6_identity_manager: None,
681 }
682 }
683
684 pub fn new_with_ipv6_security(
686 local_id: Key,
687 config: DHTConfig,
688 skademlia_config: skademlia::SKademliaConfig,
689 ipv6_config: ipv6_identity::IPv6DHTConfig
690 ) -> Self {
691 let routing_table = RoutingTable::new(local_id.clone(), config.clone());
692 let storage = DHTStorage::new(config.clone());
693 let skademlia = skademlia::SKademlia::new(skademlia_config);
694 let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
695
696 Self {
697 local_id,
698 routing_table,
699 storage,
700 config,
701 skademlia: Some(skademlia),
702 ipv6_identity_manager: Some(ipv6_identity_manager),
703 }
704 }
705
706 pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
708 if let Some(ref mut manager) = self.ipv6_identity_manager {
709 self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
711 manager.set_local_identity(identity)?;
712 info!("Local IPv6 identity set and DHT key updated");
713 Ok(())
714 } else {
715 Err(P2PError::Security("IPv6 identity manager not enabled".to_string()).into())
716 }
717 }
718
719 pub async fn add_bootstrap_node(&self, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
721 let node = DHTNode::new(peer_id, addresses, &self.local_id);
722 self.routing_table.add_node(node).await
723 }
724
725 pub async fn add_ipv6_node(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>, ipv6_identity: crate::security::IPv6NodeID) -> Result<()> {
727 if let Some(ref mut manager) = self.ipv6_identity_manager {
728 let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
730 let security_event = manager.validate_node_join(&base_node, &ipv6_identity).await?;
731
732 match security_event {
733 ipv6_identity::IPv6SecurityEvent::NodeJoined { verification_confidence, .. } => {
734 let ipv6_node = manager.enhance_dht_node(base_node.clone(), ipv6_identity).await?;
736
737 let mut enhanced_base_node = base_node;
739 enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
740
741 self.routing_table.add_node(enhanced_base_node).await?;
743
744 info!("Added IPv6-verified node {} with confidence {:.2}", peer_id, verification_confidence);
745 Ok(())
746 }
747 ipv6_identity::IPv6SecurityEvent::VerificationFailed { reason, .. } => {
748 Err(P2PError::Security(format!("IPv6 verification failed: {}", reason)).into())
749 }
750 ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => {
751 Err(P2PError::Security(format!("IP diversity violation: {}", subnet_type)).into())
752 }
753 ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => {
754 Err(P2PError::Security(format!("Node banned: {}", reason)).into())
755 }
756 _ => {
757 Err(P2PError::Security("Unexpected security event".to_string()).into())
758 }
759 }
760 } else {
761 self.add_bootstrap_node(peer_id, addresses).await
763 }
764 }
765
766 pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
768 self.routing_table.remove_node(peer_id).await?;
770
771 if let Some(ref mut manager) = self.ipv6_identity_manager {
773 manager.remove_node(peer_id);
774 }
775
776 Ok(())
777 }
778
779 pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
781 if let Some(ref manager) = self.ipv6_identity_manager {
782 manager.is_node_banned(peer_id)
783 } else {
784 false
785 }
786 }
787
788 pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
790 let record = Record::new(key.clone(), value, self.local_id.to_hex());
791
792 self.storage.store(record.clone()).await?;
794
795 let closest_nodes = self.routing_table
797 .closest_nodes(&key, self.config.replication_factor)
798 .await;
799
800 info!("Storing record with key {} on {} nodes", key.to_hex(), closest_nodes.len());
801
802 if closest_nodes.is_empty() {
804 info!("No other nodes available for replication, storing only locally");
805 return Ok(());
806 }
807
808 let mut successful_replications = 0;
810 for node in &closest_nodes {
811 if self.replicate_record(&record, node).await.is_ok() {
812 successful_replications += 1;
813 }
814 }
815
816 info!("Successfully replicated record {} to {}/{} nodes",
817 key.to_hex(), successful_replications, closest_nodes.len());
818
819 let required_replications = if closest_nodes.len() == 1 {
821 1
822 } else {
823 std::cmp::max(1, closest_nodes.len() / 2)
824 };
825
826 if successful_replications >= required_replications {
827 Ok(())
828 } else {
829 Err(P2PError::DHT(format!(
830 "Insufficient replication: only {}/{} nodes stored the record (required: {})",
831 successful_replications, closest_nodes.len(), required_replications
832 )).into())
833 }
834 }
835
836 pub async fn get(&self, key: &Key) -> Option<Record> {
838 if let Some(record) = self.storage.get(key).await {
840 if !record.is_expired() {
841 return Some(record);
842 }
843 }
844
845 if let Some(record) = self.iterative_find_value(key).await {
847 if self.storage.store(record.clone()).await.is_ok() {
849 debug!("Cached retrieved record with key {}", key.to_hex());
850 }
851 return Some(record);
852 }
853
854 None
855 }
856
857 pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
859 self.routing_table.closest_nodes(key, self.config.replication_factor).await
860 }
861
862 pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
864 match query {
865 DHTQuery::FindNode { key, requester: _ } => {
866 let nodes = self.find_node(&key).await;
867 let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
868 DHTResponse::Nodes { nodes: serializable_nodes }
869 }
870 DHTQuery::FindValue { key, requester: _ } => {
871 if let Some(record) = self.storage.get(&key).await {
872 if !record.is_expired() {
873 return DHTResponse::Value { record };
874 }
875 }
876 let nodes = self.find_node(&key).await;
877 let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
878 DHTResponse::Nodes { nodes: serializable_nodes }
879 }
880 DHTQuery::Store { record, requester: _ } => {
881 match self.storage.store(record).await {
882 Ok(()) => DHTResponse::Stored { success: true },
883 Err(_) => DHTResponse::Stored { success: false },
884 }
885 }
886 DHTQuery::Ping { requester: _ } => {
887 DHTResponse::Pong { responder: self.local_id.to_hex() }
888 }
889 }
890 }
891
892 pub async fn stats(&self) -> DHTStats {
894 let (total_nodes, active_buckets) = self.routing_table.stats().await;
895 let (stored_records, expired_records) = self.storage.stats().await;
896
897 DHTStats {
898 local_id: self.local_id.clone(),
899 total_nodes,
900 active_buckets,
901 stored_records,
902 expired_records,
903 }
904 }
905
906 pub async fn maintenance(&self) -> Result<()> {
908 let expired_count = self.storage.cleanup_expired().await;
910 if expired_count > 0 {
911 debug!("Cleaned up {} expired records", expired_count);
912 }
913
914 self.republish_records().await?;
916
917 self.refresh_buckets().await?;
919
920 Ok(())
923 }
924
925 pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
927 if let Some(record) = self.storage.get(key).await {
929 if !record.is_expired() {
930 return Ok(Some(record));
931 }
932 }
933
934 let (enable_distance_verification, disjoint_path_count, min_reputation) = if let Some(ref skademlia) = self.skademlia {
936 (skademlia.config.enable_distance_verification,
937 skademlia.config.disjoint_path_count,
938 skademlia.config.min_routing_reputation)
939 } else {
940 return Ok(self.get(key).await);
942 };
943
944 let initial_nodes = self.routing_table
946 .closest_nodes(key, disjoint_path_count * 3)
947 .await;
948
949 if initial_nodes.is_empty() {
950 return Ok(None);
951 }
952
953 let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
955 skademlia.secure_lookup(key.clone(), initial_nodes).await?
956 } else {
957 return Ok(None);
958 };
959
960 for node in &secure_nodes {
962 if enable_distance_verification {
964 let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
965
966 let consensus = if let Some(ref mut skademlia) = self.skademlia {
967 skademlia.verify_distance_consensus(&node.peer_id, key, witness_nodes).await?
968 } else {
969 continue;
970 };
971
972 if consensus.confidence < min_reputation {
973 debug!("Skipping node {} due to low distance verification confidence", node.peer_id);
974 continue;
975 }
976 }
977
978 let query = DHTQuery::FindValue {
979 key: key.clone(),
980 requester: self.local_id.to_hex()
981 };
982 if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
983 let _ = self.storage.store(record.clone()).await;
985 return Ok(Some(record));
986 }
987 }
988
989 Ok(None)
990 }
991
992 pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
994 let record = Record::new(key.clone(), value, self.local_id.to_hex());
995
996 self.storage.store(record.clone()).await?;
998
999 let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1001 let candidate_nodes = self.routing_table
1003 .closest_nodes(&key, self.config.replication_factor * 2)
1004 .await;
1005
1006 skademlia.select_secure_nodes(
1008 &candidate_nodes,
1009 &key,
1010 self.config.replication_factor
1011 )
1012 } else {
1013 self.routing_table.closest_nodes(&key, self.config.replication_factor).await
1015 };
1016
1017 info!("Storing record with key {} on {} secure nodes", key.to_hex(), secure_nodes.len());
1018
1019 let mut replication_results = Vec::new();
1021 let mut successful_replications = 0;
1022
1023 for node in &secure_nodes {
1024 let success = self.replicate_record(&record, node).await.is_ok();
1025 replication_results.push((node.peer_id.clone(), success));
1026 if success {
1027 successful_replications += 1;
1028 }
1029 }
1030
1031 if let Some(ref mut skademlia) = self.skademlia {
1033 for (peer_id, success) in replication_results {
1034 skademlia.reputation_manager.update_reputation(
1035 &peer_id,
1036 success,
1037 Duration::from_millis(100)
1038 );
1039 }
1040 }
1041
1042 if successful_replications > 0 {
1043 info!("Successfully replicated to {}/{} secure nodes",
1044 successful_replications, secure_nodes.len());
1045 }
1046
1047 Ok(())
1048 }
1049
1050 pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
1052 if let Some(ref mut skademlia) = self.skademlia {
1053 let nodes = self.routing_table.closest_nodes(&key, skademlia.config.sibling_list_size).await;
1054 skademlia.update_sibling_list(key, nodes);
1055 }
1056 Ok(())
1057 }
1058
1059 pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
1061 if let Some(ref skademlia) = self.skademlia {
1062 let sample_key = Key::random();
1064 let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
1065 skademlia.validate_routing_consistency(&sample_nodes).await
1066 } else {
1067 Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1068 }
1069 }
1070
1071 pub fn create_distance_challenge(&mut self, peer_id: &PeerId, key: &Key) -> Option<skademlia::DistanceChallenge> {
1073 self.skademlia.as_mut()
1074 .map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
1075 }
1076
1077 pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
1079 if let Some(ref skademlia) = self.skademlia {
1080 skademlia.verify_distance_proof(proof)
1081 } else {
1082 Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1083 }
1084 }
1085
1086 #[allow(dead_code)]
1088 async fn verify_node_distances(&self, nodes: &[DHTNode], _target_key: &Key, min_reputation: f64) -> Result<Vec<DHTNode>> {
1089 let mut verified_nodes = Vec::new();
1090
1091 for node in nodes {
1092 let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
1093
1094 if witness_nodes.len() >= 2 {
1096 let consensus_confidence = 0.8; if consensus_confidence >= min_reputation {
1100 verified_nodes.push(node.clone());
1101 } else {
1102 debug!("Node {} failed distance verification with confidence {}",
1103 node.peer_id, consensus_confidence);
1104 }
1105 }
1106 }
1107
1108 Ok(verified_nodes)
1109 }
1110
1111 async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
1113 let target_key = Key::new(target_peer.as_bytes());
1115 let candidate_nodes = self.routing_table.closest_nodes(&target_key, count * 2).await;
1116
1117 candidate_nodes.into_iter()
1118 .filter(|node| node.peer_id != *target_peer)
1119 .take(count)
1120 .map(|node| node.peer_id)
1121 .collect()
1122 }
1123
1124 pub fn create_enhanced_distance_challenge(&mut self, peer_id: &PeerId, key: &Key, suspected_attack: bool) -> Option<skademlia::EnhancedDistanceChallenge> {
1126 if let Some(ref mut skademlia) = self.skademlia {
1127 Some(skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack))
1128 } else {
1129 None
1130 }
1131 }
1132
1133 pub async fn verify_distance_multi_round(&mut self, challenge: &skademlia::EnhancedDistanceChallenge) -> Result<bool> {
1135 if let Some(ref mut skademlia) = self.skademlia {
1136 skademlia.verify_distance_multi_round(challenge).await
1137 } else {
1138 Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
1139 }
1140 }
1141
1142 pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
1144 self.skademlia.as_mut()
1145 .map(|skademlia| skademlia.get_security_bucket(key))
1146 }
1147
1148 pub async fn add_trusted_node(&mut self, key: &Key, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
1150 if let Some(ref mut skademlia) = self.skademlia {
1151 let node = DHTNode::new(peer_id, addresses, &self.local_id);
1152 let security_bucket = skademlia.get_security_bucket(key);
1153 security_bucket.add_trusted_node(node);
1154 }
1155 Ok(())
1156 }
1157
1158 pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
1160 if self.is_node_banned(&self.local_id.to_hex()) {
1162 return Err(P2PError::Security("Local node is banned".to_string()).into());
1163 }
1164
1165 if let Some(record) = self.storage.get(key).await {
1167 if !record.is_expired() {
1168 return Ok(Some(record));
1169 }
1170 }
1171
1172 let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
1174
1175 if verified_nodes.is_empty() {
1176 return self.secure_get(key).await;
1178 }
1179
1180 if let Some(ref mut skademlia) = self.skademlia {
1182 let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
1183
1184 for node in &secure_nodes {
1186 if let Some(ref manager) = self.ipv6_identity_manager {
1188 if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1189 if ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1191 debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
1192 continue;
1193 }
1194 } else {
1195 debug!("Skipping node {} without verified IPv6 identity", node.peer_id);
1196 continue;
1197 }
1198 }
1199
1200 let query = DHTQuery::FindValue {
1201 key: key.clone(),
1202 requester: self.local_id.to_hex()
1203 };
1204 if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
1205 if let Some(ref mut manager) = self.ipv6_identity_manager {
1207 manager.update_ipv6_reputation(&node.peer_id, true);
1208 }
1209
1210 let _ = self.storage.store(record.clone()).await;
1212 return Ok(Some(record));
1213 }
1214 }
1215 }
1216
1217 Ok(None)
1218 }
1219
1220 pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
1222 if self.is_node_banned(&self.local_id.to_hex()) {
1224 return Err(P2PError::Security("Local node is banned".to_string()).into());
1225 }
1226
1227 let record = Record::new(key.clone(), value, self.local_id.to_hex());
1228
1229 self.storage.store(record.clone()).await?;
1231
1232 let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
1234
1235 let secure_nodes = if let Some(ref skademlia) = self.skademlia {
1237 skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
1238 } else {
1239 verified_nodes.into_iter().take(self.config.replication_factor).collect()
1240 };
1241
1242 info!("Storing record with key {} on {} IPv6-verified secure nodes", key.to_hex(), secure_nodes.len());
1243
1244 let mut successful_replications = 0;
1246
1247 for node in &secure_nodes {
1248 let success = self.replicate_record(&record, node).await.is_ok();
1249
1250 if let Some(ref mut manager) = self.ipv6_identity_manager {
1252 manager.update_ipv6_reputation(&node.peer_id, success);
1253 }
1254
1255 if success {
1256 successful_replications += 1;
1257 }
1258 }
1259
1260 if successful_replications == 0 && !secure_nodes.is_empty() {
1261 return Err(P2PError::DHT("Failed to replicate to any IPv6-verified nodes".to_string()).into());
1262 }
1263
1264 info!("Successfully replicated to {}/{} IPv6-verified nodes",
1265 successful_replications, secure_nodes.len());
1266 Ok(())
1267 }
1268
1269 async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
1271 let mut verified_nodes = Vec::new();
1272
1273 let candidate_nodes = self.routing_table.closest_nodes(key, self.config.replication_factor * 2).await;
1275
1276 if let Some(ref manager) = self.ipv6_identity_manager {
1277 for node in candidate_nodes {
1278 if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
1280 if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
1282 if !manager.is_node_banned(&node.peer_id) {
1284 verified_nodes.push(node);
1285 }
1286 }
1287 }
1288 }
1289 } else {
1290 verified_nodes = candidate_nodes;
1292 }
1293
1294 Ok(verified_nodes)
1295 }
1296
1297 pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
1299 self.ipv6_identity_manager.as_ref()
1300 .map(|manager| manager.get_ipv6_diversity_stats())
1301 }
1302
1303 pub fn cleanup_ipv6_data(&mut self) {
1305 if let Some(ref mut manager) = self.ipv6_identity_manager {
1306 manager.cleanup_expired();
1307 }
1308 }
1309
1310 pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
1312 if let Some(ref mut manager) = self.ipv6_identity_manager {
1313 manager.ban_node(peer_id, reason);
1314 }
1315 }
1316
1317 pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
1319 self.ipv6_identity_manager.as_ref()
1320 .and_then(|manager| manager.get_local_identity())
1321 }
1322
1323 async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
1325 debug!("Replicating record {} to node {}", record.key.to_hex(), node.peer_id);
1328
1329 tokio::time::sleep(Duration::from_millis(10)).await;
1331
1332 if rand::random::<f64>() < 0.95 {
1334 Ok(())
1335 } else {
1336 Err(P2PError::Network("Replication failed".to_string()).into())
1337 }
1338 }
1339
1340 async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
1342 debug!("Starting iterative lookup for key {}", key.to_hex());
1343
1344 let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1345
1346 let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1348 lookup_state.add_nodes(initial_nodes);
1349
1350 let mut iterations = 0;
1352 const MAX_ITERATIONS: usize = 10;
1353
1354 while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1355 let nodes_to_query = lookup_state.next_nodes();
1356 if nodes_to_query.is_empty() {
1357 break;
1358 }
1359
1360 let mut queries = Vec::new();
1362 for node in &nodes_to_query {
1363 let query = DHTQuery::FindValue {
1364 key: key.clone(),
1365 requester: self.local_id.to_hex()
1366 };
1367 queries.push(self.simulate_query(node, query));
1368 }
1369
1370 for query_result in futures::future::join_all(queries).await {
1372 match query_result {
1373 Ok(DHTResponse::Value { record }) => {
1374 debug!("Found value for key {} in iteration {}", key.to_hex(), iterations);
1375 return Some(record);
1376 }
1377 Ok(DHTResponse::Nodes { nodes }) => {
1378 let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1379 .map(|n| n.to_dht_node())
1380 .collect();
1381 lookup_state.add_nodes(dht_nodes);
1382 }
1383 _ => {
1384 debug!("Query failed during iterative lookup");
1386 }
1387 }
1388 }
1389
1390 iterations += 1;
1391 }
1392
1393 debug!("Iterative lookup for key {} completed after {} iterations, value not found",
1394 key.to_hex(), iterations);
1395 None
1396 }
1397
1398 async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
1400 tokio::time::sleep(Duration::from_millis(50)).await;
1402
1403 Ok(self.handle_query(query).await)
1405 }
1406
1407 async fn republish_records(&self) -> Result<()> {
1409 let all_records = self.storage.all_records().await;
1410 let mut republished_count = 0;
1411
1412 for record in all_records {
1413 let remaining_ttl = record.expires_at
1415 .duration_since(SystemTime::now())
1416 .unwrap_or(Duration::ZERO);
1417
1418 if remaining_ttl < self.config.record_ttl / 4 {
1419 let closest_nodes = self.routing_table
1421 .closest_nodes(&record.key, self.config.replication_factor)
1422 .await;
1423
1424 for node in &closest_nodes {
1426 if self.replicate_record(&record, node).await.is_ok() {
1427 republished_count += 1;
1428 }
1429 }
1430 }
1431 }
1432
1433 if republished_count > 0 {
1434 debug!("Republished {} records during maintenance", republished_count);
1435 }
1436
1437 Ok(())
1438 }
1439
1440 async fn refresh_buckets(&self) -> Result<()> {
1442 let mut refreshed_count = 0;
1443
1444 for bucket_index in 0..256 {
1445 let needs_refresh = {
1446 let bucket = self.routing_table.buckets[bucket_index].read().await;
1447 bucket.needs_refresh(self.config.bucket_refresh_interval)
1448 };
1449
1450 if needs_refresh {
1451 let target_key = self.generate_key_for_bucket(bucket_index);
1453 let _nodes = self.iterative_find_node(&target_key).await;
1454 refreshed_count += 1;
1455
1456 {
1458 let mut bucket = self.routing_table.buckets[bucket_index].write().await;
1459 bucket.last_refresh = Instant::now();
1460 }
1461 }
1462 }
1463
1464 if refreshed_count > 0 {
1465 debug!("Refreshed {} buckets during maintenance", refreshed_count);
1466 }
1467
1468 Ok(())
1469 }
1470
1471 fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
1473 let mut key_bytes = self.local_id.as_bytes().to_vec();
1474
1475 if bucket_index < 256 {
1477 let byte_index = (255 - bucket_index) / 8;
1478 let bit_index = (255 - bucket_index) % 8;
1479
1480 if byte_index < key_bytes.len() {
1481 key_bytes[byte_index] ^= 1 << bit_index;
1482 }
1483 }
1484
1485 let mut hash = [0u8; 32];
1486 hash.copy_from_slice(&key_bytes);
1487 Key::from_hash(hash)
1488 }
1489
1490 async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
1492 debug!("Starting iterative node lookup for key {}", key.to_hex());
1493
1494 let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
1495
1496 let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
1498 lookup_state.add_nodes(initial_nodes);
1499
1500 let mut iterations = 0;
1502 const MAX_ITERATIONS: usize = 10;
1503
1504 while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
1505 let nodes_to_query = lookup_state.next_nodes();
1506 if nodes_to_query.is_empty() {
1507 break;
1508 }
1509
1510 let mut queries = Vec::new();
1512 for node in &nodes_to_query {
1513 let query = DHTQuery::FindNode {
1514 key: key.clone(),
1515 requester: self.local_id.to_hex()
1516 };
1517 queries.push(self.simulate_query(node, query));
1518 }
1519
1520 for query_result in futures::future::join_all(queries).await {
1522 if let Ok(DHTResponse::Nodes { nodes }) = query_result {
1523 let dht_nodes: Vec<DHTNode> = nodes.into_iter()
1524 .map(|n| n.to_dht_node())
1525 .collect();
1526 lookup_state.add_nodes(dht_nodes);
1527 }
1528 }
1529
1530 iterations += 1;
1531 }
1532
1533 debug!("Iterative node lookup for key {} completed after {} iterations",
1534 key.to_hex(), iterations);
1535
1536 lookup_state.closest.into_iter()
1538 .take(self.config.replication_factor)
1539 .collect()
1540 }
1541
1542 pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
1544 debug!("Checking consistency for key {}", key.to_hex());
1545
1546 let closest_nodes = self.routing_table
1548 .closest_nodes(key, self.config.replication_factor)
1549 .await;
1550
1551 let mut records_found = Vec::new();
1552 let mut nodes_queried = 0;
1553 let mut nodes_responded = 0;
1554
1555 for node in &closest_nodes {
1557 nodes_queried += 1;
1558
1559 let query = DHTQuery::FindValue {
1560 key: key.clone(),
1561 requester: self.local_id.to_hex()
1562 };
1563
1564 match self.simulate_query(node, query).await {
1565 Ok(DHTResponse::Value { record }) => {
1566 nodes_responded += 1;
1567 records_found.push((node.peer_id.clone(), record));
1568 }
1569 Ok(DHTResponse::Nodes { .. }) => {
1570 nodes_responded += 1;
1571 }
1573 _ => {
1574 }
1576 }
1577 }
1578
1579 let mut consistent = true;
1581 let mut canonical_record: Option<Record> = None;
1582 let mut conflicts = Vec::new();
1583
1584 for (node_id, record) in &records_found {
1585 if let Some(ref canonical) = canonical_record {
1586 if record.value != canonical.value ||
1588 record.created_at != canonical.created_at ||
1589 record.publisher != canonical.publisher {
1590 consistent = false;
1591 conflicts.push((node_id.clone(), record.clone()));
1592 }
1593 } else {
1594 canonical_record = Some(record.clone());
1595 }
1596 }
1597
1598 let report = ConsistencyReport {
1599 key: key.clone(),
1600 nodes_queried,
1601 nodes_responded,
1602 records_found: records_found.len(),
1603 consistent,
1604 canonical_record,
1605 conflicts,
1606 replication_factor: self.config.replication_factor,
1607 };
1608
1609 debug!("Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}",
1610 key.to_hex(), report.nodes_queried, report.nodes_responded,
1611 report.records_found, report.consistent);
1612
1613 Ok(report)
1614 }
1615
1616 pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
1618 debug!("Starting repair for key {}", key.to_hex());
1619
1620 let consistency_report = self.check_consistency(key).await?;
1621
1622 if consistency_report.consistent {
1623 return Ok(RepairResult {
1624 key: key.clone(),
1625 repairs_needed: false,
1626 repairs_attempted: 0,
1627 repairs_successful: 0,
1628 final_state: "consistent".to_string(),
1629 });
1630 }
1631
1632 let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
1634 canonical
1635 } else {
1636 return Ok(RepairResult {
1637 key: key.clone(),
1638 repairs_needed: false,
1639 repairs_attempted: 0,
1640 repairs_successful: 0,
1641 final_state: "no_records_found".to_string(),
1642 });
1643 };
1644
1645 let mut most_recent = canonical_record.clone();
1647 for (_, conflicted_record) in &consistency_report.conflicts {
1648 if conflicted_record.created_at > most_recent.created_at {
1649 most_recent = conflicted_record.clone();
1650 }
1651 }
1652
1653 let closest_nodes = self.routing_table
1655 .closest_nodes(key, self.config.replication_factor)
1656 .await;
1657
1658 let mut repairs_attempted = 0;
1659 let mut repairs_successful = 0;
1660
1661 for node in &closest_nodes {
1662 repairs_attempted += 1;
1663 if self.replicate_record(&most_recent, node).await.is_ok() {
1664 repairs_successful += 1;
1665 }
1666 }
1667
1668 let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
1669 "repaired".to_string()
1670 } else {
1671 "repair_failed".to_string()
1672 };
1673
1674 debug!("Repair for key {} completed: {}/{} repairs successful, final state: {}",
1675 key.to_hex(), repairs_successful, repairs_attempted, final_state);
1676
1677 Ok(RepairResult {
1678 key: key.clone(),
1679 repairs_needed: true,
1680 repairs_attempted,
1681 repairs_successful,
1682 final_state,
1683 })
1684 }
1685
1686 pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
1692 info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
1693
1694 let inbox_key = Key::from_inbox_id(inbox_id);
1695
1696 let inbox_metadata = InboxMetadata {
1698 inbox_id: inbox_id.to_string(),
1699 owner: owner_peer_id.clone(),
1700 created_at: SystemTime::now(),
1701 message_count: 0,
1702 max_messages: 1000, is_public: true,
1704 access_keys: vec![owner_peer_id.clone()],
1705 };
1706
1707 let metadata_value = serde_json::to_vec(&inbox_metadata)
1708 .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox metadata: {}", e)))?;
1709
1710 let metadata_record = Record {
1711 key: inbox_key.clone(),
1712 value: metadata_value,
1713 publisher: owner_peer_id.clone(),
1714 created_at: SystemTime::now(),
1715 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
1717 };
1718
1719 self.put_record_with_infinite_ttl(metadata_record).await?;
1721
1722 let index_key = Key::from_inbox_index(inbox_id);
1724 let empty_index = InboxMessageIndex {
1725 inbox_id: inbox_id.to_string(),
1726 messages: Vec::new(),
1727 last_updated: SystemTime::now(),
1728 };
1729
1730 let index_value = serde_json::to_vec(&empty_index)
1731 .map_err(|e| P2PError::DHT(format!("Failed to serialize inbox index: {}", e)))?;
1732
1733 let index_record = Record {
1734 key: index_key,
1735 value: index_value,
1736 publisher: owner_peer_id.clone(),
1737 created_at: SystemTime::now(),
1738 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
1740 };
1741
1742 self.put_record_with_infinite_ttl(index_record).await?;
1743
1744 let inbox_info = InboxInfo {
1745 inbox_id: inbox_id.to_string(),
1746 three_word_address: self.generate_three_word_address(inbox_id),
1747 owner: owner_peer_id,
1748 created_at: SystemTime::now(),
1749 message_count: 0,
1750 is_accessible: true,
1751 };
1752
1753 info!("Successfully created inbox {} with three-word address: {}",
1754 inbox_id, inbox_info.three_word_address);
1755
1756 Ok(inbox_info)
1757 }
1758
1759 pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
1761 info!("Sending message to inbox {}", inbox_id);
1762
1763 let inbox_key = Key::from_inbox_id(inbox_id);
1765 let metadata_record = self.get(&inbox_key).await
1766 .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1767
1768 let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
1769 .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1770
1771 if inbox_metadata.message_count >= inbox_metadata.max_messages {
1773 return Err(P2PError::DHT(format!("Inbox {} is full", inbox_id)));
1774 }
1775
1776 let message_key = Key::from_inbox_message(inbox_id, &message.id);
1778 let message_value = serde_json::to_vec(&message)
1779 .map_err(|e| P2PError::DHT(format!("Failed to serialize message: {}", e)))?;
1780
1781 let message_record = Record {
1782 key: message_key.clone(),
1783 value: message_value,
1784 publisher: message.sender.clone(),
1785 created_at: message.timestamp,
1786 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
1788 };
1789
1790 self.put_record_with_infinite_ttl(message_record).await?;
1791
1792 let index_key = Key::from_inbox_index(inbox_id);
1794 let index_record = self.get(&index_key).await
1795 .ok_or_else(|| P2PError::DHT(format!("Inbox index {} not found", inbox_id)))?;
1796
1797 let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1798 .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1799
1800 message_index.messages.push(MessageRef {
1801 message_id: message.id.clone(),
1802 sender: message.sender.clone(),
1803 timestamp: message.timestamp,
1804 message_type: message.message_type.clone(),
1805 });
1806 message_index.last_updated = SystemTime::now();
1807
1808 inbox_metadata.message_count += 1;
1810
1811 let updated_index_value = serde_json::to_vec(&message_index)
1813 .map_err(|e| P2PError::DHT(format!("Failed to serialize updated index: {}", e)))?;
1814
1815 let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
1816 .map_err(|e| P2PError::DHT(format!("Failed to serialize updated metadata: {}", e)))?;
1817
1818 let updated_index_record = Record {
1819 key: index_key,
1820 value: updated_index_value,
1821 publisher: message.sender.clone(),
1822 created_at: SystemTime::now(),
1823 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1824 signature: None,
1825 };
1826
1827 let updated_metadata_record = Record {
1828 key: inbox_key,
1829 value: updated_metadata_value,
1830 publisher: message.sender.clone(),
1831 created_at: SystemTime::now(),
1832 expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
1833 signature: None,
1834 };
1835
1836 self.put_record_with_infinite_ttl(updated_index_record).await?;
1837 self.put_record_with_infinite_ttl(updated_metadata_record).await?;
1838
1839 info!("Successfully sent message {} to inbox {}", message.id, inbox_id);
1840 Ok(())
1841 }
1842
1843 pub async fn get_inbox_messages(&self, inbox_id: &str, limit: Option<usize>) -> Result<Vec<InboxMessage>> {
1845 info!("Retrieving messages from inbox {}", inbox_id);
1846
1847 let index_key = Key::from_inbox_index(inbox_id);
1848 let index_record = self.get(&index_key).await
1849 .ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
1850
1851 let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
1852 .map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
1853
1854 let mut messages = Vec::new();
1855 let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
1856 message_index.messages.iter().rev().take(limit).collect()
1857 } else {
1858 message_index.messages.iter().collect()
1859 };
1860
1861 for message_ref in message_refs {
1862 let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
1863 if let Some(message_record) = self.get(&message_key).await {
1864 if let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value) {
1865 messages.push(message);
1866 }
1867 }
1868 }
1869
1870 messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1872
1873 info!("Retrieved {} messages from inbox {}", messages.len(), inbox_id);
1874 Ok(messages)
1875 }
1876
1877 pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
1879 let inbox_key = Key::from_inbox_id(inbox_id);
1880 let metadata_record = self.get(&inbox_key).await;
1881
1882 if let Some(record) = metadata_record {
1883 let metadata: InboxMetadata = serde_json::from_slice(&record.value)
1884 .map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
1885
1886 let inbox_info = InboxInfo {
1887 inbox_id: inbox_id.to_string(),
1888 three_word_address: self.generate_three_word_address(inbox_id),
1889 owner: metadata.owner,
1890 created_at: metadata.created_at,
1891 message_count: metadata.message_count,
1892 is_accessible: true,
1893 };
1894
1895 Ok(Some(inbox_info))
1896 } else {
1897 Ok(None)
1898 }
1899 }
1900
1901 async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
1903 self.storage.store(record.clone()).await?;
1905
1906 let closest_nodes = self.routing_table
1908 .closest_nodes(&record.key, self.config.replication_factor)
1909 .await;
1910
1911 for node in &closest_nodes {
1913 if let Err(e) = self.replicate_record(&record, node).await {
1914 debug!("Failed to replicate infinite TTL record to node {}: {}", node.peer_id, e);
1915 }
1916 }
1917
1918 Ok(())
1919 }
1920
1921 fn generate_three_word_address(&self, inbox_id: &str) -> String {
1923 use crate::bootstrap::words::WordEncoder;
1924
1925 let encoder = WordEncoder::new();
1926 let fake_multiaddr = format!("/inbox/{}/dht", inbox_id).parse().unwrap_or_else(|_| {
1927 "/ip6/::1/udp/9000/quic".parse().unwrap()
1928 });
1929
1930 if let Ok(words) = encoder.encode_multiaddr(&fake_multiaddr) {
1931 words.to_string()
1932 } else {
1933 format!("inbox.{}.messages", inbox_id.chars().take(8).collect::<String>())
1934 }
1935 }
1936}
1937
1938#[derive(Debug, Clone)]
1940pub struct DHTStats {
1941 pub local_id: Key,
1943 pub total_nodes: usize,
1945 pub active_buckets: usize,
1947 pub stored_records: usize,
1949 pub expired_records: usize,
1951}
1952
1953#[derive(Debug, Clone)]
1955pub struct ConsistencyReport {
1956 pub key: Key,
1958 pub nodes_queried: usize,
1960 pub nodes_responded: usize,
1962 pub records_found: usize,
1964 pub consistent: bool,
1966 pub canonical_record: Option<Record>,
1968 pub conflicts: Vec<(PeerId, Record)>,
1970 pub replication_factor: usize,
1972}
1973
1974#[derive(Debug, Clone)]
1976pub struct RepairResult {
1977 pub key: Key,
1979 pub repairs_needed: bool,
1981 pub repairs_attempted: usize,
1983 pub repairs_successful: usize,
1985 pub final_state: String,
1987}
1988
1989impl LookupState {
1990 pub fn new(target: Key, alpha: usize) -> Self {
1992 Self {
1993 target,
1994 queried: HashMap::new(),
1995 to_query: VecDeque::new(),
1996 closest: Vec::new(),
1997 started_at: Instant::now(),
1998 alpha,
1999 }
2000 }
2001
2002 pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
2004 for node in nodes {
2005 if !self.queried.contains_key(&node.peer_id) {
2006 self.to_query.push_back(node);
2007 }
2008 }
2009
2010 let target = &self.target;
2012 self.to_query.make_contiguous().sort_by_key(|node| {
2013 node.key().distance(target).as_bytes().to_vec()
2014 });
2015 }
2016
2017 pub fn next_nodes(&mut self) -> Vec<DHTNode> {
2019 let mut nodes = Vec::new();
2020 for _ in 0..self.alpha {
2021 if let Some(node) = self.to_query.pop_front() {
2022 self.queried.insert(node.peer_id.clone(), Instant::now());
2023 nodes.push(node);
2024 } else {
2025 break;
2026 }
2027 }
2028 nodes
2029 }
2030
2031 pub fn is_complete(&self) -> bool {
2033 self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
2034 }
2035}
2036
2037#[derive(Debug, Clone, Serialize, Deserialize)]
2043pub struct InboxMetadata {
2044 pub inbox_id: String,
2045 pub owner: PeerId,
2046 pub created_at: SystemTime,
2047 pub message_count: usize,
2048 pub max_messages: usize,
2049 pub is_public: bool,
2050 pub access_keys: Vec<PeerId>,
2051}
2052
2053#[derive(Debug, Clone, Serialize, Deserialize)]
2055pub struct InboxMessage {
2056 pub id: String,
2057 pub sender: PeerId,
2058 pub recipient_inbox: String,
2059 pub content: String,
2060 pub message_type: String,
2061 pub timestamp: SystemTime,
2062 pub attachments: Vec<MessageAttachment>,
2063}
2064
2065#[derive(Debug, Clone, Serialize, Deserialize)]
2067pub struct MessageAttachment {
2068 pub filename: String,
2069 pub content_type: String,
2070 pub size: u64,
2071 pub hash: String,
2072}
2073
2074#[derive(Debug, Clone, Serialize, Deserialize)]
2076pub struct InboxMessageIndex {
2077 pub inbox_id: String,
2078 pub messages: Vec<MessageRef>,
2079 pub last_updated: SystemTime,
2080}
2081
2082#[derive(Debug, Clone, Serialize, Deserialize)]
2084pub struct MessageRef {
2085 pub message_id: String,
2086 pub sender: PeerId,
2087 pub timestamp: SystemTime,
2088 pub message_type: String,
2089}
2090
2091#[derive(Debug, Clone, Serialize, Deserialize)]
2093pub struct InboxInfo {
2094 pub inbox_id: String,
2095 pub three_word_address: String,
2096 pub owner: PeerId,
2097 pub created_at: SystemTime,
2098 pub message_count: usize,
2099 pub is_accessible: bool,
2100}
2101
2102impl Key {
2107 pub fn from_inbox_id(inbox_id: &str) -> Self {
2109 let mut hasher = Sha256::new();
2110 hasher.update(b"INBOX_METADATA:");
2111 hasher.update(inbox_id.as_bytes());
2112 let hash = hasher.finalize();
2113 Key { hash: hash.into() }
2114 }
2115
2116 pub fn from_inbox_index(inbox_id: &str) -> Self {
2118 let mut hasher = Sha256::new();
2119 hasher.update(b"INBOX_INDEX:");
2120 hasher.update(inbox_id.as_bytes());
2121 let hash = hasher.finalize();
2122 Key { hash: hash.into() }
2123 }
2124
2125 pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
2127 let mut hasher = Sha256::new();
2128 hasher.update(b"INBOX_MESSAGE:");
2129 hasher.update(inbox_id.as_bytes());
2130 hasher.update(b":");
2131 hasher.update(message_id.as_bytes());
2132 let hash = hasher.finalize();
2133 Key { hash: hash.into() }
2134 }
2135}
2136
2137