ipfrs_storage/
lifecycle.rs

1//! Lifecycle Policies for automatic data management
2//!
3//! This module provides lifecycle management for storage blocks:
4//! - Age-based tiering (move to cold storage after N days)
5//! - Access-based tiering (archive rarely accessed data)
6//! - Size-based policies (different rules for different block sizes)
7//! - Automatic expiration and deletion
8//! - Policy evaluation and enforcement
9//! - Lifecycle statistics and reporting
10
11use crate::traits::BlockStore;
12use dashmap::DashMap;
13use ipfrs_core::Cid;
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, SystemTime};
17use tracing::{debug, info};
18
19/// Lifecycle action
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21pub enum LifecycleAction {
22    /// Move to a different storage tier
23    Transition(StorageTier),
24    /// Delete the block permanently
25    Delete,
26    /// Archive (compress and move to cold storage)
27    Archive,
28    /// Mark for manual review
29    Review,
30}
31
32/// Storage tier
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
34pub enum StorageTier {
35    /// Hot storage (fast, expensive)
36    Hot,
37    /// Warm storage (balanced)
38    Warm,
39    /// Cold storage (slow, cheap)
40    Cold,
41    /// Archive storage (very slow, very cheap)
42    Archive,
43}
44
45/// Lifecycle rule condition
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum LifecycleCondition {
48    /// Age in days since creation
49    AgeDays(u32),
50    /// Number of days since last access
51    DaysSinceLastAccess(u32),
52    /// Access count below threshold
53    AccessCountBelow(u64),
54    /// Block size in bytes
55    SizeBytes { min: Option<u64>, max: Option<u64> },
56    /// Current storage tier
57    CurrentTier(StorageTier),
58    /// Multiple conditions (AND)
59    And(Vec<LifecycleCondition>),
60    /// Multiple conditions (OR)
61    Or(Vec<LifecycleCondition>),
62}
63
64/// Lifecycle rule
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct LifecycleRule {
67    /// Rule identifier
68    pub id: String,
69    /// Rule description
70    pub description: String,
71    /// Condition to trigger this rule
72    pub condition: LifecycleCondition,
73    /// Action to take when condition is met
74    pub action: LifecycleAction,
75    /// Rule priority (higher = evaluated first)
76    pub priority: u32,
77    /// Whether the rule is enabled
78    pub enabled: bool,
79}
80
81impl LifecycleRule {
82    /// Create a new lifecycle rule
83    pub fn new(
84        id: String,
85        description: String,
86        condition: LifecycleCondition,
87        action: LifecycleAction,
88    ) -> Self {
89        Self {
90            id,
91            description,
92            condition,
93            action,
94            priority: 100,
95            enabled: true,
96        }
97    }
98
99    /// Check if the rule applies to a block
100    #[allow(dead_code)]
101    fn matches(&self, metadata: &BlockMetadata) -> bool {
102        if !self.enabled {
103            return false;
104        }
105        self.evaluate_condition(&self.condition, metadata)
106    }
107
108    fn evaluate_condition(&self, condition: &LifecycleCondition, metadata: &BlockMetadata) -> bool {
109        match condition {
110            LifecycleCondition::AgeDays(days) => {
111                let age = SystemTime::now()
112                    .duration_since(metadata.created_at)
113                    .unwrap_or_default();
114                age >= Duration::from_secs(*days as u64 * 86400)
115            }
116            LifecycleCondition::DaysSinceLastAccess(days) => {
117                if let Some(last_access) = metadata.last_accessed {
118                    let duration = SystemTime::now()
119                        .duration_since(last_access)
120                        .unwrap_or_default();
121                    duration >= Duration::from_secs(*days as u64 * 86400)
122                } else {
123                    // Never accessed = treat as very old
124                    *days == 0
125                }
126            }
127            LifecycleCondition::AccessCountBelow(threshold) => metadata.access_count < *threshold,
128            LifecycleCondition::SizeBytes { min, max } => {
129                if let Some(min_size) = min {
130                    if metadata.size < *min_size {
131                        return false;
132                    }
133                }
134                if let Some(max_size) = max {
135                    if metadata.size > *max_size {
136                        return false;
137                    }
138                }
139                true
140            }
141            LifecycleCondition::CurrentTier(tier) => metadata.tier == *tier,
142            LifecycleCondition::And(conditions) => conditions
143                .iter()
144                .all(|c| self.evaluate_condition(c, metadata)),
145            LifecycleCondition::Or(conditions) => conditions
146                .iter()
147                .any(|c| self.evaluate_condition(c, metadata)),
148        }
149    }
150}
151
152/// Block metadata for lifecycle management
153#[derive(Debug, Clone)]
154pub struct BlockMetadata {
155    /// Block CID
156    pub cid: Cid,
157    /// Block size in bytes
158    pub size: u64,
159    /// Creation time
160    pub created_at: SystemTime,
161    /// Last access time
162    pub last_accessed: Option<SystemTime>,
163    /// Number of accesses
164    pub access_count: u64,
165    /// Current storage tier
166    pub tier: StorageTier,
167}
168
169/// Lifecycle policy configuration
170#[derive(Debug, Clone)]
171pub struct LifecyclePolicyConfig {
172    /// Evaluation interval
173    pub evaluation_interval: Duration,
174    /// Maximum actions per evaluation
175    pub max_actions_per_evaluation: usize,
176    /// Dry run mode (don't actually perform actions)
177    pub dry_run: bool,
178}
179
180impl Default for LifecyclePolicyConfig {
181    fn default() -> Self {
182        Self {
183            evaluation_interval: Duration::from_secs(3600), // 1 hour
184            max_actions_per_evaluation: 1000,
185            dry_run: false,
186        }
187    }
188}
189
190/// Lifecycle action result
191#[derive(Debug, Clone)]
192pub struct LifecycleActionResult {
193    /// Block CID
194    pub cid: Cid,
195    /// Rule that triggered the action
196    pub rule_id: String,
197    /// Action taken
198    pub action: LifecycleAction,
199    /// Whether the action succeeded
200    pub success: bool,
201    /// Error message if failed
202    pub error: Option<String>,
203}
204
205/// Lifecycle statistics
206#[derive(Debug, Default)]
207pub struct LifecycleStats {
208    /// Total evaluations run
209    pub evaluations_run: AtomicU64,
210    /// Total blocks evaluated
211    pub blocks_evaluated: AtomicU64,
212    /// Actions taken by type
213    pub transitions: AtomicU64,
214    pub deletions: AtomicU64,
215    pub archives: AtomicU64,
216    pub reviews: AtomicU64,
217    /// Failed actions
218    pub failures: AtomicU64,
219}
220
221impl LifecycleStats {
222    fn record_evaluation(&self, blocks_count: u64) {
223        self.evaluations_run.fetch_add(1, Ordering::Relaxed);
224        self.blocks_evaluated
225            .fetch_add(blocks_count, Ordering::Relaxed);
226    }
227
228    fn record_action(&self, action: LifecycleAction, success: bool) {
229        if success {
230            match action {
231                LifecycleAction::Transition(_) => {
232                    self.transitions.fetch_add(1, Ordering::Relaxed);
233                }
234                LifecycleAction::Delete => {
235                    self.deletions.fetch_add(1, Ordering::Relaxed);
236                }
237                LifecycleAction::Archive => {
238                    self.archives.fetch_add(1, Ordering::Relaxed);
239                }
240                LifecycleAction::Review => {
241                    self.reviews.fetch_add(1, Ordering::Relaxed);
242                }
243            }
244        } else {
245            self.failures.fetch_add(1, Ordering::Relaxed);
246        }
247    }
248}
249
250/// Lifecycle policy manager
251pub struct LifecyclePolicyManager {
252    rules: parking_lot::RwLock<Vec<LifecycleRule>>,
253    metadata: DashMap<Cid, BlockMetadata>,
254    config: parking_lot::RwLock<LifecyclePolicyConfig>,
255    stats: LifecycleStats,
256}
257
258impl LifecyclePolicyManager {
259    /// Create a new lifecycle policy manager
260    pub fn new(config: LifecyclePolicyConfig) -> Self {
261        Self {
262            rules: parking_lot::RwLock::new(Vec::new()),
263            metadata: DashMap::new(),
264            config: parking_lot::RwLock::new(config),
265            stats: LifecycleStats::default(),
266        }
267    }
268
269    /// Add a lifecycle rule
270    pub fn add_rule(&self, rule: LifecycleRule) {
271        let mut rules = self.rules.write();
272        rules.push(rule.clone());
273        rules.sort_by(|a, b| b.priority.cmp(&a.priority));
274        debug!("Added lifecycle rule: {}", rule.id);
275    }
276
277    /// Remove a lifecycle rule
278    pub fn remove_rule(&self, rule_id: &str) -> bool {
279        let mut rules = self.rules.write();
280        if let Some(pos) = rules.iter().position(|r| r.id == rule_id) {
281            rules.remove(pos);
282            debug!("Removed lifecycle rule: {}", rule_id);
283            true
284        } else {
285            false
286        }
287    }
288
289    /// Get all rules
290    pub fn get_rules(&self) -> Vec<LifecycleRule> {
291        self.rules.read().clone()
292    }
293
294    /// Register block metadata
295    pub fn register_block(&self, metadata: BlockMetadata) {
296        self.metadata.insert(metadata.cid, metadata);
297    }
298
299    /// Update block access
300    pub fn record_access(&self, cid: &Cid) {
301        if let Some(mut entry) = self.metadata.get_mut(cid) {
302            entry.last_accessed = Some(SystemTime::now());
303            entry.access_count += 1;
304        }
305    }
306
307    /// Evaluate policies and return actions to take
308    pub fn evaluate(&self) -> Vec<LifecycleActionResult> {
309        let rules = self.rules.read();
310        let config = self.config.read();
311        let mut results = Vec::new();
312
313        let blocks_count = self.metadata.len() as u64;
314        self.stats.record_evaluation(blocks_count);
315
316        for entry in self.metadata.iter() {
317            if results.len() >= config.max_actions_per_evaluation {
318                break;
319            }
320
321            let metadata = entry.value();
322
323            // Find first matching rule
324            for rule in rules.iter() {
325                if rule.matches(metadata) {
326                    let result = LifecycleActionResult {
327                        cid: metadata.cid,
328                        rule_id: rule.id.clone(),
329                        action: rule.action,
330                        success: !config.dry_run,
331                        error: if config.dry_run {
332                            Some("Dry run mode".to_string())
333                        } else {
334                            None
335                        },
336                    };
337
338                    self.stats.record_action(rule.action, !config.dry_run);
339                    results.push(result);
340                    break; // Only apply first matching rule
341                }
342            }
343        }
344
345        if !results.is_empty() {
346            info!(
347                "Lifecycle evaluation completed: {} actions recommended",
348                results.len()
349            );
350        }
351
352        results
353    }
354
355    /// Apply lifecycle actions to a block store
356    pub async fn apply_actions<S: BlockStore>(
357        &self,
358        store: &S,
359        actions: Vec<LifecycleActionResult>,
360    ) -> Vec<LifecycleActionResult> {
361        let mut results = Vec::new();
362
363        for action in actions {
364            if self.config.read().dry_run {
365                results.push(action);
366                continue;
367            }
368
369            let success = match action.action {
370                LifecycleAction::Delete => store.delete(&action.cid).await.is_ok(),
371                LifecycleAction::Transition(tier) => {
372                    // Update metadata
373                    if let Some(mut entry) = self.metadata.get_mut(&action.cid) {
374                        entry.tier = tier;
375                        true
376                    } else {
377                        false
378                    }
379                }
380                LifecycleAction::Archive | LifecycleAction::Review => {
381                    // These would typically involve external systems
382                    true
383                }
384            };
385
386            results.push(LifecycleActionResult {
387                success,
388                error: if success {
389                    None
390                } else {
391                    Some("Action failed".to_string())
392                },
393                ..action
394            });
395
396            self.stats.record_action(action.action, success);
397        }
398
399        results
400    }
401
402    /// Get lifecycle statistics
403    pub fn get_stats(&self) -> LifecycleStatsSnapshot {
404        LifecycleStatsSnapshot {
405            evaluations_run: self.stats.evaluations_run.load(Ordering::Relaxed),
406            blocks_evaluated: self.stats.blocks_evaluated.load(Ordering::Relaxed),
407            transitions: self.stats.transitions.load(Ordering::Relaxed),
408            deletions: self.stats.deletions.load(Ordering::Relaxed),
409            archives: self.stats.archives.load(Ordering::Relaxed),
410            reviews: self.stats.reviews.load(Ordering::Relaxed),
411            failures: self.stats.failures.load(Ordering::Relaxed),
412        }
413    }
414
415    /// Get blocks by tier
416    pub fn get_blocks_by_tier(&self, tier: StorageTier) -> Vec<Cid> {
417        self.metadata
418            .iter()
419            .filter_map(|entry| {
420                if entry.value().tier == tier {
421                    Some(entry.key().clone())
422                } else {
423                    None
424                }
425            })
426            .collect()
427    }
428}
429
430/// Snapshot of lifecycle statistics
431#[derive(Debug, Clone, Serialize, Deserialize)]
432pub struct LifecycleStatsSnapshot {
433    pub evaluations_run: u64,
434    pub blocks_evaluated: u64,
435    pub transitions: u64,
436    pub deletions: u64,
437    pub archives: u64,
438    pub reviews: u64,
439    pub failures: u64,
440}
441
442/// Common lifecycle rule presets
443impl LifecycleRule {
444    /// Move to cold storage after 30 days
445    pub fn archive_old_blocks() -> Self {
446        Self::new(
447            "archive_old".to_string(),
448            "Move blocks older than 30 days to cold storage".to_string(),
449            LifecycleCondition::AgeDays(30),
450            LifecycleAction::Transition(StorageTier::Cold),
451        )
452    }
453
454    /// Delete blocks not accessed in 90 days
455    pub fn delete_unused() -> Self {
456        Self::new(
457            "delete_unused".to_string(),
458            "Delete blocks not accessed in 90 days".to_string(),
459            LifecycleCondition::DaysSinceLastAccess(90),
460            LifecycleAction::Delete,
461        )
462    }
463
464    /// Archive large blocks after 7 days
465    pub fn archive_large_blocks() -> Self {
466        Self::new(
467            "archive_large".to_string(),
468            "Archive blocks larger than 10MB after 7 days".to_string(),
469            LifecycleCondition::And(vec![
470                LifecycleCondition::AgeDays(7),
471                LifecycleCondition::SizeBytes {
472                    min: Some(10 * 1024 * 1024),
473                    max: None,
474                },
475            ]),
476            LifecycleAction::Archive,
477        )
478    }
479
480    /// Move rarely accessed hot storage to warm
481    pub fn demote_cold_hot_storage() -> Self {
482        Self::new(
483            "demote_hot".to_string(),
484            "Move rarely accessed hot storage blocks to warm tier".to_string(),
485            LifecycleCondition::And(vec![
486                LifecycleCondition::CurrentTier(StorageTier::Hot),
487                LifecycleCondition::DaysSinceLastAccess(7),
488                LifecycleCondition::AccessCountBelow(10),
489            ]),
490            LifecycleAction::Transition(StorageTier::Warm),
491        )
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn test_age_condition() {
501        let rule = LifecycleRule::new(
502            "test".to_string(),
503            "Test rule".to_string(),
504            LifecycleCondition::AgeDays(1),
505            LifecycleAction::Delete,
506        );
507
508        let old_metadata = BlockMetadata {
509            cid: Cid::default(),
510            size: 100,
511            created_at: SystemTime::now() - Duration::from_secs(2 * 86400),
512            last_accessed: None,
513            access_count: 0,
514            tier: StorageTier::Hot,
515        };
516
517        assert!(rule.matches(&old_metadata));
518
519        let new_metadata = BlockMetadata {
520            cid: Cid::default(),
521            size: 100,
522            created_at: SystemTime::now(),
523            last_accessed: None,
524            access_count: 0,
525            tier: StorageTier::Hot,
526        };
527
528        assert!(!rule.matches(&new_metadata));
529    }
530
531    #[test]
532    fn test_lifecycle_manager() {
533        let manager = LifecyclePolicyManager::new(LifecyclePolicyConfig::default());
534
535        // Add a rule
536        manager.add_rule(LifecycleRule::archive_old_blocks());
537
538        // Register an old block
539        let metadata = BlockMetadata {
540            cid: Cid::default(),
541            size: 100,
542            created_at: SystemTime::now() - Duration::from_secs(31 * 86400),
543            last_accessed: None,
544            access_count: 0,
545            tier: StorageTier::Hot,
546        };
547
548        manager.register_block(metadata);
549
550        // Evaluate
551        let actions = manager.evaluate();
552        assert_eq!(actions.len(), 1);
553        assert_eq!(
554            actions[0].action,
555            LifecycleAction::Transition(StorageTier::Cold)
556        );
557    }
558
559    #[test]
560    fn test_rule_presets() {
561        let rule = LifecycleRule::delete_unused();
562        assert_eq!(rule.id, "delete_unused");
563        assert_eq!(rule.action, LifecycleAction::Delete);
564
565        let rule = LifecycleRule::archive_large_blocks();
566        assert_eq!(rule.action, LifecycleAction::Archive);
567    }
568}