saorsa_core/dht/
rsps_integration.rs

1//! RSPS Integration with DHT Storage
2//!
3//! This module integrates Root-Scoped Provider Summaries (RSPS) with the DHT storage layer,
4//! enabling efficient content discovery and cache admission control.
5
6use crate::dht::optimized_storage::OptimizedDHTStorage;
7use crate::dht::{Key, Record};
8use crate::error::{P2PError, P2pResult as Result, StorageError};
9use crate::{Multiaddr, PeerId};
10use saorsa_rsps::{
11    CachePolicy, Cid, RootAnchoredCache, RootCid, Rsps, RspsConfig, TtlConfig, TtlEngine, TtlStats,
12    WitnessKey, WitnessReceipt, witness::ReceiptMetadata,
13};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{Duration, SystemTime};
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19
20/// RSPS-enhanced DHT storage that uses provider summaries for efficient routing
21pub struct RspsDhtStorage {
22    /// Base DHT storage layer
23    base_storage: Arc<OptimizedDHTStorage>,
24    /// Root-anchored cache with RSPS admission control
25    cache: Arc<RootAnchoredCache>,
26    /// Map of root CIDs to their provider summaries
27    provider_summaries: Arc<RwLock<HashMap<RootCid, ProviderRecord>>>,
28    /// TTL management engine
29    ttl_manager: Arc<TtlEngine>,
30    /// Witness key for generating receipts
31    witness_key: Arc<WitnessKey>,
32    /// Local peer ID
33    local_peer: PeerId,
34    /// Configuration
35    config: RspsDhtConfig,
36}
37
38/// Configuration for RSPS-DHT integration (wrapper around dht_rsps::RspsConfig)
39#[derive(Debug, Clone)]
40pub struct RspsDhtConfig {
41    /// Maximum cache size in bytes
42    pub max_cache_size: usize,
43    /// Maximum items per root in cache
44    pub max_items_per_root: usize,
45    /// Base TTL for cached items
46    pub base_ttl: Duration,
47    /// Minimum receipts for TTL extension
48    pub min_receipts_for_extension: usize,
49    /// Maximum TTL multiplier
50    pub max_ttl_multiplier: f64,
51    /// Witness pseudonym refresh interval
52    pub pseudonym_refresh_interval: Duration,
53    /// Provider summary update interval
54    pub summary_update_interval: Duration,
55}
56
57impl Default for RspsDhtConfig {
58    fn default() -> Self {
59        Self {
60            max_cache_size: 100 * 1024 * 1024, // 100MB
61            max_items_per_root: 1000,
62            base_ttl: Duration::from_secs(3600), // 1 hour
63            min_receipts_for_extension: 3,
64            max_ttl_multiplier: 8.0,
65            pseudonym_refresh_interval: Duration::from_secs(86400), // 24 hours
66            summary_update_interval: Duration::from_secs(300),      // 5 minutes
67        }
68    }
69}
70
71impl From<RspsDhtConfig> for RspsConfig {
72    fn from(_config: RspsDhtConfig) -> Self {
73        // Use defaults for dht_rsps::RspsConfig since the structures don't match
74        RspsConfig::default()
75    }
76}
77
78/// Provider record containing RSPS and metadata
79#[derive(Debug, Clone)]
80pub struct ProviderRecord {
81    /// The provider's peer ID
82    pub provider: PeerId,
83    /// Provider's network addresses
84    pub addresses: Vec<Multiaddr>,
85    /// Root-scoped provider summary
86    pub rsps: Arc<Rsps>,
87    /// Last update timestamp
88    pub last_updated: SystemTime,
89    /// Provider's witness receipts
90    pub receipts: Vec<WitnessReceipt>,
91}
92
93impl RspsDhtStorage {
94    /// Create a new RSPS-enhanced DHT storage
95    pub async fn new(
96        base_storage: Arc<OptimizedDHTStorage>,
97        local_peer: PeerId,
98        config: RspsDhtConfig,
99    ) -> Result<Self> {
100        // Initialize cache with policy
101        let cache_policy = CachePolicy {
102            max_size: config.max_cache_size,
103            max_items_per_root: config.max_items_per_root,
104            min_root_depth: 2,
105            pledge_ratio: 1.5,
106        };
107        let cache = Arc::new(RootAnchoredCache::new(cache_policy));
108
109        // Initialize TTL manager
110        let ttl_config = TtlConfig {
111            base_ttl: config.base_ttl,
112            ttl_per_hit: Duration::from_secs(30 * 60), // 30 minutes per hit
113            max_hit_ttl: Duration::from_secs(12 * 3600), // 12 hours max from hits
114            ttl_per_receipt: Duration::from_secs(10 * 60), // 10 minutes per receipt
115            max_receipt_ttl: Duration::from_secs(2 * 3600), // 2 hours max from receipts
116            bucket_window: Duration::from_secs(5 * 60), // 5 minute buckets
117        };
118        let ttl_manager = Arc::new(TtlEngine::new(ttl_config));
119
120        // Generate witness key
121        let witness_key = Arc::new(WitnessKey::generate());
122
123        Ok(Self {
124            base_storage,
125            cache,
126            provider_summaries: Arc::new(RwLock::new(HashMap::new())),
127            ttl_manager,
128            witness_key,
129            local_peer,
130            config,
131        })
132    }
133
134    /// Store a provider record with RSPS
135    pub async fn store_provider(
136        &self,
137        root_cid: RootCid,
138        provider: PeerId,
139        addresses: Vec<Multiaddr>,
140        rsps: Rsps,
141    ) -> Result<()> {
142        info!(
143            "Storing provider record for root {:?} from peer {:?}",
144            root_cid, provider
145        );
146
147        // Create provider record
148        let record = ProviderRecord {
149            provider: provider.clone(),
150            addresses,
151            rsps: Arc::new(rsps),
152            last_updated: SystemTime::now(),
153            receipts: Vec::new(),
154        };
155
156        // Store in provider summaries
157        let mut summaries = self.provider_summaries.write().await;
158        summaries.insert(root_cid, record.clone());
159
160        // Create DHT record for provider announcement
161        let key = self.provider_key(&root_cid, &provider);
162        let value = self.serialize_provider_record(&record)?;
163
164        let dht_record = Record {
165            key,
166            value,
167            publisher: {
168                // Convert PeerId (String) to NodeId
169                let peer_bytes = self.local_peer.as_bytes();
170                let mut node_id_bytes = [0u8; 32];
171                let len = peer_bytes.len().min(32);
172                node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
173                crate::identity::node_identity::NodeId::from_bytes(node_id_bytes)
174            },
175            expires_at: SystemTime::now() + self.config.summary_update_interval,
176            created_at: SystemTime::now(),
177            signature: Some(Vec::new()),
178        };
179
180        // Store in base DHT
181        self.base_storage.store(dht_record).await?;
182
183        debug!("Provider record stored successfully");
184        Ok(())
185    }
186
187    /// Find providers for a root CID
188    pub async fn find_providers(&self, root_cid: &RootCid) -> Result<Vec<ProviderRecord>> {
189        let summaries = self.provider_summaries.read().await;
190
191        // Check local cache first
192        if let Some(record) = summaries.get(root_cid) {
193            debug!("Found provider in local cache");
194            return Ok(vec![record.clone()]);
195        }
196
197        // Query DHT for providers
198        let pattern = self.provider_key_pattern(root_cid);
199        let records = self
200            .base_storage
201            .get_records_by_publisher(&pattern, None)
202            .await
203            .iter()
204            .filter_map(|record| {
205                let key_str = std::str::from_utf8(&record.key).ok()?;
206                if key_str.starts_with(&pattern) {
207                    Some(record.clone())
208                } else {
209                    None
210                }
211            })
212            .collect::<Vec<_>>();
213
214        let mut providers = Vec::new();
215        for record in records {
216            if let Ok(provider_record) = self.deserialize_provider_record(&record.value) {
217                providers.push(provider_record);
218            }
219        }
220
221        info!(
222            "Found {} providers for root {:?}",
223            providers.len(),
224            root_cid
225        );
226        Ok(providers)
227    }
228
229    /// Cache a CID if it's in the RSPS for the root
230    pub async fn cache_if_allowed(
231        &self,
232        root_cid: RootCid,
233        cid: Cid,
234        data: Vec<u8>,
235    ) -> Result<bool> {
236        // Check if we have RSPS for this root
237        let summaries = self.provider_summaries.read().await;
238        let _provider_record =
239            summaries
240                .get(&root_cid)
241                .ok_or(P2PError::Storage(StorageError::Database(
242                    std::borrow::Cow::Borrowed("No RSPS for root"),
243                )))?;
244
245        // Use cache admission control
246        let admitted = self.cache.admit(root_cid, cid, data.clone()).map_err(|e| {
247            P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
248                "Cache admission failed: {}",
249                e
250            ))))
251        })?;
252
253        if admitted {
254            // Record in TTL manager
255            let ttl = self.ttl_manager.record_hit(&cid).map_err(|e| {
256                P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
257                    "TTL record failed: {}",
258                    e
259                ))))
260            })?;
261
262            info!("Cached CID {:?} with TTL {:?}", cid, ttl);
263        } else {
264            debug!("CID {:?} not admitted to cache", cid);
265        }
266
267        Ok(admitted)
268    }
269
270    /// Generate a witness receipt for retrieved content
271    pub async fn generate_receipt(&self, cid: &Cid) -> Result<WitnessReceipt> {
272        // Get current epoch (simplified)
273        let epoch = std::time::SystemTime::now()
274            .duration_since(std::time::UNIX_EPOCH)
275            .unwrap_or_default()
276            .as_secs();
277
278        // Create receipt metadata
279        let metadata = ReceiptMetadata {
280            latency_ms: 0,
281            content_size: 0,
282            valid: true,
283            error: None,
284        };
285
286        // Create receipt using proper API
287        let receipt = self.witness_key.create_receipt(*cid, epoch, metadata);
288
289        // Record receipt in TTL manager for extension logic
290        let witness_id = self.witness_key.public_key();
291        self.ttl_manager
292            .record_receipt(cid, witness_id)
293            .map_err(|e| {
294                P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
295                    "Failed to record receipt: {}",
296                    e
297                ))))
298            })?;
299
300        receipt.map_err(|e| {
301            P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
302                "RSPS receipt creation failed: {}",
303                e
304            ))))
305        })
306    }
307
308    /// Batch generate receipts for multiple CIDs
309    pub async fn generate_receipt_batch(&self, cids: &[Cid]) -> Result<Vec<WitnessReceipt>> {
310        let mut batch = Vec::new();
311
312        for cid in cids {
313            let receipt = self.generate_receipt(cid).await?;
314            batch.push(receipt);
315        }
316
317        Ok(batch)
318    }
319
320    /// Verify a witness receipt
321    pub async fn verify_receipt(&self, receipt: &WitnessReceipt) -> Result<bool> {
322        // In production, this would verify against known witness keys
323        // For now, we just check the signature format
324        Ok(!receipt.signature.is_empty())
325    }
326
327    /// Update RSPS for a root based on new content
328    pub async fn update_rsps(&self, root_cid: &RootCid, new_cids: Vec<Cid>) -> Result<()> {
329        let mut summaries = self.provider_summaries.write().await;
330
331        if let Some(record) = summaries.get_mut(root_cid) {
332            // Create new RSPS with updated CIDs
333            let mut all_cids = HashSet::new();
334
335            // Get existing CIDs from RSPS
336            // Note: This requires iterating through possible CIDs to check membership
337            // In production, we'd maintain a separate index
338
339            // Add new CIDs
340            for cid in new_cids {
341                all_cids.insert(cid);
342            }
343
344            // Create updated RSPS
345            let cid_vec: Vec<Cid> = all_cids.into_iter().collect();
346            let new_rsps =
347                Rsps::new(*root_cid, 1, &cid_vec, &RspsConfig::default()).map_err(|e| {
348                    P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
349                        "RSPS creation failed: {}",
350                        e
351                    ))))
352                })?;
353
354            // Update record
355            record.rsps = Arc::new(new_rsps);
356            record.last_updated = SystemTime::now();
357
358            info!("Updated RSPS for root {:?}", root_cid);
359        } else {
360            warn!("No existing RSPS for root {:?}", root_cid);
361        }
362
363        Ok(())
364    }
365
366    /// Clean up expired entries
367    pub async fn cleanup_expired(&self) -> Result<()> {
368        // Note: RootAnchoredCache doesn't have cleanup_expired method
369        // Cache cleanup happens automatically during eviction
370        debug!("Cache cleanup managed automatically");
371
372        // Clean up TTL manager
373        let expired_cids = self.ttl_manager.cleanup_expired();
374        debug!("Removed {} expired TTL entries", expired_cids.len());
375
376        // Clean up old provider summaries
377        let mut summaries = self.provider_summaries.write().await;
378        let now = SystemTime::now();
379        let expired_roots: Vec<RootCid> = summaries
380            .iter()
381            .filter(|(_, record)| {
382                now.duration_since(record.last_updated)
383                    .unwrap_or(Duration::ZERO)
384                    > self.config.summary_update_interval * 2
385            })
386            .map(|(root, _)| *root)
387            .collect();
388
389        for root in expired_roots {
390            summaries.remove(&root);
391            debug!("Removed expired provider summary for root {:?}", root);
392        }
393
394        Ok(())
395    }
396
397    /// Get cache statistics
398    pub async fn get_cache_stats(&self) -> CacheStats {
399        CacheStats {
400            total_cached_items: self.cache.stats().total_items,
401            total_cache_size: self.cache.stats().total_size,
402            roots_tracked: self.provider_summaries.read().await.len(),
403            ttl_stats: TtlStats {
404                hit_count: 0,
405                receipt_count: 0,
406                active_buckets: 0,
407                remaining_ttl: std::time::Duration::ZERO,
408                total_ttl: std::time::Duration::ZERO,
409            },
410        }
411    }
412
413    // Helper methods
414
415    fn provider_key(&self, root_cid: &RootCid, provider: &PeerId) -> Key {
416        let key_str = format!("/rsps/provider/{}/{}", hex::encode(root_cid), provider);
417        let hash = blake3::hash(key_str.as_bytes());
418        *hash.as_bytes()
419    }
420
421    fn provider_key_pattern(&self, root_cid: &RootCid) -> String {
422        format!("/rsps/provider/{}/", hex::encode(root_cid))
423    }
424
425    fn serialize_provider_record(&self, record: &ProviderRecord) -> Result<Vec<u8>> {
426        // In production, use proper serialization (e.g., protobuf)
427        Ok(format!("{:?}", record).into_bytes())
428    }
429
430    fn deserialize_provider_record(&self, _data: &[u8]) -> Result<ProviderRecord> {
431        // In production, use proper deserialization
432        Err(P2PError::Serialization("Not implemented".into()))
433    }
434}
435
436/// Cache statistics
437#[derive(Debug, Clone)]
438pub struct CacheStats {
439    pub total_cached_items: usize,
440    pub total_cache_size: usize,
441    pub roots_tracked: usize,
442    pub ttl_stats: saorsa_rsps::TtlStats,
443}
444
445#[cfg(test)]
446mod tests {
447
448    #[tokio::test]
449    async fn test_rsps_integration() {
450        // Test will be implemented
451    }
452}