1use super::*;
21use crate::dht::skademlia::{SKademlia, SKademliaConfig};
22use crate::dht::{DHT, DHTConfig, DhtKey, Key as DHTKey};
23use crate::{Multiaddr, PeerId};
24use async_trait::async_trait;
25use sha2::Digest;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::RwLock;
29
30pub struct AdaptiveDHT {
32 _skademlia: Arc<RwLock<SKademlia>>,
34
35 base_dht: Arc<RwLock<DHT>>,
37
38 trust_provider: Arc<dyn TrustProvider>,
40
41 router: Arc<AdaptiveRouter>,
43
44 _identity: Arc<NodeIdentity>,
46
47 metrics: Arc<RwLock<DHTMetrics>>,
49}
50
51#[derive(Debug, Default, Clone)]
53pub struct DHTMetrics {
54 pub lookups_total: u64,
55 pub lookups_successful: u64,
56 pub stores_total: u64,
57 pub stores_successful: u64,
58 pub average_lookup_hops: f64,
59 pub trust_rejections: u64,
60}
61
62impl AdaptiveDHT {
63 pub async fn new(
65 _config: DHTConfig,
66 identity: Arc<NodeIdentity>,
67 trust_provider: Arc<dyn TrustProvider>,
68 router: Arc<AdaptiveRouter>,
69 ) -> Result<Self> {
70 let skademlia_config = SKademliaConfig {
71 min_routing_reputation: 0.3,
72 enable_distance_verification: true,
73 enable_routing_validation: true,
74 ..Default::default()
75 };
76
77 let local_key = Self::node_id_to_key(identity.node_id());
79 let node_id = crate::dht::core_engine::NodeId::from_key(DhtKey::from_bytes(local_key));
81 let base_dht = Arc::new(RwLock::new(DHT::new(node_id)?));
82 let _reputation_manager = crate::security::ReputationManager::new(0.99, 0.1);
84 let skademlia = Arc::new(RwLock::new(SKademlia::new(skademlia_config)));
85
86 Ok(Self {
87 _skademlia: skademlia,
88 base_dht,
89 trust_provider,
90 router,
91 _identity: identity,
92 metrics: Arc::new(RwLock::new(DHTMetrics::default())),
93 })
94 }
95
96 fn node_id_to_key(node_id: &NodeId) -> DHTKey {
98 node_id.hash
99 }
100
101 pub async fn store(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ContentHash> {
103 let mut metrics = self.metrics.write().await;
104 metrics.stores_total += 1;
105
106 let hash = blake3::hash(&key);
108 let dht_key = *hash.as_bytes();
109
110 let mut dht = self.base_dht.write().await;
112 dht.store(&DhtKey::from_bytes(dht_key), value.clone())
113 .await
114 .map_err(|e| AdaptiveNetworkError::Other(e.to_string()))?;
115
116 metrics.stores_successful += 1;
117
118 let mut hasher = sha2::Sha256::new();
120 hasher.update(&key);
121 hasher.update(&value);
122 let result = hasher.finalize();
123 let mut hash = [0u8; 32];
124 hash.copy_from_slice(&result);
125
126 Ok(ContentHash(hash))
127 }
128
129 pub async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
131 let mut metrics = self.metrics.write().await;
132 metrics.lookups_total += 1;
133
134 let target_id = NodeId::from_bytes(hash.0);
136 let _path = self
137 .router
138 .route(&target_id, ContentType::DataRetrieval)
139 .await?;
140
141 let dht_key = DhtKey::from_bytes(hash.0);
143
144 let dht = self.base_dht.read().await;
146 match dht.retrieve(&dht_key).await {
147 Ok(Some(value)) => {
148 metrics.lookups_successful += 1;
149 Ok(value)
150 }
151 Ok(None) => Err(AdaptiveNetworkError::Other("Record not found".to_string())),
152 Err(e) => Err(AdaptiveNetworkError::Other(e.to_string())),
153 }
154 }
155
156 pub async fn find_closest_nodes(
158 &self,
159 target: &NodeId,
160 count: usize,
161 ) -> Result<Vec<NodeDescriptor>> {
162 let dht_key = DhtKey::from_bytes(Self::node_id_to_key(target));
163 let dht = self.base_dht.read().await;
164
165 let nodes = dht
167 .find_nodes(&dht_key, 8)
168 .await
169 .unwrap_or_else(|_| Vec::new());
170
171 let sorted_nodes: Vec<_> = nodes
173 .into_iter()
174 .filter_map(|node| {
175 let mut hash = [0u8; 32];
178 let peer_bytes = node.id.as_bytes();
179 if peer_bytes.len() >= 32 {
180 hash.copy_from_slice(&peer_bytes[..32]);
181 } else {
182 let hashed = blake3::hash(peer_bytes);
184 hash.copy_from_slice(hashed.as_bytes());
185 }
186 let node_id = NodeId::from_bytes(hash);
187 let trust = self.trust_provider.get_trust(&node_id);
188
189 if trust < 0.3 {
191 return None;
192 }
193
194 Some((node, trust))
195 })
196 .collect();
197
198 let mut sorted_nodes = sorted_nodes;
200 sorted_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201
202 Ok(sorted_nodes
204 .into_iter()
205 .take(count)
206 .map(|(node, trust)| {
207 let mut hash = [0u8; 32];
209 let peer_bytes = node.id.as_bytes();
210 if peer_bytes.len() >= 32 {
211 hash.copy_from_slice(&peer_bytes[..32]);
212 } else {
213 let hashed = blake3::hash(peer_bytes);
214 hash.copy_from_slice(hashed.as_bytes());
215 }
216 let node_id = NodeId::from_bytes(hash);
217
218 NodeDescriptor {
219 id: node_id,
220 public_key: {
223 let placeholder = [
225 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
226 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
227 ];
228
229 const ED25519_BASEPOINT_BYTES: [u8; 32] = [
231 0x58, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66,
232 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66,
233 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66,
234 ];
235
236 ed25519_dalek::VerifyingKey::from_bytes(&placeholder).unwrap_or_else(|_| {
237 ed25519_dalek::VerifyingKey::from_bytes(&ED25519_BASEPOINT_BYTES)
239 .ok()
240 .unwrap_or_else(|| {
241 use ed25519_dalek::SigningKey;
244 let signing_key = SigningKey::from_bytes(&[1u8; 32]);
245 signing_key.verifying_key()
246 })
247 })
248 },
249 addresses: vec![node.address.clone()],
250 hyperbolic: None,
251 som_position: None,
252 trust,
253 capabilities: NodeCapabilities {
254 storage: 0,
255 compute: 0,
256 bandwidth: 0,
257 },
258 }
259 })
260 .collect())
261 }
262
263 pub async fn update_routing(&self, node: NodeDescriptor) -> Result<()> {
265 let _peer_id = PeerId::from_str(&node.id.to_string())
267 .map_err(|e| AdaptiveNetworkError::Other(format!("Invalid peer ID: {e}")))?;
268
269 let addresses: Vec<Multiaddr> = node
271 .addresses
272 .iter()
273 .filter_map(|a| Multiaddr::from_str(a).ok())
274 .collect();
275
276 if addresses.is_empty() {
277 return Err(AdaptiveNetworkError::Other(
278 "No valid addresses".to_string(),
279 ));
280 }
281
282 Ok(())
286 }
287
288 pub async fn get_metrics(&self) -> DHTMetrics {
290 self.metrics.read().await.clone()
291 }
292}
293
294pub struct KademliaRoutingStrategy {
296 dht: Arc<AdaptiveDHT>,
297}
298
299impl KademliaRoutingStrategy {
300 pub fn new(dht: Arc<AdaptiveDHT>) -> Self {
301 Self { dht }
302 }
303}
304
305#[async_trait]
306impl RoutingStrategy for KademliaRoutingStrategy {
307 async fn find_path(&self, target: &NodeId) -> Result<Vec<NodeId>> {
308 let nodes = self.dht.find_closest_nodes(target, 3).await?;
309 Ok(nodes.into_iter().map(|n| n.id).collect())
310 }
311
312 fn route_score(&self, neighbor: &NodeId, target: &NodeId) -> f64 {
313 let neighbor_bytes = &neighbor.hash;
315 let target_bytes = &target.hash;
316 let mut distance = 0u32;
317
318 for i in 0..32 {
319 distance += (neighbor_bytes[i] ^ target_bytes[i]).count_ones();
320 }
321
322 1.0 / (1.0 + distance as f64)
324 }
325
326 fn update_metrics(&mut self, _path: &[NodeId], _success: bool) {
327 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use std::collections::HashMap;
335
336 #[tokio::test]
337 async fn test_adaptive_dht_creation() {
338 struct MockTrustProvider;
339 impl TrustProvider for MockTrustProvider {
340 fn get_trust(&self, _node: &NodeId) -> f64 {
341 0.5
342 }
343 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
344 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
345 HashMap::new()
346 }
347 fn remove_node(&self, _node: &NodeId) {}
348 }
349
350 let config = DHTConfig::default();
351 let identity = Arc::new(NodeIdentity::generate().unwrap());
352 let trust_provider = Arc::new(MockTrustProvider);
353 let router = Arc::new(AdaptiveRouter::new_with_id(
354 identity.node_id().clone(),
355 trust_provider.clone(),
356 ));
357
358 let dht = AdaptiveDHT::new(config, identity, trust_provider, router)
359 .await
360 .unwrap();
361 let metrics = dht.get_metrics().await;
362
363 assert_eq!(metrics.lookups_total, 0);
364 assert_eq!(metrics.stores_total, 0);
365 }
366
367 #[tokio::test]
368 async fn test_node_id_to_key_conversion() {
369 use rand::RngCore;
370
371 let mut hash = [0u8; 32];
373 rand::thread_rng().fill_bytes(&mut hash);
374 let node_id = NodeId::from_bytes(hash);
375
376 let key = AdaptiveDHT::node_id_to_key(&node_id);
377
378 assert_eq!(key.len(), 32);
380 }
381}