saorsa_core/dht/
rsps_integration.rs

1//! RSPS Integration with DHT Storage
2//!
3//! This module integrates Root-Scoped Provider Summaries (RSPS) with the DHT storage layer,
4//! enabling efficient content discovery and cache admission control.
5
6use crate::dht::optimized_storage::OptimizedDHTStorage;
7use crate::dht::{Key, Record};
8use crate::error::{P2PError, P2pResult as Result, StorageError};
9use crate::{Multiaddr, PeerId};
10use saorsa_rsps::{
11    CachePolicy, Cid, RootAnchoredCache, RootCid, Rsps, RspsConfig, TtlConfig, TtlEngine, TtlStats,
12    WitnessKey, WitnessReceipt, witness::ReceiptMetadata,
13};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{Duration, SystemTime};
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19
20/// RSPS-enhanced DHT storage that uses provider summaries for efficient routing
21pub struct RspsDhtStorage {
22    /// Base DHT storage layer
23    base_storage: Arc<OptimizedDHTStorage>,
24    /// Root-anchored cache with RSPS admission control
25    cache: Arc<RootAnchoredCache>,
26    /// Map of root CIDs to their provider summaries
27    provider_summaries: Arc<RwLock<HashMap<RootCid, ProviderRecord>>>,
28    /// TTL management engine
29    ttl_manager: Arc<TtlEngine>,
30    /// Witness key for generating receipts
31    witness_key: Arc<WitnessKey>,
32    /// Local peer ID
33    local_peer: PeerId,
34    /// Configuration
35    config: RspsDhtConfig,
36}
37
38/// Configuration for RSPS-DHT integration (wrapper around dht_rsps::RspsConfig)
39#[derive(Debug, Clone)]
40pub struct RspsDhtConfig {
41    /// Maximum cache size in bytes
42    pub max_cache_size: usize,
43    /// Maximum items per root in cache
44    pub max_items_per_root: usize,
45    /// Base TTL for cached items
46    pub base_ttl: Duration,
47    /// Minimum receipts for TTL extension
48    pub min_receipts_for_extension: usize,
49    /// Maximum TTL multiplier
50    pub max_ttl_multiplier: f64,
51    /// Witness pseudonym refresh interval
52    pub pseudonym_refresh_interval: Duration,
53    /// Provider summary update interval
54    pub summary_update_interval: Duration,
55}
56
57impl Default for RspsDhtConfig {
58    fn default() -> Self {
59        Self {
60            max_cache_size: 100 * 1024 * 1024, // 100MB
61            max_items_per_root: 1000,
62            base_ttl: Duration::from_secs(3600), // 1 hour
63            min_receipts_for_extension: 3,
64            max_ttl_multiplier: 8.0,
65            pseudonym_refresh_interval: Duration::from_secs(86400), // 24 hours
66            summary_update_interval: Duration::from_secs(300),      // 5 minutes
67        }
68    }
69}
70
71impl From<RspsDhtConfig> for RspsConfig {
72    fn from(_config: RspsDhtConfig) -> Self {
73        // Use defaults for dht_rsps::RspsConfig since the structures don't match
74        RspsConfig::default()
75    }
76}
77
78/// Provider record containing RSPS and metadata
79#[derive(Debug, Clone)]
80pub struct ProviderRecord {
81    /// The provider's peer ID
82    pub provider: PeerId,
83    /// Provider's network addresses
84    pub addresses: Vec<Multiaddr>,
85    /// Root-scoped provider summary
86    pub rsps: Arc<Rsps>,
87    /// Last update timestamp
88    pub last_updated: SystemTime,
89    /// Provider's witness receipts
90    pub receipts: Vec<WitnessReceipt>,
91}
92
93impl RspsDhtStorage {
94    /// Create a new RSPS-enhanced DHT storage
95    pub async fn new(
96        base_storage: Arc<OptimizedDHTStorage>,
97        local_peer: PeerId,
98        config: RspsDhtConfig,
99    ) -> Result<Self> {
100        // Initialize cache with policy
101        let cache_policy = CachePolicy {
102            max_size: config.max_cache_size,
103            max_items_per_root: config.max_items_per_root,
104            min_root_depth: 2,
105            pledge_ratio: 1.5,
106        };
107        let cache = Arc::new(RootAnchoredCache::new(cache_policy));
108
109        // Initialize TTL manager
110        let ttl_config = TtlConfig {
111            base_ttl: config.base_ttl,
112            ttl_per_hit: Duration::from_secs(30 * 60), // 30 minutes per hit
113            max_hit_ttl: Duration::from_secs(12 * 3600), // 12 hours max from hits
114            ttl_per_receipt: Duration::from_secs(10 * 60), // 10 minutes per receipt
115            max_receipt_ttl: Duration::from_secs(2 * 3600), // 2 hours max from receipts
116            bucket_window: Duration::from_secs(5 * 60), // 5 minute buckets
117        };
118        let ttl_manager = Arc::new(TtlEngine::new(ttl_config));
119
120        // Generate witness key
121        let witness_key = Arc::new(WitnessKey::generate());
122
123        Ok(Self {
124            base_storage,
125            cache,
126            provider_summaries: Arc::new(RwLock::new(HashMap::new())),
127            ttl_manager,
128            witness_key,
129            local_peer,
130            config,
131        })
132    }
133
134    /// Store a provider record with RSPS
135    pub async fn store_provider(
136        &self,
137        root_cid: RootCid,
138        provider: PeerId,
139        addresses: Vec<Multiaddr>,
140        rsps: Rsps,
141    ) -> Result<()> {
142        info!(
143            "Storing provider record for root {:?} from peer {:?}",
144            root_cid, provider
145        );
146
147        // Register RSPS with the cache for admission control
148        self.cache.register_rsps(rsps.clone());
149
150        // Create provider record
151        let record = ProviderRecord {
152            provider: provider.clone(),
153            addresses,
154            rsps: Arc::new(rsps),
155            last_updated: SystemTime::now(),
156            receipts: Vec::new(),
157        };
158
159        // Store in provider summaries
160        let mut summaries = self.provider_summaries.write().await;
161        summaries.insert(root_cid, record.clone());
162
163        // Create DHT record for provider announcement
164        let key = self.provider_key(&root_cid, &provider);
165        let value = self.serialize_provider_record(&record)?;
166
167        let dht_record = Record {
168            key,
169            value,
170            publisher: {
171                // Convert PeerId (String) to NodeId
172                let peer_bytes = self.local_peer.as_bytes();
173                let mut node_id_bytes = [0u8; 32];
174                let len = peer_bytes.len().min(32);
175                node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
176                crate::identity::node_identity::NodeId::from_bytes(node_id_bytes)
177            },
178            expires_at: SystemTime::now() + self.config.summary_update_interval,
179            created_at: SystemTime::now(),
180            signature: Some(Vec::new()),
181        };
182
183        // Store in base DHT
184        self.base_storage.store(dht_record).await?;
185
186        debug!("Provider record stored successfully");
187        Ok(())
188    }
189
190    /// Find providers for a root CID
191    pub async fn find_providers(&self, root_cid: &RootCid) -> Result<Vec<ProviderRecord>> {
192        let summaries = self.provider_summaries.read().await;
193
194        // Check local cache first
195        if let Some(record) = summaries.get(root_cid) {
196            debug!("Found provider in local cache");
197            return Ok(vec![record.clone()]);
198        }
199
200        // Query DHT for providers
201        let pattern = self.provider_key_pattern(root_cid);
202        let records = self
203            .base_storage
204            .get_records_by_publisher(&pattern, None)
205            .await
206            .iter()
207            .filter_map(|record| {
208                let key_str = std::str::from_utf8(&record.key).ok()?;
209                if key_str.starts_with(&pattern) {
210                    Some(record.clone())
211                } else {
212                    None
213                }
214            })
215            .collect::<Vec<_>>();
216
217        let mut providers = Vec::new();
218        for record in records {
219            if let Ok(provider_record) = self.deserialize_provider_record(&record.value) {
220                providers.push(provider_record);
221            }
222        }
223
224        info!(
225            "Found {} providers for root {:?}",
226            providers.len(),
227            root_cid
228        );
229        Ok(providers)
230    }
231
232    /// Cache a CID if it's in the RSPS for the root
233    pub async fn cache_if_allowed(
234        &self,
235        root_cid: RootCid,
236        cid: Cid,
237        data: Vec<u8>,
238    ) -> Result<bool> {
239        // Check if we have RSPS for this root
240        let summaries = self.provider_summaries.read().await;
241        let _provider_record =
242            summaries
243                .get(&root_cid)
244                .ok_or(P2PError::Storage(StorageError::Database(
245                    std::borrow::Cow::Borrowed("No RSPS for root"),
246                )))?;
247
248        // Use cache admission control
249        let admitted = self.cache.admit(root_cid, cid, data.clone()).map_err(|e| {
250            P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
251                "Cache admission failed: {}",
252                e
253            ))))
254        })?;
255
256        if admitted {
257            // Record in TTL manager
258            let ttl = self.ttl_manager.record_hit(&cid).map_err(|e| {
259                P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
260                    "TTL record failed: {}",
261                    e
262                ))))
263            })?;
264
265            info!("Cached CID {:?} with TTL {:?}", cid, ttl);
266        } else {
267            debug!("CID {:?} not admitted to cache", cid);
268        }
269
270        Ok(admitted)
271    }
272
273    /// Generate a witness receipt for retrieved content
274    pub async fn generate_receipt(&self, cid: &Cid) -> Result<WitnessReceipt> {
275        // Get current epoch (simplified)
276        let epoch = std::time::SystemTime::now()
277            .duration_since(std::time::UNIX_EPOCH)
278            .unwrap_or_default()
279            .as_secs();
280
281        // Create receipt metadata
282        let metadata = ReceiptMetadata {
283            latency_ms: 0,
284            content_size: 0,
285            valid: true,
286            error: None,
287        };
288
289        // Create receipt using proper API
290        let receipt = self.witness_key.create_receipt(*cid, epoch, metadata);
291
292        // Record receipt in TTL manager for extension logic
293        let witness_id = self.witness_key.public_key();
294        self.ttl_manager
295            .record_receipt(cid, witness_id)
296            .map_err(|e| {
297                P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
298                    "Failed to record receipt: {}",
299                    e
300                ))))
301            })?;
302
303        receipt.map_err(|e| {
304            P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
305                "RSPS receipt creation failed: {}",
306                e
307            ))))
308        })
309    }
310
311    /// Batch generate receipts for multiple CIDs
312    pub async fn generate_receipt_batch(&self, cids: &[Cid]) -> Result<Vec<WitnessReceipt>> {
313        let mut batch = Vec::new();
314
315        for cid in cids {
316            let receipt = self.generate_receipt(cid).await?;
317            batch.push(receipt);
318        }
319
320        Ok(batch)
321    }
322
323    /// Verify a witness receipt
324    pub async fn verify_receipt(&self, receipt: &WitnessReceipt) -> Result<bool> {
325        // In production, this would verify against known witness keys
326        // For now, we just check the signature format
327        Ok(!receipt.signature.is_empty())
328    }
329
330    /// Update RSPS for a root based on new content
331    pub async fn update_rsps(&self, root_cid: &RootCid, new_cids: Vec<Cid>) -> Result<()> {
332        let mut summaries = self.provider_summaries.write().await;
333
334        if let Some(record) = summaries.get_mut(root_cid) {
335            // Create new RSPS with updated CIDs
336            let mut all_cids = HashSet::new();
337
338            // Get existing CIDs from RSPS
339            // Note: This requires iterating through possible CIDs to check membership
340            // In production, we'd maintain a separate index
341
342            // Add new CIDs
343            for cid in new_cids {
344                all_cids.insert(cid);
345            }
346
347            // Create updated RSPS
348            let cid_vec: Vec<Cid> = all_cids.into_iter().collect();
349            let new_rsps =
350                Rsps::new(*root_cid, 1, &cid_vec, &RspsConfig::default()).map_err(|e| {
351                    P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
352                        "RSPS creation failed: {}",
353                        e
354                    ))))
355                })?;
356
357            // Update record
358            record.rsps = Arc::new(new_rsps);
359            record.last_updated = SystemTime::now();
360
361            info!("Updated RSPS for root {:?}", root_cid);
362        } else {
363            warn!("No existing RSPS for root {:?}", root_cid);
364        }
365
366        Ok(())
367    }
368
369    /// Clean up expired entries
370    pub async fn cleanup_expired(&self) -> Result<()> {
371        // Note: RootAnchoredCache doesn't have cleanup_expired method
372        // Cache cleanup happens automatically during eviction
373        debug!("Cache cleanup managed automatically");
374
375        // Clean up TTL manager
376        let expired_cids = self.ttl_manager.cleanup_expired();
377        debug!("Removed {} expired TTL entries", expired_cids.len());
378
379        // Clean up old provider summaries
380        let mut summaries = self.provider_summaries.write().await;
381        let now = SystemTime::now();
382        let expired_roots: Vec<RootCid> = summaries
383            .iter()
384            .filter(|(_, record)| {
385                now.duration_since(record.last_updated)
386                    .unwrap_or(Duration::ZERO)
387                    > self.config.summary_update_interval * 2
388            })
389            .map(|(root, _)| *root)
390            .collect();
391
392        for root in expired_roots {
393            summaries.remove(&root);
394            debug!("Removed expired provider summary for root {:?}", root);
395        }
396
397        Ok(())
398    }
399
400    /// Get cache statistics
401    pub async fn get_cache_stats(&self) -> CacheStats {
402        CacheStats {
403            total_cached_items: self.cache.stats().total_items,
404            total_cache_size: self.cache.stats().total_size,
405            roots_tracked: self.provider_summaries.read().await.len(),
406            ttl_stats: TtlStats {
407                hit_count: 0,
408                receipt_count: 0,
409                active_buckets: 0,
410                remaining_ttl: std::time::Duration::ZERO,
411                total_ttl: std::time::Duration::ZERO,
412            },
413        }
414    }
415
416    // Helper methods
417
418    fn provider_key(&self, root_cid: &RootCid, provider: &PeerId) -> Key {
419        let key_str = format!("/rsps/provider/{}/{}", hex::encode(root_cid), provider);
420        let hash = blake3::hash(key_str.as_bytes());
421        *hash.as_bytes()
422    }
423
424    fn provider_key_pattern(&self, root_cid: &RootCid) -> String {
425        format!("/rsps/provider/{}/", hex::encode(root_cid))
426    }
427
428    fn serialize_provider_record(&self, record: &ProviderRecord) -> Result<Vec<u8>> {
429        // In production, use proper serialization (e.g., protobuf)
430        Ok(format!("{:?}", record).into_bytes())
431    }
432
433    fn deserialize_provider_record(&self, _data: &[u8]) -> Result<ProviderRecord> {
434        // In production, use proper deserialization
435        Err(P2PError::Serialization("Not implemented".into()))
436    }
437}
438
439/// Cache statistics
440#[derive(Debug, Clone)]
441pub struct CacheStats {
442    pub total_cached_items: usize,
443    pub total_cache_size: usize,
444    pub roots_tracked: usize,
445    pub ttl_stats: saorsa_rsps::TtlStats,
446}
447
448#[cfg(test)]
449mod tests {
450
451    #[tokio::test]
452    async fn test_rsps_integration() {
453        // Test will be implemented
454    }
455}