ipfrs_network/
dht_provider.rs

1//! Custom DHT provider interface for pluggable DHT implementations
2//!
3//! This module provides a trait-based abstraction for DHT implementations,
4//! allowing IPFRS to support multiple DHT backends beyond Kademlia.
5//!
6//! ## Features
7//!
8//! - **Pluggable DHT**: Support for alternative DHT implementations
9//! - **Provider Trait**: Common interface for all DHT backends
10//! - **Provider Registry**: Dynamic registration and selection of DHT providers
11//! - **Extensibility**: Easy integration of custom DHT algorithms
12//!
13//! ## Example
14//!
15//! ```rust
16//! use ipfrs_network::dht_provider::{DhtProvider, DhtProviderRegistry, DhtCapabilities};
17//! use ipfrs_network::dht_provider::kademlia::KademliaDhtProvider;
18//! use std::sync::Arc;
19//!
20//! # #[tokio::main]
21//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
22//! // Register DHT providers
23//! let mut registry = DhtProviderRegistry::new();
24//! registry.register("kademlia", Arc::new(KademliaDhtProvider::new()));
25//!
26//! // Select and use a DHT provider
27//! if let Some(provider) = registry.get("kademlia") {
28//!     let caps = provider.capabilities();
29//!     println!("Provider supports content routing: {}", caps.supports_content_routing);
30//! }
31//! # Ok(())
32//! # }
33//! ```
34
35use cid::Cid;
36use libp2p::PeerId;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::Arc;
40use thiserror::Error;
41
42/// Errors that can occur in DHT provider operations
43#[derive(Debug, Error)]
44pub enum DhtProviderError {
45    /// Provider not found
46    #[error("DHT provider not found: {0}")]
47    ProviderNotFound(String),
48
49    /// Operation not supported by this provider
50    #[error("Operation not supported: {0}")]
51    OperationNotSupported(String),
52
53    /// Configuration error
54    #[error("Configuration error: {0}")]
55    ConfigurationError(String),
56
57    /// Query failed
58    #[error("Query failed: {0}")]
59    QueryFailed(String),
60
61    /// Internal error
62    #[error("Internal error: {0}")]
63    InternalError(String),
64}
65
66/// Capabilities of a DHT provider
67#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct DhtCapabilities {
69    /// Supports content routing (finding providers)
70    pub supports_content_routing: bool,
71    /// Supports peer routing (finding peers)
72    pub supports_peer_routing: bool,
73    /// Supports key-value storage
74    pub supports_kv_storage: bool,
75    /// Supports range queries
76    pub supports_range_queries: bool,
77    /// Supports semantic queries
78    pub supports_semantic_queries: bool,
79    /// Maximum number of hops for queries
80    pub max_query_hops: Option<usize>,
81    /// Supports custom routing algorithms
82    pub supports_custom_routing: bool,
83}
84
85impl DhtCapabilities {
86    /// Create capabilities for a basic DHT
87    pub fn basic() -> Self {
88        Self {
89            supports_content_routing: true,
90            supports_peer_routing: true,
91            supports_kv_storage: false,
92            supports_range_queries: false,
93            supports_semantic_queries: false,
94            max_query_hops: Some(20),
95            supports_custom_routing: false,
96        }
97    }
98
99    /// Create capabilities for an advanced DHT
100    pub fn advanced() -> Self {
101        Self {
102            supports_content_routing: true,
103            supports_peer_routing: true,
104            supports_kv_storage: true,
105            supports_range_queries: true,
106            supports_semantic_queries: true,
107            max_query_hops: Some(20),
108            supports_custom_routing: true,
109        }
110    }
111}
112
113/// Result of a DHT query
114#[derive(Debug, Clone)]
115pub struct DhtQueryResult {
116    /// Peers that provide the content
117    pub providers: Vec<PeerId>,
118    /// Number of hops taken
119    pub hops: usize,
120    /// Query duration in milliseconds
121    pub duration_ms: u64,
122    /// Whether the query was successful
123    pub success: bool,
124}
125
126/// Peer information from DHT
127#[derive(Debug, Clone)]
128pub struct DhtPeerInfo {
129    /// Peer ID
130    pub peer_id: PeerId,
131    /// Addresses
132    pub addresses: Vec<String>,
133    /// Distance metric (provider-specific)
134    pub distance: Option<u64>,
135}
136
137/// Common trait for DHT providers
138pub trait DhtProvider: Send + Sync {
139    /// Get provider name
140    fn name(&self) -> &str;
141
142    /// Get provider version
143    fn version(&self) -> &str;
144
145    /// Get provider capabilities
146    fn capabilities(&self) -> DhtCapabilities;
147
148    /// Bootstrap the DHT with known peers
149    fn bootstrap(&self, peers: Vec<PeerId>) -> Result<(), DhtProviderError>;
150
151    /// Announce content availability
152    fn provide(&self, cid: &Cid) -> Result<(), DhtProviderError>;
153
154    /// Find providers for content
155    fn find_providers(&self, cid: &Cid) -> Result<DhtQueryResult, DhtProviderError>;
156
157    /// Find a specific peer
158    fn find_peer(&self, peer_id: &PeerId) -> Result<DhtPeerInfo, DhtProviderError>;
159
160    /// Get closest peers to a key
161    fn get_closest_peers(
162        &self,
163        key: &[u8],
164        count: usize,
165    ) -> Result<Vec<DhtPeerInfo>, DhtProviderError>;
166
167    /// Put a key-value pair (if supported)
168    fn put_value(&self, key: &[u8], value: &[u8]) -> Result<(), DhtProviderError> {
169        let _ = (key, value);
170        Err(DhtProviderError::OperationNotSupported(
171            "Key-value storage not supported".to_string(),
172        ))
173    }
174
175    /// Get a value by key (if supported)
176    fn get_value(&self, key: &[u8]) -> Result<Vec<u8>, DhtProviderError> {
177        let _ = key;
178        Err(DhtProviderError::OperationNotSupported(
179            "Key-value storage not supported".to_string(),
180        ))
181    }
182
183    /// Get DHT statistics
184    fn stats(&self) -> DhtProviderStats;
185
186    /// Check if DHT is healthy
187    fn is_healthy(&self) -> bool {
188        let stats = self.stats();
189        stats.routing_table_size > 0 && stats.success_rate > 0.5
190    }
191}
192
193/// Statistics for DHT provider
194#[derive(Debug, Clone, Default)]
195pub struct DhtProviderStats {
196    /// Number of peers in routing table
197    pub routing_table_size: usize,
198    /// Total queries executed
199    pub total_queries: u64,
200    /// Successful queries
201    pub successful_queries: u64,
202    /// Failed queries
203    pub failed_queries: u64,
204    /// Average query duration in milliseconds
205    pub avg_query_duration_ms: f64,
206    /// Success rate (0.0 - 1.0)
207    pub success_rate: f64,
208}
209
210impl DhtProviderStats {
211    /// Calculate success rate
212    pub fn calculate_success_rate(&mut self) {
213        if self.total_queries > 0 {
214            self.success_rate = self.successful_queries as f64 / self.total_queries as f64;
215        } else {
216            self.success_rate = 0.0;
217        }
218    }
219}
220
221/// Registry for DHT providers
222pub struct DhtProviderRegistry {
223    /// Registered providers
224    providers: HashMap<String, Arc<dyn DhtProvider>>,
225    /// Active provider name
226    active_provider: Option<String>,
227}
228
229impl DhtProviderRegistry {
230    /// Create a new provider registry
231    pub fn new() -> Self {
232        Self {
233            providers: HashMap::new(),
234            active_provider: None,
235        }
236    }
237
238    /// Register a DHT provider
239    pub fn register(&mut self, name: impl Into<String>, provider: Arc<dyn DhtProvider>) {
240        let name = name.into();
241        self.providers.insert(name.clone(), provider);
242
243        // Set as active if it's the first provider
244        if self.active_provider.is_none() {
245            self.active_provider = Some(name);
246        }
247    }
248
249    /// Get a provider by name
250    pub fn get(&self, name: &str) -> Option<Arc<dyn DhtProvider>> {
251        self.providers.get(name).cloned()
252    }
253
254    /// Get the active provider
255    pub fn get_active(&self) -> Option<Arc<dyn DhtProvider>> {
256        self.active_provider
257            .as_ref()
258            .and_then(|name| self.get(name))
259    }
260
261    /// Set active provider
262    pub fn set_active(&mut self, name: impl Into<String>) -> Result<(), DhtProviderError> {
263        let name = name.into();
264        if self.providers.contains_key(&name) {
265            self.active_provider = Some(name);
266            Ok(())
267        } else {
268            Err(DhtProviderError::ProviderNotFound(name))
269        }
270    }
271
272    /// List all registered providers
273    pub fn list_providers(&self) -> Vec<String> {
274        self.providers.keys().cloned().collect()
275    }
276
277    /// Remove a provider
278    pub fn unregister(&mut self, name: &str) -> Option<Arc<dyn DhtProvider>> {
279        let provider = self.providers.remove(name);
280
281        // Clear active provider if it was removed
282        if self.active_provider.as_deref() == Some(name) {
283            self.active_provider = None;
284        }
285
286        provider
287    }
288
289    /// Get number of registered providers
290    pub fn count(&self) -> usize {
291        self.providers.len()
292    }
293
294    /// Check if a provider is registered
295    pub fn has_provider(&self, name: &str) -> bool {
296        self.providers.contains_key(name)
297    }
298}
299
300impl Default for DhtProviderRegistry {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306/// Kademlia DHT provider implementation
307pub mod kademlia {
308    use super::*;
309
310    /// Kademlia DHT provider
311    pub struct KademliaDhtProvider {
312        stats: parking_lot::RwLock<DhtProviderStats>,
313        config: KademliaConfig,
314    }
315
316    /// Configuration for Kademlia DHT
317    #[derive(Debug, Clone)]
318    pub struct KademliaConfig {
319        /// K-bucket size (number of peers per bucket)
320        pub k_bucket_size: usize,
321        /// Alpha parameter (concurrency)
322        pub alpha: usize,
323        /// Replication factor
324        pub replication_factor: usize,
325        /// Query timeout in seconds
326        pub query_timeout_secs: u64,
327    }
328
329    impl Default for KademliaConfig {
330        fn default() -> Self {
331            Self {
332                k_bucket_size: 20,
333                alpha: 3,
334                replication_factor: 20,
335                query_timeout_secs: 60,
336            }
337        }
338    }
339
340    impl KademliaDhtProvider {
341        /// Create a new Kademlia DHT provider
342        pub fn new() -> Self {
343            Self::with_config(KademliaConfig::default())
344        }
345
346        /// Create with custom configuration
347        pub fn with_config(config: KademliaConfig) -> Self {
348            Self {
349                stats: parking_lot::RwLock::new(DhtProviderStats::default()),
350                config,
351            }
352        }
353
354        /// Get configuration
355        #[allow(dead_code)]
356        pub fn config(&self) -> &KademliaConfig {
357            &self.config
358        }
359    }
360
361    impl Default for KademliaDhtProvider {
362        fn default() -> Self {
363            Self::new()
364        }
365    }
366
367    impl DhtProvider for KademliaDhtProvider {
368        fn name(&self) -> &str {
369            "kademlia"
370        }
371
372        fn version(&self) -> &str {
373            "1.0.0"
374        }
375
376        fn capabilities(&self) -> DhtCapabilities {
377            DhtCapabilities {
378                supports_content_routing: true,
379                supports_peer_routing: true,
380                supports_kv_storage: true,
381                supports_range_queries: false,
382                supports_semantic_queries: false,
383                max_query_hops: Some(20),
384                supports_custom_routing: false,
385            }
386        }
387
388        fn bootstrap(&self, peers: Vec<PeerId>) -> Result<(), DhtProviderError> {
389            // Placeholder: In production, this would connect to bootstrap peers
390            let mut stats = self.stats.write();
391            stats.routing_table_size = peers.len();
392            Ok(())
393        }
394
395        fn provide(&self, _cid: &Cid) -> Result<(), DhtProviderError> {
396            // Placeholder: In production, this would announce to DHT
397            Ok(())
398        }
399
400        fn find_providers(&self, _cid: &Cid) -> Result<DhtQueryResult, DhtProviderError> {
401            // Placeholder: In production, this would query DHT
402            let mut stats = self.stats.write();
403            stats.total_queries += 1;
404            stats.successful_queries += 1;
405            stats.calculate_success_rate();
406
407            Ok(DhtQueryResult {
408                providers: vec![],
409                hops: 0,
410                duration_ms: 0,
411                success: true,
412            })
413        }
414
415        fn find_peer(&self, peer_id: &PeerId) -> Result<DhtPeerInfo, DhtProviderError> {
416            // Placeholder: In production, this would query DHT
417            Ok(DhtPeerInfo {
418                peer_id: *peer_id,
419                addresses: vec![],
420                distance: None,
421            })
422        }
423
424        fn get_closest_peers(
425            &self,
426            _key: &[u8],
427            count: usize,
428        ) -> Result<Vec<DhtPeerInfo>, DhtProviderError> {
429            // Placeholder: In production, this would query routing table
430            let _ = count;
431            Ok(vec![])
432        }
433
434        fn put_value(&self, _key: &[u8], _value: &[u8]) -> Result<(), DhtProviderError> {
435            // Placeholder: In production, this would store in DHT
436            Ok(())
437        }
438
439        fn get_value(&self, _key: &[u8]) -> Result<Vec<u8>, DhtProviderError> {
440            // Placeholder: In production, this would retrieve from DHT
441            Err(DhtProviderError::QueryFailed("Not found".to_string()))
442        }
443
444        fn stats(&self) -> DhtProviderStats {
445            self.stats.read().clone()
446        }
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::kademlia::*;
453    use super::*;
454
455    #[test]
456    fn test_dht_capabilities() {
457        let basic = DhtCapabilities::basic();
458        assert!(basic.supports_content_routing);
459        assert!(basic.supports_peer_routing);
460        assert!(!basic.supports_kv_storage);
461
462        let advanced = DhtCapabilities::advanced();
463        assert!(advanced.supports_content_routing);
464        assert!(advanced.supports_kv_storage);
465        assert!(advanced.supports_semantic_queries);
466    }
467
468    #[test]
469    fn test_kademlia_provider() {
470        let provider = KademliaDhtProvider::new();
471        assert_eq!(provider.name(), "kademlia");
472        assert_eq!(provider.version(), "1.0.0");
473
474        let caps = provider.capabilities();
475        assert!(caps.supports_content_routing);
476        assert!(caps.supports_peer_routing);
477        assert!(caps.supports_kv_storage);
478    }
479
480    #[test]
481    fn test_provider_registry() {
482        let mut registry = DhtProviderRegistry::new();
483        assert_eq!(registry.count(), 0);
484
485        let provider = Arc::new(KademliaDhtProvider::new());
486        registry.register("kademlia", provider);
487        assert_eq!(registry.count(), 1);
488        assert!(registry.has_provider("kademlia"));
489    }
490
491    #[test]
492    fn test_registry_active_provider() {
493        let mut registry = DhtProviderRegistry::new();
494        let provider = Arc::new(KademliaDhtProvider::new());
495        registry.register("kademlia", provider);
496
497        let active = registry.get_active();
498        assert!(active.is_some());
499        assert_eq!(active.unwrap().name(), "kademlia");
500    }
501
502    #[test]
503    fn test_registry_set_active() {
504        let mut registry = DhtProviderRegistry::new();
505        let provider1 = Arc::new(KademliaDhtProvider::new());
506        registry.register("kademlia", provider1);
507
508        registry.set_active("kademlia").unwrap();
509        assert_eq!(registry.get_active().unwrap().name(), "kademlia");
510    }
511
512    #[test]
513    fn test_registry_unregister() {
514        let mut registry = DhtProviderRegistry::new();
515        let provider = Arc::new(KademliaDhtProvider::new());
516        registry.register("kademlia", provider);
517
518        assert_eq!(registry.count(), 1);
519        registry.unregister("kademlia");
520        assert_eq!(registry.count(), 0);
521    }
522
523    #[test]
524    fn test_provider_bootstrap() {
525        let provider = KademliaDhtProvider::new();
526        let peers = vec![PeerId::random(), PeerId::random()];
527        let result = provider.bootstrap(peers);
528        assert!(result.is_ok());
529    }
530
531    #[test]
532    fn test_provider_stats() {
533        let provider = KademliaDhtProvider::new();
534        let stats = provider.stats();
535        assert_eq!(stats.total_queries, 0);
536        assert_eq!(stats.successful_queries, 0);
537    }
538
539    #[test]
540    fn test_provider_health() {
541        let provider = KademliaDhtProvider::new();
542        // Initially unhealthy (no peers)
543        assert!(!provider.is_healthy());
544
545        // Bootstrap with peers
546        provider.bootstrap(vec![PeerId::random()]).unwrap();
547
548        // Perform a query to improve success rate
549        let cid = Cid::default();
550        provider.find_providers(&cid).unwrap();
551
552        // Should be healthy now
553        assert!(provider.is_healthy());
554    }
555
556    #[test]
557    fn test_list_providers() {
558        let mut registry = DhtProviderRegistry::new();
559        let provider = Arc::new(KademliaDhtProvider::new());
560        registry.register("kademlia", provider);
561
562        let providers = registry.list_providers();
563        assert_eq!(providers.len(), 1);
564        assert!(providers.contains(&"kademlia".to_string()));
565    }
566
567    #[test]
568    fn test_provider_not_found() {
569        let mut registry = DhtProviderRegistry::new();
570        let result = registry.set_active("nonexistent");
571        assert!(result.is_err());
572        assert!(matches!(result, Err(DhtProviderError::ProviderNotFound(_))));
573    }
574
575    #[test]
576    fn test_stats_success_rate() {
577        let mut stats = DhtProviderStats {
578            total_queries: 10,
579            successful_queries: 7,
580            ..Default::default()
581        };
582        stats.calculate_success_rate();
583        assert!((stats.success_rate - 0.7).abs() < 0.01);
584    }
585
586    #[test]
587    fn test_kademlia_config_default() {
588        let config = KademliaConfig::default();
589        assert_eq!(config.k_bucket_size, 20);
590        assert_eq!(config.alpha, 3);
591        assert_eq!(config.replication_factor, 20);
592        assert_eq!(config.query_timeout_secs, 60);
593    }
594
595    #[test]
596    fn test_unsupported_operation() {
597        let provider = KademliaDhtProvider::new();
598
599        // This should work (Kademlia supports KV storage)
600        let result = provider.put_value(b"key", b"value");
601        assert!(result.is_ok());
602    }
603}