use alloc::collections::BTreeMap;
use alloc::format;
use alloc::string::String;
use alloc::vec::Vec;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum CloudProvider {
AwsS3,
AzureBlob,
GoogleGcs,
Minio,
}
impl CloudProvider {
pub fn name(&self) -> &'static str {
match self {
CloudProvider::AwsS3 => "AWS S3",
CloudProvider::AzureBlob => "Azure Blob",
CloudProvider::GoogleGcs => "Google GCS",
CloudProvider::Minio => "MinIO",
}
}
pub fn endpoint(&self, region: &str) -> String {
match self {
CloudProvider::AwsS3 => {
format!("https://s3.{}.amazonaws.com", region)
}
CloudProvider::AzureBlob => {
format!("https://{}.blob.core.windows.net", region)
}
CloudProvider::GoogleGcs => "https://storage.googleapis.com".into(),
CloudProvider::Minio => {
format!("http://{}:9000", region) }
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum CloudStorageClass {
Hot,
Warm,
Cold,
Archive,
}
impl CloudStorageClass {
pub fn name(&self) -> &'static str {
match self {
CloudStorageClass::Hot => "Hot",
CloudStorageClass::Warm => "Warm",
CloudStorageClass::Cold => "Cold",
CloudStorageClass::Archive => "Archive",
}
}
pub fn cost_multiplier(&self) -> f64 {
match self {
CloudStorageClass::Hot => 1.0,
CloudStorageClass::Warm => 0.5,
CloudStorageClass::Cold => 0.1,
CloudStorageClass::Archive => 0.04,
}
}
pub fn retrieval_latency_sec(&self) -> u64 {
match self {
CloudStorageClass::Hot => 1,
CloudStorageClass::Warm => 5,
CloudStorageClass::Cold => 3600, CloudStorageClass::Archive => 43200, }
}
}
#[derive(Debug, Clone)]
pub struct TierPolicy {
pub age_days: u64,
pub storage_class: CloudStorageClass,
pub min_size: u64,
pub max_size: u64,
pub access_threshold: u64,
}
impl TierPolicy {
pub fn new(age_days: u64, storage_class: CloudStorageClass) -> Self {
Self {
age_days,
storage_class,
min_size: 0,
max_size: 0,
access_threshold: 0,
}
}
pub fn matches(&self, age_days: u64, size: u64, access_count: u64) -> bool {
if age_days < self.age_days {
return false;
}
if self.min_size > 0 && size < self.min_size {
return false;
}
if self.max_size > 0 && size > self.max_size {
return false;
}
if self.access_threshold > 0 && access_count >= self.access_threshold {
return false;
}
true
}
}
#[derive(Debug, Clone)]
pub struct CloudObject {
pub dataset_id: u64,
pub block_offset: u64,
pub size: u64,
pub provider: CloudProvider,
pub bucket: String,
pub key: String,
pub storage_class: CloudStorageClass,
pub uploaded_at: u64,
pub last_access: u64,
pub access_count: u64,
pub etag: String,
}
#[derive(Debug, Clone)]
pub struct CloudUploadConfig {
pub dataset_id: u64,
pub block_offset: u64,
pub size: u64,
pub provider: CloudProvider,
pub bucket: String,
pub key: String,
pub storage_class: CloudStorageClass,
}
impl CloudObject {
pub fn new(config: CloudUploadConfig, timestamp: u64) -> Self {
let etag = format!("etag-{}-{}", config.dataset_id, config.block_offset);
Self {
dataset_id: config.dataset_id,
block_offset: config.block_offset,
size: config.size,
provider: config.provider,
bucket: config.bucket,
key: config.key,
storage_class: config.storage_class,
uploaded_at: timestamp,
last_access: timestamp,
access_count: 0,
etag,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CloudTierStats {
pub uploaded: u64,
pub uploaded_bytes: u64,
pub downloaded: u64,
pub downloaded_bytes: u64,
pub deleted: u64,
pub migrations: u64,
pub upload_failures: u64,
pub download_failures: u64,
}
pub struct CloudTierManager {
objects: BTreeMap<(u64, u64), CloudObject>,
policies: Vec<TierPolicy>,
stats: CloudTierStats,
}
impl Default for CloudTierManager {
fn default() -> Self {
Self::new()
}
}
impl CloudTierManager {
pub fn new() -> Self {
Self {
objects: BTreeMap::new(),
policies: Vec::new(),
stats: CloudTierStats::default(),
}
}
pub fn add_policy(&mut self, policy: TierPolicy) {
self.policies.push(policy);
self.policies.sort_by(|a, b| b.age_days.cmp(&a.age_days));
}
pub fn upload(&mut self, object: CloudObject) -> Result<(), &'static str> {
let dataset_id = object.dataset_id;
let block_offset = object.block_offset;
let size = object.size;
let provider_name = object.provider.name();
let bucket = object.bucket.clone();
let storage_class_name = object.storage_class.name();
self.objects.insert((dataset_id, block_offset), object);
self.stats.uploaded += 1;
self.stats.uploaded_bytes += size;
crate::lcpfs_println!(
"[ CLOUD ] Uploaded dataset {} block 0x{:x} to {} ({} / {})",
dataset_id,
block_offset,
provider_name,
bucket,
storage_class_name
);
Ok(())
}
pub fn download(
&mut self,
dataset_id: u64,
block_offset: u64,
timestamp: u64,
) -> Result<(), &'static str> {
let object = self
.objects
.get_mut(&(dataset_id, block_offset))
.ok_or("Object not found in cloud")?;
object.last_access = timestamp;
object.access_count += 1;
self.stats.downloaded += 1;
self.stats.downloaded_bytes += object.size;
crate::lcpfs_println!(
"[ CLOUD ] Downloaded dataset {} block 0x{:x} from {} (latency: {}s)",
dataset_id,
block_offset,
object.provider.name(),
object.storage_class.retrieval_latency_sec()
);
Ok(())
}
pub fn delete(&mut self, dataset_id: u64, block_offset: u64) -> Result<(), &'static str> {
self.objects
.remove(&(dataset_id, block_offset))
.ok_or("Object not found")?;
self.stats.deleted += 1;
crate::lcpfs_println!(
"[ CLOUD ] Deleted dataset {} block 0x{:x} from cloud",
dataset_id,
block_offset
);
Ok(())
}
pub fn apply_policies(&mut self, current_time: u64) {
let mut migrations = Vec::new();
for (key, object) in &self.objects {
let age_ms = current_time.saturating_sub(object.uploaded_at);
let age_days = age_ms / (24 * 3600 * 1000);
for policy in &self.policies {
if policy.matches(age_days, object.size, object.access_count) {
if policy.storage_class != object.storage_class {
migrations.push((*key, policy.storage_class));
}
break;
}
}
}
for ((dataset_id, block_offset), new_class) in migrations {
if let Some(object) = self.objects.get_mut(&(dataset_id, block_offset)) {
let old_class = object.storage_class;
object.storage_class = new_class;
self.stats.migrations += 1;
crate::lcpfs_println!(
"[ CLOUD ] Migrated dataset {} block 0x{:x}: {} -> {}",
dataset_id,
block_offset,
old_class.name(),
new_class.name()
);
}
}
}
pub fn get_object(&self, dataset_id: u64, block_offset: u64) -> Option<&CloudObject> {
self.objects.get(&(dataset_id, block_offset))
}
pub fn list_by_class(&self, storage_class: CloudStorageClass) -> Vec<&CloudObject> {
self.objects
.values()
.filter(|obj| obj.storage_class == storage_class)
.collect()
}
pub fn get_stats(&self) -> CloudTierStats {
self.stats.clone()
}
pub fn total_cloud_bytes(&self) -> u64 {
self.objects.values().map(|obj| obj.size).sum()
}
pub fn storage_by_class(&self) -> BTreeMap<CloudStorageClass, u64> {
let mut breakdown = BTreeMap::new();
for object in self.objects.values() {
*breakdown.entry(object.storage_class).or_insert(0) += object.size;
}
breakdown
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_obj(
dataset_id: u64,
block_offset: u64,
size: u64,
provider: CloudProvider,
bucket: &str,
key: &str,
storage_class: CloudStorageClass,
timestamp: u64,
) -> CloudObject {
CloudObject::new(
CloudUploadConfig {
dataset_id,
block_offset,
size,
provider,
bucket: bucket.into(),
key: key.into(),
storage_class,
},
timestamp,
)
}
#[test]
fn test_cloud_provider_endpoint() {
assert_eq!(
CloudProvider::AwsS3.endpoint("us-east-1"),
"https://s3.us-east-1.amazonaws.com"
);
assert_eq!(
CloudProvider::AzureBlob.endpoint("mystorageaccount"),
"https://mystorageaccount.blob.core.windows.net"
);
assert_eq!(
CloudProvider::GoogleGcs.endpoint(""),
"https://storage.googleapis.com"
);
}
#[test]
fn test_storage_class_cost() {
assert!(
CloudStorageClass::Hot.cost_multiplier() > CloudStorageClass::Warm.cost_multiplier()
);
assert!(
CloudStorageClass::Warm.cost_multiplier() > CloudStorageClass::Cold.cost_multiplier()
);
assert!(
CloudStorageClass::Cold.cost_multiplier()
> CloudStorageClass::Archive.cost_multiplier()
);
}
#[test]
fn test_storage_class_latency() {
assert!(
CloudStorageClass::Hot.retrieval_latency_sec()
< CloudStorageClass::Warm.retrieval_latency_sec()
);
assert!(
CloudStorageClass::Warm.retrieval_latency_sec()
< CloudStorageClass::Cold.retrieval_latency_sec()
);
assert!(
CloudStorageClass::Cold.retrieval_latency_sec()
< CloudStorageClass::Archive.retrieval_latency_sec()
);
}
#[test]
fn test_tier_policy_age() {
let policy = TierPolicy::new(30, CloudStorageClass::Warm);
assert!(!policy.matches(29, 1000, 0)); assert!(policy.matches(30, 1000, 0)); assert!(policy.matches(31, 1000, 0)); }
#[test]
fn test_tier_policy_size() {
let mut policy = TierPolicy::new(30, CloudStorageClass::Cold);
policy.min_size = 1_000_000; policy.max_size = 10_000_000;
assert!(!policy.matches(30, 500_000, 0)); assert!(policy.matches(30, 5_000_000, 0)); assert!(!policy.matches(30, 20_000_000, 0)); }
#[test]
fn test_tier_policy_access() {
let mut policy = TierPolicy::new(30, CloudStorageClass::Archive);
policy.access_threshold = 5;
assert!(policy.matches(30, 1000, 0)); assert!(policy.matches(30, 1000, 4)); assert!(!policy.matches(30, 1000, 5)); assert!(!policy.matches(30, 1000, 10)); }
#[test]
fn test_upload_download() {
let mut manager = CloudTierManager::new();
let obj = test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"my-bucket",
"data/block-1000",
CloudStorageClass::Hot,
1000,
);
manager.upload(obj).expect("test: operation should succeed");
assert_eq!(manager.stats.uploaded, 1);
assert_eq!(manager.stats.uploaded_bytes, 4096);
let object = manager
.get_object(1, 0x1000)
.expect("test: operation should succeed");
assert_eq!(object.provider, CloudProvider::AwsS3);
assert_eq!(object.bucket, "my-bucket");
assert_eq!(object.storage_class, CloudStorageClass::Hot);
manager
.download(1, 0x1000, 2000)
.expect("test: operation should succeed");
assert_eq!(manager.stats.downloaded, 1);
assert_eq!(manager.stats.downloaded_bytes, 4096);
let object = manager
.get_object(1, 0x1000)
.expect("test: operation should succeed");
assert_eq!(object.access_count, 1);
assert_eq!(object.last_access, 2000);
}
#[test]
fn test_delete_object() {
let mut manager = CloudTierManager::new();
let obj = test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"bucket",
"key",
CloudStorageClass::Hot,
1000,
);
manager.upload(obj).expect("test: operation should succeed");
assert!(manager.get_object(1, 0x1000).is_some());
manager
.delete(1, 0x1000)
.expect("test: operation should succeed");
assert_eq!(manager.stats.deleted, 1);
assert!(manager.get_object(1, 0x1000).is_none());
}
#[test]
fn test_policy_migration() {
let mut manager = CloudTierManager::new();
let obj = test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"bucket",
"key",
CloudStorageClass::Hot,
1000,
);
manager.upload(obj).expect("test: operation should succeed");
let policy = TierPolicy::new(30, CloudStorageClass::Warm);
manager.add_policy(policy);
let time_29_days = 1000 + 29 * 24 * 3600 * 1000;
manager.apply_policies(time_29_days);
assert_eq!(
manager
.get_object(1, 0x1000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Hot
);
assert_eq!(manager.stats.migrations, 0);
let time_31_days = 1000 + 31 * 24 * 3600 * 1000;
manager.apply_policies(time_31_days);
assert_eq!(
manager
.get_object(1, 0x1000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Warm
);
assert_eq!(manager.stats.migrations, 1);
}
#[test]
fn test_multi_tier_policies() {
let mut manager = CloudTierManager::new();
let obj = test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"bucket",
"key",
CloudStorageClass::Hot,
0,
);
manager.upload(obj).expect("test: operation should succeed");
manager.add_policy(TierPolicy::new(30, CloudStorageClass::Warm));
manager.add_policy(TierPolicy::new(90, CloudStorageClass::Cold));
manager.add_policy(TierPolicy::new(365, CloudStorageClass::Archive));
let time_60_days = 60 * 24 * 3600 * 1000;
manager.apply_policies(time_60_days);
assert_eq!(
manager
.get_object(1, 0x1000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Warm
);
let time_120_days = 120 * 24 * 3600 * 1000;
manager.apply_policies(time_120_days);
assert_eq!(
manager
.get_object(1, 0x1000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Cold
);
let time_400_days = 400 * 24 * 3600 * 1000;
manager.apply_policies(time_400_days);
assert_eq!(
manager
.get_object(1, 0x1000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Archive
);
assert_eq!(manager.stats.migrations, 3);
}
#[test]
fn test_list_by_class() {
let mut manager = CloudTierManager::new();
manager
.upload(test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"b1",
"k1",
CloudStorageClass::Hot,
1000,
))
.expect("test: operation should succeed");
manager
.upload(test_obj(
2,
0x2000,
8192,
CloudProvider::AwsS3,
"b2",
"k2",
CloudStorageClass::Hot,
1000,
))
.expect("test: operation should succeed");
manager
.upload(test_obj(
3,
0x3000,
16384,
CloudProvider::AwsS3,
"b3",
"k3",
CloudStorageClass::Warm,
1000,
))
.expect("test: operation should succeed");
let hot_objects = manager.list_by_class(CloudStorageClass::Hot);
assert_eq!(hot_objects.len(), 2);
let warm_objects = manager.list_by_class(CloudStorageClass::Warm);
assert_eq!(warm_objects.len(), 1);
}
#[test]
fn test_storage_breakdown() {
let mut manager = CloudTierManager::new();
manager
.upload(test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"b",
"k1",
CloudStorageClass::Hot,
1000,
))
.expect("test: operation should succeed");
manager
.upload(test_obj(
2,
0x2000,
8192,
CloudProvider::AwsS3,
"b",
"k2",
CloudStorageClass::Hot,
1000,
))
.expect("test: operation should succeed");
manager
.upload(test_obj(
3,
0x3000,
16384,
CloudProvider::AwsS3,
"b",
"k3",
CloudStorageClass::Warm,
1000,
))
.expect("test: operation should succeed");
let breakdown = manager.storage_by_class();
assert_eq!(breakdown.get(&CloudStorageClass::Hot), Some(&12288)); assert_eq!(breakdown.get(&CloudStorageClass::Warm), Some(&16384));
assert_eq!(manager.total_cloud_bytes(), 28672); }
#[test]
fn test_access_based_policy() {
let mut manager = CloudTierManager::new();
manager
.upload(test_obj(
1,
0x1000,
4096,
CloudProvider::AwsS3,
"b",
"k",
CloudStorageClass::Hot,
0,
))
.expect("test: operation should succeed");
let mut policy = TierPolicy::new(30, CloudStorageClass::Archive);
policy.access_threshold = 5;
manager.add_policy(policy);
for _ in 0..10 {
manager
.download(1, 0x1000, 1000)
.expect("test: operation should succeed");
}
let time_31_days = 31 * 24 * 3600 * 1000;
manager.apply_policies(time_31_days);
assert_eq!(
manager
.get_object(1, 0x1000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Hot
);
manager
.upload(test_obj(
2,
0x2000,
4096,
CloudProvider::AwsS3,
"b",
"k2",
CloudStorageClass::Hot,
0,
))
.expect("test: operation should succeed");
manager
.download(2, 0x2000, 1000)
.expect("test: operation should succeed");
manager.apply_policies(time_31_days);
assert_eq!(
manager
.get_object(2, 0x2000)
.expect("test: operation should succeed")
.storage_class,
CloudStorageClass::Archive
);
}
}