use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::RwLock;
use std::time::Instant;
use tracing::{debug, info};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub enum StorageTier {
Hot,
#[default]
Warm,
Cold,
}
#[derive(Debug, Clone)]
pub struct TierConfig {
pub tier: StorageTier,
pub path: PathBuf,
pub capacity: u64,
pub read_speed_mbps: u32,
pub write_speed_mbps: u32,
pub enabled: bool,
}
impl TierConfig {
#[must_use]
pub fn new(tier: StorageTier, path: impl Into<PathBuf>, capacity: u64) -> Self {
let (read_speed, write_speed) = match tier {
StorageTier::Hot => (500, 400), StorageTier::Warm => (150, 100), StorageTier::Cold => (50, 30), };
Self {
tier,
path: path.into(),
capacity,
read_speed_mbps: read_speed,
write_speed_mbps: write_speed,
enabled: true,
}
}
}
#[derive(Debug, Clone)]
pub struct TieredStorageConfig {
pub hot: Option<TierConfig>,
pub warm: TierConfig,
pub cold: Option<TierConfig>,
pub hot_promotion_threshold: u32,
pub hot_demotion_inactive_secs: u64,
pub cold_demotion_inactive_secs: u64,
pub rebalance_interval_secs: u64,
pub max_move_per_cycle: u64,
}
impl Default for TieredStorageConfig {
fn default() -> Self {
Self {
hot: None, warm: TierConfig::new(
StorageTier::Warm,
"/var/chie/warm",
100 * 1024 * 1024 * 1024,
),
cold: None, hot_promotion_threshold: 10,
hot_demotion_inactive_secs: 3600, cold_demotion_inactive_secs: 7 * 24 * 3600, rebalance_interval_secs: 300, max_move_per_cycle: 1024 * 1024 * 1024, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentLocation {
pub cid: String,
pub tier: StorageTier,
pub size: u64,
pub access_count: u32,
pub last_accessed: u64,
pub tier_placed_at: u64,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct AccessRecord {
timestamp: Instant,
cid: String,
}
pub struct TieredStorageManager {
config: TieredStorageConfig,
locations: RwLock<HashMap<String, ContentLocation>>,
tier_usage: RwLock<HashMap<StorageTier, u64>>,
access_history: RwLock<VecDeque<AccessRecord>>,
pending_moves: RwLock<Vec<PendingMove>>,
}
#[derive(Debug, Clone)]
pub struct PendingMove {
pub cid: String,
pub from: StorageTier,
pub to: StorageTier,
pub size: u64,
pub priority: u32,
}
impl TieredStorageManager {
#[must_use]
pub fn new(config: TieredStorageConfig) -> Self {
let mut tier_usage = HashMap::new();
tier_usage.insert(StorageTier::Warm, 0);
if config.hot.is_some() {
tier_usage.insert(StorageTier::Hot, 0);
}
if config.cold.is_some() {
tier_usage.insert(StorageTier::Cold, 0);
}
Self {
config,
locations: RwLock::new(HashMap::new()),
tier_usage: RwLock::new(tier_usage),
access_history: RwLock::new(VecDeque::with_capacity(10000)),
pending_moves: RwLock::new(Vec::new()),
}
}
#[must_use]
pub fn register_content(&self, cid: &str, size: u64) -> StorageTier {
let initial_tier = self.determine_initial_tier(size);
let now = current_timestamp();
let location = ContentLocation {
cid: cid.to_string(),
tier: initial_tier,
size,
access_count: 0,
last_accessed: now,
tier_placed_at: now,
};
{
let mut locations = self.locations.write().unwrap();
locations.insert(cid.to_string(), location);
}
{
let mut usage = self.tier_usage.write().unwrap();
*usage.entry(initial_tier).or_insert(0) += size;
}
info!(
"Registered content {} ({} bytes) in {:?} tier",
cid, size, initial_tier
);
initial_tier
}
pub fn record_access(&self, cid: &str) {
let mut locations = self.locations.write().unwrap();
if let Some(location) = locations.get_mut(cid) {
location.access_count += 1;
location.last_accessed = current_timestamp();
}
let mut history = self.access_history.write().unwrap();
history.push_back(AccessRecord {
timestamp: Instant::now(),
cid: cid.to_string(),
});
while history.len() > 10000 {
history.pop_front();
}
}
#[must_use]
#[inline]
pub fn get_location(&self, cid: &str) -> Option<ContentLocation> {
let locations = self.locations.read().unwrap();
locations.get(cid).cloned()
}
#[must_use]
#[inline]
pub fn get_content_path(&self, cid: &str) -> Option<PathBuf> {
let locations = self.locations.read().unwrap();
let location = locations.get(cid)?;
let tier_config = match location.tier {
StorageTier::Hot => self.config.hot.as_ref(),
StorageTier::Warm => Some(&self.config.warm),
StorageTier::Cold => self.config.cold.as_ref(),
};
tier_config.map(|c| c.path.join(cid))
}
#[inline]
fn determine_initial_tier(&self, size: u64) -> StorageTier {
if size < 10 * 1024 * 1024 {
if let Some(hot) = &self.config.hot {
if hot.enabled && self.has_space(StorageTier::Hot, size) {
return StorageTier::Hot;
}
}
}
if self.has_space(StorageTier::Warm, size) {
return StorageTier::Warm;
}
if let Some(cold) = &self.config.cold {
if cold.enabled && self.has_space(StorageTier::Cold, size) {
return StorageTier::Cold;
}
}
StorageTier::Warm
}
#[inline]
fn has_space(&self, tier: StorageTier, size: u64) -> bool {
let capacity = match tier {
StorageTier::Hot => self.config.hot.as_ref().map(|c| c.capacity).unwrap_or(0),
StorageTier::Warm => self.config.warm.capacity,
StorageTier::Cold => self.config.cold.as_ref().map(|c| c.capacity).unwrap_or(0),
};
let usage = self.tier_usage.read().unwrap();
let used = *usage.get(&tier).unwrap_or(&0);
used + size <= capacity
}
#[must_use]
#[inline]
pub fn analyze_tier_changes(&self) -> Vec<PendingMove> {
let mut moves = Vec::new();
let now = current_timestamp();
let locations = self.locations.read().unwrap();
for location in locations.values() {
if location.tier != StorageTier::Hot
&& location.access_count >= self.config.hot_promotion_threshold
&& self.config.hot.is_some()
&& self.has_space(StorageTier::Hot, location.size)
{
moves.push(PendingMove {
cid: location.cid.clone(),
from: location.tier,
to: StorageTier::Hot,
size: location.size,
priority: location.access_count,
});
continue;
}
if location.tier == StorageTier::Hot {
let inactive_secs = now.saturating_sub(location.last_accessed);
if inactive_secs > self.config.hot_demotion_inactive_secs {
moves.push(PendingMove {
cid: location.cid.clone(),
from: StorageTier::Hot,
to: StorageTier::Warm,
size: location.size,
priority: 100 - location.access_count.min(100),
});
continue;
}
}
if location.tier == StorageTier::Warm && self.config.cold.is_some() {
let inactive_secs = now.saturating_sub(location.last_accessed);
if inactive_secs > self.config.cold_demotion_inactive_secs
&& self.has_space(StorageTier::Cold, location.size)
{
moves.push(PendingMove {
cid: location.cid.clone(),
from: StorageTier::Warm,
to: StorageTier::Cold,
size: location.size,
priority: 0,
});
}
}
}
moves.sort_by(|a, b| b.priority.cmp(&a.priority));
moves
}
pub fn execute_move(&self, cid: &str, new_tier: StorageTier) {
let mut locations = self.locations.write().unwrap();
let mut usage = self.tier_usage.write().unwrap();
if let Some(location) = locations.get_mut(cid) {
let old_tier = location.tier;
let size = location.size;
if let Some(old_usage) = usage.get_mut(&old_tier) {
*old_usage = old_usage.saturating_sub(size);
}
*usage.entry(new_tier).or_insert(0) += size;
location.tier = new_tier;
location.tier_placed_at = current_timestamp();
debug!("Moved {} from {:?} to {:?}", cid, old_tier, new_tier);
}
}
pub fn remove_content(&self, cid: &str) {
let mut locations = self.locations.write().unwrap();
let mut usage = self.tier_usage.write().unwrap();
if let Some(location) = locations.remove(cid) {
if let Some(tier_usage) = usage.get_mut(&location.tier) {
*tier_usage = tier_usage.saturating_sub(location.size);
}
}
}
#[must_use]
pub fn tier_stats(&self) -> TierStats {
let usage = self.tier_usage.read().unwrap();
let locations = self.locations.read().unwrap();
let hot_used = *usage.get(&StorageTier::Hot).unwrap_or(&0);
let warm_used = *usage.get(&StorageTier::Warm).unwrap_or(&0);
let cold_used = *usage.get(&StorageTier::Cold).unwrap_or(&0);
let hot_capacity = self.config.hot.as_ref().map(|c| c.capacity).unwrap_or(0);
let warm_capacity = self.config.warm.capacity;
let cold_capacity = self.config.cold.as_ref().map(|c| c.capacity).unwrap_or(0);
let content_by_tier = locations.values().fold(HashMap::new(), |mut acc, loc| {
*acc.entry(loc.tier).or_insert(0) += 1;
acc
});
TierStats {
hot_used,
hot_capacity,
hot_content_count: *content_by_tier.get(&StorageTier::Hot).unwrap_or(&0),
warm_used,
warm_capacity,
warm_content_count: *content_by_tier.get(&StorageTier::Warm).unwrap_or(&0),
cold_used,
cold_capacity,
cold_content_count: *content_by_tier.get(&StorageTier::Cold).unwrap_or(&0),
total_content: locations.len(),
}
}
#[must_use]
#[inline]
pub fn get_pending_moves(&self) -> Vec<PendingMove> {
self.pending_moves.read().unwrap().clone()
}
#[must_use]
#[inline]
pub fn get_tier_path(&self, tier: StorageTier) -> Option<PathBuf> {
match tier {
StorageTier::Hot => self.config.hot.as_ref().map(|c| c.path.clone()),
StorageTier::Warm => Some(self.config.warm.path.clone()),
StorageTier::Cold => self.config.cold.as_ref().map(|c| c.path.clone()),
}
}
#[must_use]
#[inline]
pub fn get_tier_config(&self, tier: StorageTier) -> Option<&TierConfig> {
match tier {
StorageTier::Hot => self.config.hot.as_ref(),
StorageTier::Warm => Some(&self.config.warm),
StorageTier::Cold => self.config.cold.as_ref(),
}
}
#[must_use]
pub fn rebalance(&self) -> RebalanceResult {
let moves = self.analyze_tier_changes();
let mut bytes_moved = 0u64;
let mut moves_executed = 0;
let mut pending = self.pending_moves.write().unwrap();
pending.clear();
for m in moves {
if bytes_moved + m.size > self.config.max_move_per_cycle {
pending.push(m);
} else {
bytes_moved += m.size;
moves_executed += 1;
}
}
RebalanceResult {
moves_executed,
bytes_moved,
pending_moves: pending.len(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TierStats {
pub hot_used: u64,
pub hot_capacity: u64,
pub hot_content_count: usize,
pub warm_used: u64,
pub warm_capacity: u64,
pub warm_content_count: usize,
pub cold_used: u64,
pub cold_capacity: u64,
pub cold_content_count: usize,
pub total_content: usize,
}
#[derive(Debug, Clone)]
pub struct RebalanceResult {
pub moves_executed: usize,
pub bytes_moved: u64,
pub pending_moves: usize,
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tiered_storage_default_config() {
let config = TieredStorageConfig::default();
assert!(config.hot.is_none());
assert!(config.cold.is_none());
}
#[test]
fn test_register_content() {
let config = TieredStorageConfig::default();
let manager = TieredStorageManager::new(config);
let tier = manager.register_content("QmTest123", 1024 * 1024);
assert_eq!(tier, StorageTier::Warm);
let location = manager.get_location("QmTest123").unwrap();
assert_eq!(location.tier, StorageTier::Warm);
assert_eq!(location.size, 1024 * 1024);
}
#[test]
fn test_record_access() {
let config = TieredStorageConfig::default();
let manager = TieredStorageManager::new(config);
let _ = manager.register_content("QmTest123", 1024);
for _ in 0..5 {
manager.record_access("QmTest123");
}
let location = manager.get_location("QmTest123").unwrap();
assert_eq!(location.access_count, 5);
}
#[test]
fn test_tier_stats() {
let config = TieredStorageConfig::default();
let manager = TieredStorageManager::new(config);
let _ = manager.register_content("QmTest1", 1024);
let _ = manager.register_content("QmTest2", 2048);
let stats = manager.tier_stats();
assert_eq!(stats.warm_used, 3072);
assert_eq!(stats.total_content, 2);
}
#[test]
fn test_content_removal() {
let config = TieredStorageConfig::default();
let manager = TieredStorageManager::new(config);
let _ = manager.register_content("QmTest123", 1024);
assert!(manager.get_location("QmTest123").is_some());
manager.remove_content("QmTest123");
assert!(manager.get_location("QmTest123").is_none());
}
#[test]
fn test_hot_tier_placement() {
let config = TieredStorageConfig {
hot: Some(TierConfig::new(
StorageTier::Hot,
"/tmp/hot",
100 * 1024 * 1024,
)),
..Default::default()
};
let manager = TieredStorageManager::new(config);
let tier = manager.register_content("QmSmall", 1024);
assert_eq!(tier, StorageTier::Hot);
let tier = manager.register_content("QmLarge", 50 * 1024 * 1024);
assert_eq!(tier, StorageTier::Warm);
}
}