use crate::core::error::{Error, Result};
use crate::storage::unified_memory::*;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct HybridConfig {
pub hot_tier: TierConfig,
pub warm_tier: TierConfig,
pub cold_tier: TierConfig,
pub analysis_window: Duration,
pub promotion_threshold: f64,
pub demotion_threshold: Duration,
pub enable_auto_tiering: bool,
pub tiering_interval: Duration,
pub enable_cold_compression: bool,
pub max_hot_memory: usize,
pub enable_deduplication: bool,
}
impl Default for HybridConfig {
fn default() -> Self {
Self {
hot_tier: TierConfig {
name: "hot".to_string(),
storage_type: TierStorageType::InMemory,
max_size: 512 * 1024 * 1024, compression: CompressionType::None,
access_latency: Duration::from_micros(1), throughput_mbps: 10000.0, },
warm_tier: TierConfig {
name: "warm".to_string(),
storage_type: TierStorageType::SSD,
max_size: 10 * 1024 * 1024 * 1024, compression: CompressionType::Lz4,
access_latency: Duration::from_millis(1), throughput_mbps: 500.0, },
cold_tier: TierConfig {
name: "cold".to_string(),
storage_type: TierStorageType::HDD,
max_size: 1024 * 1024 * 1024 * 1024, compression: CompressionType::Zstd,
access_latency: Duration::from_millis(10), throughput_mbps: 100.0, },
analysis_window: Duration::from_secs(3600), promotion_threshold: 10.0, demotion_threshold: Duration::from_secs(24 * 3600), enable_auto_tiering: true,
tiering_interval: Duration::from_secs(5 * 60), enable_cold_compression: true,
max_hot_memory: 1024 * 1024 * 1024, enable_deduplication: true,
}
}
}
#[derive(Debug, Clone)]
pub struct TierConfig {
pub name: String,
pub storage_type: TierStorageType,
pub max_size: usize,
pub compression: CompressionType,
pub access_latency: Duration,
pub throughput_mbps: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TierStorageType {
InMemory,
SSD,
HDD,
Network,
Custom,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DataTier {
Hot,
Warm,
Cold,
}
#[derive(Debug, Clone)]
pub struct AccessPattern {
pub access_count: u64,
pub last_access: Instant,
pub first_access: Instant,
pub access_frequency: f64,
pub data_size: usize,
pub pattern_type: AccessPatternType,
}
impl AccessPattern {
pub fn new(data_size: usize) -> Self {
let now = Instant::now();
Self {
access_count: 1,
last_access: now,
first_access: now,
access_frequency: 0.0,
data_size,
pattern_type: AccessPatternType::Unknown,
}
}
pub fn record_access(&mut self) {
self.access_count += 1;
self.last_access = Instant::now();
let time_since_first = self.last_access.duration_since(self.first_access);
if time_since_first.as_secs() > 0 {
self.access_frequency =
self.access_count as f64 / (time_since_first.as_secs_f64() / 3600.0);
}
self.pattern_type = if self.access_frequency > 100.0 {
AccessPatternType::VeryHot
} else if self.access_frequency > 10.0 {
AccessPatternType::Hot
} else if self.access_frequency > 1.0 {
AccessPatternType::Warm
} else {
AccessPatternType::Cold
};
}
pub fn time_since_last_access(&self) -> Duration {
Instant::now().duration_since(self.last_access)
}
pub fn should_promote(&self, threshold: f64) -> bool {
self.access_frequency > threshold
}
pub fn should_demote(&self, threshold: Duration) -> bool {
self.time_since_last_access() > threshold
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AccessPatternType {
VeryHot,
Hot,
Warm,
Cold,
Unknown,
}
#[derive(Debug, Clone)]
pub struct TieredDataEntry {
pub id: DataId,
pub current_tier: DataTier,
pub chunk: DataChunk,
pub access_pattern: AccessPattern,
pub metadata: TieredDataMetadata,
pub compression_state: CompressionState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DataId(pub u64);
#[derive(Debug, Clone)]
pub struct TieredDataMetadata {
pub created_at: Instant,
pub original_size: usize,
pub compressed_size: usize,
pub checksum: u64,
pub tier_history: Vec<TierHistoryEntry>,
}
#[derive(Debug, Clone)]
pub struct TierHistoryEntry {
pub tier: DataTier,
pub timestamp: Instant,
pub reason: TierMoveReason,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TierMoveReason {
HighFrequency,
LowFrequency,
CapacityPressure,
InitialPlacement,
Manual,
}
#[derive(Debug, Clone)]
pub struct CompressionState {
pub algorithm: CompressionType,
pub ratio: f64,
pub processing_time: Duration,
}
#[derive(Debug, Clone)]
pub struct TierStatistics {
pub current_usage: usize,
pub max_capacity: usize,
pub chunk_count: u64,
pub total_accesses: u64,
pub avg_access_latency: Duration,
pub hit_rate: f64,
pub promotions: u64,
pub demotions: u64,
}
impl TierStatistics {
pub fn new(max_capacity: usize) -> Self {
Self {
current_usage: 0,
max_capacity,
chunk_count: 0,
total_accesses: 0,
avg_access_latency: Duration::ZERO,
hit_rate: 0.0,
promotions: 0,
demotions: 0,
}
}
pub fn utilization(&self) -> f64 {
if self.max_capacity == 0 {
0.0
} else {
self.current_usage as f64 / self.max_capacity as f64
}
}
pub fn available_space(&self) -> usize {
self.max_capacity.saturating_sub(self.current_usage)
}
}
#[derive(Debug)]
pub struct TierManager {
config: HybridConfig,
tier_backends: HashMap<DataTier, Box<dyn TierBackend>>,
tier_stats: HashMap<DataTier, TierStatistics>,
data_index: HashMap<DataId, DataTier>,
access_tracker: HashMap<DataId, AccessPattern>,
tiering_scheduler: TieringScheduler,
}
impl TierManager {
pub fn new(config: HybridConfig) -> Self {
let mut tier_backends: HashMap<DataTier, Box<dyn TierBackend>> = HashMap::new();
tier_backends.insert(
DataTier::Hot,
Box::new(InMemoryTierBackend::new(&config.hot_tier)),
);
tier_backends.insert(
DataTier::Warm,
Box::new(SSDTierBackend::new(&config.warm_tier)),
);
tier_backends.insert(
DataTier::Cold,
Box::new(HDDTierBackend::new(&config.cold_tier)),
);
let mut tier_stats = HashMap::new();
tier_stats.insert(DataTier::Hot, TierStatistics::new(config.hot_tier.max_size));
tier_stats.insert(
DataTier::Warm,
TierStatistics::new(config.warm_tier.max_size),
);
tier_stats.insert(
DataTier::Cold,
TierStatistics::new(config.cold_tier.max_size),
);
Self {
tiering_scheduler: TieringScheduler::new(config.tiering_interval),
config,
tier_backends,
tier_stats,
data_index: HashMap::new(),
access_tracker: HashMap::new(),
}
}
pub fn store_data(&mut self, data: DataChunk) -> Result<DataId> {
let data_id = DataId(rand::random::<u64>());
let initial_tier = self.determine_initial_tier(&data)?;
if let Some(backend) = self.tier_backends.get_mut(&initial_tier) {
backend.store_chunk(data_id, &data)?;
self.data_index.insert(data_id, initial_tier);
self.access_tracker
.insert(data_id, AccessPattern::new(data.len()));
if let Some(stats) = self.tier_stats.get_mut(&initial_tier) {
stats.current_usage += data.len();
stats.chunk_count += 1;
}
Ok(data_id)
} else {
Err(Error::InvalidOperation(format!(
"Tier backend {:?} not found",
initial_tier
)))
}
}
pub fn retrieve_data(&mut self, data_id: DataId) -> Result<DataChunk> {
if let Some(pattern) = self.access_tracker.get_mut(&data_id) {
pattern.record_access();
}
let current_tier = self
.data_index
.get(&data_id)
.ok_or_else(|| Error::InvalidOperation(format!("Data {:?} not found", data_id)))?;
if let Some(backend) = self.tier_backends.get(current_tier) {
let chunk = backend.retrieve_chunk(data_id)?;
if let Some(stats) = self.tier_stats.get_mut(current_tier) {
stats.total_accesses += 1;
}
if self.config.enable_auto_tiering {
self.check_and_promote(data_id)?;
}
Ok(chunk)
} else {
Err(Error::InvalidOperation(format!(
"Tier backend {:?} not found",
current_tier
)))
}
}
pub fn delete_data(&mut self, data_id: DataId) -> Result<()> {
if let Some(¤t_tier) = self.data_index.get(&data_id) {
if let Some(backend) = self.tier_backends.get_mut(¤t_tier) {
backend.delete_chunk(data_id)?;
self.data_index.remove(&data_id);
if let Some(pattern) = self.access_tracker.remove(&data_id) {
if let Some(stats) = self.tier_stats.get_mut(¤t_tier) {
stats.current_usage = stats.current_usage.saturating_sub(pattern.data_size);
stats.chunk_count = stats.chunk_count.saturating_sub(1);
}
}
Ok(())
} else {
Err(Error::InvalidOperation(format!(
"Tier backend {:?} not found",
current_tier
)))
}
} else {
Err(Error::InvalidOperation(format!(
"Data {:?} not found",
data_id
)))
}
}
fn determine_initial_tier(&self, data: &DataChunk) -> Result<DataTier> {
if data.len() < 1024 * 1024 {
Ok(DataTier::Hot)
} else if data.len() < 100 * 1024 * 1024 {
Ok(DataTier::Warm)
} else {
Ok(DataTier::Cold)
}
}
fn check_and_promote(&mut self, data_id: DataId) -> Result<()> {
if let Some(pattern) = self.access_tracker.get(&data_id) {
if let Some(¤t_tier) = self.data_index.get(&data_id) {
let target_tier = match current_tier {
DataTier::Cold
if pattern.should_promote(self.config.promotion_threshold / 10.0) =>
{
Some(DataTier::Warm)
}
DataTier::Warm if pattern.should_promote(self.config.promotion_threshold) => {
Some(DataTier::Hot)
}
_ => None,
};
if let Some(new_tier) = target_tier {
self.move_data(
data_id,
current_tier,
new_tier,
TierMoveReason::HighFrequency,
)?;
}
}
}
Ok(())
}
pub fn run_background_tiering(&mut self) -> Result<TieringReport> {
let mut report = TieringReport {
promotions: 0,
demotions: 0,
bytes_moved: 0,
duration: Duration::ZERO,
};
let start_time = Instant::now();
let data_to_demote: Vec<_> = self
.access_tracker
.iter()
.filter(|(_, pattern)| pattern.should_demote(self.config.demotion_threshold))
.map(|(&id, _)| id)
.collect();
for data_id in data_to_demote {
if let Some(¤t_tier) = self.data_index.get(&data_id) {
let target_tier = match current_tier {
DataTier::Hot => DataTier::Warm,
DataTier::Warm => DataTier::Cold,
DataTier::Cold => continue, };
let data_size = self
.access_tracker
.get(&data_id)
.map(|p| p.data_size)
.unwrap_or(0);
if self
.move_data(
data_id,
current_tier,
target_tier,
TierMoveReason::LowFrequency,
)
.is_ok()
{
report.demotions += 1;
report.bytes_moved += data_size;
}
}
}
for &tier in &[DataTier::Hot, DataTier::Warm] {
if let Some(stats) = self.tier_stats.get(&tier) {
if stats.utilization() > 0.9 {
self.handle_capacity_pressure(tier, &mut report)?;
}
}
}
report.duration = start_time.elapsed();
Ok(report)
}
fn handle_capacity_pressure(
&mut self,
tier: DataTier,
report: &mut TieringReport,
) -> Result<()> {
let target_tier = match tier {
DataTier::Hot => DataTier::Warm,
DataTier::Warm => DataTier::Cold,
DataTier::Cold => return Ok(()), };
let mut candidates: Vec<_> = self
.data_index
.iter()
.filter(|(_, &t)| t == tier)
.filter_map(|(&id, _)| {
self.access_tracker
.get(&id)
.map(|pattern| (id, pattern.last_access))
})
.collect();
candidates.sort_by_key(|(_, last_access)| *last_access);
let move_count = (candidates.len() / 10).max(1);
for (data_id, _) in candidates.into_iter().take(move_count) {
let data_size = self
.access_tracker
.get(&data_id)
.map(|p| p.data_size)
.unwrap_or(0);
if self
.move_data(data_id, tier, target_tier, TierMoveReason::CapacityPressure)
.is_ok()
{
report.demotions += 1;
report.bytes_moved += data_size;
}
}
Ok(())
}
fn move_data(
&mut self,
data_id: DataId,
from_tier: DataTier,
to_tier: DataTier,
reason: TierMoveReason,
) -> Result<()> {
let chunk = if let Some(backend) = self.tier_backends.get(&from_tier) {
backend.retrieve_chunk(data_id)?
} else {
return Err(Error::InvalidOperation(format!(
"Source tier backend {:?} not found",
from_tier
)));
};
if let Some(backend) = self.tier_backends.get_mut(&to_tier) {
backend.store_chunk(data_id, &chunk)?;
} else {
return Err(Error::InvalidOperation(format!(
"Target tier backend {:?} not found",
to_tier
)));
}
if let Some(backend) = self.tier_backends.get_mut(&from_tier) {
backend.delete_chunk(data_id)?;
}
self.data_index.insert(data_id, to_tier);
let data_size = chunk.len();
if let Some(from_stats) = self.tier_stats.get_mut(&from_tier) {
from_stats.current_usage = from_stats.current_usage.saturating_sub(data_size);
from_stats.chunk_count = from_stats.chunk_count.saturating_sub(1);
match reason {
TierMoveReason::HighFrequency => from_stats.promotions += 1,
_ => from_stats.demotions += 1,
}
}
if let Some(to_stats) = self.tier_stats.get_mut(&to_tier) {
to_stats.current_usage += data_size;
to_stats.chunk_count += 1;
match reason {
TierMoveReason::HighFrequency => to_stats.promotions += 1,
_ => to_stats.demotions += 1,
}
}
Ok(())
}
pub fn get_tier_statistics(&self) -> &HashMap<DataTier, TierStatistics> {
&self.tier_stats
}
pub fn get_data_distribution(&self) -> HashMap<DataTier, usize> {
let mut distribution = HashMap::new();
for &tier in self.data_index.values() {
*distribution.entry(tier).or_insert(0) += 1;
}
distribution
}
}
#[derive(Debug, Clone)]
pub struct TieringReport {
pub promotions: u64,
pub demotions: u64,
pub bytes_moved: usize,
pub duration: Duration,
}
#[derive(Debug)]
pub struct TieringScheduler {
interval: Duration,
last_run: Instant,
}
impl TieringScheduler {
pub fn new(interval: Duration) -> Self {
Self {
interval,
last_run: Instant::now(),
}
}
pub fn should_run(&self) -> bool {
self.last_run.elapsed() >= self.interval
}
pub fn mark_run(&mut self) {
self.last_run = Instant::now();
}
}
pub trait TierBackend: Send + Sync + std::fmt::Debug {
fn store_chunk(&mut self, id: DataId, chunk: &DataChunk) -> Result<()>;
fn retrieve_chunk(&self, id: DataId) -> Result<DataChunk>;
fn delete_chunk(&mut self, id: DataId) -> Result<()>;
fn get_storage_info(&self) -> TierStorageInfo;
}
#[derive(Debug, Clone)]
pub struct TierStorageInfo {
pub backend_type: TierStorageType,
pub usage: usize,
pub capacity: usize,
pub avg_latency: Duration,
}
#[derive(Debug)]
pub struct InMemoryTierBackend {
data: HashMap<DataId, DataChunk>,
config: TierConfig,
}
impl InMemoryTierBackend {
pub fn new(config: &TierConfig) -> Self {
Self {
data: HashMap::new(),
config: config.clone(),
}
}
}
impl TierBackend for InMemoryTierBackend {
fn store_chunk(&mut self, id: DataId, chunk: &DataChunk) -> Result<()> {
self.data.insert(id, chunk.clone());
Ok(())
}
fn retrieve_chunk(&self, id: DataId) -> Result<DataChunk> {
self.data
.get(&id)
.cloned()
.ok_or_else(|| Error::InvalidOperation(format!("Chunk {:?} not found in memory", id)))
}
fn delete_chunk(&mut self, id: DataId) -> Result<()> {
self.data.remove(&id);
Ok(())
}
fn get_storage_info(&self) -> TierStorageInfo {
let usage = self.data.values().map(|chunk| chunk.len()).sum();
TierStorageInfo {
backend_type: TierStorageType::InMemory,
usage,
capacity: self.config.max_size,
avg_latency: self.config.access_latency,
}
}
}
#[derive(Debug)]
pub struct SSDTierBackend {
data: HashMap<DataId, DataChunk>,
config: TierConfig,
}
impl SSDTierBackend {
pub fn new(config: &TierConfig) -> Self {
Self {
data: HashMap::new(),
config: config.clone(),
}
}
}
impl TierBackend for SSDTierBackend {
fn store_chunk(&mut self, id: DataId, chunk: &DataChunk) -> Result<()> {
std::thread::sleep(Duration::from_micros(100)); self.data.insert(id, chunk.clone());
Ok(())
}
fn retrieve_chunk(&self, id: DataId) -> Result<DataChunk> {
std::thread::sleep(Duration::from_micros(50)); self.data
.get(&id)
.cloned()
.ok_or_else(|| Error::InvalidOperation(format!("Chunk {:?} not found in SSD", id)))
}
fn delete_chunk(&mut self, id: DataId) -> Result<()> {
self.data.remove(&id);
Ok(())
}
fn get_storage_info(&self) -> TierStorageInfo {
let usage = self.data.values().map(|chunk| chunk.len()).sum();
TierStorageInfo {
backend_type: TierStorageType::SSD,
usage,
capacity: self.config.max_size,
avg_latency: self.config.access_latency,
}
}
}
#[derive(Debug)]
pub struct HDDTierBackend {
data: HashMap<DataId, DataChunk>,
config: TierConfig,
}
impl HDDTierBackend {
pub fn new(config: &TierConfig) -> Self {
Self {
data: HashMap::new(),
config: config.clone(),
}
}
}
impl TierBackend for HDDTierBackend {
fn store_chunk(&mut self, id: DataId, chunk: &DataChunk) -> Result<()> {
std::thread::sleep(Duration::from_millis(5)); self.data.insert(id, chunk.clone());
Ok(())
}
fn retrieve_chunk(&self, id: DataId) -> Result<DataChunk> {
std::thread::sleep(Duration::from_millis(10)); self.data
.get(&id)
.cloned()
.ok_or_else(|| Error::InvalidOperation(format!("Chunk {:?} not found in HDD", id)))
}
fn delete_chunk(&mut self, id: DataId) -> Result<()> {
self.data.remove(&id);
Ok(())
}
fn get_storage_info(&self) -> TierStorageInfo {
let usage = self.data.values().map(|chunk| chunk.len()).sum();
TierStorageInfo {
backend_type: TierStorageType::HDD,
usage,
capacity: self.config.max_size,
avg_latency: self.config.access_latency,
}
}
}
#[derive(Debug)]
pub struct HybridHandle {
pub config: HybridConfig,
pub tier_manager: Arc<Mutex<TierManager>>,
pub statistics: HybridStatistics,
}
#[derive(Debug, Clone)]
pub struct HybridStatistics {
pub total_data_size: usize,
pub chunk_count: u64,
pub tier_distribution: HashMap<DataTier, usize>,
pub total_movements: u64,
pub avg_access_latency: Duration,
pub tier_hit_rates: HashMap<DataTier, f64>,
}
impl HybridStatistics {
pub fn new() -> Self {
Self {
total_data_size: 0,
chunk_count: 0,
tier_distribution: HashMap::new(),
total_movements: 0,
avg_access_latency: Duration::ZERO,
tier_hit_rates: HashMap::new(),
}
}
}
pub struct HybridLargeScaleStrategy {
config: HybridConfig,
global_stats: Arc<Mutex<HybridStatistics>>,
next_handle_id: std::sync::atomic::AtomicU64,
}
impl HybridLargeScaleStrategy {
pub fn new(config: HybridConfig) -> Self {
Self {
config,
global_stats: Arc::new(Mutex::new(HybridStatistics::new())),
next_handle_id: std::sync::atomic::AtomicU64::new(1),
}
}
fn create_tier_manager(&self) -> TierManager {
TierManager::new(self.config.clone())
}
fn estimate_storage_requirements(&self, config: &StorageConfig) -> Result<HybridConfig> {
let mut hybrid_config = self.config.clone();
let estimated_size = config.requirements.estimated_size;
if estimated_size > 100 * 1024 * 1024 * 1024 {
hybrid_config.hot_tier.max_size = (estimated_size / 100).max(1024 * 1024 * 1024); hybrid_config.warm_tier.max_size = (estimated_size / 10).max(10 * 1024 * 1024 * 1024); hybrid_config.cold_tier.max_size = estimated_size; }
match config.requirements.access_pattern {
crate::storage::unified_memory::AccessPattern::Sequential => {
hybrid_config.promotion_threshold *= 0.5; hybrid_config.demotion_threshold =
Duration::from_secs(hybrid_config.demotion_threshold.as_secs() * 2);
}
crate::storage::unified_memory::AccessPattern::Random => {
hybrid_config.hot_tier.max_size *= 2; hybrid_config.promotion_threshold *= 2.0; }
crate::storage::unified_memory::AccessPattern::Streaming => {
hybrid_config.enable_auto_tiering = false; hybrid_config.warm_tier.max_size = estimated_size; }
_ => {
}
}
Ok(hybrid_config)
}
}
impl StorageStrategy for HybridLargeScaleStrategy {
type Handle = HybridHandle;
type Error = Error;
type Metadata = HybridStatistics;
fn name(&self) -> &'static str {
"HybridLargeScale"
}
fn create_storage(&mut self, config: &StorageConfig) -> Result<Self::Handle> {
let optimized_config = self.estimate_storage_requirements(config)?;
let tier_manager = Arc::new(Mutex::new(self.create_tier_manager()));
let handle = HybridHandle {
config: optimized_config,
tier_manager,
statistics: HybridStatistics::new(),
};
Ok(handle)
}
fn read_chunk(&self, handle: &Self::Handle, range: ChunkRange) -> Result<DataChunk> {
let start_time = Instant::now();
let data_id = DataId(range.start as u64);
let result = if let Ok(mut manager) = handle.tier_manager.lock() {
manager.retrieve_data(data_id)
} else {
Err(Error::InvalidOperation(
"Failed to acquire tier manager lock".to_string(),
))
};
if let Ok(mut stats) = self.global_stats.lock() {
stats.avg_access_latency = start_time.elapsed();
}
result
}
fn write_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()> {
let data_id = if let Ok(mut manager) = handle.tier_manager.lock() {
manager.store_data(chunk.clone())?
} else {
return Err(Error::InvalidOperation(
"Failed to acquire tier manager lock".to_string(),
));
};
if let Ok(mut stats) = self.global_stats.lock() {
stats.total_data_size += chunk.len();
stats.chunk_count += 1;
}
Ok(())
}
fn append_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()> {
self.write_chunk(handle, chunk)
}
fn flush(&mut self, handle: &Self::Handle) -> Result<()> {
if handle.config.enable_auto_tiering {
if let Ok(mut manager) = handle.tier_manager.lock() {
let report = manager.run_background_tiering()?;
if let Ok(mut stats) = self.global_stats.lock() {
stats.total_movements += report.promotions + report.demotions;
}
}
}
Ok(())
}
fn delete_storage(&mut self, handle: &Self::Handle) -> Result<()> {
if let Ok(mut manager) = handle.tier_manager.lock() {
let data_ids: Vec<DataId> = manager.data_index.keys().cloned().collect();
for data_id in data_ids {
manager.delete_data(data_id)?;
}
}
Ok(())
}
fn can_handle(&self, requirements: &StorageRequirements) -> StrategyCapability {
let can_handle = requirements.estimated_size > 100 * 1024 * 1024;
let confidence = if can_handle { 0.95 } else { 0.3 };
let performance_score = match requirements.performance_priority {
PerformancePriority::Speed => 0.9, PerformancePriority::Memory => 0.8, PerformancePriority::Balanced => 0.95, PerformancePriority::Throughput => 0.9,
PerformancePriority::Latency => 0.85,
};
StrategyCapability {
can_handle,
confidence,
performance_score,
resource_cost: ResourceCost {
memory: requirements.estimated_size / 20, cpu: 20.0, disk: requirements.estimated_size,
network: 0,
},
}
}
fn performance_profile(&self) -> PerformanceProfile {
PerformanceProfile {
read_speed: Speed::VeryFast, write_speed: Speed::Fast, memory_efficiency: Efficiency::Excellent, compression_ratio: 2.0, query_optimization: QueryOptimization::Good,
parallel_scalability: ParallelScalability::Excellent,
}
}
fn storage_stats(&self) -> StorageStats {
if let Ok(stats) = self.global_stats.lock() {
StorageStats {
total_size: stats.total_data_size,
used_size: stats.total_data_size,
read_operations: stats.chunk_count,
write_operations: stats.chunk_count,
avg_read_latency_ns: stats.avg_access_latency.as_nanos() as u64,
avg_write_latency_ns: stats.avg_access_latency.as_nanos() as u64,
cache_hit_rate: stats.tier_hit_rates.values().sum::<f64>()
/ stats.tier_hit_rates.len().max(1) as f64,
}
} else {
StorageStats::default()
}
}
fn optimize_for_pattern(
&mut self,
pattern: crate::storage::unified_memory::AccessPattern,
) -> Result<()> {
match pattern {
crate::storage::unified_memory::AccessPattern::Sequential => {
self.config.promotion_threshold *= 0.5;
self.config.demotion_threshold = Duration::from_secs(48 * 3600);
}
crate::storage::unified_memory::AccessPattern::Random => {
self.config.hot_tier.max_size *= 2;
self.config.promotion_threshold *= 2.0;
}
crate::storage::unified_memory::AccessPattern::Streaming => {
self.config.enable_auto_tiering = false;
}
crate::storage::unified_memory::AccessPattern::HighLocality => {
self.config.hot_tier.max_size *= 3;
self.config.promotion_threshold *= 0.3;
}
crate::storage::unified_memory::AccessPattern::LowLocality => {
self.config.hot_tier.max_size /= 2;
self.config.demotion_threshold = Duration::from_secs(6 * 3600); }
_ => {
}
}
Ok(())
}
fn compact(&mut self, handle: &Self::Handle) -> Result<CompactionResult> {
let start_time = Instant::now();
let size_before = if let Ok(stats) = self.global_stats.lock() {
stats.total_data_size
} else {
0
};
if let Ok(mut manager) = handle.tier_manager.lock() {
let _ = manager.run_background_tiering();
}
let size_after = size_before;
Ok(CompactionResult {
size_before,
size_after,
duration: start_time.elapsed(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_access_pattern_tracking() {
let mut pattern = AccessPattern::new(1024);
assert_eq!(pattern.access_count, 1);
assert_eq!(pattern.data_size, 1024);
std::thread::sleep(Duration::from_millis(1100)); pattern.record_access();
assert_eq!(pattern.access_count, 2);
assert!(pattern.access_frequency > 0.0);
}
#[test]
fn test_tier_statistics() {
let mut stats = TierStatistics::new(1024 * 1024);
assert_eq!(stats.utilization(), 0.0);
stats.current_usage = 512 * 1024;
assert_eq!(stats.utilization(), 0.5);
assert_eq!(stats.available_space(), 512 * 1024);
}
#[test]
fn test_tier_manager() {
let config = HybridConfig::default();
let mut manager = TierManager::new(config);
let test_data = DataChunk::new_test_data(1024);
let data_id = manager
.store_data(test_data.clone())
.expect("operation should succeed");
let retrieved = manager
.retrieve_data(data_id)
.expect("operation should succeed");
assert_eq!(retrieved.len(), test_data.len());
manager
.delete_data(data_id)
.expect("operation should succeed");
}
#[test]
fn test_in_memory_tier_backend() {
let config = TierConfig {
name: "test".to_string(),
storage_type: TierStorageType::InMemory,
max_size: 1024 * 1024,
compression: CompressionType::None,
access_latency: Duration::from_micros(1),
throughput_mbps: 1000.0,
};
let mut backend = InMemoryTierBackend::new(&config);
let data_id = DataId(1);
let chunk = DataChunk::new_test_data(512);
backend
.store_chunk(data_id, &chunk)
.expect("operation should succeed");
let retrieved = backend
.retrieve_chunk(data_id)
.expect("operation should succeed");
assert_eq!(retrieved.len(), chunk.len());
backend
.delete_chunk(data_id)
.expect("operation should succeed");
assert!(backend.retrieve_chunk(data_id).is_err());
}
#[test]
fn test_hybrid_large_scale_strategy() {
let config = HybridConfig::default();
let mut strategy = HybridLargeScaleStrategy::new(config);
let storage_config = StorageConfig {
requirements: StorageRequirements {
estimated_size: 1024 * 1024 * 1024, access_pattern: crate::storage::unified_memory::AccessPattern::Random,
performance_priority: PerformancePriority::Balanced,
..Default::default()
},
..Default::default()
};
let handle = strategy
.create_storage(&storage_config)
.expect("operation should succeed");
let test_chunk = DataChunk::new_test_data(1024);
strategy
.write_chunk(&handle, test_chunk.clone())
.expect("operation should succeed");
let range = ChunkRange::new(0, 1); }
#[test]
fn test_capability_assessment() {
let config = HybridConfig::default();
let strategy = HybridLargeScaleStrategy::new(config);
let requirements = StorageRequirements {
estimated_size: 1024 * 1024 * 1024, access_pattern: crate::storage::unified_memory::AccessPattern::Random,
performance_priority: PerformancePriority::Balanced,
..Default::default()
};
let capability = strategy.can_handle(&requirements);
assert!(capability.can_handle);
assert!(capability.confidence > 0.9);
assert!(capability.performance_score > 0.9);
}
#[test]
fn test_tiering_scheduler() {
let scheduler = TieringScheduler::new(Duration::from_millis(100));
assert!(!scheduler.should_run());
std::thread::sleep(Duration::from_millis(150));
assert!(scheduler.should_run());
}
}