1use dashmap::DashMap;
26use ipfrs_core::{Cid, Error, Result};
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
34pub enum Tier {
35 Hot,
37 Warm,
39 Cold,
41 Archive,
43}
44
45impl Tier {
46 pub fn colder(self) -> Option<Tier> {
48 match self {
49 Tier::Hot => Some(Tier::Warm),
50 Tier::Warm => Some(Tier::Cold),
51 Tier::Cold => Some(Tier::Archive),
52 Tier::Archive => None,
53 }
54 }
55
56 pub fn hotter(self) -> Option<Tier> {
58 match self {
59 Tier::Archive => Some(Tier::Cold),
60 Tier::Cold => Some(Tier::Warm),
61 Tier::Warm => Some(Tier::Hot),
62 Tier::Hot => None,
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct TierConfig {
70 pub hot_threshold: f64,
72 pub warm_threshold: f64,
74 pub cold_threshold: f64,
76 pub time_window_secs: u64,
78 pub decay_factor: f64,
80 pub cleanup_interval_secs: u64,
82}
83
84impl Default for TierConfig {
85 fn default() -> Self {
86 Self {
87 hot_threshold: 10.0, warm_threshold: 1.0, cold_threshold: 0.1, time_window_secs: 3600, decay_factor: 0.9, cleanup_interval_secs: 300, }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct AccessStats {
100 pub total_accesses: u64,
102 pub weighted_accesses: f64,
104 pub last_access: u64,
106 pub first_access: u64,
108 pub tier: Tier,
110}
111
112impl AccessStats {
113 fn new() -> Self {
114 let now = SystemTime::now()
115 .duration_since(UNIX_EPOCH)
116 .unwrap_or_default()
117 .as_secs();
118
119 Self {
120 total_accesses: 1,
121 weighted_accesses: 1.0,
122 last_access: now,
123 first_access: now,
124 tier: Tier::Hot, }
126 }
127
128 fn record_access(&mut self) {
129 let now = SystemTime::now()
130 .duration_since(UNIX_EPOCH)
131 .unwrap_or_default()
132 .as_secs();
133
134 self.total_accesses += 1;
135 self.weighted_accesses += 1.0;
136 self.last_access = now;
137 }
138
139 fn access_rate(&self, time_window_secs: u64) -> f64 {
141 let now = SystemTime::now()
142 .duration_since(UNIX_EPOCH)
143 .unwrap_or_default()
144 .as_secs();
145
146 let elapsed = now.saturating_sub(self.first_access).max(1);
147 let window = elapsed.min(time_window_secs) as f64;
148
149 self.weighted_accesses * 3600.0 / window
151 }
152
153 fn apply_decay(&mut self, decay_factor: f64) {
155 self.weighted_accesses *= decay_factor;
156 }
157}
158
159pub struct AccessTracker {
161 stats: DashMap<Vec<u8>, AccessStats>,
163 config: TierConfig,
165 last_cleanup: RwLock<Instant>,
167 global_stats: GlobalAccessStats,
169}
170
171#[derive(Default)]
173struct GlobalAccessStats {
174 total_accesses: AtomicU64,
175 hot_blocks: AtomicU64,
176 warm_blocks: AtomicU64,
177 cold_blocks: AtomicU64,
178 archive_blocks: AtomicU64,
179}
180
181impl AccessTracker {
182 pub fn new(config: TierConfig) -> Self {
184 Self {
185 stats: DashMap::new(),
186 config,
187 last_cleanup: RwLock::new(Instant::now()),
188 global_stats: GlobalAccessStats::default(),
189 }
190 }
191
192 pub fn record_access(&self, cid: &Cid) {
194 let key = cid.to_bytes();
195 self.global_stats
196 .total_accesses
197 .fetch_add(1, Ordering::Relaxed);
198
199 self.stats
200 .entry(key)
201 .and_modify(|stats| {
202 let old_tier = stats.tier;
203 stats.record_access();
204 let new_tier = self.classify_tier(stats);
205 if old_tier != new_tier {
206 self.update_tier_counts(old_tier, new_tier);
207 stats.tier = new_tier;
208 }
209 })
210 .or_insert_with(|| {
211 self.global_stats.hot_blocks.fetch_add(1, Ordering::Relaxed);
212 AccessStats::new()
213 });
214
215 self.maybe_cleanup();
217 }
218
219 pub fn get_tier(&self, cid: &Cid) -> Option<Tier> {
221 self.stats.get(&cid.to_bytes()).map(|s| s.tier)
222 }
223
224 pub fn is_hot(&self, cid: &Cid) -> bool {
226 self.get_tier(cid) == Some(Tier::Hot)
227 }
228
229 pub fn is_cold(&self, cid: &Cid) -> bool {
231 matches!(self.get_tier(cid), Some(Tier::Cold) | Some(Tier::Archive))
232 }
233
234 pub fn get_stats(&self, cid: &Cid) -> Option<AccessStats> {
236 self.stats.get(&cid.to_bytes()).map(|s| s.clone())
237 }
238
239 pub fn list_by_tier(&self, tier: Tier) -> Result<Vec<Cid>> {
241 let mut result = Vec::new();
242 for entry in self.stats.iter() {
243 if entry.value().tier == tier {
244 let cid = Cid::try_from(entry.key().clone())
245 .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
246 result.push(cid);
247 }
248 }
249 Ok(result)
250 }
251
252 pub fn get_cold_candidates(&self, max_count: usize) -> Result<Vec<(Cid, Tier)>> {
254 let mut candidates: Vec<_> = self
255 .stats
256 .iter()
257 .filter_map(|entry| {
258 let stats = entry.value();
259 if let Some(colder_tier) = stats.tier.colder() {
260 let rate = stats.access_rate(self.config.time_window_secs);
261 let threshold = self.tier_threshold(colder_tier);
262 if rate < threshold {
263 let cid = Cid::try_from(entry.key().clone()).ok()?;
264 return Some((cid, colder_tier, rate));
265 }
266 }
267 None
268 })
269 .collect();
270
271 candidates.sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
273
274 Ok(candidates
275 .into_iter()
276 .take(max_count)
277 .map(|(cid, tier, _)| (cid, tier))
278 .collect())
279 }
280
281 pub fn set_tier(&self, cid: &Cid, tier: Tier) {
283 let key = cid.to_bytes();
284 if let Some(mut entry) = self.stats.get_mut(&key) {
285 let old_tier = entry.tier;
286 if old_tier != tier {
287 self.update_tier_counts(old_tier, tier);
288 entry.tier = tier;
289 }
290 }
291 }
292
293 pub fn global_stats(&self) -> TierStatsSnapshot {
295 TierStatsSnapshot {
296 total_accesses: self.global_stats.total_accesses.load(Ordering::Relaxed),
297 tracked_blocks: self.stats.len() as u64,
298 hot_blocks: self.global_stats.hot_blocks.load(Ordering::Relaxed),
299 warm_blocks: self.global_stats.warm_blocks.load(Ordering::Relaxed),
300 cold_blocks: self.global_stats.cold_blocks.load(Ordering::Relaxed),
301 archive_blocks: self.global_stats.archive_blocks.load(Ordering::Relaxed),
302 }
303 }
304
305 pub fn run_cleanup(&self) {
307 for mut entry in self.stats.iter_mut() {
308 let stats = entry.value_mut();
309 let old_tier = stats.tier;
310
311 stats.apply_decay(self.config.decay_factor);
313
314 let new_tier = self.classify_tier(stats);
316 if old_tier != new_tier {
317 self.update_tier_counts(old_tier, new_tier);
318 stats.tier = new_tier;
319 }
320 }
321
322 *self.last_cleanup.write() = Instant::now();
323 }
324
325 fn classify_tier(&self, stats: &AccessStats) -> Tier {
327 let rate = stats.access_rate(self.config.time_window_secs);
328
329 if rate >= self.config.hot_threshold {
330 Tier::Hot
331 } else if rate >= self.config.warm_threshold {
332 Tier::Warm
333 } else if rate >= self.config.cold_threshold {
334 Tier::Cold
335 } else {
336 Tier::Archive
337 }
338 }
339
340 fn tier_threshold(&self, tier: Tier) -> f64 {
342 match tier {
343 Tier::Hot => self.config.hot_threshold,
344 Tier::Warm => self.config.warm_threshold,
345 Tier::Cold => self.config.cold_threshold,
346 Tier::Archive => 0.0,
347 }
348 }
349
350 fn update_tier_counts(&self, old_tier: Tier, new_tier: Tier) {
352 match old_tier {
353 Tier::Hot => self.global_stats.hot_blocks.fetch_sub(1, Ordering::Relaxed),
354 Tier::Warm => self
355 .global_stats
356 .warm_blocks
357 .fetch_sub(1, Ordering::Relaxed),
358 Tier::Cold => self
359 .global_stats
360 .cold_blocks
361 .fetch_sub(1, Ordering::Relaxed),
362 Tier::Archive => self
363 .global_stats
364 .archive_blocks
365 .fetch_sub(1, Ordering::Relaxed),
366 };
367 match new_tier {
368 Tier::Hot => self.global_stats.hot_blocks.fetch_add(1, Ordering::Relaxed),
369 Tier::Warm => self
370 .global_stats
371 .warm_blocks
372 .fetch_add(1, Ordering::Relaxed),
373 Tier::Cold => self
374 .global_stats
375 .cold_blocks
376 .fetch_add(1, Ordering::Relaxed),
377 Tier::Archive => self
378 .global_stats
379 .archive_blocks
380 .fetch_add(1, Ordering::Relaxed),
381 };
382 }
383
384 fn maybe_cleanup(&self) {
386 let should_cleanup = {
387 let last = self.last_cleanup.read();
388 last.elapsed() > Duration::from_secs(self.config.cleanup_interval_secs)
389 };
390
391 if should_cleanup {
392 self.run_cleanup();
393 }
394 }
395
396 pub fn remove(&self, cid: &Cid) {
398 if let Some((_, stats)) = self.stats.remove(&cid.to_bytes()) {
399 match stats.tier {
400 Tier::Hot => self.global_stats.hot_blocks.fetch_sub(1, Ordering::Relaxed),
401 Tier::Warm => self
402 .global_stats
403 .warm_blocks
404 .fetch_sub(1, Ordering::Relaxed),
405 Tier::Cold => self
406 .global_stats
407 .cold_blocks
408 .fetch_sub(1, Ordering::Relaxed),
409 Tier::Archive => self
410 .global_stats
411 .archive_blocks
412 .fetch_sub(1, Ordering::Relaxed),
413 };
414 }
415 }
416
417 pub fn clear(&self) {
419 self.stats.clear();
420 self.global_stats.total_accesses.store(0, Ordering::Relaxed);
421 self.global_stats.hot_blocks.store(0, Ordering::Relaxed);
422 self.global_stats.warm_blocks.store(0, Ordering::Relaxed);
423 self.global_stats.cold_blocks.store(0, Ordering::Relaxed);
424 self.global_stats.archive_blocks.store(0, Ordering::Relaxed);
425 }
426}
427
428#[derive(Debug, Clone)]
430pub struct TierStatsSnapshot {
431 pub total_accesses: u64,
433 pub tracked_blocks: u64,
435 pub hot_blocks: u64,
437 pub warm_blocks: u64,
439 pub cold_blocks: u64,
441 pub archive_blocks: u64,
443}
444
445use crate::traits::BlockStore;
447use async_trait::async_trait;
448use ipfrs_core::Block;
449
450pub struct TieredStore<H: BlockStore, C: BlockStore> {
451 hot_store: H,
453 cold_store: C,
455 tracker: AccessTracker,
457 config: TierConfig,
459}
460
461impl<H: BlockStore, C: BlockStore> TieredStore<H, C> {
462 pub fn new(hot_store: H, cold_store: C, config: TierConfig) -> Self {
464 Self {
465 hot_store,
466 cold_store,
467 tracker: AccessTracker::new(config.clone()),
468 config,
469 }
470 }
471
472 pub fn tracker(&self) -> &AccessTracker {
474 &self.tracker
475 }
476
477 pub fn config(&self) -> &TierConfig {
479 &self.config
480 }
481
482 pub async fn migrate_cold_blocks(&self, max_count: usize) -> Result<usize> {
484 let candidates = self.tracker.get_cold_candidates(max_count)?;
485 let mut migrated = 0;
486
487 for (cid, _new_tier) in candidates {
488 if let Some(block) = self.hot_store.get(&cid).await? {
490 self.cold_store.put(&block).await?;
492 self.hot_store.delete(&cid).await?;
494 migrated += 1;
495 }
496 }
497
498 Ok(migrated)
499 }
500
501 pub async fn promote_block(&self, cid: &Cid) -> Result<bool> {
503 if let Some(block) = self.cold_store.get(cid).await? {
504 self.hot_store.put(&block).await?;
505 self.cold_store.delete(cid).await?;
506 self.tracker.set_tier(cid, Tier::Hot);
507 Ok(true)
508 } else {
509 Ok(false)
510 }
511 }
512}
513
514#[async_trait]
515impl<H: BlockStore, C: BlockStore> BlockStore for TieredStore<H, C> {
516 async fn put(&self, block: &Block) -> Result<()> {
517 self.tracker.record_access(block.cid());
519 self.hot_store.put(block).await
520 }
521
522 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
523 self.tracker.record_access(cid);
524
525 if let Some(block) = self.hot_store.get(cid).await? {
527 return Ok(Some(block));
528 }
529
530 if let Some(block) = self.cold_store.get(cid).await? {
532 if self.tracker.is_hot(cid) {
534 self.hot_store.put(&block).await?;
536 self.cold_store.delete(cid).await?;
537 }
538 return Ok(Some(block));
539 }
540
541 Ok(None)
542 }
543
544 async fn has(&self, cid: &Cid) -> Result<bool> {
545 if self.hot_store.has(cid).await? {
546 return Ok(true);
547 }
548 self.cold_store.has(cid).await
549 }
550
551 async fn delete(&self, cid: &Cid) -> Result<()> {
552 self.tracker.remove(cid);
553 let _ = self.hot_store.delete(cid).await;
555 let _ = self.cold_store.delete(cid).await;
556 Ok(())
557 }
558
559 fn list_cids(&self) -> Result<Vec<Cid>> {
560 let mut cids = self.hot_store.list_cids()?;
562 let cold_cids = self.cold_store.list_cids()?;
563 cids.extend(cold_cids);
564 cids.sort_by_key(|a| a.to_bytes());
566 cids.dedup_by(|a, b| a.to_bytes() == b.to_bytes());
567 Ok(cids)
568 }
569
570 fn len(&self) -> usize {
571 self.hot_store.len() + self.cold_store.len()
572 }
573
574 fn is_empty(&self) -> bool {
575 self.hot_store.is_empty() && self.cold_store.is_empty()
576 }
577
578 async fn flush(&self) -> Result<()> {
579 self.hot_store.flush().await?;
580 self.cold_store.flush().await
581 }
582
583 async fn close(&self) -> Result<()> {
584 self.hot_store.close().await?;
585 self.cold_store.close().await
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use bytes::Bytes;
593 use ipfrs_core::Block;
594
595 fn make_test_cid(data: &[u8]) -> Cid {
596 let block = Block::new(Bytes::copy_from_slice(data)).unwrap();
597 *block.cid()
598 }
599
600 #[test]
601 fn test_tier_classification() {
602 let config = TierConfig::default();
603 let tracker = AccessTracker::new(config);
604 let cid = make_test_cid(b"test");
605
606 tracker.record_access(&cid);
608 assert!(tracker.is_hot(&cid));
609 }
610
611 #[test]
612 fn test_access_stats() {
613 let config = TierConfig::default();
614 let tracker = AccessTracker::new(config);
615 let cid = make_test_cid(b"test");
616
617 for _ in 0..10 {
618 tracker.record_access(&cid);
619 }
620
621 let stats = tracker.get_stats(&cid).unwrap();
622 assert_eq!(stats.total_accesses, 10);
623 }
624
625 #[test]
626 fn test_tier_stats() {
627 let config = TierConfig::default();
628 let tracker = AccessTracker::new(config);
629
630 for i in 0..5 {
631 let cid = make_test_cid(&[i]);
632 tracker.record_access(&cid);
633 }
634
635 let stats = tracker.global_stats();
636 assert_eq!(stats.tracked_blocks, 5);
637 assert_eq!(stats.hot_blocks, 5);
638 }
639
640 #[test]
641 fn test_tier_transitions() {
642 assert_eq!(Tier::Hot.colder(), Some(Tier::Warm));
643 assert_eq!(Tier::Warm.colder(), Some(Tier::Cold));
644 assert_eq!(Tier::Cold.colder(), Some(Tier::Archive));
645 assert_eq!(Tier::Archive.colder(), None);
646
647 assert_eq!(Tier::Archive.hotter(), Some(Tier::Cold));
648 assert_eq!(Tier::Hot.hotter(), None);
649 }
650}