saorsa_core/adaptive/
dht_integration.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! DHT Integration for Adaptive Network
15//!
16//! This module bridges the existing S/Kademlia implementation with the adaptive
17//! network, providing trust-weighted routing and integration with other adaptive
18//! components.
19
20use super::*;
21use crate::dht::skademlia::{SKademlia, SKademliaConfig};
22use crate::dht::{DHT, DHTConfig, DhtKey, Key as DHTKey};
23use crate::{Multiaddr, PeerId};
24use async_trait::async_trait;
25use sha2::Digest;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::RwLock;
29
30/// Adaptive DHT that integrates S/Kademlia with trust scoring
31pub struct AdaptiveDHT {
32    /// Underlying S/Kademlia implementation
33    _skademlia: Arc<RwLock<SKademlia>>,
34
35    /// Base DHT for standard operations
36    base_dht: Arc<RwLock<DHT>>,
37
38    /// Trust provider for weighted routing
39    trust_provider: Arc<dyn TrustProvider>,
40
41    /// Adaptive router for strategy selection
42    router: Arc<AdaptiveRouter>,
43
44    /// Local node identity
45    _identity: Arc<NodeIdentity>,
46
47    /// Performance metrics
48    metrics: Arc<RwLock<DHTMetrics>>,
49}
50
51/// DHT performance metrics
52#[derive(Debug, Default, Clone)]
53pub struct DHTMetrics {
54    pub lookups_total: u64,
55    pub lookups_successful: u64,
56    pub stores_total: u64,
57    pub stores_successful: u64,
58    pub average_lookup_hops: f64,
59    pub trust_rejections: u64,
60}
61
62impl AdaptiveDHT {
63    /// Create new adaptive DHT instance
64    pub async fn new(
65        _config: DHTConfig,
66        identity: Arc<NodeIdentity>,
67        trust_provider: Arc<dyn TrustProvider>,
68        router: Arc<AdaptiveRouter>,
69    ) -> Result<Self> {
70        let skademlia_config = SKademliaConfig {
71            min_routing_reputation: 0.3,
72            enable_distance_verification: true,
73            enable_routing_validation: true,
74            ..Default::default()
75        };
76
77        // Create DHT with local ID from identity
78        let local_key = Self::node_id_to_key(identity.node_id());
79        // Convert Key to NodeId for DhtCoreEngine
80        let node_id = crate::dht::core_engine::NodeId::from_key(DhtKey::from_bytes(local_key));
81        let base_dht = Arc::new(RwLock::new(DHT::new(node_id)?));
82        // Create reputation manager for S/Kademlia
83        let _reputation_manager = crate::security::ReputationManager::new(0.99, 0.1);
84        let skademlia = Arc::new(RwLock::new(SKademlia::new(skademlia_config)));
85
86        Ok(Self {
87            _skademlia: skademlia,
88            base_dht,
89            trust_provider,
90            router,
91            _identity: identity,
92            metrics: Arc::new(RwLock::new(DHTMetrics::default())),
93        })
94    }
95
96    /// Convert adaptive NodeId to DHT Key
97    fn node_id_to_key(node_id: &NodeId) -> DHTKey {
98        node_id.hash
99    }
100
101    /// Store value in the DHT with trust-based replication
102    pub async fn store(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ContentHash> {
103        let mut metrics = self.metrics.write().await;
104        metrics.stores_total += 1;
105
106        // Create DHT key
107        let hash = blake3::hash(&key);
108        let dht_key = *hash.as_bytes();
109
110        // Store in base DHT using store method
111        let mut dht = self.base_dht.write().await;
112        dht.store(&DhtKey::from_bytes(dht_key), value.clone())
113            .await
114            .map_err(|e| AdaptiveNetworkError::Other(e.to_string()))?;
115
116        metrics.stores_successful += 1;
117
118        // Return content hash
119        let mut hasher = sha2::Sha256::new();
120        hasher.update(&key);
121        hasher.update(&value);
122        let result = hasher.finalize();
123        let mut hash = [0u8; 32];
124        hash.copy_from_slice(&result);
125
126        Ok(ContentHash(hash))
127    }
128
129    /// Retrieve value from DHT using adaptive routing
130    pub async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
131        let mut metrics = self.metrics.write().await;
132        metrics.lookups_total += 1;
133
134        // Use adaptive router to select routing strategy
135        let target_id = NodeId::from_bytes(hash.0);
136        let _path = self
137            .router
138            .route(&target_id, ContentType::DataRetrieval)
139            .await?;
140
141        // Create DHT key from hash
142        let dht_key = DhtKey::from_bytes(hash.0);
143
144        // Lookup in base DHT
145        let dht = self.base_dht.read().await;
146        match dht.retrieve(&dht_key).await {
147            Ok(Some(value)) => {
148                metrics.lookups_successful += 1;
149                Ok(value)
150            }
151            Ok(None) => Err(AdaptiveNetworkError::Other("Record not found".to_string())),
152            Err(e) => Err(AdaptiveNetworkError::Other(e.to_string())),
153        }
154    }
155
156    /// Find nodes close to a key using trust-weighted selection
157    pub async fn find_closest_nodes(
158        &self,
159        target: &NodeId,
160        count: usize,
161    ) -> Result<Vec<NodeDescriptor>> {
162        let dht_key = DhtKey::from_bytes(Self::node_id_to_key(target));
163        let dht = self.base_dht.read().await;
164
165        // Get closest nodes from DHT using find_node
166        let nodes = dht
167            .find_nodes(&dht_key, 8)
168            .await
169            .unwrap_or_else(|_| Vec::new());
170
171        // Sort by trust score
172        let sorted_nodes: Vec<_> = nodes
173            .into_iter()
174            .filter_map(|node| {
175                // Extract node ID from peer_id string
176                // NodeInfo has id field which is NodeId
177                let mut hash = [0u8; 32];
178                let peer_bytes = node.id.as_bytes();
179                if peer_bytes.len() >= 32 {
180                    hash.copy_from_slice(&peer_bytes[..32]);
181                } else {
182                    // If peer_id is shorter, hash it
183                    let hashed = blake3::hash(peer_bytes);
184                    hash.copy_from_slice(hashed.as_bytes());
185                }
186                let node_id = NodeId::from_bytes(hash);
187                let trust = self.trust_provider.get_trust(&node_id);
188
189                // Filter out low-trust nodes
190                if trust < 0.3 {
191                    return None;
192                }
193
194                Some((node, trust))
195            })
196            .collect();
197
198        // Sort by trust descending
199        let mut sorted_nodes = sorted_nodes;
200        sorted_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201
202        // Take top nodes and convert to NodeDescriptors
203        Ok(sorted_nodes
204            .into_iter()
205            .take(count)
206            .map(|(node, trust)| {
207                // Convert node.id to array
208                let mut hash = [0u8; 32];
209                let peer_bytes = node.id.as_bytes();
210                if peer_bytes.len() >= 32 {
211                    hash.copy_from_slice(&peer_bytes[..32]);
212                } else {
213                    let hashed = blake3::hash(peer_bytes);
214                    hash.copy_from_slice(hashed.as_bytes());
215                }
216                let node_id = NodeId::from_bytes(hash);
217
218                NodeDescriptor {
219                    id: node_id,
220                    // TODO: Get real key from node - for now use a deterministic dummy key
221                    // This is a known valid ed25519 public key (the generator point)
222                    public_key: {
223                        // Try to use a placeholder key first
224                        let placeholder = [
225                            1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
226                            21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
227                        ];
228
229                        // Use the ed25519 base point as fallback (always valid)
230                        const ED25519_BASEPOINT_BYTES: [u8; 32] = [
231                            0x58, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66,
232                            0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66,
233                            0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66,
234                        ];
235
236                        ed25519_dalek::VerifyingKey::from_bytes(&placeholder).unwrap_or_else(|_| {
237                            // Fall back to the known valid base point
238                            ed25519_dalek::VerifyingKey::from_bytes(&ED25519_BASEPOINT_BYTES)
239                                .ok()
240                                .unwrap_or_else(|| {
241                                    // If even the base point fails, something is seriously wrong
242                                    // Create a verifying key from the generator point another way
243                                    use ed25519_dalek::SigningKey;
244                                    let signing_key = SigningKey::from_bytes(&[1u8; 32]);
245                                    signing_key.verifying_key()
246                                })
247                        })
248                    },
249                    addresses: vec![node.address.clone()],
250                    hyperbolic: None,
251                    som_position: None,
252                    trust,
253                    capabilities: NodeCapabilities {
254                        storage: 0,
255                        compute: 0,
256                        bandwidth: 0,
257                    },
258                }
259            })
260            .collect())
261    }
262
263    /// Update routing table with new node information
264    pub async fn update_routing(&self, node: NodeDescriptor) -> Result<()> {
265        // Convert NodeId to PeerId (using the hash as peer ID string)
266        let _peer_id = PeerId::from_str(&node.id.to_string())
267            .map_err(|e| AdaptiveNetworkError::Other(format!("Invalid peer ID: {e}")))?;
268
269        // Parse addresses to Multiaddr
270        let addresses: Vec<Multiaddr> = node
271            .addresses
272            .iter()
273            .filter_map(|a| Multiaddr::from_str(a).ok())
274            .collect();
275
276        if addresses.is_empty() {
277            return Err(AdaptiveNetworkError::Other(
278                "No valid addresses".to_string(),
279            ));
280        }
281
282        // Note: add_node doesn't exist on DhtCoreEngine
283        // DhtCoreEngine manages nodes internally through network operations
284        // For now, just return Ok as nodes are discovered through the network
285        Ok(())
286    }
287
288    /// Get current DHT metrics
289    pub async fn get_metrics(&self) -> DHTMetrics {
290        self.metrics.read().await.clone()
291    }
292}
293
294/// Implement Kademlia routing strategy for adaptive router
295pub struct KademliaRoutingStrategy {
296    dht: Arc<AdaptiveDHT>,
297}
298
299impl KademliaRoutingStrategy {
300    pub fn new(dht: Arc<AdaptiveDHT>) -> Self {
301        Self { dht }
302    }
303}
304
305#[async_trait]
306impl RoutingStrategy for KademliaRoutingStrategy {
307    async fn find_path(&self, target: &NodeId) -> Result<Vec<NodeId>> {
308        let nodes = self.dht.find_closest_nodes(target, 3).await?;
309        Ok(nodes.into_iter().map(|n| n.id).collect())
310    }
311
312    fn route_score(&self, neighbor: &NodeId, target: &NodeId) -> f64 {
313        // XOR distance metric
314        let neighbor_bytes = &neighbor.hash;
315        let target_bytes = &target.hash;
316        let mut distance = 0u32;
317
318        for i in 0..32 {
319            distance += (neighbor_bytes[i] ^ target_bytes[i]).count_ones();
320        }
321
322        // Convert to score (closer = higher score)
323        1.0 / (1.0 + distance as f64)
324    }
325
326    fn update_metrics(&mut self, _path: &[NodeId], _success: bool) {
327        // Metrics updated in AdaptiveDHT
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use std::collections::HashMap;
335
336    #[tokio::test]
337    async fn test_adaptive_dht_creation() {
338        struct MockTrustProvider;
339        impl TrustProvider for MockTrustProvider {
340            fn get_trust(&self, _node: &NodeId) -> f64 {
341                0.5
342            }
343            fn update_trust(&self, _from: &NodeId, _to: &NodeId, _success: bool) {}
344            fn get_global_trust(&self) -> HashMap<NodeId, f64> {
345                HashMap::new()
346            }
347            fn remove_node(&self, _node: &NodeId) {}
348        }
349
350        let config = DHTConfig::default();
351        let identity = Arc::new(NodeIdentity::generate().unwrap());
352        let trust_provider = Arc::new(MockTrustProvider);
353        let router = Arc::new(AdaptiveRouter::new_with_id(
354            identity.node_id().clone(),
355            trust_provider.clone(),
356        ));
357
358        let dht = AdaptiveDHT::new(config, identity, trust_provider, router)
359            .await
360            .unwrap();
361        let metrics = dht.get_metrics().await;
362
363        assert_eq!(metrics.lookups_total, 0);
364        assert_eq!(metrics.stores_total, 0);
365    }
366
367    #[tokio::test]
368    async fn test_node_id_to_key_conversion() {
369        use rand::RngCore;
370
371        // Create a random UserId
372        let mut hash = [0u8; 32];
373        rand::thread_rng().fill_bytes(&mut hash);
374        let node_id = NodeId::from_bytes(hash);
375
376        let key = AdaptiveDHT::node_id_to_key(&node_id);
377
378        // Should create valid key from node ID
379        assert_eq!(key.len(), 32);
380    }
381}