1use crate::traits::BlockStore;
26use async_trait::async_trait;
27use ipfrs_core::{Block, Cid, Result as IpfsResult};
28use parking_lot::RwLock;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TtlConfig {
37 pub default_ttl: Duration,
39 pub auto_cleanup: bool,
41 pub cleanup_interval: Duration,
43 pub max_tracked_blocks: usize,
45}
46
47impl TtlConfig {
48 pub fn new(default_ttl: Duration) -> Self {
50 Self {
51 default_ttl,
52 auto_cleanup: true,
53 cleanup_interval: Duration::from_secs(60),
54 max_tracked_blocks: 1_000_000,
55 }
56 }
57
58 pub fn manual_cleanup(default_ttl: Duration) -> Self {
60 Self {
61 default_ttl,
62 auto_cleanup: false,
63 cleanup_interval: Duration::from_secs(60),
64 max_tracked_blocks: 1_000_000,
65 }
66 }
67
68 pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
70 self.cleanup_interval = interval;
71 self
72 }
73
74 pub fn with_max_tracked_blocks(mut self, max: usize) -> Self {
76 self.max_tracked_blocks = max;
77 self
78 }
79}
80
81impl Default for TtlConfig {
82 fn default() -> Self {
83 Self::new(Duration::from_secs(3600)) }
85}
86
87#[derive(Debug, Clone)]
89struct TtlMetadata {
90 stored_at: Instant,
92 ttl: Duration,
94 size: usize,
96}
97
98impl TtlMetadata {
99 fn is_expired(&self) -> bool {
101 self.stored_at.elapsed() >= self.ttl
102 }
103
104 fn time_remaining(&self) -> Option<Duration> {
106 let elapsed = self.stored_at.elapsed();
107 if elapsed < self.ttl {
108 Some(self.ttl - elapsed)
109 } else {
110 None
111 }
112 }
113}
114
115#[derive(Debug, Clone, Default, Serialize, Deserialize)]
117pub struct TtlStats {
118 pub total_tracked: usize,
120 pub expired_cleaned: u64,
122 pub bytes_freed: u64,
124 pub last_cleanup: Option<String>,
126 pub avg_ttl_remaining_secs: u64,
128}
129
130pub struct TtlBlockStore<S: BlockStore> {
132 inner: S,
134 config: TtlConfig,
136 metadata: Arc<RwLock<HashMap<Cid, TtlMetadata>>>,
138 stats: Arc<RwLock<TtlStats>>,
140 last_cleanup: Arc<RwLock<Instant>>,
142}
143
144impl<S: BlockStore> TtlBlockStore<S> {
145 pub fn new(inner: S, config: TtlConfig) -> Self {
147 Self {
148 inner,
149 config,
150 metadata: Arc::new(RwLock::new(HashMap::new())),
151 stats: Arc::new(RwLock::new(TtlStats::default())),
152 last_cleanup: Arc::new(RwLock::new(Instant::now())),
153 }
154 }
155
156 pub fn set_ttl(&self, cid: &Cid, ttl: Duration) {
158 if let Some(metadata) = self.metadata.write().get_mut(cid) {
159 metadata.ttl = ttl;
160 }
161 }
162
163 pub fn get_ttl(&self, cid: &Cid) -> Option<Duration> {
165 self.metadata
166 .read()
167 .get(cid)
168 .and_then(|m| m.time_remaining())
169 }
170
171 pub fn is_expired(&self, cid: &Cid) -> bool {
173 self.metadata
174 .read()
175 .get(cid)
176 .map(|m| m.is_expired())
177 .unwrap_or(false)
178 }
179
180 pub fn stats(&self) -> TtlStats {
182 let mut stats = self.stats.read().clone();
183 stats.total_tracked = self.metadata.read().len();
184
185 let metadata = self.metadata.read();
187 if !metadata.is_empty() {
188 let total_remaining: u64 = metadata
189 .values()
190 .filter_map(|m| m.time_remaining())
191 .map(|d| d.as_secs())
192 .sum();
193 stats.avg_ttl_remaining_secs = total_remaining / metadata.len() as u64;
194 }
195
196 stats
197 }
198
199 pub async fn cleanup_expired(&self) -> IpfsResult<TtlCleanupResult> {
201 let mut to_delete = Vec::new();
202 let mut bytes_to_free = 0usize;
203
204 {
206 let metadata = self.metadata.read();
207 for (cid, meta) in metadata.iter() {
208 if meta.is_expired() {
209 to_delete.push(*cid);
210 bytes_to_free += meta.size;
211 }
212 }
213 }
214
215 let mut deleted_count = 0;
217 for cid in &to_delete {
218 if self.inner.delete(cid).await.is_ok() {
219 self.metadata.write().remove(cid);
220 deleted_count += 1;
221 }
222 }
223
224 {
226 let mut stats = self.stats.write();
227 stats.expired_cleaned += deleted_count;
228 stats.bytes_freed += bytes_to_free as u64;
229 stats.last_cleanup = Some(chrono::Utc::now().to_rfc3339());
230 }
231
232 *self.last_cleanup.write() = Instant::now();
233
234 Ok(TtlCleanupResult {
235 blocks_deleted: deleted_count,
236 bytes_freed: bytes_to_free as u64,
237 })
238 }
239
240 async fn auto_cleanup_if_needed(&self) -> IpfsResult<()> {
242 if !self.config.auto_cleanup {
243 return Ok(());
244 }
245
246 let should_cleanup = {
247 let last = *self.last_cleanup.read();
248 last.elapsed() >= self.config.cleanup_interval
249 };
250
251 if should_cleanup {
252 let _ = self.cleanup_expired().await;
253 }
254
255 Ok(())
256 }
257
258 fn track_block(&self, cid: &Cid, size: usize, ttl: Option<Duration>) {
260 let mut metadata = self.metadata.write();
261
262 if metadata.len() >= self.config.max_tracked_blocks {
264 if let Some(oldest_cid) = metadata.keys().next().cloned() {
266 metadata.remove(&oldest_cid);
267 }
268 }
269
270 metadata.insert(
271 *cid,
272 TtlMetadata {
273 stored_at: Instant::now(),
274 ttl: ttl.unwrap_or(self.config.default_ttl),
275 size,
276 },
277 );
278 }
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct TtlCleanupResult {
284 pub blocks_deleted: u64,
286 pub bytes_freed: u64,
288}
289
290#[async_trait]
291impl<S: BlockStore> BlockStore for TtlBlockStore<S> {
292 async fn get(&self, cid: &Cid) -> IpfsResult<Option<Block>> {
293 if self.is_expired(cid) {
295 let _ = self.inner.delete(cid).await;
297 self.metadata.write().remove(cid);
298 return Ok(None);
299 }
300
301 let _ = self.auto_cleanup_if_needed().await;
303
304 self.inner.get(cid).await
305 }
306
307 async fn put(&self, block: &Block) -> IpfsResult<()> {
308 let cid = *block.cid();
309 let size = block.data().len();
310
311 self.inner.put(block).await?;
313
314 self.track_block(&cid, size, None);
316
317 let _ = self.auto_cleanup_if_needed().await;
319
320 Ok(())
321 }
322
323 async fn has(&self, cid: &Cid) -> IpfsResult<bool> {
324 if self.is_expired(cid) {
326 return Ok(false);
327 }
328
329 self.inner.has(cid).await
330 }
331
332 async fn delete(&self, cid: &Cid) -> IpfsResult<()> {
333 self.metadata.write().remove(cid);
334 self.inner.delete(cid).await
335 }
336
337 fn list_cids(&self) -> IpfsResult<Vec<Cid>> {
338 let mut cids = self.inner.list_cids()?;
339
340 cids.retain(|cid| !self.is_expired(cid));
342
343 Ok(cids)
344 }
345
346 fn len(&self) -> usize {
347 self.list_cids().unwrap_or_default().len()
348 }
349
350 async fn flush(&self) -> IpfsResult<()> {
351 self.inner.flush().await
352 }
353
354 async fn put_many(&self, blocks: &[Block]) -> IpfsResult<()> {
355 for block in blocks {
357 self.track_block(block.cid(), block.data().len(), None);
358 }
359
360 self.inner.put_many(blocks).await
361 }
362
363 async fn get_many(&self, cids: &[Cid]) -> IpfsResult<Vec<Option<Block>>> {
364 let valid_cids: Vec<_> = cids
366 .iter()
367 .filter(|cid| !self.is_expired(cid))
368 .cloned()
369 .collect();
370
371 self.inner.get_many(&valid_cids).await
372 }
373
374 async fn has_many(&self, cids: &[Cid]) -> IpfsResult<Vec<bool>> {
375 let mut results = Vec::with_capacity(cids.len());
376
377 for cid in cids {
378 if self.is_expired(cid) {
379 results.push(false);
380 } else {
381 results.push(self.inner.has(cid).await?);
382 }
383 }
384
385 Ok(results)
386 }
387
388 async fn delete_many(&self, cids: &[Cid]) -> IpfsResult<()> {
389 {
391 let mut metadata = self.metadata.write();
392 for cid in cids {
393 metadata.remove(cid);
394 }
395 }
396
397 self.inner.delete_many(cids).await
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::memory::MemoryBlockStore;
405 use crate::utils::create_block;
406 use tokio::time::sleep;
407
408 #[tokio::test]
409 async fn test_ttl_basic() {
410 let store = MemoryBlockStore::new();
411 let config = TtlConfig::new(Duration::from_millis(100));
412 let ttl_store = TtlBlockStore::new(store, config);
413
414 let block = create_block(b"hello world".to_vec()).unwrap();
415 let cid = block.cid().clone();
416
417 ttl_store.put(&block).await.unwrap();
419
420 assert!(ttl_store.has(&cid).await.unwrap());
422
423 sleep(Duration::from_millis(150)).await;
425
426 assert!(ttl_store.is_expired(&cid));
428 assert!(!ttl_store.has(&cid).await.unwrap());
429 }
430
431 #[tokio::test]
432 async fn test_ttl_custom_per_block() {
433 let store = MemoryBlockStore::new();
434 let config = TtlConfig::new(Duration::from_secs(3600));
435 let ttl_store = TtlBlockStore::new(store, config);
436
437 let block = create_block(b"test".to_vec()).unwrap();
438 let cid = block.cid().clone();
439
440 ttl_store.put(&block).await.unwrap();
441
442 ttl_store.set_ttl(&cid, Duration::from_millis(50));
444
445 sleep(Duration::from_millis(100)).await;
446
447 assert!(ttl_store.is_expired(&cid));
448 }
449
450 #[tokio::test]
451 async fn test_ttl_cleanup() {
452 let store = MemoryBlockStore::new();
453 let config = TtlConfig::new(Duration::from_millis(50));
454 let ttl_store = TtlBlockStore::new(store, config);
455
456 for i in 0..5 {
458 let block = create_block(vec![i; 100]).unwrap();
459 ttl_store.put(&block).await.unwrap();
460 }
461
462 sleep(Duration::from_millis(100)).await;
464
465 let result = ttl_store.cleanup_expired().await.unwrap();
467
468 assert_eq!(result.blocks_deleted, 5);
469 assert!(result.bytes_freed > 0);
470
471 let stats = ttl_store.stats();
472 assert_eq!(stats.expired_cleaned, 5);
473 }
474
475 #[tokio::test]
476 async fn test_ttl_stats() {
477 let store = MemoryBlockStore::new();
478 let config = TtlConfig::new(Duration::from_secs(3600));
479 let ttl_store = TtlBlockStore::new(store, config);
480
481 let block = create_block(b"data".to_vec()).unwrap();
482 ttl_store.put(&block).await.unwrap();
483
484 let stats = ttl_store.stats();
485 assert_eq!(stats.total_tracked, 1);
486 assert!(stats.avg_ttl_remaining_secs > 0);
487 }
488
489 #[tokio::test]
490 async fn test_ttl_max_tracked_blocks() {
491 let store = MemoryBlockStore::new();
492 let config = TtlConfig::new(Duration::from_secs(3600)).with_max_tracked_blocks(3);
493 let ttl_store = TtlBlockStore::new(store, config);
494
495 for i in 0..5 {
497 let block = create_block(vec![i; 10]).unwrap();
498 ttl_store.put(&block).await.unwrap();
499 }
500
501 let stats = ttl_store.stats();
502 assert!(stats.total_tracked <= 3);
503 }
504}