saorsa_core/adaptive/
dht_integration.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! DHT Integration for Adaptive Network
15//!
16//! This module bridges the existing S/Kademlia implementation with the adaptive
17//! network, providing trust-weighted routing and integration with other adaptive
18//! components.
19
20use super::*;
21use crate::dht::skademlia::{SKademlia, SKademliaConfig};
22use crate::dht::{DHT, DHTConfig, DhtKey, Key as DHTKey};
23use crate::{Multiaddr, PeerId};
24use async_trait::async_trait;
25use sha2::Digest;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::RwLock;
29
30/// Adaptive DHT that integrates S/Kademlia with trust scoring
31pub struct AdaptiveDHT {
32    /// Underlying S/Kademlia implementation
33    _skademlia: Arc<RwLock<SKademlia>>,
34
35    /// Base DHT for standard operations
36    base_dht: Arc<RwLock<DHT>>,
37
38    /// Trust provider for weighted routing
39    trust_provider: Arc<dyn TrustProvider>,
40
41    /// Adaptive router for strategy selection
42    router: Arc<AdaptiveRouter>,
43
44    /// Local node identity
45    _identity: Arc<NodeIdentity>,
46
47    /// Performance metrics
48    metrics: Arc<RwLock<DHTMetrics>>,
49}
50
51/// DHT performance metrics
52#[derive(Debug, Default, Clone)]
53pub struct DHTMetrics {
54    pub lookups_total: u64,
55    pub lookups_successful: u64,
56    pub stores_total: u64,
57    pub stores_successful: u64,
58    pub average_lookup_hops: f64,
59    pub trust_rejections: u64,
60}
61
62impl AdaptiveDHT {
63    /// Create new adaptive DHT instance
64    pub async fn new(
65        _config: DHTConfig,
66        identity: Arc<NodeIdentity>,
67        trust_provider: Arc<dyn TrustProvider>,
68        router: Arc<AdaptiveRouter>,
69    ) -> Result<Self> {
70        let skademlia_config = SKademliaConfig {
71            min_routing_reputation: 0.3,
72            enable_distance_verification: true,
73            enable_routing_validation: true,
74            ..Default::default()
75        };
76
77        // Create DHT with local ID from identity
78        let local_key = Self::node_id_to_key(&identity.to_user_id());
79        // Convert Key to NodeId for DhtCoreEngine
80        let node_id = crate::dht::core_engine::NodeId::from_key(DhtKey::from_bytes(local_key));
81        let base_dht = Arc::new(RwLock::new(DHT::new(node_id)?));
82        // Create reputation manager for S/Kademlia
83        let _reputation_manager = crate::security::ReputationManager::new(0.99, 0.1);
84        let skademlia = Arc::new(RwLock::new(SKademlia::new(skademlia_config)));
85
86        Ok(Self {
87            _skademlia: skademlia,
88            base_dht,
89            trust_provider,
90            router,
91            _identity: identity,
92            metrics: Arc::new(RwLock::new(DHTMetrics::default())),
93        })
94    }
95
96    /// Convert adaptive NodeId to DHT Key
97    fn node_id_to_key(node_id: &NodeId) -> DHTKey {
98        node_id.hash
99    }
100
101    /// Store value in the DHT with trust-based replication
102    pub async fn store(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ContentHash> {
103        let mut metrics = self.metrics.write().await;
104        metrics.stores_total += 1;
105
106        // Create DHT key
107        let hash = blake3::hash(&key);
108        let dht_key = *hash.as_bytes();
109
110        // Store in base DHT using store method
111        let mut dht = self.base_dht.write().await;
112        dht.store(&DhtKey::from_bytes(dht_key), value.clone())
113            .await
114            .map_err(|e| AdaptiveNetworkError::Other(e.to_string()))?;
115
116        metrics.stores_successful += 1;
117
118        // Return content hash
119        let mut hasher = sha2::Sha256::new();
120        hasher.update(&key);
121        hasher.update(&value);
122        let result = hasher.finalize();
123        let mut hash = [0u8; 32];
124        hash.copy_from_slice(&result);
125
126        Ok(ContentHash(hash))
127    }
128
129    /// Retrieve value from DHT using adaptive routing
130    pub async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
131        let mut metrics = self.metrics.write().await;
132        metrics.lookups_total += 1;
133
134        // Use adaptive router to select routing strategy
135        let target_id = NodeId::from_bytes(hash.0);
136        let _path = self
137            .router
138            .route(&target_id, ContentType::DataRetrieval)
139            .await?;
140
141        // Create DHT key from hash
142        let dht_key = DhtKey::from_bytes(hash.0);
143
144        // Lookup in base DHT
145        let dht = self.base_dht.read().await;
146        match dht.retrieve(&dht_key).await {
147            Ok(Some(value)) => {
148                metrics.lookups_successful += 1;
149                Ok(value)
150            }
151            Ok(None) => Err(AdaptiveNetworkError::Other("Record not found".to_string())),
152            Err(e) => Err(AdaptiveNetworkError::Other(e.to_string())),
153        }
154    }
155
156    /// Find nodes close to a key using trust-weighted selection
157    pub async fn find_closest_nodes(
158        &self,
159        target: &NodeId,
160        count: usize,
161    ) -> Result<Vec<NodeDescriptor>> {
162        let dht_key = DhtKey::from_bytes(Self::node_id_to_key(target));
163        let dht = self.base_dht.read().await;
164
165        // Get closest nodes from DHT using find_node
166        let nodes = dht
167            .find_nodes(&dht_key, 8)
168            .await
169            .unwrap_or_else(|_| Vec::new());
170
171        // Sort by trust score
172        let sorted_nodes: Vec<_> = nodes
173            .into_iter()
174            .filter_map(|node| {
175                // Extract node ID from peer_id string
176                // NodeInfo has id field which is NodeId
177                let mut hash = [0u8; 32];
178                let peer_bytes = node.id.as_bytes();
179                if peer_bytes.len() >= 32 {
180                    hash.copy_from_slice(&peer_bytes[..32]);
181                } else {
182                    // If peer_id is shorter, hash it
183                    let hashed = blake3::hash(peer_bytes);
184                    hash.copy_from_slice(hashed.as_bytes());
185                }
186                let node_id = NodeId::from_bytes(hash);
187                let trust = self.trust_provider.get_trust(&node_id);
188
189                // Filter out low-trust nodes
190                if trust < 0.3 {
191                    return None;
192                }
193
194                Some((node, trust))
195            })
196            .collect();
197
198        // Sort by trust descending
199        let mut sorted_nodes = sorted_nodes;
200        sorted_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201
202        // Take top nodes and convert to NodeDescriptors
203        let nodes: Vec<NodeDescriptor> = sorted_nodes
204            .into_iter()
205            .take(count)
206            .filter_map(|(node, trust)| {
207                // Convert node.id to array
208                let mut hash = [0u8; 32];
209                let peer_bytes = node.id.as_bytes();
210                if peer_bytes.len() >= 32 {
211                    hash.copy_from_slice(&peer_bytes[..32]);
212                } else {
213                    let hashed = blake3::hash(peer_bytes);
214                    hash.copy_from_slice(hashed.as_bytes());
215                }
216                let node_id = NodeId::from_bytes(hash);
217
218                // Attempt to build a public key; skip node on failure
219                use crate::quantum_crypto::generate_ml_dsa_keypair;
220                let public_key = match generate_ml_dsa_keypair() {
221                    Ok((public_key, _)) => Some(public_key),
222                    Err(_) => {
223                        let dummy_bytes = [1u8; 1952];
224                        crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey::from_bytes(
225                            &dummy_bytes,
226                        )
227                        .ok()
228                    }
229                }?;
230
231                Some(NodeDescriptor {
232                    id: node_id,
233                    public_key,
234                    addresses: vec![node.address.clone()],
235                    hyperbolic: None,
236                    som_position: None,
237                    trust,
238                    capabilities: NodeCapabilities {
239                        storage: 0,
240                        compute: 0,
241                        bandwidth: 0,
242                    },
243                })
244            })
245            .collect();
246
247        Ok(nodes)
248    }
249
250    /// Update routing table with new node information
251    pub async fn update_routing(&self, node: NodeDescriptor) -> Result<()> {
252        // Convert NodeId to PeerId (using the hash as peer ID string)
253        let _peer_id = PeerId::from_str(&node.id.to_string())
254            .map_err(|e| AdaptiveNetworkError::Other(format!("Invalid peer ID: {e}")))?;
255
256        // Parse addresses to Multiaddr
257        let addresses: Vec<Multiaddr> = node
258            .addresses
259            .iter()
260            .filter_map(|a| Multiaddr::from_str(a).ok())
261            .collect();
262
263        if addresses.is_empty() {
264            return Err(AdaptiveNetworkError::Other(
265                "No valid addresses".to_string(),
266            ));
267        }
268
269        // Note: add_node doesn't exist on DhtCoreEngine
270        // DhtCoreEngine manages nodes internally through network operations
271        // For now, just return Ok as nodes are discovered through the network
272        Ok(())
273    }
274
275    /// Get current DHT metrics
276    pub async fn get_metrics(&self) -> DHTMetrics {
277        self.metrics.read().await.clone()
278    }
279}
280
281/// Implement Kademlia routing strategy for adaptive router
282pub struct KademliaRoutingStrategy {
283    dht: Arc<AdaptiveDHT>,
284}
285
286impl KademliaRoutingStrategy {
287    pub fn new(dht: Arc<AdaptiveDHT>) -> Self {
288        Self { dht }
289    }
290}
291
292#[async_trait]
293impl RoutingStrategy for KademliaRoutingStrategy {
294    async fn find_path(&self, target: &NodeId) -> Result<Vec<NodeId>> {
295        let nodes = self.dht.find_closest_nodes(target, 3).await?;
296        Ok(nodes.into_iter().map(|n| n.id).collect())
297    }
298
299    fn route_score(&self, neighbor: &NodeId, target: &NodeId) -> f64 {
300        // XOR distance metric
301        let neighbor_bytes = &neighbor.hash;
302        let target_bytes = &target.hash;
303        let mut distance = 0u32;
304
305        for i in 0..32 {
306            distance += (neighbor_bytes[i] ^ target_bytes[i]).count_ones();
307        }
308
309        // Convert to score (closer = higher score)
310        1.0 / (1.0 + distance as f64)
311    }
312
313    fn update_metrics(&mut self, _path: &[NodeId], _success: bool) {
314        // Metrics updated in AdaptiveDHT
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use std::collections::HashMap;
322
323    #[tokio::test]
324    async fn test_adaptive_dht_creation() {
325        struct MockTrustProvider;
326        impl TrustProvider for MockTrustProvider {
327            fn get_trust(&self, _node: &NodeId) -> f64 {
328                0.5
329            }
330            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
331            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
332                HashMap::new()
333            }
334            fn remove_node(&self, _node: &NodeId) {}
335        }
336
337        let config = DHTConfig::default();
338        let identity = Arc::new(NodeIdentity::generate().unwrap());
339        let trust_provider = Arc::new(MockTrustProvider);
340        let router = Arc::new(AdaptiveRouter::new_with_id(
341            NodeId::from_bytes(*identity.node_id().to_bytes()),
342            trust_provider.clone(),
343        ));
344
345        let dht = AdaptiveDHT::new(config, identity, trust_provider, router)
346            .await
347            .unwrap();
348        let metrics = dht.get_metrics().await;
349
350        assert_eq!(metrics.lookups_total, 0);
351        assert_eq!(metrics.stores_total, 0);
352    }
353
354    #[tokio::test]
355    async fn test_node_id_to_key_conversion() {
356        use rand::RngCore;
357
358        // Create a random UserId
359        let mut hash = [0u8; 32];
360        rand::thread_rng().fill_bytes(&mut hash);
361        let node_id = NodeId::from_bytes(hash);
362
363        let key = AdaptiveDHT::node_id_to_key(&node_id);
364
365        // Should create valid key from node ID
366        assert_eq!(key.len(), 32);
367    }
368}