saorsa_core/adaptive/
dht_integration.rs1use super::*;
21use crate::dht::skademlia::{SKademlia, SKademliaConfig};
22use crate::dht::{DHT, DHTConfig, DhtKey, Key as DHTKey};
23use crate::{Multiaddr, PeerId};
24use async_trait::async_trait;
25use sha2::Digest;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::RwLock;
29
30pub struct AdaptiveDHT {
32 _skademlia: Arc<RwLock<SKademlia>>,
34
35 base_dht: Arc<RwLock<DHT>>,
37
38 trust_provider: Arc<dyn TrustProvider>,
40
41 router: Arc<AdaptiveRouter>,
43
44 _identity: Arc<NodeIdentity>,
46
47 metrics: Arc<RwLock<DHTMetrics>>,
49}
50
51#[derive(Debug, Default, Clone)]
53pub struct DHTMetrics {
54 pub lookups_total: u64,
55 pub lookups_successful: u64,
56 pub stores_total: u64,
57 pub stores_successful: u64,
58 pub average_lookup_hops: f64,
59 pub trust_rejections: u64,
60}
61
62impl AdaptiveDHT {
63 pub async fn new(
65 _config: DHTConfig,
66 identity: Arc<NodeIdentity>,
67 trust_provider: Arc<dyn TrustProvider>,
68 router: Arc<AdaptiveRouter>,
69 ) -> Result<Self> {
70 let skademlia_config = SKademliaConfig {
71 min_routing_reputation: 0.3,
72 enable_distance_verification: true,
73 enable_routing_validation: true,
74 ..Default::default()
75 };
76
77 let local_key = Self::node_id_to_key(&identity.to_user_id());
79 let node_id = crate::dht::core_engine::NodeId::from_key(DhtKey::from_bytes(local_key));
81 let base_dht = Arc::new(RwLock::new(DHT::new(node_id)?));
82 let _reputation_manager = crate::security::ReputationManager::new(0.99, 0.1);
84 let skademlia = Arc::new(RwLock::new(SKademlia::new(skademlia_config)));
85
86 Ok(Self {
87 _skademlia: skademlia,
88 base_dht,
89 trust_provider,
90 router,
91 _identity: identity,
92 metrics: Arc::new(RwLock::new(DHTMetrics::default())),
93 })
94 }
95
96 fn node_id_to_key(node_id: &NodeId) -> DHTKey {
98 node_id.hash
99 }
100
101 pub async fn store(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ContentHash> {
103 let mut metrics = self.metrics.write().await;
104 metrics.stores_total += 1;
105
106 let hash = blake3::hash(&key);
108 let dht_key = *hash.as_bytes();
109
110 let mut dht = self.base_dht.write().await;
112 dht.store(&DhtKey::from_bytes(dht_key), value.clone())
113 .await
114 .map_err(|e| AdaptiveNetworkError::Other(e.to_string()))?;
115
116 metrics.stores_successful += 1;
117
118 let mut hasher = sha2::Sha256::new();
120 hasher.update(&key);
121 hasher.update(&value);
122 let result = hasher.finalize();
123 let mut hash = [0u8; 32];
124 hash.copy_from_slice(&result);
125
126 Ok(ContentHash(hash))
127 }
128
129 pub async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
131 let mut metrics = self.metrics.write().await;
132 metrics.lookups_total += 1;
133
134 let target_id = NodeId::from_bytes(hash.0);
136 let _path = self
137 .router
138 .route(&target_id, ContentType::DataRetrieval)
139 .await?;
140
141 let dht_key = DhtKey::from_bytes(hash.0);
143
144 let dht = self.base_dht.read().await;
146 match dht.retrieve(&dht_key).await {
147 Ok(Some(value)) => {
148 metrics.lookups_successful += 1;
149 Ok(value)
150 }
151 Ok(None) => Err(AdaptiveNetworkError::Other("Record not found".to_string())),
152 Err(e) => Err(AdaptiveNetworkError::Other(e.to_string())),
153 }
154 }
155
156 pub async fn find_closest_nodes(
158 &self,
159 target: &NodeId,
160 count: usize,
161 ) -> Result<Vec<NodeDescriptor>> {
162 let dht_key = DhtKey::from_bytes(Self::node_id_to_key(target));
163 let dht = self.base_dht.read().await;
164
165 let nodes = dht
167 .find_nodes(&dht_key, 8)
168 .await
169 .unwrap_or_else(|_| Vec::new());
170
171 let sorted_nodes: Vec<_> = nodes
173 .into_iter()
174 .filter_map(|node| {
175 let mut hash = [0u8; 32];
178 let peer_bytes = node.id.as_bytes();
179 if peer_bytes.len() >= 32 {
180 hash.copy_from_slice(&peer_bytes[..32]);
181 } else {
182 let hashed = blake3::hash(peer_bytes);
184 hash.copy_from_slice(hashed.as_bytes());
185 }
186 let node_id = NodeId::from_bytes(hash);
187 let trust = self.trust_provider.get_trust(&node_id);
188
189 if trust < 0.3 {
191 return None;
192 }
193
194 Some((node, trust))
195 })
196 .collect();
197
198 let mut sorted_nodes = sorted_nodes;
200 sorted_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201
202 let nodes: Vec<NodeDescriptor> = sorted_nodes
204 .into_iter()
205 .take(count)
206 .filter_map(|(node, trust)| {
207 let mut hash = [0u8; 32];
209 let peer_bytes = node.id.as_bytes();
210 if peer_bytes.len() >= 32 {
211 hash.copy_from_slice(&peer_bytes[..32]);
212 } else {
213 let hashed = blake3::hash(peer_bytes);
214 hash.copy_from_slice(hashed.as_bytes());
215 }
216 let node_id = NodeId::from_bytes(hash);
217
218 use crate::quantum_crypto::generate_ml_dsa_keypair;
220 let public_key = match generate_ml_dsa_keypair() {
221 Ok((public_key, _)) => Some(public_key),
222 Err(_) => {
223 let dummy_bytes = [1u8; 1952];
224 crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey::from_bytes(
225 &dummy_bytes,
226 )
227 .ok()
228 }
229 }?;
230
231 Some(NodeDescriptor {
232 id: node_id,
233 public_key,
234 addresses: vec![node.address.clone()],
235 hyperbolic: None,
236 som_position: None,
237 trust,
238 capabilities: NodeCapabilities {
239 storage: 0,
240 compute: 0,
241 bandwidth: 0,
242 },
243 })
244 })
245 .collect();
246
247 Ok(nodes)
248 }
249
250 pub async fn update_routing(&self, node: NodeDescriptor) -> Result<()> {
252 let _peer_id = PeerId::from_str(&node.id.to_string())
254 .map_err(|e| AdaptiveNetworkError::Other(format!("Invalid peer ID: {e}")))?;
255
256 let addresses: Vec<Multiaddr> = node
258 .addresses
259 .iter()
260 .filter_map(|a| Multiaddr::from_str(a).ok())
261 .collect();
262
263 if addresses.is_empty() {
264 return Err(AdaptiveNetworkError::Other(
265 "No valid addresses".to_string(),
266 ));
267 }
268
269 Ok(())
273 }
274
275 pub async fn get_metrics(&self) -> DHTMetrics {
277 self.metrics.read().await.clone()
278 }
279}
280
281pub struct KademliaRoutingStrategy {
283 dht: Arc<AdaptiveDHT>,
284}
285
286impl KademliaRoutingStrategy {
287 pub fn new(dht: Arc<AdaptiveDHT>) -> Self {
288 Self { dht }
289 }
290}
291
292#[async_trait]
293impl RoutingStrategy for KademliaRoutingStrategy {
294 async fn find_path(&self, target: &NodeId) -> Result<Vec<NodeId>> {
295 let nodes = self.dht.find_closest_nodes(target, 3).await?;
296 Ok(nodes.into_iter().map(|n| n.id).collect())
297 }
298
299 fn route_score(&self, neighbor: &NodeId, target: &NodeId) -> f64 {
300 let neighbor_bytes = &neighbor.hash;
302 let target_bytes = &target.hash;
303 let mut distance = 0u32;
304
305 for i in 0..32 {
306 distance += (neighbor_bytes[i] ^ target_bytes[i]).count_ones();
307 }
308
309 1.0 / (1.0 + distance as f64)
311 }
312
313 fn update_metrics(&mut self, _path: &[NodeId], _success: bool) {
314 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use std::collections::HashMap;
322
323 #[tokio::test]
324 async fn test_adaptive_dht_creation() {
325 struct MockTrustProvider;
326 impl TrustProvider for MockTrustProvider {
327 fn get_trust(&self, _node: &NodeId) -> f64 {
328 0.5
329 }
330 fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
331 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
332 HashMap::new()
333 }
334 fn remove_node(&self, _node: &NodeId) {}
335 }
336
337 let config = DHTConfig::default();
338 let identity = Arc::new(NodeIdentity::generate().unwrap());
339 let trust_provider = Arc::new(MockTrustProvider);
340 let router = Arc::new(AdaptiveRouter::new_with_id(
341 NodeId::from_bytes(*identity.node_id().to_bytes()),
342 trust_provider.clone(),
343 ));
344
345 let dht = AdaptiveDHT::new(config, identity, trust_provider, router)
346 .await
347 .unwrap();
348 let metrics = dht.get_metrics().await;
349
350 assert_eq!(metrics.lookups_total, 0);
351 assert_eq!(metrics.stores_total, 0);
352 }
353
354 #[tokio::test]
355 async fn test_node_id_to_key_conversion() {
356 use rand::RngCore;
357
358 let mut hash = [0u8; 32];
360 rand::thread_rng().fill_bytes(&mut hash);
361 let node_id = NodeId::from_bytes(hash);
362
363 let key = AdaptiveDHT::node_id_to_key(&node_id);
364
365 assert_eq!(key.len(), 32);
367 }
368}