1use crate::dht::optimized_storage::OptimizedDHTStorage;
7use crate::dht::{Key, Record};
8use crate::error::{P2PError, P2pResult as Result, StorageError};
9use crate::{Multiaddr, PeerId};
10use saorsa_rsps::{
11 CachePolicy, Cid, RootAnchoredCache, RootCid, Rsps, RspsConfig, TtlConfig, TtlEngine, TtlStats,
12 WitnessKey, WitnessReceipt, witness::ReceiptMetadata,
13};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{Duration, SystemTime};
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19
20pub struct RspsDhtStorage {
22 base_storage: Arc<OptimizedDHTStorage>,
24 cache: Arc<RootAnchoredCache>,
26 provider_summaries: Arc<RwLock<HashMap<RootCid, ProviderRecord>>>,
28 ttl_manager: Arc<TtlEngine>,
30 witness_key: Arc<WitnessKey>,
32 local_peer: PeerId,
34 config: RspsDhtConfig,
36}
37
38#[derive(Debug, Clone)]
40pub struct RspsDhtConfig {
41 pub max_cache_size: usize,
43 pub max_items_per_root: usize,
45 pub base_ttl: Duration,
47 pub min_receipts_for_extension: usize,
49 pub max_ttl_multiplier: f64,
51 pub pseudonym_refresh_interval: Duration,
53 pub summary_update_interval: Duration,
55}
56
57impl Default for RspsDhtConfig {
58 fn default() -> Self {
59 Self {
60 max_cache_size: 100 * 1024 * 1024, max_items_per_root: 1000,
62 base_ttl: Duration::from_secs(3600), min_receipts_for_extension: 3,
64 max_ttl_multiplier: 8.0,
65 pseudonym_refresh_interval: Duration::from_secs(86400), summary_update_interval: Duration::from_secs(300), }
68 }
69}
70
71impl From<RspsDhtConfig> for RspsConfig {
72 fn from(_config: RspsDhtConfig) -> Self {
73 RspsConfig::default()
75 }
76}
77
78#[derive(Debug, Clone)]
80pub struct ProviderRecord {
81 pub provider: PeerId,
83 pub addresses: Vec<Multiaddr>,
85 pub rsps: Arc<Rsps>,
87 pub last_updated: SystemTime,
89 pub receipts: Vec<WitnessReceipt>,
91}
92
93impl RspsDhtStorage {
94 pub async fn new(
96 base_storage: Arc<OptimizedDHTStorage>,
97 local_peer: PeerId,
98 config: RspsDhtConfig,
99 ) -> Result<Self> {
100 let cache_policy = CachePolicy {
102 max_size: config.max_cache_size,
103 max_items_per_root: config.max_items_per_root,
104 min_root_depth: 2,
105 pledge_ratio: 1.5,
106 };
107 let cache = Arc::new(RootAnchoredCache::new(cache_policy));
108
109 let ttl_config = TtlConfig {
111 base_ttl: config.base_ttl,
112 ttl_per_hit: Duration::from_secs(30 * 60), max_hit_ttl: Duration::from_secs(12 * 3600), ttl_per_receipt: Duration::from_secs(10 * 60), max_receipt_ttl: Duration::from_secs(2 * 3600), bucket_window: Duration::from_secs(5 * 60), };
118 let ttl_manager = Arc::new(TtlEngine::new(ttl_config));
119
120 let witness_key = Arc::new(WitnessKey::generate());
122
123 Ok(Self {
124 base_storage,
125 cache,
126 provider_summaries: Arc::new(RwLock::new(HashMap::new())),
127 ttl_manager,
128 witness_key,
129 local_peer,
130 config,
131 })
132 }
133
134 pub async fn store_provider(
136 &self,
137 root_cid: RootCid,
138 provider: PeerId,
139 addresses: Vec<Multiaddr>,
140 rsps: Rsps,
141 ) -> Result<()> {
142 info!(
143 "Storing provider record for root {:?} from peer {:?}",
144 root_cid, provider
145 );
146
147 let record = ProviderRecord {
149 provider: provider.clone(),
150 addresses,
151 rsps: Arc::new(rsps),
152 last_updated: SystemTime::now(),
153 receipts: Vec::new(),
154 };
155
156 let mut summaries = self.provider_summaries.write().await;
158 summaries.insert(root_cid, record.clone());
159
160 let key = self.provider_key(&root_cid, &provider);
162 let value = self.serialize_provider_record(&record)?;
163
164 let dht_record = Record {
165 key,
166 value,
167 publisher: {
168 let peer_bytes = self.local_peer.as_bytes();
170 let mut node_id_bytes = [0u8; 32];
171 let len = peer_bytes.len().min(32);
172 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
173 crate::identity::node_identity::NodeId::from_bytes(node_id_bytes)
174 },
175 expires_at: SystemTime::now() + self.config.summary_update_interval,
176 created_at: SystemTime::now(),
177 signature: Some(Vec::new()),
178 };
179
180 self.base_storage.store(dht_record).await?;
182
183 debug!("Provider record stored successfully");
184 Ok(())
185 }
186
187 pub async fn find_providers(&self, root_cid: &RootCid) -> Result<Vec<ProviderRecord>> {
189 let summaries = self.provider_summaries.read().await;
190
191 if let Some(record) = summaries.get(root_cid) {
193 debug!("Found provider in local cache");
194 return Ok(vec![record.clone()]);
195 }
196
197 let pattern = self.provider_key_pattern(root_cid);
199 let records = self
200 .base_storage
201 .get_records_by_publisher(&pattern, None)
202 .await
203 .iter()
204 .filter_map(|record| {
205 let key_str = std::str::from_utf8(&record.key).ok()?;
206 if key_str.starts_with(&pattern) {
207 Some(record.clone())
208 } else {
209 None
210 }
211 })
212 .collect::<Vec<_>>();
213
214 let mut providers = Vec::new();
215 for record in records {
216 if let Ok(provider_record) = self.deserialize_provider_record(&record.value) {
217 providers.push(provider_record);
218 }
219 }
220
221 info!(
222 "Found {} providers for root {:?}",
223 providers.len(),
224 root_cid
225 );
226 Ok(providers)
227 }
228
229 pub async fn cache_if_allowed(
231 &self,
232 root_cid: RootCid,
233 cid: Cid,
234 data: Vec<u8>,
235 ) -> Result<bool> {
236 let summaries = self.provider_summaries.read().await;
238 let _provider_record =
239 summaries
240 .get(&root_cid)
241 .ok_or(P2PError::Storage(StorageError::Database(
242 std::borrow::Cow::Borrowed("No RSPS for root"),
243 )))?;
244
245 let admitted = self.cache.admit(root_cid, cid, data.clone()).map_err(|e| {
247 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
248 "Cache admission failed: {}",
249 e
250 ))))
251 })?;
252
253 if admitted {
254 let ttl = self.ttl_manager.record_hit(&cid).map_err(|e| {
256 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
257 "TTL record failed: {}",
258 e
259 ))))
260 })?;
261
262 info!("Cached CID {:?} with TTL {:?}", cid, ttl);
263 } else {
264 debug!("CID {:?} not admitted to cache", cid);
265 }
266
267 Ok(admitted)
268 }
269
270 pub async fn generate_receipt(&self, cid: &Cid) -> Result<WitnessReceipt> {
272 let epoch = std::time::SystemTime::now()
274 .duration_since(std::time::UNIX_EPOCH)
275 .unwrap_or_default()
276 .as_secs();
277
278 let metadata = ReceiptMetadata {
280 latency_ms: 0,
281 content_size: 0,
282 valid: true,
283 error: None,
284 };
285
286 let receipt = self.witness_key.create_receipt(*cid, epoch, metadata);
288
289 let witness_id = self.witness_key.public_key();
291 self.ttl_manager
292 .record_receipt(cid, witness_id)
293 .map_err(|e| {
294 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
295 "Failed to record receipt: {}",
296 e
297 ))))
298 })?;
299
300 receipt.map_err(|e| {
301 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
302 "RSPS receipt creation failed: {}",
303 e
304 ))))
305 })
306 }
307
308 pub async fn generate_receipt_batch(&self, cids: &[Cid]) -> Result<Vec<WitnessReceipt>> {
310 let mut batch = Vec::new();
311
312 for cid in cids {
313 let receipt = self.generate_receipt(cid).await?;
314 batch.push(receipt);
315 }
316
317 Ok(batch)
318 }
319
320 pub async fn verify_receipt(&self, receipt: &WitnessReceipt) -> Result<bool> {
322 Ok(!receipt.signature.is_empty())
325 }
326
327 pub async fn update_rsps(&self, root_cid: &RootCid, new_cids: Vec<Cid>) -> Result<()> {
329 let mut summaries = self.provider_summaries.write().await;
330
331 if let Some(record) = summaries.get_mut(root_cid) {
332 let mut all_cids = HashSet::new();
334
335 for cid in new_cids {
341 all_cids.insert(cid);
342 }
343
344 let cid_vec: Vec<Cid> = all_cids.into_iter().collect();
346 let new_rsps =
347 Rsps::new(*root_cid, 1, &cid_vec, &RspsConfig::default()).map_err(|e| {
348 P2PError::Storage(StorageError::Database(std::borrow::Cow::Owned(format!(
349 "RSPS creation failed: {}",
350 e
351 ))))
352 })?;
353
354 record.rsps = Arc::new(new_rsps);
356 record.last_updated = SystemTime::now();
357
358 info!("Updated RSPS for root {:?}", root_cid);
359 } else {
360 warn!("No existing RSPS for root {:?}", root_cid);
361 }
362
363 Ok(())
364 }
365
366 pub async fn cleanup_expired(&self) -> Result<()> {
368 debug!("Cache cleanup managed automatically");
371
372 let expired_cids = self.ttl_manager.cleanup_expired();
374 debug!("Removed {} expired TTL entries", expired_cids.len());
375
376 let mut summaries = self.provider_summaries.write().await;
378 let now = SystemTime::now();
379 let expired_roots: Vec<RootCid> = summaries
380 .iter()
381 .filter(|(_, record)| {
382 now.duration_since(record.last_updated)
383 .unwrap_or(Duration::ZERO)
384 > self.config.summary_update_interval * 2
385 })
386 .map(|(root, _)| *root)
387 .collect();
388
389 for root in expired_roots {
390 summaries.remove(&root);
391 debug!("Removed expired provider summary for root {:?}", root);
392 }
393
394 Ok(())
395 }
396
397 pub async fn get_cache_stats(&self) -> CacheStats {
399 CacheStats {
400 total_cached_items: self.cache.stats().total_items,
401 total_cache_size: self.cache.stats().total_size,
402 roots_tracked: self.provider_summaries.read().await.len(),
403 ttl_stats: TtlStats {
404 hit_count: 0,
405 receipt_count: 0,
406 active_buckets: 0,
407 remaining_ttl: std::time::Duration::ZERO,
408 total_ttl: std::time::Duration::ZERO,
409 },
410 }
411 }
412
413 fn provider_key(&self, root_cid: &RootCid, provider: &PeerId) -> Key {
416 let key_str = format!("/rsps/provider/{}/{}", hex::encode(root_cid), provider);
417 let hash = blake3::hash(key_str.as_bytes());
418 *hash.as_bytes()
419 }
420
421 fn provider_key_pattern(&self, root_cid: &RootCid) -> String {
422 format!("/rsps/provider/{}/", hex::encode(root_cid))
423 }
424
425 fn serialize_provider_record(&self, record: &ProviderRecord) -> Result<Vec<u8>> {
426 Ok(format!("{:?}", record).into_bytes())
428 }
429
430 fn deserialize_provider_record(&self, _data: &[u8]) -> Result<ProviderRecord> {
431 Err(P2PError::Serialization("Not implemented".into()))
433 }
434}
435
436#[derive(Debug, Clone)]
438pub struct CacheStats {
439 pub total_cached_items: usize,
440 pub total_cache_size: usize,
441 pub roots_tracked: usize,
442 pub ttl_stats: saorsa_rsps::TtlStats,
443}
444
445#[cfg(test)]
446mod tests {
447
448 #[tokio::test]
449 async fn test_rsps_integration() {
450 }
452}