1use crate::dht::optimized_storage::OptimizedDHTStorage;
7use crate::dht::{Key, Record};
8use crate::error::{P2PError, P2pResult as Result, StorageError};
9use crate::{Multiaddr, PeerId};
10use saorsa_rsps::{
11 CachePolicy, Cid, RootAnchoredCache, RootCid, Rsps, RspsConfig, TtlConfig, TtlEngine, TtlStats,
12 WitnessKey, WitnessReceipt, witness::ReceiptMetadata,
13};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{Duration, SystemTime};
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19
20pub struct RspsDhtStorage {
22 base_storage: Arc<OptimizedDHTStorage>,
24 cache: Arc<RootAnchoredCache>,
26 provider_summaries: Arc<RwLock<HashMap<RootCid, ProviderRecord>>>,
28 ttl_manager: Arc<TtlEngine>,
30 witness_key: Arc<WitnessKey>,
32 local_peer: PeerId,
34 config: RspsDhtConfig,
36}
37
38#[derive(Debug, Clone)]
40pub struct RspsDhtConfig {
41 pub max_cache_size: usize,
43 pub max_items_per_root: usize,
45 pub base_ttl: Duration,
47 pub min_receipts_for_extension: usize,
49 pub max_ttl_multiplier: f64,
51 pub pseudonym_refresh_interval: Duration,
53 pub summary_update_interval: Duration,
55}
56
57impl Default for RspsDhtConfig {
58 fn default() -> Self {
59 Self {
60 max_cache_size: 100 * 1024 * 1024, max_items_per_root: 1000,
62 base_ttl: Duration::from_secs(3600), min_receipts_for_extension: 3,
64 max_ttl_multiplier: 8.0,
65 pseudonym_refresh_interval: Duration::from_secs(86400), summary_update_interval: Duration::from_secs(300), }
68 }
69}
70
71impl From<RspsDhtConfig> for RspsConfig {
72 fn from(_config: RspsDhtConfig) -> Self {
73 RspsConfig::default()
75 }
76}
77
78#[derive(Debug, Clone)]
80pub struct ProviderRecord {
81 pub provider: PeerId,
83 pub addresses: Vec<Multiaddr>,
85 pub rsps: Arc<Rsps>,
87 pub last_updated: SystemTime,
89 pub receipts: Vec<WitnessReceipt>,
91}
92
93impl RspsDhtStorage {
94 pub async fn new(
96 base_storage: Arc<OptimizedDHTStorage>,
97 local_peer: PeerId,
98 config: RspsDhtConfig,
99 ) -> Result<Self> {
100 let cache_policy = CachePolicy {
102 max_size: config.max_cache_size,
103 max_items_per_root: config.max_items_per_root,
104 min_root_depth: 2,
105 pledge_ratio: 1.5,
106 };
107 let cache = Arc::new(RootAnchoredCache::new(cache_policy));
108
109 let ttl_config = TtlConfig {
111 base_ttl: config.base_ttl,
112 ttl_per_hit: Duration::from_secs(30 * 60), max_hit_ttl: Duration::from_secs(12 * 3600), ttl_per_receipt: Duration::from_secs(10 * 60), max_receipt_ttl: Duration::from_secs(2 * 3600), bucket_window: Duration::from_secs(5 * 60), };
118 let ttl_manager = Arc::new(TtlEngine::new(ttl_config));
119
120 let witness_key = Arc::new(WitnessKey::generate());
122
123 Ok(Self {
124 base_storage,
125 cache,
126 provider_summaries: Arc::new(RwLock::new(HashMap::new())),
127 ttl_manager,
128 witness_key,
129 local_peer,
130 config,
131 })
132 }
133
134 pub async fn store_provider(
136 &self,
137 root_cid: RootCid,
138 provider: PeerId,
139 addresses: Vec<Multiaddr>,
140 rsps: Rsps,
141 ) -> Result<()> {
142 info!(
143 "Storing provider record for root {:?} from peer {:?}",
144 root_cid, provider
145 );
146
147 self.cache.register_rsps(rsps.clone());
149
150 let record = ProviderRecord {
152 provider: provider.clone(),
153 addresses,
154 rsps: Arc::new(rsps),
155 last_updated: SystemTime::now(),
156 receipts: Vec::new(),
157 };
158
159 let mut summaries = self.provider_summaries.write().await;
161 summaries.insert(root_cid, record.clone());
162
163 let key = self.provider_key(&root_cid, &provider);
165 let value = self.serialize_provider_record(&record)?;
166
167 let dht_record = Record {
168 key,
169 value,
170 publisher: {
171 let peer_bytes = self.local_peer.as_bytes();
173 let mut node_id_bytes = [0u8; 32];
174 let len = peer_bytes.len().min(32);
175 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
176 crate::identity::node_identity::NodeId::from_bytes(node_id_bytes)
177 },
178 expires_at: SystemTime::now() + self.config.summary_update_interval,
179 created_at: SystemTime::now(),
180 signature: Some(Vec::new()),
181 };
182
183 self.base_storage.store(dht_record).await?;
185
186 debug!("Provider record stored successfully");
187 Ok(())
188 }
189
190 pub async fn find_providers(&self, root_cid: &RootCid) -> Result<Vec<ProviderRecord>> {
192 let summaries = self.provider_summaries.read().await;
193
194 if let Some(record) = summaries.get(root_cid) {
196 debug!("Found provider in local cache");
197 return Ok(vec![record.clone()]);
198 }
199
200 let pattern = self.provider_key_pattern(root_cid);
202 let records = self
203 .base_storage
204 .get_records_by_publisher(&pattern, None)
205 .await
206 .iter()
207 .filter_map(|record| {
208 let key_str = std::str::from_utf8(&record.key).ok()?;
209 if key_str.starts_with(&pattern) {
210 Some(record.clone())
211 } else {
212 None
213 }
214 })
215 .collect::<Vec<_>>();
216
217 let mut providers = Vec::new();
218 for record in records {
219 if let Ok(provider_record) = self.deserialize_provider_record(&record.value) {
220 providers.push(provider_record);
221 }
222 }
223
224 info!(
225 "Found {} providers for root {:?}",
226 providers.len(),
227 root_cid
228 );
229 Ok(providers)
230 }
231
232 pub async fn cache_if_allowed(
234 &self,
235 root_cid: RootCid,
236 cid: Cid,
237 data: Vec<u8>,
238 ) -> Result<bool> {
239 let summaries = self.provider_summaries.read().await;
241 let _provider_record =
242 summaries
243 .get(&root_cid)
244 .ok_or(P2PError::Storage(StorageError::Database(
245 std::borrow::Cow::Borrowed("No RSPS for root"),
246 )))?;
247
248 let admitted = self.cache.admit(root_cid, cid, data.clone()).map_err(|e| {
250 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
251 "Cache admission failed: {}",
252 e
253 ))))
254 })?;
255
256 if admitted {
257 let ttl = self.ttl_manager.record_hit(&cid).map_err(|e| {
259 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
260 "TTL record failed: {}",
261 e
262 ))))
263 })?;
264
265 info!("Cached CID {:?} with TTL {:?}", cid, ttl);
266 } else {
267 debug!("CID {:?} not admitted to cache", cid);
268 }
269
270 Ok(admitted)
271 }
272
273 pub async fn generate_receipt(&self, cid: &Cid) -> Result<WitnessReceipt> {
275 let epoch = std::time::SystemTime::now()
277 .duration_since(std::time::UNIX_EPOCH)
278 .unwrap_or_default()
279 .as_secs();
280
281 let metadata = ReceiptMetadata {
283 latency_ms: 0,
284 content_size: 0,
285 valid: true,
286 error: None,
287 };
288
289 let receipt = self.witness_key.create_receipt(*cid, epoch, metadata);
291
292 let witness_id = self.witness_key.public_key();
294 self.ttl_manager
295 .record_receipt(cid, witness_id)
296 .map_err(|e| {
297 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
298 "Failed to record receipt: {}",
299 e
300 ))))
301 })?;
302
303 receipt.map_err(|e| {
304 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
305 "RSPS receipt creation failed: {}",
306 e
307 ))))
308 })
309 }
310
311 pub async fn generate_receipt_batch(&self, cids: &[Cid]) -> Result<Vec<WitnessReceipt>> {
313 let mut batch = Vec::new();
314
315 for cid in cids {
316 let receipt = self.generate_receipt(cid).await?;
317 batch.push(receipt);
318 }
319
320 Ok(batch)
321 }
322
323 pub async fn verify_receipt(&self, receipt: &WitnessReceipt) -> Result<bool> {
325 Ok(!receipt.signature.is_empty())
328 }
329
330 pub async fn update_rsps(&self, root_cid: &RootCid, new_cids: Vec<Cid>) -> Result<()> {
332 let mut summaries = self.provider_summaries.write().await;
333
334 if let Some(record) = summaries.get_mut(root_cid) {
335 let mut all_cids = HashSet::new();
337
338 for cid in new_cids {
344 all_cids.insert(cid);
345 }
346
347 let cid_vec: Vec<Cid> = all_cids.into_iter().collect();
349 let new_rsps =
350 Rsps::new(*root_cid, 1, &cid_vec, &RspsConfig::default()).map_err(|e| {
351 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
352 "RSPS creation failed: {}",
353 e
354 ))))
355 })?;
356
357 record.rsps = Arc::new(new_rsps);
359 record.last_updated = SystemTime::now();
360
361 info!("Updated RSPS for root {:?}", root_cid);
362 } else {
363 warn!("No existing RSPS for root {:?}", root_cid);
364 }
365
366 Ok(())
367 }
368
369 pub async fn cleanup_expired(&self) -> Result<()> {
371 debug!("Cache cleanup managed automatically");
374
375 let expired_cids = self.ttl_manager.cleanup_expired();
377 debug!("Removed {} expired TTL entries", expired_cids.len());
378
379 let mut summaries = self.provider_summaries.write().await;
381 let now = SystemTime::now();
382 let expired_roots: Vec<RootCid> = summaries
383 .iter()
384 .filter(|(_, record)| {
385 now.duration_since(record.last_updated)
386 .unwrap_or(Duration::ZERO)
387 > self.config.summary_update_interval * 2
388 })
389 .map(|(root, _)| *root)
390 .collect();
391
392 for root in expired_roots {
393 summaries.remove(&root);
394 debug!("Removed expired provider summary for root {:?}", root);
395 }
396
397 Ok(())
398 }
399
400 pub async fn get_cache_stats(&self) -> CacheStats {
402 CacheStats {
403 total_cached_items: self.cache.stats().total_items,
404 total_cache_size: self.cache.stats().total_size,
405 roots_tracked: self.provider_summaries.read().await.len(),
406 ttl_stats: TtlStats {
407 hit_count: 0,
408 receipt_count: 0,
409 active_buckets: 0,
410 remaining_ttl: std::time::Duration::ZERO,
411 total_ttl: std::time::Duration::ZERO,
412 },
413 }
414 }
415
416 fn provider_key(&self, root_cid: &RootCid, provider: &PeerId) -> Key {
419 let key_str = format!("/rsps/provider/{}/{}", hex::encode(root_cid), provider);
420 let hash = blake3::hash(key_str.as_bytes());
421 *hash.as_bytes()
422 }
423
424 fn provider_key_pattern(&self, root_cid: &RootCid) -> String {
425 format!("/rsps/provider/{}/", hex::encode(root_cid))
426 }
427
428 fn serialize_provider_record(&self, record: &ProviderRecord) -> Result<Vec<u8>> {
429 Ok(format!("{:?}", record).into_bytes())
431 }
432
433 fn deserialize_provider_record(&self, _data: &[u8]) -> Result<ProviderRecord> {
434 Err(P2PError::Serialization("Not implemented".into()))
436 }
437}
438
439#[derive(Debug, Clone)]
441pub struct CacheStats {
442 pub total_cached_items: usize,
443 pub total_cache_size: usize,
444 pub roots_tracked: usize,
445 pub ttl_stats: saorsa_rsps::TtlStats,
446}
447
448#[cfg(test)]
449mod tests {
450
451 #[tokio::test]
452 async fn test_rsps_integration() {
453 }
455}