1use crate::traits::BlockStore;
12use dashmap::DashMap;
13use ipfrs_core::Cid;
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, SystemTime};
17use tracing::{debug, info};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21pub enum LifecycleAction {
22 Transition(StorageTier),
24 Delete,
26 Archive,
28 Review,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
34pub enum StorageTier {
35 Hot,
37 Warm,
39 Cold,
41 Archive,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum LifecycleCondition {
48 AgeDays(u32),
50 DaysSinceLastAccess(u32),
52 AccessCountBelow(u64),
54 SizeBytes { min: Option<u64>, max: Option<u64> },
56 CurrentTier(StorageTier),
58 And(Vec<LifecycleCondition>),
60 Or(Vec<LifecycleCondition>),
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct LifecycleRule {
67 pub id: String,
69 pub description: String,
71 pub condition: LifecycleCondition,
73 pub action: LifecycleAction,
75 pub priority: u32,
77 pub enabled: bool,
79}
80
81impl LifecycleRule {
82 pub fn new(
84 id: String,
85 description: String,
86 condition: LifecycleCondition,
87 action: LifecycleAction,
88 ) -> Self {
89 Self {
90 id,
91 description,
92 condition,
93 action,
94 priority: 100,
95 enabled: true,
96 }
97 }
98
99 #[allow(dead_code)]
101 fn matches(&self, metadata: &BlockMetadata) -> bool {
102 if !self.enabled {
103 return false;
104 }
105 self.evaluate_condition(&self.condition, metadata)
106 }
107
108 fn evaluate_condition(&self, condition: &LifecycleCondition, metadata: &BlockMetadata) -> bool {
109 match condition {
110 LifecycleCondition::AgeDays(days) => {
111 let age = SystemTime::now()
112 .duration_since(metadata.created_at)
113 .unwrap_or_default();
114 age >= Duration::from_secs(*days as u64 * 86400)
115 }
116 LifecycleCondition::DaysSinceLastAccess(days) => {
117 if let Some(last_access) = metadata.last_accessed {
118 let duration = SystemTime::now()
119 .duration_since(last_access)
120 .unwrap_or_default();
121 duration >= Duration::from_secs(*days as u64 * 86400)
122 } else {
123 *days == 0
125 }
126 }
127 LifecycleCondition::AccessCountBelow(threshold) => metadata.access_count < *threshold,
128 LifecycleCondition::SizeBytes { min, max } => {
129 if let Some(min_size) = min {
130 if metadata.size < *min_size {
131 return false;
132 }
133 }
134 if let Some(max_size) = max {
135 if metadata.size > *max_size {
136 return false;
137 }
138 }
139 true
140 }
141 LifecycleCondition::CurrentTier(tier) => metadata.tier == *tier,
142 LifecycleCondition::And(conditions) => conditions
143 .iter()
144 .all(|c| self.evaluate_condition(c, metadata)),
145 LifecycleCondition::Or(conditions) => conditions
146 .iter()
147 .any(|c| self.evaluate_condition(c, metadata)),
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct BlockMetadata {
155 pub cid: Cid,
157 pub size: u64,
159 pub created_at: SystemTime,
161 pub last_accessed: Option<SystemTime>,
163 pub access_count: u64,
165 pub tier: StorageTier,
167}
168
169#[derive(Debug, Clone)]
171pub struct LifecyclePolicyConfig {
172 pub evaluation_interval: Duration,
174 pub max_actions_per_evaluation: usize,
176 pub dry_run: bool,
178}
179
180impl Default for LifecyclePolicyConfig {
181 fn default() -> Self {
182 Self {
183 evaluation_interval: Duration::from_secs(3600), max_actions_per_evaluation: 1000,
185 dry_run: false,
186 }
187 }
188}
189
190#[derive(Debug, Clone)]
192pub struct LifecycleActionResult {
193 pub cid: Cid,
195 pub rule_id: String,
197 pub action: LifecycleAction,
199 pub success: bool,
201 pub error: Option<String>,
203}
204
205#[derive(Debug, Default)]
207pub struct LifecycleStats {
208 pub evaluations_run: AtomicU64,
210 pub blocks_evaluated: AtomicU64,
212 pub transitions: AtomicU64,
214 pub deletions: AtomicU64,
215 pub archives: AtomicU64,
216 pub reviews: AtomicU64,
217 pub failures: AtomicU64,
219}
220
221impl LifecycleStats {
222 fn record_evaluation(&self, blocks_count: u64) {
223 self.evaluations_run.fetch_add(1, Ordering::Relaxed);
224 self.blocks_evaluated
225 .fetch_add(blocks_count, Ordering::Relaxed);
226 }
227
228 fn record_action(&self, action: LifecycleAction, success: bool) {
229 if success {
230 match action {
231 LifecycleAction::Transition(_) => {
232 self.transitions.fetch_add(1, Ordering::Relaxed);
233 }
234 LifecycleAction::Delete => {
235 self.deletions.fetch_add(1, Ordering::Relaxed);
236 }
237 LifecycleAction::Archive => {
238 self.archives.fetch_add(1, Ordering::Relaxed);
239 }
240 LifecycleAction::Review => {
241 self.reviews.fetch_add(1, Ordering::Relaxed);
242 }
243 }
244 } else {
245 self.failures.fetch_add(1, Ordering::Relaxed);
246 }
247 }
248}
249
250pub struct LifecyclePolicyManager {
252 rules: parking_lot::RwLock<Vec<LifecycleRule>>,
253 metadata: DashMap<Cid, BlockMetadata>,
254 config: parking_lot::RwLock<LifecyclePolicyConfig>,
255 stats: LifecycleStats,
256}
257
258impl LifecyclePolicyManager {
259 pub fn new(config: LifecyclePolicyConfig) -> Self {
261 Self {
262 rules: parking_lot::RwLock::new(Vec::new()),
263 metadata: DashMap::new(),
264 config: parking_lot::RwLock::new(config),
265 stats: LifecycleStats::default(),
266 }
267 }
268
269 pub fn add_rule(&self, rule: LifecycleRule) {
271 let mut rules = self.rules.write();
272 rules.push(rule.clone());
273 rules.sort_by(|a, b| b.priority.cmp(&a.priority));
274 debug!("Added lifecycle rule: {}", rule.id);
275 }
276
277 pub fn remove_rule(&self, rule_id: &str) -> bool {
279 let mut rules = self.rules.write();
280 if let Some(pos) = rules.iter().position(|r| r.id == rule_id) {
281 rules.remove(pos);
282 debug!("Removed lifecycle rule: {}", rule_id);
283 true
284 } else {
285 false
286 }
287 }
288
289 pub fn get_rules(&self) -> Vec<LifecycleRule> {
291 self.rules.read().clone()
292 }
293
294 pub fn register_block(&self, metadata: BlockMetadata) {
296 self.metadata.insert(metadata.cid, metadata);
297 }
298
299 pub fn record_access(&self, cid: &Cid) {
301 if let Some(mut entry) = self.metadata.get_mut(cid) {
302 entry.last_accessed = Some(SystemTime::now());
303 entry.access_count += 1;
304 }
305 }
306
307 pub fn evaluate(&self) -> Vec<LifecycleActionResult> {
309 let rules = self.rules.read();
310 let config = self.config.read();
311 let mut results = Vec::new();
312
313 let blocks_count = self.metadata.len() as u64;
314 self.stats.record_evaluation(blocks_count);
315
316 for entry in self.metadata.iter() {
317 if results.len() >= config.max_actions_per_evaluation {
318 break;
319 }
320
321 let metadata = entry.value();
322
323 for rule in rules.iter() {
325 if rule.matches(metadata) {
326 let result = LifecycleActionResult {
327 cid: metadata.cid,
328 rule_id: rule.id.clone(),
329 action: rule.action,
330 success: !config.dry_run,
331 error: if config.dry_run {
332 Some("Dry run mode".to_string())
333 } else {
334 None
335 },
336 };
337
338 self.stats.record_action(rule.action, !config.dry_run);
339 results.push(result);
340 break; }
342 }
343 }
344
345 if !results.is_empty() {
346 info!(
347 "Lifecycle evaluation completed: {} actions recommended",
348 results.len()
349 );
350 }
351
352 results
353 }
354
355 pub async fn apply_actions<S: BlockStore>(
357 &self,
358 store: &S,
359 actions: Vec<LifecycleActionResult>,
360 ) -> Vec<LifecycleActionResult> {
361 let mut results = Vec::new();
362
363 for action in actions {
364 if self.config.read().dry_run {
365 results.push(action);
366 continue;
367 }
368
369 let success = match action.action {
370 LifecycleAction::Delete => store.delete(&action.cid).await.is_ok(),
371 LifecycleAction::Transition(tier) => {
372 if let Some(mut entry) = self.metadata.get_mut(&action.cid) {
374 entry.tier = tier;
375 true
376 } else {
377 false
378 }
379 }
380 LifecycleAction::Archive | LifecycleAction::Review => {
381 true
383 }
384 };
385
386 results.push(LifecycleActionResult {
387 success,
388 error: if success {
389 None
390 } else {
391 Some("Action failed".to_string())
392 },
393 ..action
394 });
395
396 self.stats.record_action(action.action, success);
397 }
398
399 results
400 }
401
402 pub fn get_stats(&self) -> LifecycleStatsSnapshot {
404 LifecycleStatsSnapshot {
405 evaluations_run: self.stats.evaluations_run.load(Ordering::Relaxed),
406 blocks_evaluated: self.stats.blocks_evaluated.load(Ordering::Relaxed),
407 transitions: self.stats.transitions.load(Ordering::Relaxed),
408 deletions: self.stats.deletions.load(Ordering::Relaxed),
409 archives: self.stats.archives.load(Ordering::Relaxed),
410 reviews: self.stats.reviews.load(Ordering::Relaxed),
411 failures: self.stats.failures.load(Ordering::Relaxed),
412 }
413 }
414
415 pub fn get_blocks_by_tier(&self, tier: StorageTier) -> Vec<Cid> {
417 self.metadata
418 .iter()
419 .filter_map(|entry| {
420 if entry.value().tier == tier {
421 Some(entry.key().clone())
422 } else {
423 None
424 }
425 })
426 .collect()
427 }
428}
429
430#[derive(Debug, Clone, Serialize, Deserialize)]
432pub struct LifecycleStatsSnapshot {
433 pub evaluations_run: u64,
434 pub blocks_evaluated: u64,
435 pub transitions: u64,
436 pub deletions: u64,
437 pub archives: u64,
438 pub reviews: u64,
439 pub failures: u64,
440}
441
442impl LifecycleRule {
444 pub fn archive_old_blocks() -> Self {
446 Self::new(
447 "archive_old".to_string(),
448 "Move blocks older than 30 days to cold storage".to_string(),
449 LifecycleCondition::AgeDays(30),
450 LifecycleAction::Transition(StorageTier::Cold),
451 )
452 }
453
454 pub fn delete_unused() -> Self {
456 Self::new(
457 "delete_unused".to_string(),
458 "Delete blocks not accessed in 90 days".to_string(),
459 LifecycleCondition::DaysSinceLastAccess(90),
460 LifecycleAction::Delete,
461 )
462 }
463
464 pub fn archive_large_blocks() -> Self {
466 Self::new(
467 "archive_large".to_string(),
468 "Archive blocks larger than 10MB after 7 days".to_string(),
469 LifecycleCondition::And(vec![
470 LifecycleCondition::AgeDays(7),
471 LifecycleCondition::SizeBytes {
472 min: Some(10 * 1024 * 1024),
473 max: None,
474 },
475 ]),
476 LifecycleAction::Archive,
477 )
478 }
479
480 pub fn demote_cold_hot_storage() -> Self {
482 Self::new(
483 "demote_hot".to_string(),
484 "Move rarely accessed hot storage blocks to warm tier".to_string(),
485 LifecycleCondition::And(vec![
486 LifecycleCondition::CurrentTier(StorageTier::Hot),
487 LifecycleCondition::DaysSinceLastAccess(7),
488 LifecycleCondition::AccessCountBelow(10),
489 ]),
490 LifecycleAction::Transition(StorageTier::Warm),
491 )
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn test_age_condition() {
501 let rule = LifecycleRule::new(
502 "test".to_string(),
503 "Test rule".to_string(),
504 LifecycleCondition::AgeDays(1),
505 LifecycleAction::Delete,
506 );
507
508 let old_metadata = BlockMetadata {
509 cid: Cid::default(),
510 size: 100,
511 created_at: SystemTime::now() - Duration::from_secs(2 * 86400),
512 last_accessed: None,
513 access_count: 0,
514 tier: StorageTier::Hot,
515 };
516
517 assert!(rule.matches(&old_metadata));
518
519 let new_metadata = BlockMetadata {
520 cid: Cid::default(),
521 size: 100,
522 created_at: SystemTime::now(),
523 last_accessed: None,
524 access_count: 0,
525 tier: StorageTier::Hot,
526 };
527
528 assert!(!rule.matches(&new_metadata));
529 }
530
531 #[test]
532 fn test_lifecycle_manager() {
533 let manager = LifecyclePolicyManager::new(LifecyclePolicyConfig::default());
534
535 manager.add_rule(LifecycleRule::archive_old_blocks());
537
538 let metadata = BlockMetadata {
540 cid: Cid::default(),
541 size: 100,
542 created_at: SystemTime::now() - Duration::from_secs(31 * 86400),
543 last_accessed: None,
544 access_count: 0,
545 tier: StorageTier::Hot,
546 };
547
548 manager.register_block(metadata);
549
550 let actions = manager.evaluate();
552 assert_eq!(actions.len(), 1);
553 assert_eq!(
554 actions[0].action,
555 LifecycleAction::Transition(StorageTier::Cold)
556 );
557 }
558
559 #[test]
560 fn test_rule_presets() {
561 let rule = LifecycleRule::delete_unused();
562 assert_eq!(rule.id, "delete_unused");
563 assert_eq!(rule.action, LifecycleAction::Delete);
564
565 let rule = LifecycleRule::archive_large_blocks();
566 assert_eq!(rule.action, LifecycleAction::Archive);
567 }
568}