ipfrs_storage/
ttl.rs

1//! Time-To-Live (TTL) support for automatic block expiration
2//!
3//! Provides automatic expiration of blocks after a specified duration.
4//! Useful for:
5//! - Cache invalidation
6//! - Temporary data storage
7//! - Preventing unbounded storage growth
8//! - Compliance with data retention policies
9//!
10//! ## Example
11//! ```no_run
12//! use ipfrs_storage::{TtlBlockStore, TtlConfig, MemoryBlockStore};
13//! use std::time::Duration;
14//!
15//! #[tokio::main]
16//! async fn main() {
17//!     let store = MemoryBlockStore::new();
18//!     let config = TtlConfig::new(Duration::from_secs(3600)); // 1 hour TTL
19//!     let ttl_store = TtlBlockStore::new(store, config);
20//!
21//!     // Blocks will automatically expire after 1 hour
22//! }
23//! ```
24
25use crate::traits::BlockStore;
26use async_trait::async_trait;
27use ipfrs_core::{Block, Cid, Result as IpfsResult};
28use parking_lot::RwLock;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34/// TTL configuration
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TtlConfig {
37    /// Default TTL for blocks
38    pub default_ttl: Duration,
39    /// Enable automatic cleanup of expired blocks
40    pub auto_cleanup: bool,
41    /// Cleanup interval (how often to check for expired blocks)
42    pub cleanup_interval: Duration,
43    /// Maximum number of blocks to track
44    pub max_tracked_blocks: usize,
45}
46
47impl TtlConfig {
48    /// Create a new TTL configuration
49    pub fn new(default_ttl: Duration) -> Self {
50        Self {
51            default_ttl,
52            auto_cleanup: true,
53            cleanup_interval: Duration::from_secs(60),
54            max_tracked_blocks: 1_000_000,
55        }
56    }
57
58    /// Create config with no automatic cleanup
59    pub fn manual_cleanup(default_ttl: Duration) -> Self {
60        Self {
61            default_ttl,
62            auto_cleanup: false,
63            cleanup_interval: Duration::from_secs(60),
64            max_tracked_blocks: 1_000_000,
65        }
66    }
67
68    /// Set cleanup interval
69    pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
70        self.cleanup_interval = interval;
71        self
72    }
73
74    /// Set maximum tracked blocks
75    pub fn with_max_tracked_blocks(mut self, max: usize) -> Self {
76        self.max_tracked_blocks = max;
77        self
78    }
79}
80
81impl Default for TtlConfig {
82    fn default() -> Self {
83        Self::new(Duration::from_secs(3600)) // 1 hour default
84    }
85}
86
87/// TTL metadata for a block
88#[derive(Debug, Clone)]
89struct TtlMetadata {
90    /// When the block was stored
91    stored_at: Instant,
92    /// TTL for this block
93    ttl: Duration,
94    /// Size of the block in bytes
95    size: usize,
96}
97
98impl TtlMetadata {
99    /// Check if block has expired
100    fn is_expired(&self) -> bool {
101        self.stored_at.elapsed() >= self.ttl
102    }
103
104    /// Time remaining before expiration
105    fn time_remaining(&self) -> Option<Duration> {
106        let elapsed = self.stored_at.elapsed();
107        if elapsed < self.ttl {
108            Some(self.ttl - elapsed)
109        } else {
110            None
111        }
112    }
113}
114
115/// TTL statistics
116#[derive(Debug, Clone, Default, Serialize, Deserialize)]
117pub struct TtlStats {
118    /// Total blocks tracked
119    pub total_tracked: usize,
120    /// Expired blocks cleaned up
121    pub expired_cleaned: u64,
122    /// Total bytes freed from cleanup
123    pub bytes_freed: u64,
124    /// Last cleanup time
125    pub last_cleanup: Option<String>,
126    /// Average TTL remaining
127    pub avg_ttl_remaining_secs: u64,
128}
129
130/// Block store with TTL support
131pub struct TtlBlockStore<S: BlockStore> {
132    /// Underlying storage
133    inner: S,
134    /// TTL configuration
135    config: TtlConfig,
136    /// TTL metadata for blocks
137    metadata: Arc<RwLock<HashMap<Cid, TtlMetadata>>>,
138    /// Statistics
139    stats: Arc<RwLock<TtlStats>>,
140    /// Last cleanup time
141    last_cleanup: Arc<RwLock<Instant>>,
142}
143
144impl<S: BlockStore> TtlBlockStore<S> {
145    /// Create a new TTL block store
146    pub fn new(inner: S, config: TtlConfig) -> Self {
147        Self {
148            inner,
149            config,
150            metadata: Arc::new(RwLock::new(HashMap::new())),
151            stats: Arc::new(RwLock::new(TtlStats::default())),
152            last_cleanup: Arc::new(RwLock::new(Instant::now())),
153        }
154    }
155
156    /// Set TTL for a specific block
157    pub fn set_ttl(&self, cid: &Cid, ttl: Duration) {
158        if let Some(metadata) = self.metadata.write().get_mut(cid) {
159            metadata.ttl = ttl;
160        }
161    }
162
163    /// Get TTL for a block
164    pub fn get_ttl(&self, cid: &Cid) -> Option<Duration> {
165        self.metadata
166            .read()
167            .get(cid)
168            .and_then(|m| m.time_remaining())
169    }
170
171    /// Check if a block has expired
172    pub fn is_expired(&self, cid: &Cid) -> bool {
173        self.metadata
174            .read()
175            .get(cid)
176            .map(|m| m.is_expired())
177            .unwrap_or(false)
178    }
179
180    /// Get statistics
181    pub fn stats(&self) -> TtlStats {
182        let mut stats = self.stats.read().clone();
183        stats.total_tracked = self.metadata.read().len();
184
185        // Calculate average TTL remaining
186        let metadata = self.metadata.read();
187        if !metadata.is_empty() {
188            let total_remaining: u64 = metadata
189                .values()
190                .filter_map(|m| m.time_remaining())
191                .map(|d| d.as_secs())
192                .sum();
193            stats.avg_ttl_remaining_secs = total_remaining / metadata.len() as u64;
194        }
195
196        stats
197    }
198
199    /// Manually trigger cleanup of expired blocks
200    pub async fn cleanup_expired(&self) -> IpfsResult<TtlCleanupResult> {
201        let mut to_delete = Vec::new();
202        let mut bytes_to_free = 0usize;
203
204        // Find expired blocks
205        {
206            let metadata = self.metadata.read();
207            for (cid, meta) in metadata.iter() {
208                if meta.is_expired() {
209                    to_delete.push(*cid);
210                    bytes_to_free += meta.size;
211                }
212            }
213        }
214
215        // Delete expired blocks
216        let mut deleted_count = 0;
217        for cid in &to_delete {
218            if self.inner.delete(cid).await.is_ok() {
219                self.metadata.write().remove(cid);
220                deleted_count += 1;
221            }
222        }
223
224        // Update statistics
225        {
226            let mut stats = self.stats.write();
227            stats.expired_cleaned += deleted_count;
228            stats.bytes_freed += bytes_to_free as u64;
229            stats.last_cleanup = Some(chrono::Utc::now().to_rfc3339());
230        }
231
232        *self.last_cleanup.write() = Instant::now();
233
234        Ok(TtlCleanupResult {
235            blocks_deleted: deleted_count,
236            bytes_freed: bytes_to_free as u64,
237        })
238    }
239
240    /// Check and perform auto-cleanup if needed
241    async fn auto_cleanup_if_needed(&self) -> IpfsResult<()> {
242        if !self.config.auto_cleanup {
243            return Ok(());
244        }
245
246        let should_cleanup = {
247            let last = *self.last_cleanup.read();
248            last.elapsed() >= self.config.cleanup_interval
249        };
250
251        if should_cleanup {
252            let _ = self.cleanup_expired().await;
253        }
254
255        Ok(())
256    }
257
258    /// Track a new block
259    fn track_block(&self, cid: &Cid, size: usize, ttl: Option<Duration>) {
260        let mut metadata = self.metadata.write();
261
262        // Enforce max tracked blocks limit
263        if metadata.len() >= self.config.max_tracked_blocks {
264            // Remove oldest block (simple FIFO eviction)
265            if let Some(oldest_cid) = metadata.keys().next().cloned() {
266                metadata.remove(&oldest_cid);
267            }
268        }
269
270        metadata.insert(
271            *cid,
272            TtlMetadata {
273                stored_at: Instant::now(),
274                ttl: ttl.unwrap_or(self.config.default_ttl),
275                size,
276            },
277        );
278    }
279}
280
281/// Result of TTL cleanup operation
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct TtlCleanupResult {
284    /// Number of blocks deleted
285    pub blocks_deleted: u64,
286    /// Bytes freed
287    pub bytes_freed: u64,
288}
289
290#[async_trait]
291impl<S: BlockStore> BlockStore for TtlBlockStore<S> {
292    async fn get(&self, cid: &Cid) -> IpfsResult<Option<Block>> {
293        // Check if expired
294        if self.is_expired(cid) {
295            // Remove expired block
296            let _ = self.inner.delete(cid).await;
297            self.metadata.write().remove(cid);
298            return Ok(None);
299        }
300
301        // Trigger auto-cleanup if needed
302        let _ = self.auto_cleanup_if_needed().await;
303
304        self.inner.get(cid).await
305    }
306
307    async fn put(&self, block: &Block) -> IpfsResult<()> {
308        let cid = *block.cid();
309        let size = block.data().len();
310
311        // Store block
312        self.inner.put(block).await?;
313
314        // Track TTL
315        self.track_block(&cid, size, None);
316
317        // Trigger auto-cleanup if needed
318        let _ = self.auto_cleanup_if_needed().await;
319
320        Ok(())
321    }
322
323    async fn has(&self, cid: &Cid) -> IpfsResult<bool> {
324        // Check if expired
325        if self.is_expired(cid) {
326            return Ok(false);
327        }
328
329        self.inner.has(cid).await
330    }
331
332    async fn delete(&self, cid: &Cid) -> IpfsResult<()> {
333        self.metadata.write().remove(cid);
334        self.inner.delete(cid).await
335    }
336
337    fn list_cids(&self) -> IpfsResult<Vec<Cid>> {
338        let mut cids = self.inner.list_cids()?;
339
340        // Filter out expired blocks
341        cids.retain(|cid| !self.is_expired(cid));
342
343        Ok(cids)
344    }
345
346    fn len(&self) -> usize {
347        self.list_cids().unwrap_or_default().len()
348    }
349
350    async fn flush(&self) -> IpfsResult<()> {
351        self.inner.flush().await
352    }
353
354    async fn put_many(&self, blocks: &[Block]) -> IpfsResult<()> {
355        // Track all blocks
356        for block in blocks {
357            self.track_block(block.cid(), block.data().len(), None);
358        }
359
360        self.inner.put_many(blocks).await
361    }
362
363    async fn get_many(&self, cids: &[Cid]) -> IpfsResult<Vec<Option<Block>>> {
364        // Filter out expired CIDs
365        let valid_cids: Vec<_> = cids
366            .iter()
367            .filter(|cid| !self.is_expired(cid))
368            .cloned()
369            .collect();
370
371        self.inner.get_many(&valid_cids).await
372    }
373
374    async fn has_many(&self, cids: &[Cid]) -> IpfsResult<Vec<bool>> {
375        let mut results = Vec::with_capacity(cids.len());
376
377        for cid in cids {
378            if self.is_expired(cid) {
379                results.push(false);
380            } else {
381                results.push(self.inner.has(cid).await?);
382            }
383        }
384
385        Ok(results)
386    }
387
388    async fn delete_many(&self, cids: &[Cid]) -> IpfsResult<()> {
389        // Remove from metadata
390        {
391            let mut metadata = self.metadata.write();
392            for cid in cids {
393                metadata.remove(cid);
394            }
395        }
396
397        self.inner.delete_many(cids).await
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::memory::MemoryBlockStore;
405    use crate::utils::create_block;
406    use tokio::time::sleep;
407
408    #[tokio::test]
409    async fn test_ttl_basic() {
410        let store = MemoryBlockStore::new();
411        let config = TtlConfig::new(Duration::from_millis(100));
412        let ttl_store = TtlBlockStore::new(store, config);
413
414        let block = create_block(b"hello world".to_vec()).unwrap();
415        let cid = block.cid().clone();
416
417        // Put block
418        ttl_store.put(&block).await.unwrap();
419
420        // Should exist immediately
421        assert!(ttl_store.has(&cid).await.unwrap());
422
423        // Wait for expiration
424        sleep(Duration::from_millis(150)).await;
425
426        // Should be expired
427        assert!(ttl_store.is_expired(&cid));
428        assert!(!ttl_store.has(&cid).await.unwrap());
429    }
430
431    #[tokio::test]
432    async fn test_ttl_custom_per_block() {
433        let store = MemoryBlockStore::new();
434        let config = TtlConfig::new(Duration::from_secs(3600));
435        let ttl_store = TtlBlockStore::new(store, config);
436
437        let block = create_block(b"test".to_vec()).unwrap();
438        let cid = block.cid().clone();
439
440        ttl_store.put(&block).await.unwrap();
441
442        // Set custom TTL
443        ttl_store.set_ttl(&cid, Duration::from_millis(50));
444
445        sleep(Duration::from_millis(100)).await;
446
447        assert!(ttl_store.is_expired(&cid));
448    }
449
450    #[tokio::test]
451    async fn test_ttl_cleanup() {
452        let store = MemoryBlockStore::new();
453        let config = TtlConfig::new(Duration::from_millis(50));
454        let ttl_store = TtlBlockStore::new(store, config);
455
456        // Add some blocks
457        for i in 0..5 {
458            let block = create_block(vec![i; 100]).unwrap();
459            ttl_store.put(&block).await.unwrap();
460        }
461
462        // Wait for expiration
463        sleep(Duration::from_millis(100)).await;
464
465        // Trigger cleanup
466        let result = ttl_store.cleanup_expired().await.unwrap();
467
468        assert_eq!(result.blocks_deleted, 5);
469        assert!(result.bytes_freed > 0);
470
471        let stats = ttl_store.stats();
472        assert_eq!(stats.expired_cleaned, 5);
473    }
474
475    #[tokio::test]
476    async fn test_ttl_stats() {
477        let store = MemoryBlockStore::new();
478        let config = TtlConfig::new(Duration::from_secs(3600));
479        let ttl_store = TtlBlockStore::new(store, config);
480
481        let block = create_block(b"data".to_vec()).unwrap();
482        ttl_store.put(&block).await.unwrap();
483
484        let stats = ttl_store.stats();
485        assert_eq!(stats.total_tracked, 1);
486        assert!(stats.avg_ttl_remaining_secs > 0);
487    }
488
489    #[tokio::test]
490    async fn test_ttl_max_tracked_blocks() {
491        let store = MemoryBlockStore::new();
492        let config = TtlConfig::new(Duration::from_secs(3600)).with_max_tracked_blocks(3);
493        let ttl_store = TtlBlockStore::new(store, config);
494
495        // Add more blocks than the limit
496        for i in 0..5 {
497            let block = create_block(vec![i; 10]).unwrap();
498            ttl_store.put(&block).await.unwrap();
499        }
500
501        let stats = ttl_store.stats();
502        assert!(stats.total_tracked <= 3);
503    }
504}