chie_core/
tiered_storage.rs

1//! Tiered storage for CHIE nodes.
2//!
3//! This module provides automatic content placement across different storage tiers:
4//! - Hot tier (SSD): Frequently accessed content
5//! - Warm tier (HDD): Moderately accessed content
6//! - Cold tier (Archive): Rarely accessed content
7//!
8//! Content is automatically promoted/demoted based on access patterns.
9
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::path::PathBuf;
13use std::sync::RwLock;
14use std::time::Instant;
15use tracing::{debug, info};
16
17/// Storage tier types.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
19pub enum StorageTier {
20    /// Fast storage (SSD) for hot content.
21    Hot,
22    /// Medium storage (HDD) for warm content.
23    #[default]
24    Warm,
25    /// Archive storage for cold content.
26    Cold,
27}
28
29/// Configuration for a single storage tier.
30#[derive(Debug, Clone)]
31pub struct TierConfig {
32    /// Tier type.
33    pub tier: StorageTier,
34    /// Storage path for this tier.
35    pub path: PathBuf,
36    /// Maximum capacity (bytes).
37    pub capacity: u64,
38    /// Read speed (MB/s) for planning.
39    pub read_speed_mbps: u32,
40    /// Write speed (MB/s) for planning.
41    pub write_speed_mbps: u32,
42    /// Whether this tier is enabled.
43    pub enabled: bool,
44}
45
46impl TierConfig {
47    /// Create a new tier config.
48    #[must_use]
49    pub fn new(tier: StorageTier, path: impl Into<PathBuf>, capacity: u64) -> Self {
50        let (read_speed, write_speed) = match tier {
51            StorageTier::Hot => (500, 400),  // SSD
52            StorageTier::Warm => (150, 100), // HDD
53            StorageTier::Cold => (50, 30),   // Archive
54        };
55
56        Self {
57            tier,
58            path: path.into(),
59            capacity,
60            read_speed_mbps: read_speed,
61            write_speed_mbps: write_speed,
62            enabled: true,
63        }
64    }
65}
66
67/// Configuration for tiered storage.
68#[derive(Debug, Clone)]
69pub struct TieredStorageConfig {
70    /// Hot tier configuration.
71    pub hot: Option<TierConfig>,
72    /// Warm tier configuration.
73    pub warm: TierConfig,
74    /// Cold tier configuration.
75    pub cold: Option<TierConfig>,
76    /// Minimum access count to promote to hot.
77    pub hot_promotion_threshold: u32,
78    /// Maximum inactivity to demote from hot (seconds).
79    pub hot_demotion_inactive_secs: u64,
80    /// Maximum inactivity to demote to cold (seconds).
81    pub cold_demotion_inactive_secs: u64,
82    /// How often to run tier rebalancing (seconds).
83    pub rebalance_interval_secs: u64,
84    /// Maximum bytes to move per rebalance cycle.
85    pub max_move_per_cycle: u64,
86}
87
88impl Default for TieredStorageConfig {
89    fn default() -> Self {
90        Self {
91            hot: None, // SSD optional
92            warm: TierConfig::new(
93                StorageTier::Warm,
94                "/var/chie/warm",
95                100 * 1024 * 1024 * 1024,
96            ),
97            cold: None, // Archive optional
98            hot_promotion_threshold: 10,
99            hot_demotion_inactive_secs: 3600,           // 1 hour
100            cold_demotion_inactive_secs: 7 * 24 * 3600, // 1 week
101            rebalance_interval_secs: 300,               // 5 minutes
102            max_move_per_cycle: 1024 * 1024 * 1024,     // 1 GB
103        }
104    }
105}
106
107/// Content location in tiered storage.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ContentLocation {
110    /// Content CID.
111    pub cid: String,
112    /// Current storage tier.
113    pub tier: StorageTier,
114    /// Size in bytes.
115    pub size: u64,
116    /// Access count.
117    pub access_count: u32,
118    /// Last access time (Unix timestamp).
119    pub last_accessed: u64,
120    /// When content was placed in current tier (Unix timestamp).
121    pub tier_placed_at: u64,
122}
123
124/// Access record for content.
125#[derive(Debug, Clone)]
126#[allow(dead_code)]
127struct AccessRecord {
128    timestamp: Instant,
129    cid: String,
130}
131
132/// Tiered storage manager.
133pub struct TieredStorageManager {
134    /// Configuration.
135    config: TieredStorageConfig,
136    /// Content locations.
137    locations: RwLock<HashMap<String, ContentLocation>>,
138    /// Tier usage (tier -> used bytes).
139    tier_usage: RwLock<HashMap<StorageTier, u64>>,
140    /// Recent access history.
141    access_history: RwLock<VecDeque<AccessRecord>>,
142    /// Pending moves (source tier -> target tier -> CIDs).
143    pending_moves: RwLock<Vec<PendingMove>>,
144}
145
146/// A pending content move between tiers.
147#[derive(Debug, Clone)]
148pub struct PendingMove {
149    /// Content CID.
150    pub cid: String,
151    /// Source tier.
152    pub from: StorageTier,
153    /// Target tier.
154    pub to: StorageTier,
155    /// Content size.
156    pub size: u64,
157    /// Priority (higher = sooner).
158    pub priority: u32,
159}
160
161impl TieredStorageManager {
162    /// Create a new tiered storage manager.
163    #[must_use]
164    pub fn new(config: TieredStorageConfig) -> Self {
165        let mut tier_usage = HashMap::new();
166        tier_usage.insert(StorageTier::Warm, 0);
167        if config.hot.is_some() {
168            tier_usage.insert(StorageTier::Hot, 0);
169        }
170        if config.cold.is_some() {
171            tier_usage.insert(StorageTier::Cold, 0);
172        }
173
174        Self {
175            config,
176            locations: RwLock::new(HashMap::new()),
177            tier_usage: RwLock::new(tier_usage),
178            access_history: RwLock::new(VecDeque::with_capacity(10000)),
179            pending_moves: RwLock::new(Vec::new()),
180        }
181    }
182
183    /// Register new content with initial tier placement.
184    #[must_use]
185    pub fn register_content(&self, cid: &str, size: u64) -> StorageTier {
186        let initial_tier = self.determine_initial_tier(size);
187        let now = current_timestamp();
188
189        let location = ContentLocation {
190            cid: cid.to_string(),
191            tier: initial_tier,
192            size,
193            access_count: 0,
194            last_accessed: now,
195            tier_placed_at: now,
196        };
197
198        {
199            let mut locations = self.locations.write().unwrap();
200            locations.insert(cid.to_string(), location);
201        }
202
203        {
204            let mut usage = self.tier_usage.write().unwrap();
205            *usage.entry(initial_tier).or_insert(0) += size;
206        }
207
208        info!(
209            "Registered content {} ({} bytes) in {:?} tier",
210            cid, size, initial_tier
211        );
212        initial_tier
213    }
214
215    /// Record content access.
216    pub fn record_access(&self, cid: &str) {
217        let mut locations = self.locations.write().unwrap();
218        if let Some(location) = locations.get_mut(cid) {
219            location.access_count += 1;
220            location.last_accessed = current_timestamp();
221        }
222
223        // Record in history
224        let mut history = self.access_history.write().unwrap();
225        history.push_back(AccessRecord {
226            timestamp: Instant::now(),
227            cid: cid.to_string(),
228        });
229
230        // Limit history size
231        while history.len() > 10000 {
232            history.pop_front();
233        }
234    }
235
236    /// Get content location.
237    #[must_use]
238    #[inline]
239    pub fn get_location(&self, cid: &str) -> Option<ContentLocation> {
240        let locations = self.locations.read().unwrap();
241        locations.get(cid).cloned()
242    }
243
244    /// Get path for content based on its tier.
245    #[must_use]
246    #[inline]
247    pub fn get_content_path(&self, cid: &str) -> Option<PathBuf> {
248        let locations = self.locations.read().unwrap();
249        let location = locations.get(cid)?;
250
251        let tier_config = match location.tier {
252            StorageTier::Hot => self.config.hot.as_ref(),
253            StorageTier::Warm => Some(&self.config.warm),
254            StorageTier::Cold => self.config.cold.as_ref(),
255        };
256
257        tier_config.map(|c| c.path.join(cid))
258    }
259
260    /// Determine initial tier for new content.
261    #[inline]
262    fn determine_initial_tier(&self, size: u64) -> StorageTier {
263        // Small content goes to hot tier if available
264        if size < 10 * 1024 * 1024 {
265            if let Some(hot) = &self.config.hot {
266                if hot.enabled && self.has_space(StorageTier::Hot, size) {
267                    return StorageTier::Hot;
268                }
269            }
270        }
271
272        // Default to warm tier
273        if self.has_space(StorageTier::Warm, size) {
274            return StorageTier::Warm;
275        }
276
277        // Fall back to cold tier
278        if let Some(cold) = &self.config.cold {
279            if cold.enabled && self.has_space(StorageTier::Cold, size) {
280                return StorageTier::Cold;
281            }
282        }
283
284        // Default to warm even if over capacity
285        StorageTier::Warm
286    }
287
288    /// Check if tier has space for content.
289    #[inline]
290    fn has_space(&self, tier: StorageTier, size: u64) -> bool {
291        let capacity = match tier {
292            StorageTier::Hot => self.config.hot.as_ref().map(|c| c.capacity).unwrap_or(0),
293            StorageTier::Warm => self.config.warm.capacity,
294            StorageTier::Cold => self.config.cold.as_ref().map(|c| c.capacity).unwrap_or(0),
295        };
296
297        let usage = self.tier_usage.read().unwrap();
298        let used = *usage.get(&tier).unwrap_or(&0);
299
300        used + size <= capacity
301    }
302
303    /// Analyze content for tier changes.
304    #[must_use]
305    #[inline]
306    pub fn analyze_tier_changes(&self) -> Vec<PendingMove> {
307        let mut moves = Vec::new();
308        let now = current_timestamp();
309        let locations = self.locations.read().unwrap();
310
311        for location in locations.values() {
312            // Check for promotion to hot
313            if location.tier != StorageTier::Hot
314                && location.access_count >= self.config.hot_promotion_threshold
315                && self.config.hot.is_some()
316                && self.has_space(StorageTier::Hot, location.size)
317            {
318                moves.push(PendingMove {
319                    cid: location.cid.clone(),
320                    from: location.tier,
321                    to: StorageTier::Hot,
322                    size: location.size,
323                    priority: location.access_count,
324                });
325                continue;
326            }
327
328            // Check for demotion from hot
329            if location.tier == StorageTier::Hot {
330                let inactive_secs = now.saturating_sub(location.last_accessed);
331                if inactive_secs > self.config.hot_demotion_inactive_secs {
332                    moves.push(PendingMove {
333                        cid: location.cid.clone(),
334                        from: StorageTier::Hot,
335                        to: StorageTier::Warm,
336                        size: location.size,
337                        priority: 100 - location.access_count.min(100),
338                    });
339                    continue;
340                }
341            }
342
343            // Check for demotion to cold
344            if location.tier == StorageTier::Warm && self.config.cold.is_some() {
345                let inactive_secs = now.saturating_sub(location.last_accessed);
346                if inactive_secs > self.config.cold_demotion_inactive_secs
347                    && self.has_space(StorageTier::Cold, location.size)
348                {
349                    moves.push(PendingMove {
350                        cid: location.cid.clone(),
351                        from: StorageTier::Warm,
352                        to: StorageTier::Cold,
353                        size: location.size,
354                        priority: 0,
355                    });
356                }
357            }
358        }
359
360        // Sort by priority (highest first)
361        moves.sort_by(|a, b| b.priority.cmp(&a.priority));
362
363        moves
364    }
365
366    /// Execute a tier move (call after actually moving the data).
367    pub fn execute_move(&self, cid: &str, new_tier: StorageTier) {
368        let mut locations = self.locations.write().unwrap();
369        let mut usage = self.tier_usage.write().unwrap();
370
371        if let Some(location) = locations.get_mut(cid) {
372            let old_tier = location.tier;
373            let size = location.size;
374
375            // Update usage
376            if let Some(old_usage) = usage.get_mut(&old_tier) {
377                *old_usage = old_usage.saturating_sub(size);
378            }
379            *usage.entry(new_tier).or_insert(0) += size;
380
381            // Update location
382            location.tier = new_tier;
383            location.tier_placed_at = current_timestamp();
384
385            debug!("Moved {} from {:?} to {:?}", cid, old_tier, new_tier);
386        }
387    }
388
389    /// Remove content from tracking.
390    pub fn remove_content(&self, cid: &str) {
391        let mut locations = self.locations.write().unwrap();
392        let mut usage = self.tier_usage.write().unwrap();
393
394        if let Some(location) = locations.remove(cid) {
395            if let Some(tier_usage) = usage.get_mut(&location.tier) {
396                *tier_usage = tier_usage.saturating_sub(location.size);
397            }
398        }
399    }
400
401    /// Get tier statistics.
402    #[must_use]
403    pub fn tier_stats(&self) -> TierStats {
404        let usage = self.tier_usage.read().unwrap();
405        let locations = self.locations.read().unwrap();
406
407        let hot_used = *usage.get(&StorageTier::Hot).unwrap_or(&0);
408        let warm_used = *usage.get(&StorageTier::Warm).unwrap_or(&0);
409        let cold_used = *usage.get(&StorageTier::Cold).unwrap_or(&0);
410
411        let hot_capacity = self.config.hot.as_ref().map(|c| c.capacity).unwrap_or(0);
412        let warm_capacity = self.config.warm.capacity;
413        let cold_capacity = self.config.cold.as_ref().map(|c| c.capacity).unwrap_or(0);
414
415        let content_by_tier = locations.values().fold(HashMap::new(), |mut acc, loc| {
416            *acc.entry(loc.tier).or_insert(0) += 1;
417            acc
418        });
419
420        TierStats {
421            hot_used,
422            hot_capacity,
423            hot_content_count: *content_by_tier.get(&StorageTier::Hot).unwrap_or(&0),
424            warm_used,
425            warm_capacity,
426            warm_content_count: *content_by_tier.get(&StorageTier::Warm).unwrap_or(&0),
427            cold_used,
428            cold_capacity,
429            cold_content_count: *content_by_tier.get(&StorageTier::Cold).unwrap_or(&0),
430            total_content: locations.len(),
431        }
432    }
433
434    /// Get pending moves.
435    #[must_use]
436    #[inline]
437    pub fn get_pending_moves(&self) -> Vec<PendingMove> {
438        self.pending_moves.read().unwrap().clone()
439    }
440
441    /// Get the storage path for a specific tier.
442    #[must_use]
443    #[inline]
444    pub fn get_tier_path(&self, tier: StorageTier) -> Option<PathBuf> {
445        match tier {
446            StorageTier::Hot => self.config.hot.as_ref().map(|c| c.path.clone()),
447            StorageTier::Warm => Some(self.config.warm.path.clone()),
448            StorageTier::Cold => self.config.cold.as_ref().map(|c| c.path.clone()),
449        }
450    }
451
452    /// Get the tier configuration.
453    #[must_use]
454    #[inline]
455    pub fn get_tier_config(&self, tier: StorageTier) -> Option<&TierConfig> {
456        match tier {
457            StorageTier::Hot => self.config.hot.as_ref(),
458            StorageTier::Warm => Some(&self.config.warm),
459            StorageTier::Cold => self.config.cold.as_ref(),
460        }
461    }
462
463    /// Run a rebalance cycle.
464    #[must_use]
465    pub fn rebalance(&self) -> RebalanceResult {
466        let moves = self.analyze_tier_changes();
467        let mut bytes_moved = 0u64;
468        let mut moves_executed = 0;
469
470        let mut pending = self.pending_moves.write().unwrap();
471        pending.clear();
472
473        for m in moves {
474            if bytes_moved + m.size > self.config.max_move_per_cycle {
475                // Queue for next cycle
476                pending.push(m);
477            } else {
478                // Would execute move here in real implementation
479                bytes_moved += m.size;
480                moves_executed += 1;
481            }
482        }
483
484        RebalanceResult {
485            moves_executed,
486            bytes_moved,
487            pending_moves: pending.len(),
488        }
489    }
490}
491
492/// Tier statistics.
493#[derive(Debug, Clone, Serialize, Deserialize, Default)]
494pub struct TierStats {
495    /// Hot tier used bytes.
496    pub hot_used: u64,
497    /// Hot tier capacity.
498    pub hot_capacity: u64,
499    /// Hot tier content count.
500    pub hot_content_count: usize,
501    /// Warm tier used bytes.
502    pub warm_used: u64,
503    /// Warm tier capacity.
504    pub warm_capacity: u64,
505    /// Warm tier content count.
506    pub warm_content_count: usize,
507    /// Cold tier used bytes.
508    pub cold_used: u64,
509    /// Cold tier capacity.
510    pub cold_capacity: u64,
511    /// Cold tier content count.
512    pub cold_content_count: usize,
513    /// Total content items.
514    pub total_content: usize,
515}
516
517/// Result of a rebalance operation.
518#[derive(Debug, Clone)]
519pub struct RebalanceResult {
520    /// Number of moves executed.
521    pub moves_executed: usize,
522    /// Total bytes moved.
523    pub bytes_moved: u64,
524    /// Number of pending moves for next cycle.
525    pub pending_moves: usize,
526}
527
528/// Get current Unix timestamp.
529fn current_timestamp() -> u64 {
530    std::time::SystemTime::now()
531        .duration_since(std::time::UNIX_EPOCH)
532        .map(|d| d.as_secs())
533        .unwrap_or(0)
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[test]
541    fn test_tiered_storage_default_config() {
542        let config = TieredStorageConfig::default();
543        assert!(config.hot.is_none());
544        assert!(config.cold.is_none());
545    }
546
547    #[test]
548    fn test_register_content() {
549        let config = TieredStorageConfig::default();
550        let manager = TieredStorageManager::new(config);
551
552        let tier = manager.register_content("QmTest123", 1024 * 1024);
553        assert_eq!(tier, StorageTier::Warm);
554
555        let location = manager.get_location("QmTest123").unwrap();
556        assert_eq!(location.tier, StorageTier::Warm);
557        assert_eq!(location.size, 1024 * 1024);
558    }
559
560    #[test]
561    fn test_record_access() {
562        let config = TieredStorageConfig::default();
563        let manager = TieredStorageManager::new(config);
564
565        let _ = manager.register_content("QmTest123", 1024);
566
567        for _ in 0..5 {
568            manager.record_access("QmTest123");
569        }
570
571        let location = manager.get_location("QmTest123").unwrap();
572        assert_eq!(location.access_count, 5);
573    }
574
575    #[test]
576    fn test_tier_stats() {
577        let config = TieredStorageConfig::default();
578        let manager = TieredStorageManager::new(config);
579
580        let _ = manager.register_content("QmTest1", 1024);
581        let _ = manager.register_content("QmTest2", 2048);
582
583        let stats = manager.tier_stats();
584        assert_eq!(stats.warm_used, 3072);
585        assert_eq!(stats.total_content, 2);
586    }
587
588    #[test]
589    fn test_content_removal() {
590        let config = TieredStorageConfig::default();
591        let manager = TieredStorageManager::new(config);
592
593        let _ = manager.register_content("QmTest123", 1024);
594        assert!(manager.get_location("QmTest123").is_some());
595
596        manager.remove_content("QmTest123");
597        assert!(manager.get_location("QmTest123").is_none());
598    }
599
600    #[test]
601    fn test_hot_tier_placement() {
602        let config = TieredStorageConfig {
603            hot: Some(TierConfig::new(
604                StorageTier::Hot,
605                "/tmp/hot",
606                100 * 1024 * 1024,
607            )),
608            ..Default::default()
609        };
610
611        let manager = TieredStorageManager::new(config);
612
613        // Small content should go to hot tier
614        let tier = manager.register_content("QmSmall", 1024);
615        assert_eq!(tier, StorageTier::Hot);
616
617        // Large content should go to warm tier
618        let tier = manager.register_content("QmLarge", 50 * 1024 * 1024);
619        assert_eq!(tier, StorageTier::Warm);
620    }
621}