ipfrs_storage/
tiering.rs

1//! Hot/cold storage tiering with access tracking.
2//!
3//! Manages automatic migration of blocks between fast (hot) and
4//! slow (cold) storage tiers based on access patterns.
5//!
6//! # Access Tracking
7//!
8//! Uses a combination of access frequency and recency to determine
9//! block temperature. Blocks with high access frequency stay hot,
10//! while rarely accessed blocks become cold over time.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use ipfrs_storage::tiering::{AccessTracker, TierConfig};
16//!
17//! let tracker = AccessTracker::new(TierConfig::default());
18//! tracker.record_access(&cid);
19//!
20//! if tracker.is_hot(&cid) {
21//!     // Block is frequently accessed
22//! }
23//! ```
24
25use dashmap::DashMap;
26use ipfrs_core::{Cid, Error, Result};
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
31
32/// Storage tier classification
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
34pub enum Tier {
35    /// Hot tier - frequently accessed, kept in fast storage
36    Hot,
37    /// Warm tier - occasionally accessed
38    Warm,
39    /// Cold tier - rarely accessed, can be moved to slow storage
40    Cold,
41    /// Archive tier - very rarely accessed, cheapest storage
42    Archive,
43}
44
45impl Tier {
46    /// Get the next colder tier
47    pub fn colder(self) -> Option<Tier> {
48        match self {
49            Tier::Hot => Some(Tier::Warm),
50            Tier::Warm => Some(Tier::Cold),
51            Tier::Cold => Some(Tier::Archive),
52            Tier::Archive => None,
53        }
54    }
55
56    /// Get the next hotter tier
57    pub fn hotter(self) -> Option<Tier> {
58        match self {
59            Tier::Archive => Some(Tier::Cold),
60            Tier::Cold => Some(Tier::Warm),
61            Tier::Warm => Some(Tier::Hot),
62            Tier::Hot => None,
63        }
64    }
65}
66
67/// Configuration for tiering behavior
68#[derive(Debug, Clone)]
69pub struct TierConfig {
70    /// Threshold for hot tier (accesses per hour)
71    pub hot_threshold: f64,
72    /// Threshold for warm tier (accesses per hour)
73    pub warm_threshold: f64,
74    /// Threshold for cold tier (accesses per hour)
75    pub cold_threshold: f64,
76    /// Time window for calculating access rate (in seconds)
77    pub time_window_secs: u64,
78    /// Decay factor for old accesses (0.0 - 1.0)
79    pub decay_factor: f64,
80    /// How often to run cleanup/decay (in seconds)
81    pub cleanup_interval_secs: u64,
82}
83
84impl Default for TierConfig {
85    fn default() -> Self {
86        Self {
87            hot_threshold: 10.0,        // 10+ accesses/hour = hot
88            warm_threshold: 1.0,        // 1-10 accesses/hour = warm
89            cold_threshold: 0.1,        // 0.1-1 accesses/hour = cold
90            time_window_secs: 3600,     // 1 hour window
91            decay_factor: 0.9,          // 10% decay per period
92            cleanup_interval_secs: 300, // Cleanup every 5 minutes
93        }
94    }
95}
96
97/// Access statistics for a single block
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct AccessStats {
100    /// Total access count
101    pub total_accesses: u64,
102    /// Weighted access count (with time decay)
103    pub weighted_accesses: f64,
104    /// Last access timestamp (Unix timestamp)
105    pub last_access: u64,
106    /// First access timestamp (Unix timestamp)
107    pub first_access: u64,
108    /// Current tier
109    pub tier: Tier,
110}
111
112impl AccessStats {
113    fn new() -> Self {
114        let now = SystemTime::now()
115            .duration_since(UNIX_EPOCH)
116            .unwrap_or_default()
117            .as_secs();
118
119        Self {
120            total_accesses: 1,
121            weighted_accesses: 1.0,
122            last_access: now,
123            first_access: now,
124            tier: Tier::Hot, // New blocks start hot
125        }
126    }
127
128    fn record_access(&mut self) {
129        let now = SystemTime::now()
130            .duration_since(UNIX_EPOCH)
131            .unwrap_or_default()
132            .as_secs();
133
134        self.total_accesses += 1;
135        self.weighted_accesses += 1.0;
136        self.last_access = now;
137    }
138
139    /// Calculate access rate (accesses per hour)
140    fn access_rate(&self, time_window_secs: u64) -> f64 {
141        let now = SystemTime::now()
142            .duration_since(UNIX_EPOCH)
143            .unwrap_or_default()
144            .as_secs();
145
146        let elapsed = now.saturating_sub(self.first_access).max(1);
147        let window = elapsed.min(time_window_secs) as f64;
148
149        // Accesses per hour
150        self.weighted_accesses * 3600.0 / window
151    }
152
153    /// Apply time decay to weighted accesses
154    fn apply_decay(&mut self, decay_factor: f64) {
155        self.weighted_accesses *= decay_factor;
156    }
157}
158
159/// Access tracker for monitoring block access patterns
160pub struct AccessTracker {
161    /// Access statistics per CID
162    stats: DashMap<Vec<u8>, AccessStats>,
163    /// Configuration
164    config: TierConfig,
165    /// Last cleanup time
166    last_cleanup: RwLock<Instant>,
167    /// Global statistics
168    global_stats: GlobalAccessStats,
169}
170
171/// Global access statistics
172#[derive(Default)]
173struct GlobalAccessStats {
174    total_accesses: AtomicU64,
175    hot_blocks: AtomicU64,
176    warm_blocks: AtomicU64,
177    cold_blocks: AtomicU64,
178    archive_blocks: AtomicU64,
179}
180
181impl AccessTracker {
182    /// Create a new access tracker
183    pub fn new(config: TierConfig) -> Self {
184        Self {
185            stats: DashMap::new(),
186            config,
187            last_cleanup: RwLock::new(Instant::now()),
188            global_stats: GlobalAccessStats::default(),
189        }
190    }
191
192    /// Record an access to a block
193    pub fn record_access(&self, cid: &Cid) {
194        let key = cid.to_bytes();
195        self.global_stats
196            .total_accesses
197            .fetch_add(1, Ordering::Relaxed);
198
199        self.stats
200            .entry(key)
201            .and_modify(|stats| {
202                let old_tier = stats.tier;
203                stats.record_access();
204                let new_tier = self.classify_tier(stats);
205                if old_tier != new_tier {
206                    self.update_tier_counts(old_tier, new_tier);
207                    stats.tier = new_tier;
208                }
209            })
210            .or_insert_with(|| {
211                self.global_stats.hot_blocks.fetch_add(1, Ordering::Relaxed);
212                AccessStats::new()
213            });
214
215        // Periodic cleanup
216        self.maybe_cleanup();
217    }
218
219    /// Get the current tier for a block
220    pub fn get_tier(&self, cid: &Cid) -> Option<Tier> {
221        self.stats.get(&cid.to_bytes()).map(|s| s.tier)
222    }
223
224    /// Check if a block is in the hot tier
225    pub fn is_hot(&self, cid: &Cid) -> bool {
226        self.get_tier(cid) == Some(Tier::Hot)
227    }
228
229    /// Check if a block is cold (cold or archive tier)
230    pub fn is_cold(&self, cid: &Cid) -> bool {
231        matches!(self.get_tier(cid), Some(Tier::Cold) | Some(Tier::Archive))
232    }
233
234    /// Get access statistics for a block
235    pub fn get_stats(&self, cid: &Cid) -> Option<AccessStats> {
236        self.stats.get(&cid.to_bytes()).map(|s| s.clone())
237    }
238
239    /// List all blocks in a specific tier
240    pub fn list_by_tier(&self, tier: Tier) -> Result<Vec<Cid>> {
241        let mut result = Vec::new();
242        for entry in self.stats.iter() {
243            if entry.value().tier == tier {
244                let cid = Cid::try_from(entry.key().clone())
245                    .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
246                result.push(cid);
247            }
248        }
249        Ok(result)
250    }
251
252    /// Get candidates for migration to a colder tier
253    pub fn get_cold_candidates(&self, max_count: usize) -> Result<Vec<(Cid, Tier)>> {
254        let mut candidates: Vec<_> = self
255            .stats
256            .iter()
257            .filter_map(|entry| {
258                let stats = entry.value();
259                if let Some(colder_tier) = stats.tier.colder() {
260                    let rate = stats.access_rate(self.config.time_window_secs);
261                    let threshold = self.tier_threshold(colder_tier);
262                    if rate < threshold {
263                        let cid = Cid::try_from(entry.key().clone()).ok()?;
264                        return Some((cid, colder_tier, rate));
265                    }
266                }
267                None
268            })
269            .collect();
270
271        // Sort by access rate (lowest first)
272        candidates.sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
273
274        Ok(candidates
275            .into_iter()
276            .take(max_count)
277            .map(|(cid, tier, _)| (cid, tier))
278            .collect())
279    }
280
281    /// Manually set the tier for a block
282    pub fn set_tier(&self, cid: &Cid, tier: Tier) {
283        let key = cid.to_bytes();
284        if let Some(mut entry) = self.stats.get_mut(&key) {
285            let old_tier = entry.tier;
286            if old_tier != tier {
287                self.update_tier_counts(old_tier, tier);
288                entry.tier = tier;
289            }
290        }
291    }
292
293    /// Get global statistics
294    pub fn global_stats(&self) -> TierStatsSnapshot {
295        TierStatsSnapshot {
296            total_accesses: self.global_stats.total_accesses.load(Ordering::Relaxed),
297            tracked_blocks: self.stats.len() as u64,
298            hot_blocks: self.global_stats.hot_blocks.load(Ordering::Relaxed),
299            warm_blocks: self.global_stats.warm_blocks.load(Ordering::Relaxed),
300            cold_blocks: self.global_stats.cold_blocks.load(Ordering::Relaxed),
301            archive_blocks: self.global_stats.archive_blocks.load(Ordering::Relaxed),
302        }
303    }
304
305    /// Force a cleanup/decay pass
306    pub fn run_cleanup(&self) {
307        for mut entry in self.stats.iter_mut() {
308            let stats = entry.value_mut();
309            let old_tier = stats.tier;
310
311            // Apply decay
312            stats.apply_decay(self.config.decay_factor);
313
314            // Reclassify tier
315            let new_tier = self.classify_tier(stats);
316            if old_tier != new_tier {
317                self.update_tier_counts(old_tier, new_tier);
318                stats.tier = new_tier;
319            }
320        }
321
322        *self.last_cleanup.write() = Instant::now();
323    }
324
325    /// Classify a block into a tier based on its access rate
326    fn classify_tier(&self, stats: &AccessStats) -> Tier {
327        let rate = stats.access_rate(self.config.time_window_secs);
328
329        if rate >= self.config.hot_threshold {
330            Tier::Hot
331        } else if rate >= self.config.warm_threshold {
332            Tier::Warm
333        } else if rate >= self.config.cold_threshold {
334            Tier::Cold
335        } else {
336            Tier::Archive
337        }
338    }
339
340    /// Get the threshold for a tier
341    fn tier_threshold(&self, tier: Tier) -> f64 {
342        match tier {
343            Tier::Hot => self.config.hot_threshold,
344            Tier::Warm => self.config.warm_threshold,
345            Tier::Cold => self.config.cold_threshold,
346            Tier::Archive => 0.0,
347        }
348    }
349
350    /// Update tier counts when a block changes tiers
351    fn update_tier_counts(&self, old_tier: Tier, new_tier: Tier) {
352        match old_tier {
353            Tier::Hot => self.global_stats.hot_blocks.fetch_sub(1, Ordering::Relaxed),
354            Tier::Warm => self
355                .global_stats
356                .warm_blocks
357                .fetch_sub(1, Ordering::Relaxed),
358            Tier::Cold => self
359                .global_stats
360                .cold_blocks
361                .fetch_sub(1, Ordering::Relaxed),
362            Tier::Archive => self
363                .global_stats
364                .archive_blocks
365                .fetch_sub(1, Ordering::Relaxed),
366        };
367        match new_tier {
368            Tier::Hot => self.global_stats.hot_blocks.fetch_add(1, Ordering::Relaxed),
369            Tier::Warm => self
370                .global_stats
371                .warm_blocks
372                .fetch_add(1, Ordering::Relaxed),
373            Tier::Cold => self
374                .global_stats
375                .cold_blocks
376                .fetch_add(1, Ordering::Relaxed),
377            Tier::Archive => self
378                .global_stats
379                .archive_blocks
380                .fetch_add(1, Ordering::Relaxed),
381        };
382    }
383
384    /// Check if cleanup is needed and run it
385    fn maybe_cleanup(&self) {
386        let should_cleanup = {
387            let last = self.last_cleanup.read();
388            last.elapsed() > Duration::from_secs(self.config.cleanup_interval_secs)
389        };
390
391        if should_cleanup {
392            self.run_cleanup();
393        }
394    }
395
396    /// Remove tracking for a block
397    pub fn remove(&self, cid: &Cid) {
398        if let Some((_, stats)) = self.stats.remove(&cid.to_bytes()) {
399            match stats.tier {
400                Tier::Hot => self.global_stats.hot_blocks.fetch_sub(1, Ordering::Relaxed),
401                Tier::Warm => self
402                    .global_stats
403                    .warm_blocks
404                    .fetch_sub(1, Ordering::Relaxed),
405                Tier::Cold => self
406                    .global_stats
407                    .cold_blocks
408                    .fetch_sub(1, Ordering::Relaxed),
409                Tier::Archive => self
410                    .global_stats
411                    .archive_blocks
412                    .fetch_sub(1, Ordering::Relaxed),
413            };
414        }
415    }
416
417    /// Clear all tracking data
418    pub fn clear(&self) {
419        self.stats.clear();
420        self.global_stats.total_accesses.store(0, Ordering::Relaxed);
421        self.global_stats.hot_blocks.store(0, Ordering::Relaxed);
422        self.global_stats.warm_blocks.store(0, Ordering::Relaxed);
423        self.global_stats.cold_blocks.store(0, Ordering::Relaxed);
424        self.global_stats.archive_blocks.store(0, Ordering::Relaxed);
425    }
426}
427
428/// Snapshot of tier statistics
429#[derive(Debug, Clone)]
430pub struct TierStatsSnapshot {
431    /// Total accesses recorded
432    pub total_accesses: u64,
433    /// Number of blocks being tracked
434    pub tracked_blocks: u64,
435    /// Number of blocks in hot tier
436    pub hot_blocks: u64,
437    /// Number of blocks in warm tier
438    pub warm_blocks: u64,
439    /// Number of blocks in cold tier
440    pub cold_blocks: u64,
441    /// Number of blocks in archive tier
442    pub archive_blocks: u64,
443}
444
445/// Tiered block store that tracks access patterns and supports migration
446use crate::traits::BlockStore;
447use async_trait::async_trait;
448use ipfrs_core::Block;
449
450pub struct TieredStore<H: BlockStore, C: BlockStore> {
451    /// Hot storage (fast, expensive)
452    hot_store: H,
453    /// Cold storage (slow, cheap)
454    cold_store: C,
455    /// Access tracker
456    tracker: AccessTracker,
457    /// Configuration
458    config: TierConfig,
459}
460
461impl<H: BlockStore, C: BlockStore> TieredStore<H, C> {
462    /// Create a new tiered store
463    pub fn new(hot_store: H, cold_store: C, config: TierConfig) -> Self {
464        Self {
465            hot_store,
466            cold_store,
467            tracker: AccessTracker::new(config.clone()),
468            config,
469        }
470    }
471
472    /// Get the access tracker
473    pub fn tracker(&self) -> &AccessTracker {
474        &self.tracker
475    }
476
477    /// Get the tier configuration
478    pub fn config(&self) -> &TierConfig {
479        &self.config
480    }
481
482    /// Migrate cold blocks from hot to cold storage
483    pub async fn migrate_cold_blocks(&self, max_count: usize) -> Result<usize> {
484        let candidates = self.tracker.get_cold_candidates(max_count)?;
485        let mut migrated = 0;
486
487        for (cid, _new_tier) in candidates {
488            // Get from hot storage
489            if let Some(block) = self.hot_store.get(&cid).await? {
490                // Store in cold storage
491                self.cold_store.put(&block).await?;
492                // Remove from hot storage
493                self.hot_store.delete(&cid).await?;
494                migrated += 1;
495            }
496        }
497
498        Ok(migrated)
499    }
500
501    /// Promote a block from cold to hot storage
502    pub async fn promote_block(&self, cid: &Cid) -> Result<bool> {
503        if let Some(block) = self.cold_store.get(cid).await? {
504            self.hot_store.put(&block).await?;
505            self.cold_store.delete(cid).await?;
506            self.tracker.set_tier(cid, Tier::Hot);
507            Ok(true)
508        } else {
509            Ok(false)
510        }
511    }
512}
513
514#[async_trait]
515impl<H: BlockStore, C: BlockStore> BlockStore for TieredStore<H, C> {
516    async fn put(&self, block: &Block) -> Result<()> {
517        // New blocks go to hot storage
518        self.tracker.record_access(block.cid());
519        self.hot_store.put(block).await
520    }
521
522    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
523        self.tracker.record_access(cid);
524
525        // Try hot storage first
526        if let Some(block) = self.hot_store.get(cid).await? {
527            return Ok(Some(block));
528        }
529
530        // Fall back to cold storage
531        if let Some(block) = self.cold_store.get(cid).await? {
532            // Optionally promote to hot storage on access
533            if self.tracker.is_hot(cid) {
534                // Block is now hot, migrate it
535                self.hot_store.put(&block).await?;
536                self.cold_store.delete(cid).await?;
537            }
538            return Ok(Some(block));
539        }
540
541        Ok(None)
542    }
543
544    async fn has(&self, cid: &Cid) -> Result<bool> {
545        if self.hot_store.has(cid).await? {
546            return Ok(true);
547        }
548        self.cold_store.has(cid).await
549    }
550
551    async fn delete(&self, cid: &Cid) -> Result<()> {
552        self.tracker.remove(cid);
553        // Delete from both stores
554        let _ = self.hot_store.delete(cid).await;
555        let _ = self.cold_store.delete(cid).await;
556        Ok(())
557    }
558
559    fn list_cids(&self) -> Result<Vec<Cid>> {
560        // Combine CIDs from both stores
561        let mut cids = self.hot_store.list_cids()?;
562        let cold_cids = self.cold_store.list_cids()?;
563        cids.extend(cold_cids);
564        // Remove duplicates
565        cids.sort_by_key(|a| a.to_bytes());
566        cids.dedup_by(|a, b| a.to_bytes() == b.to_bytes());
567        Ok(cids)
568    }
569
570    fn len(&self) -> usize {
571        self.hot_store.len() + self.cold_store.len()
572    }
573
574    fn is_empty(&self) -> bool {
575        self.hot_store.is_empty() && self.cold_store.is_empty()
576    }
577
578    async fn flush(&self) -> Result<()> {
579        self.hot_store.flush().await?;
580        self.cold_store.flush().await
581    }
582
583    async fn close(&self) -> Result<()> {
584        self.hot_store.close().await?;
585        self.cold_store.close().await
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use bytes::Bytes;
593    use ipfrs_core::Block;
594
595    fn make_test_cid(data: &[u8]) -> Cid {
596        let block = Block::new(Bytes::copy_from_slice(data)).unwrap();
597        *block.cid()
598    }
599
600    #[test]
601    fn test_tier_classification() {
602        let config = TierConfig::default();
603        let tracker = AccessTracker::new(config);
604        let cid = make_test_cid(b"test");
605
606        // First access - should be hot
607        tracker.record_access(&cid);
608        assert!(tracker.is_hot(&cid));
609    }
610
611    #[test]
612    fn test_access_stats() {
613        let config = TierConfig::default();
614        let tracker = AccessTracker::new(config);
615        let cid = make_test_cid(b"test");
616
617        for _ in 0..10 {
618            tracker.record_access(&cid);
619        }
620
621        let stats = tracker.get_stats(&cid).unwrap();
622        assert_eq!(stats.total_accesses, 10);
623    }
624
625    #[test]
626    fn test_tier_stats() {
627        let config = TierConfig::default();
628        let tracker = AccessTracker::new(config);
629
630        for i in 0..5 {
631            let cid = make_test_cid(&[i]);
632            tracker.record_access(&cid);
633        }
634
635        let stats = tracker.global_stats();
636        assert_eq!(stats.tracked_blocks, 5);
637        assert_eq!(stats.hot_blocks, 5);
638    }
639
640    #[test]
641    fn test_tier_transitions() {
642        assert_eq!(Tier::Hot.colder(), Some(Tier::Warm));
643        assert_eq!(Tier::Warm.colder(), Some(Tier::Cold));
644        assert_eq!(Tier::Cold.colder(), Some(Tier::Archive));
645        assert_eq!(Tier::Archive.colder(), None);
646
647        assert_eq!(Tier::Archive.hotter(), Some(Tier::Cold));
648        assert_eq!(Tier::Hot.hotter(), None);
649    }
650}