use super::tier::{AccessStats, StorageClass, TIER_MANAGER, TierManager};
use crate::cloud::tier::{
CloudObject, CloudProvider, CloudStorageClass, CloudTierManager, CloudUploadConfig, TierPolicy,
};
use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use alloc::{format, vec};
use lazy_static::lazy_static;
use spin::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum UnifiedTier {
LocalGold,
LocalSilver,
LocalBronze,
CloudHot,
CloudWarm,
CloudCold,
CloudArchive,
}
impl UnifiedTier {
pub fn tier_number(&self) -> u8 {
match self {
UnifiedTier::LocalGold => 0,
UnifiedTier::LocalSilver => 1,
UnifiedTier::LocalBronze => 2,
UnifiedTier::CloudHot => 3,
UnifiedTier::CloudWarm => 4,
UnifiedTier::CloudCold => 5,
UnifiedTier::CloudArchive => 6,
}
}
pub fn cost_factor(&self) -> f64 {
match self {
UnifiedTier::LocalGold => 100.0, UnifiedTier::LocalSilver => 30.0, UnifiedTier::LocalBronze => 10.0, UnifiedTier::CloudHot => 5.0, UnifiedTier::CloudWarm => 2.5, UnifiedTier::CloudCold => 0.5, UnifiedTier::CloudArchive => 0.02, }
}
pub fn retrieval_latency_sec(&self) -> u64 {
match self {
UnifiedTier::LocalGold => 0,
UnifiedTier::LocalSilver => 0,
UnifiedTier::LocalBronze => 0,
UnifiedTier::CloudHot => 1,
UnifiedTier::CloudWarm => 5,
UnifiedTier::CloudCold => 3600, UnifiedTier::CloudArchive => 43200, }
}
pub fn is_local(&self) -> bool {
matches!(
self,
UnifiedTier::LocalGold | UnifiedTier::LocalSilver | UnifiedTier::LocalBronze
)
}
pub fn is_cloud(&self) -> bool {
!self.is_local()
}
pub fn to_local_class(&self) -> Option<StorageClass> {
match self {
UnifiedTier::LocalGold => Some(StorageClass::Gold),
UnifiedTier::LocalSilver => Some(StorageClass::Silver),
UnifiedTier::LocalBronze => Some(StorageClass::Bronze),
_ => None,
}
}
pub fn to_cloud_class(&self) -> Option<CloudStorageClass> {
match self {
UnifiedTier::CloudHot => Some(CloudStorageClass::Hot),
UnifiedTier::CloudWarm => Some(CloudStorageClass::Warm),
UnifiedTier::CloudCold => Some(CloudStorageClass::Cold),
UnifiedTier::CloudArchive => Some(CloudStorageClass::Archive),
_ => None,
}
}
pub fn from_local(class: StorageClass) -> Self {
match class {
StorageClass::Gold => UnifiedTier::LocalGold,
StorageClass::Silver => UnifiedTier::LocalSilver,
StorageClass::Bronze => UnifiedTier::LocalBronze,
}
}
pub fn from_cloud(class: CloudStorageClass) -> Self {
match class {
CloudStorageClass::Hot => UnifiedTier::CloudHot,
CloudStorageClass::Warm => UnifiedTier::CloudWarm,
CloudStorageClass::Cold => UnifiedTier::CloudCold,
CloudStorageClass::Archive => UnifiedTier::CloudArchive,
}
}
pub fn name(&self) -> &'static str {
match self {
UnifiedTier::LocalGold => "Local Gold (NVMe)",
UnifiedTier::LocalSilver => "Local Silver (SSD)",
UnifiedTier::LocalBronze => "Local Bronze (HDD)",
UnifiedTier::CloudHot => "Cloud Hot",
UnifiedTier::CloudWarm => "Cloud Warm",
UnifiedTier::CloudCold => "Cloud Cold",
UnifiedTier::CloudArchive => "Cloud Archive",
}
}
}
#[derive(Debug, Clone)]
pub struct ColdStoragePolicy {
pub age_days: u64,
pub target_tier: UnifiedTier,
pub max_access_count: u64,
pub min_size: u64,
pub max_size: u64,
pub priority: u32,
pub name: String,
pub use_archive_compression: bool,
}
impl ColdStoragePolicy {
pub fn new(name: &str, age_days: u64, target_tier: UnifiedTier) -> Self {
Self {
age_days,
target_tier,
max_access_count: u64::MAX,
min_size: 0,
max_size: 0,
priority: 100,
name: name.to_string(),
use_archive_compression: target_tier.is_cloud(),
}
}
pub fn with_max_access(mut self, count: u64) -> Self {
self.max_access_count = count;
self
}
pub fn with_min_size(mut self, size: u64) -> Self {
self.min_size = size;
self
}
pub fn with_max_size(mut self, size: u64) -> Self {
self.max_size = size;
self
}
pub fn with_priority(mut self, priority: u32) -> Self {
self.priority = priority;
self
}
pub fn with_compression(mut self, enable: bool) -> Self {
self.use_archive_compression = enable;
self
}
pub fn matches(&self, age_days: u64, access_count: u64, size: u64) -> bool {
if age_days < self.age_days {
return false;
}
if access_count > self.max_access_count {
return false;
}
if size < self.min_size {
return false;
}
if self.max_size > 0 && size > self.max_size {
return false;
}
true
}
}
#[derive(Debug, Clone)]
pub struct ObjectState {
pub object_id: u64,
pub current_tier: UnifiedTier,
pub size: u64,
pub created_at: u64,
pub last_access: u64,
pub access_count: u64,
pub archive_compressed: bool,
pub cloud_key: Option<String>,
pub cloud_bucket: Option<String>,
pub cloud_provider: Option<CloudProvider>,
}
impl ObjectState {
pub fn new(object_id: u64, size: u64, timestamp: u64) -> Self {
Self {
object_id,
current_tier: UnifiedTier::LocalSilver,
size,
created_at: timestamp,
last_access: timestamp,
access_count: 0,
archive_compressed: false,
cloud_key: None,
cloud_bucket: None,
cloud_provider: None,
}
}
pub fn age_days(&self, current_time: u64) -> u64 {
let age_ms = current_time.saturating_sub(self.created_at);
age_ms / (24 * 3600 * 1000)
}
pub fn idle_days(&self, current_time: u64) -> u64 {
let idle_ms = current_time.saturating_sub(self.last_access);
idle_ms / (24 * 3600 * 1000)
}
pub fn record_access(&mut self, timestamp: u64) {
self.last_access = timestamp;
self.access_count += 1;
}
}
lazy_static! {
pub static ref COLD_STORAGE_ENGINE: Mutex<ColdStorageEngine> =
Mutex::new(ColdStorageEngine::new());
}
#[derive(Debug, Clone, Default)]
pub struct ColdStorageStats {
pub total_objects: u64,
pub local_objects: u64,
pub cloud_objects: u64,
pub local_bytes: u64,
pub cloud_bytes: u64,
pub migrations_cold: u64,
pub migrations_warm: u64,
pub bytes_to_cloud: u64,
pub bytes_from_cloud: u64,
pub compression_savings: u64,
pub policy_evaluations: u64,
}
pub struct ColdStorageEngine {
objects: BTreeMap<u64, ObjectState>,
policies: Vec<ColdStoragePolicy>,
cloud_manager: CloudTierManager,
default_provider: CloudProvider,
default_bucket: String,
stats: ColdStorageStats,
auto_process: bool,
}
impl Default for ColdStorageEngine {
fn default() -> Self {
Self::new()
}
}
impl ColdStorageEngine {
pub fn new() -> Self {
let mut engine = Self {
objects: BTreeMap::new(),
policies: Vec::new(),
cloud_manager: CloudTierManager::new(),
default_provider: CloudProvider::AwsS3,
default_bucket: "lcpfs-archive".to_string(),
stats: ColdStorageStats::default(),
auto_process: true,
};
engine.add_default_policies();
engine
}
fn add_default_policies(&mut self) {
self.add_policy(
ColdStoragePolicy::new("local-warm", 7, UnifiedTier::LocalSilver)
.with_max_access(10)
.with_priority(200),
);
self.add_policy(
ColdStoragePolicy::new("local-cold", 30, UnifiedTier::LocalBronze)
.with_max_access(5)
.with_priority(150),
);
self.add_policy(
ColdStoragePolicy::new("cloud-hot", 60, UnifiedTier::CloudHot)
.with_max_access(3)
.with_min_size(1024 * 1024) .with_priority(100),
);
self.add_policy(
ColdStoragePolicy::new("cloud-warm", 90, UnifiedTier::CloudWarm)
.with_max_access(2)
.with_min_size(1024 * 1024)
.with_priority(80),
);
self.add_policy(
ColdStoragePolicy::new("cloud-cold", 180, UnifiedTier::CloudCold)
.with_max_access(1)
.with_min_size(1024 * 1024)
.with_priority(60),
);
self.add_policy(
ColdStoragePolicy::new("cloud-archive", 365, UnifiedTier::CloudArchive)
.with_max_access(0)
.with_min_size(1024 * 1024)
.with_compression(true)
.with_priority(40),
);
}
pub fn set_cloud_provider(&mut self, provider: CloudProvider, bucket: &str) {
self.default_provider = provider;
self.default_bucket = bucket.to_string();
}
pub fn add_policy(&mut self, policy: ColdStoragePolicy) {
self.policies.push(policy);
self.policies.sort_by(|a, b| b.priority.cmp(&a.priority));
}
pub fn clear_policies(&mut self) {
self.policies.clear();
}
pub fn register_object(&mut self, object_id: u64, size: u64, timestamp: u64) {
let state = ObjectState::new(object_id, size, timestamp);
self.objects.insert(object_id, state);
self.stats.total_objects += 1;
self.stats.local_objects += 1;
self.stats.local_bytes += size;
}
pub fn record_access(&mut self, object_id: u64, timestamp: u64) {
if let Some(state) = self.objects.get_mut(&object_id) {
state.record_access(timestamp);
TIER_MANAGER
.lock()
.record_access(object_id, false, timestamp, state.size);
}
}
pub fn get_object(&self, object_id: u64) -> Option<&ObjectState> {
self.objects.get(&object_id)
}
pub fn get_tier(&self, object_id: u64) -> Option<UnifiedTier> {
self.objects.get(&object_id).map(|s| s.current_tier)
}
pub fn process_lifecycle(&mut self, current_time: u64) {
let mut migrations: Vec<(u64, UnifiedTier, bool)> = Vec::new();
for (object_id, state) in &self.objects {
self.stats.policy_evaluations += 1;
let age_days = state.age_days(current_time);
for policy in &self.policies {
if policy.matches(age_days, state.access_count, state.size) {
if policy.target_tier != state.current_tier {
if policy.target_tier.tier_number() > state.current_tier.tier_number() {
migrations.push((
*object_id,
policy.target_tier,
policy.use_archive_compression,
));
}
}
break; }
}
}
for (object_id, target_tier, use_compression) in migrations {
let _ = self.migrate_object(object_id, target_tier, use_compression, current_time);
}
}
pub fn migrate_object(
&mut self,
object_id: u64,
target_tier: UnifiedTier,
use_compression: bool,
timestamp: u64,
) -> Result<(), &'static str> {
let state = self
.objects
.get(&object_id)
.ok_or("Object not found")?
.clone();
let source_tier = state.current_tier;
let is_to_cloud =
target_tier.is_cloud() && (source_tier.is_local() || source_tier.is_cloud());
let is_from_cloud = source_tier.is_cloud() && target_tier.is_local();
crate::lcpfs_println!(
"[ COLD ] Migrating object {} from {} to {}{}",
object_id,
source_tier.name(),
target_tier.name(),
if use_compression { " (compressed)" } else { "" }
);
if source_tier.is_local() && target_tier.is_local() {
if let (Some(from), Some(to)) =
(source_tier.to_local_class(), target_tier.to_local_class())
{
let decision = super::tier::MigrationDecision::Demote {
object_id,
from_tier: from,
to_tier: to,
estimated_benefit: 0.0,
};
TIER_MANAGER.lock().execute_migration(&decision, timestamp);
}
}
if source_tier.is_local() && target_tier.is_cloud() {
let cloud_class = target_tier.to_cloud_class().ok_or("Invalid cloud tier")?;
let key = format!("objects/{}/{}", object_id / 1000, object_id);
let config = CloudUploadConfig {
dataset_id: 0,
block_offset: object_id,
size: state.size,
provider: self.default_provider,
bucket: self.default_bucket.clone(),
key: key.clone(),
storage_class: cloud_class,
};
let object = CloudObject::new(config, timestamp);
self.cloud_manager.upload(object)?;
self.stats.local_objects = self.stats.local_objects.saturating_sub(1);
self.stats.local_bytes = self.stats.local_bytes.saturating_sub(state.size);
self.stats.cloud_objects += 1;
self.stats.cloud_bytes += state.size;
self.stats.bytes_to_cloud += state.size;
self.stats.migrations_cold += 1;
if let Some(obj_state) = self.objects.get_mut(&object_id) {
obj_state.current_tier = target_tier;
obj_state.cloud_key = Some(key);
obj_state.cloud_bucket = Some(self.default_bucket.clone());
obj_state.cloud_provider = Some(self.default_provider);
obj_state.archive_compressed = use_compression;
}
}
if source_tier.is_cloud() && target_tier.is_cloud() {
self.cloud_manager.apply_policies(timestamp);
if let Some(obj_state) = self.objects.get_mut(&object_id) {
obj_state.current_tier = target_tier;
}
self.stats.migrations_cold += 1;
}
if source_tier.is_cloud() && target_tier.is_local() {
if let (Some(bucket), Some(key)) = (&state.cloud_bucket, &state.cloud_key) {
let _ = (bucket, key); self.cloud_manager.download(0, object_id, timestamp)?;
}
self.stats.cloud_objects = self.stats.cloud_objects.saturating_sub(1);
self.stats.cloud_bytes = self.stats.cloud_bytes.saturating_sub(state.size);
self.stats.local_objects += 1;
self.stats.local_bytes += state.size;
self.stats.bytes_from_cloud += state.size;
self.stats.migrations_warm += 1;
if let Some(obj_state) = self.objects.get_mut(&object_id) {
obj_state.current_tier = target_tier;
obj_state.cloud_key = None;
obj_state.cloud_bucket = None;
obj_state.cloud_provider = None;
obj_state.archive_compressed = false;
}
}
Ok(())
}
pub fn recall_object(
&mut self,
object_id: u64,
target_tier: UnifiedTier,
timestamp: u64,
) -> Result<u64, &'static str> {
let state = self.objects.get(&object_id).ok_or("Object not found")?;
if !state.current_tier.is_cloud() {
return Err("Object not in cloud tier");
}
if !target_tier.is_local() {
return Err("Target must be local tier");
}
let latency = state.current_tier.retrieval_latency_sec();
crate::lcpfs_println!(
"[ COLD ] Recalling object {} from {} (latency: {}s)",
object_id,
state.current_tier.name(),
latency
);
self.migrate_object(object_id, target_tier, false, timestamp)?;
Ok(latency)
}
pub fn delete_object(&mut self, object_id: u64) -> Result<(), &'static str> {
let state = self.objects.remove(&object_id).ok_or("Object not found")?;
if state.current_tier.is_cloud() {
let _ = self.cloud_manager.delete(0, object_id);
self.stats.cloud_objects = self.stats.cloud_objects.saturating_sub(1);
self.stats.cloud_bytes = self.stats.cloud_bytes.saturating_sub(state.size);
} else {
self.stats.local_objects = self.stats.local_objects.saturating_sub(1);
self.stats.local_bytes = self.stats.local_bytes.saturating_sub(state.size);
}
self.stats.total_objects = self.stats.total_objects.saturating_sub(1);
crate::lcpfs_println!(
"[ COLD ] Deleted object {} from {}",
object_id,
state.current_tier.name()
);
Ok(())
}
pub fn stats(&self) -> ColdStorageStats {
self.stats.clone()
}
pub fn tier_breakdown(&self) -> BTreeMap<UnifiedTier, (u64, u64)> {
let mut breakdown: BTreeMap<UnifiedTier, (u64, u64)> = BTreeMap::new();
for state in self.objects.values() {
let entry = breakdown.entry(state.current_tier).or_insert((0, 0));
entry.0 += 1; entry.1 += state.size; }
breakdown
}
pub fn objects_in_tier(&self, tier: UnifiedTier) -> Vec<u64> {
self.objects
.iter()
.filter(|(_, state)| state.current_tier == tier)
.map(|(id, _)| *id)
.collect()
}
pub fn get_migration_candidates(&self, current_time: u64) -> Vec<(u64, UnifiedTier)> {
let mut candidates = Vec::new();
for (object_id, state) in &self.objects {
let age_days = state.age_days(current_time);
for policy in &self.policies {
if policy.matches(age_days, state.access_count, state.size) {
if policy.target_tier != state.current_tier
&& policy.target_tier.tier_number() > state.current_tier.tier_number()
{
candidates.push((*object_id, policy.target_tier));
}
break;
}
}
}
candidates
}
pub fn estimate_cost_savings(&self) -> f64 {
let mut current_cost = 0.0;
let mut all_hot_cost = 0.0;
for state in self.objects.values() {
let bytes_gb = state.size as f64 / (1024.0 * 1024.0 * 1024.0);
current_cost += bytes_gb * state.current_tier.cost_factor();
all_hot_cost += bytes_gb * UnifiedTier::LocalGold.cost_factor();
}
if all_hot_cost > 0.0 {
(all_hot_cost - current_cost) / all_hot_cost * 100.0
} else {
0.0
}
}
}
pub fn register_object(object_id: u64, size: u64, timestamp: u64) {
COLD_STORAGE_ENGINE
.lock()
.register_object(object_id, size, timestamp);
}
pub fn record_access(object_id: u64, timestamp: u64) {
COLD_STORAGE_ENGINE
.lock()
.record_access(object_id, timestamp);
}
pub fn get_tier(object_id: u64) -> Option<UnifiedTier> {
COLD_STORAGE_ENGINE.lock().get_tier(object_id)
}
pub fn process_lifecycle(current_time: u64) {
COLD_STORAGE_ENGINE.lock().process_lifecycle(current_time);
}
pub fn recall_object(object_id: u64, timestamp: u64) -> Result<u64, &'static str> {
COLD_STORAGE_ENGINE
.lock()
.recall_object(object_id, UnifiedTier::LocalSilver, timestamp)
}
pub fn stats() -> ColdStorageStats {
COLD_STORAGE_ENGINE.lock().stats()
}
pub fn tier_breakdown() -> BTreeMap<UnifiedTier, (u64, u64)> {
COLD_STORAGE_ENGINE.lock().tier_breakdown()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unified_tier_ordering() {
assert!(UnifiedTier::LocalGold.tier_number() < UnifiedTier::LocalSilver.tier_number());
assert!(UnifiedTier::LocalSilver.tier_number() < UnifiedTier::LocalBronze.tier_number());
assert!(UnifiedTier::LocalBronze.tier_number() < UnifiedTier::CloudHot.tier_number());
assert!(UnifiedTier::CloudHot.tier_number() < UnifiedTier::CloudWarm.tier_number());
assert!(UnifiedTier::CloudWarm.tier_number() < UnifiedTier::CloudCold.tier_number());
assert!(UnifiedTier::CloudCold.tier_number() < UnifiedTier::CloudArchive.tier_number());
}
#[test]
fn test_unified_tier_cost() {
assert!(UnifiedTier::LocalGold.cost_factor() > UnifiedTier::LocalSilver.cost_factor());
assert!(UnifiedTier::LocalSilver.cost_factor() > UnifiedTier::LocalBronze.cost_factor());
assert!(UnifiedTier::LocalBronze.cost_factor() > UnifiedTier::CloudHot.cost_factor());
assert!(UnifiedTier::CloudArchive.cost_factor() < UnifiedTier::CloudCold.cost_factor());
}
#[test]
fn test_cold_storage_policy_matching() {
let policy = ColdStoragePolicy::new("test", 30, UnifiedTier::CloudWarm)
.with_max_access(5)
.with_min_size(1024);
assert!(!policy.matches(29, 3, 2048));
assert!(policy.matches(30, 3, 2048));
assert!(!policy.matches(30, 10, 2048));
assert!(!policy.matches(30, 3, 512));
}
#[test]
fn test_object_state_age() {
let state = ObjectState::new(1, 1024, 0);
let time_30_days = 30 * 24 * 3600 * 1000;
assert_eq!(state.age_days(time_30_days), 30);
let time_365_days = 365 * 24 * 3600 * 1000;
assert_eq!(state.age_days(time_365_days), 365);
}
#[test]
fn test_engine_register_and_access() {
let mut engine = ColdStorageEngine::new();
engine.clear_policies();
engine.register_object(1, 1024, 0);
assert_eq!(engine.stats.total_objects, 1);
assert_eq!(engine.stats.local_objects, 1);
engine.record_access(1, 1000);
let state = engine.get_object(1).unwrap();
assert_eq!(state.access_count, 1);
assert_eq!(state.last_access, 1000);
}
#[test]
fn test_tier_is_local_cloud() {
assert!(UnifiedTier::LocalGold.is_local());
assert!(UnifiedTier::LocalSilver.is_local());
assert!(UnifiedTier::LocalBronze.is_local());
assert!(!UnifiedTier::CloudHot.is_local());
assert!(!UnifiedTier::CloudArchive.is_local());
assert!(!UnifiedTier::LocalGold.is_cloud());
assert!(UnifiedTier::CloudHot.is_cloud());
assert!(UnifiedTier::CloudArchive.is_cloud());
}
#[test]
fn test_tier_conversion() {
assert_eq!(
UnifiedTier::LocalGold.to_local_class(),
Some(StorageClass::Gold)
);
assert_eq!(
UnifiedTier::CloudWarm.to_cloud_class(),
Some(CloudStorageClass::Warm)
);
assert_eq!(UnifiedTier::LocalGold.to_cloud_class(), None);
assert_eq!(UnifiedTier::CloudWarm.to_local_class(), None);
}
#[test]
fn test_engine_tier_breakdown() {
let mut engine = ColdStorageEngine::new();
engine.clear_policies();
engine.register_object(1, 1024, 0);
engine.register_object(2, 2048, 0);
engine.register_object(3, 4096, 0);
let breakdown = engine.tier_breakdown();
let (count, bytes) = breakdown.get(&UnifiedTier::LocalSilver).unwrap();
assert_eq!(*count, 3);
assert_eq!(*bytes, 1024 + 2048 + 4096);
}
#[test]
fn test_cost_savings_estimation() {
let mut engine = ColdStorageEngine::new();
engine.clear_policies();
engine.register_object(1, 1024 * 1024 * 1024, 0);
let savings = engine.estimate_cost_savings();
assert!(savings > 0.0);
assert!(savings < 100.0);
}
}