use crate::core::error::{Error, Result};
use std::any::Any;
use std::collections::HashMap;
use std::ops::Range;
#[derive(Debug, Clone)]
pub struct StorageConfig {
pub estimated_size: usize,
pub access_pattern: AccessPattern,
pub performance_priority: PerformancePriority,
pub durability: DurabilityLevel,
pub compression: CompressionPreference,
pub memory_limit: Option<usize>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AccessPattern {
Sequential,
Random,
ReadHeavy,
WriteHeavy,
Streaming,
Columnar,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PerformancePriority {
Speed,
Memory,
Balanced,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DurabilityLevel {
Temporary,
Cached,
Persistent,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionPreference {
None,
Auto,
Fast,
Best,
}
#[derive(Debug, Clone)]
pub struct DataChunk {
pub data: Vec<u8>,
pub metadata: ChunkMetadata,
}
#[derive(Debug, Clone)]
pub struct ChunkMetadata {
pub row_count: usize,
pub column_count: usize,
pub compression: CompressionPreference,
pub uncompressed_size: usize,
pub compressed_size: usize,
}
impl DataChunk {
pub fn new(data: Vec<u8>, metadata: ChunkMetadata) -> Self {
Self { data, metadata }
}
pub fn new_test_data(size: usize) -> Self {
let data = vec![0u8; size];
let metadata = ChunkMetadata {
row_count: size / 8, column_count: 1,
compression: CompressionPreference::None,
uncompressed_size: size,
compressed_size: size,
};
Self { data, metadata }
}
pub fn size(&self) -> usize {
self.data.len()
}
}
#[derive(Debug, Clone)]
pub struct PerformanceProfile {
pub read_speed: Speed,
pub write_speed: Speed,
pub memory_efficiency: Efficiency,
pub compression_ratio: f64,
pub random_access_speed: Speed,
pub sequential_access_speed: Speed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Speed {
Slow,
Medium,
Fast,
VeryFast,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Efficiency {
Poor,
Fair,
Good,
Excellent,
}
#[derive(Debug, Clone)]
pub struct StorageStatistics {
pub total_size: usize,
pub chunk_count: usize,
pub avg_compression_ratio: f64,
pub read_operations: u64,
pub write_operations: u64,
pub cache_hit_rate: f64,
}
pub trait StorageEngine: Send + Sync {
type Handle: Clone + Send + Sync;
type Error: std::error::Error + Send + Sync + 'static;
fn create_storage(&mut self, config: &StorageConfig) -> Result<Self::Handle>;
fn read_chunk(&self, handle: &Self::Handle, range: Range<usize>) -> Result<DataChunk>;
fn write_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()>;
fn append_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()>;
fn flush(&mut self, handle: &Self::Handle) -> Result<()>;
fn delete_storage(&mut self, handle: &Self::Handle) -> Result<()>;
fn performance_profile(&self) -> PerformanceProfile;
fn storage_stats(&self, handle: &Self::Handle) -> Result<StorageStatistics>;
fn supports_random_access(&self) -> bool;
fn supports_streaming(&self) -> bool;
fn supports_compression(&self) -> bool;
fn optimal_chunk_size(&self) -> usize;
fn memory_overhead(&self) -> usize;
fn optimize_for_pattern(&mut self, pattern: AccessPattern) -> Result<()>;
fn compact(&mut self, handle: &Self::Handle) -> Result<()>;
}
pub trait StorageStrategy: Send + Sync {
fn select_engine(&self, requirements: &StorageRequirements) -> StorageEngineId;
fn should_migrate(
&self,
handle: &StorageHandle,
new_requirements: &StorageRequirements,
) -> bool;
fn migrate_storage(
&mut self,
from: &StorageHandle,
to: &StorageEngineId,
) -> Result<StorageHandle>;
}
#[derive(Debug, Clone)]
pub struct StorageRequirements {
pub config: StorageConfig,
pub workload: WorkloadCharacteristics,
pub constraints: ResourceConstraints,
}
#[derive(Debug, Clone)]
pub struct WorkloadCharacteristics {
pub read_write_ratio: f64,
pub avg_query_size: usize,
pub concurrency_level: u32,
pub locality_pattern: LocalityPattern,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LocalityPattern {
HighTemporal,
HighSpatial,
Mixed,
PoorLocality,
}
#[derive(Debug, Clone)]
pub struct ResourceConstraints {
pub max_memory: Option<usize>,
pub max_disk: Option<usize>,
pub cpu_budget: CpuBudget,
pub network_budget: Option<NetworkBandwidth>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CpuBudget {
Low,
Medium,
High,
Unlimited,
}
#[derive(Debug, Clone, Copy)]
pub struct NetworkBandwidth {
pub bytes_per_second: u64,
pub latency_ms: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StorageEngineId {
ColumnStore,
MemoryMapped,
DiskStorage,
StringPool,
Hybrid,
}
#[derive(Debug, Clone)]
pub struct StorageHandle {
pub engine_id: StorageEngineId,
pub inner_handle: StorageHandleInner,
pub metadata: StorageMetadata,
}
#[derive(Debug, Clone)]
pub enum StorageHandleInner {
ColumnStore(crate::storage::column_store::ColumnStoreHandle),
MemoryMapped(MemoryMappedHandle),
DiskStorage(DiskStorageHandle),
StringPool(StringPoolHandle),
}
#[derive(Debug, Clone)]
pub struct StorageMetadata {
pub created_at: std::time::SystemTime,
pub last_accessed: std::time::SystemTime,
pub last_modified: std::time::SystemTime,
pub size_bytes: usize,
pub access_count: u64,
pub config: StorageConfig,
}
pub mod handles {
#[derive(Debug, Clone)]
pub struct ColumnStoreHandle {
pub id: usize,
}
#[derive(Debug, Clone)]
pub struct MemoryMappedHandle {
pub id: usize,
}
#[derive(Debug, Clone)]
pub struct DiskStorageHandle {
pub id: usize,
}
#[derive(Debug, Clone)]
pub struct StringPoolHandle {
pub id: usize,
}
}
pub use handles::*;
pub struct UnifiedStorageManager {
column_store: Option<crate::storage::ColumnStore>,
strategy: Box<dyn StorageStrategy>,
handles: HashMap<StorageHandleId, StorageHandle>,
monitor: PerformanceMonitor,
next_handle_id: StorageHandleId,
}
pub type StorageHandleId = usize;
#[derive(Debug)]
pub struct PerformanceMonitor {
engine_metrics: HashMap<StorageEngineId, EngineMetrics>,
}
#[derive(Debug, Clone)]
pub struct EngineMetrics {
pub avg_read_latency: u64,
pub avg_write_latency: u64,
pub throughput: u64,
pub error_rate: f64,
pub memory_usage: usize,
}
impl PerformanceMonitor {
pub fn new() -> Self {
Self {
engine_metrics: HashMap::new(),
}
}
pub fn record_operation(
&mut self,
engine_id: StorageEngineId,
operation: &Operation,
latency: std::time::Duration,
) {
let metrics = self
.engine_metrics
.entry(engine_id)
.or_insert_with(|| EngineMetrics {
avg_read_latency: 0,
avg_write_latency: 0,
throughput: 0,
error_rate: 0.0,
memory_usage: 0,
});
match operation {
Operation::Read { .. } => {
metrics.avg_read_latency =
(metrics.avg_read_latency + latency.as_nanos() as u64) / 2;
}
Operation::Write { .. } => {
metrics.avg_write_latency =
(metrics.avg_write_latency + latency.as_nanos() as u64) / 2;
}
}
}
pub fn get_metrics(&self, engine_id: StorageEngineId) -> Option<&EngineMetrics> {
self.engine_metrics.get(&engine_id)
}
}
#[derive(Debug, Clone)]
pub enum Operation {
Read { size: usize, range: Range<usize> },
Write { size: usize, compressed: bool },
}
impl Default for PerformanceMonitor {
fn default() -> Self {
Self::new()
}
}
impl UnifiedStorageManager {
pub fn new() -> Self {
Self {
column_store: Some(crate::storage::ColumnStore::new()),
strategy: Box::new(DefaultStorageStrategy::new()),
handles: HashMap::new(),
monitor: PerformanceMonitor::new(),
next_handle_id: 1,
}
}
pub fn create_storage(
&mut self,
requirements: &StorageRequirements,
) -> Result<StorageHandleId> {
let engine_id = self.strategy.select_engine(requirements);
let inner_handle = match engine_id {
StorageEngineId::ColumnStore => {
if let Some(ref mut engine) = self.column_store {
let handle = engine.create_storage(&requirements.config)?;
StorageHandleInner::ColumnStore(handle)
} else {
return Err(Error::InvalidOperation(
"Column store not available for create_storage".to_string(),
));
}
}
_ => {
return Err(Error::NotImplemented(format!(
"Storage engine {:?}",
engine_id
)));
}
};
let handle_id = self.next_handle_id;
self.next_handle_id += 1;
let handle = StorageHandle {
engine_id,
inner_handle,
metadata: StorageMetadata {
created_at: std::time::SystemTime::now(),
last_accessed: std::time::SystemTime::now(),
last_modified: std::time::SystemTime::now(),
size_bytes: 0,
access_count: 0,
config: requirements.config.clone(),
},
};
self.handles.insert(handle_id, handle);
Ok(handle_id)
}
pub fn read_chunk(&self, handle_id: StorageHandleId, range: Range<usize>) -> Result<DataChunk> {
let handle = self.handles.get(&handle_id).ok_or_else(|| {
Error::InvalidOperation("Invalid handle ID for read_chunk".to_string())
})?;
match (&handle.engine_id, &handle.inner_handle) {
(StorageEngineId::ColumnStore, StorageHandleInner::ColumnStore(cs_handle)) => {
if let Some(ref engine) = self.column_store {
engine.read_chunk(cs_handle, range)
} else {
Err(Error::InvalidOperation(
"Column store not available for read_chunk".to_string(),
))
}
}
_ => Err(Error::NotImplemented(format!(
"Engine {:?}",
handle.engine_id
))),
}
}
pub fn write_chunk(&mut self, handle_id: StorageHandleId, chunk: DataChunk) -> Result<()> {
let handle = self.handles.get(&handle_id).ok_or_else(|| {
Error::InvalidOperation("Invalid handle ID for write_chunk".to_string())
})?;
let result = match (&handle.engine_id, &handle.inner_handle) {
(StorageEngineId::ColumnStore, StorageHandleInner::ColumnStore(cs_handle)) => {
if let Some(ref mut engine) = self.column_store {
engine.write_chunk(cs_handle, chunk)
} else {
Err(Error::InvalidOperation(
"Column store not available for write_chunk".to_string(),
))
}
}
_ => Err(Error::NotImplemented(format!(
"Engine {:?}",
handle.engine_id
))),
};
if result.is_ok() {
if let Some(handle) = self.handles.get_mut(&handle_id) {
handle.metadata.last_modified = std::time::SystemTime::now();
handle.metadata.access_count += 1;
}
}
result
}
}
pub struct DefaultStorageStrategy {
preferences: HashMap<AccessPattern, StorageEngineId>,
}
impl DefaultStorageStrategy {
pub fn new() -> Self {
let mut preferences = HashMap::new();
preferences.insert(AccessPattern::Sequential, StorageEngineId::ColumnStore);
preferences.insert(AccessPattern::Random, StorageEngineId::MemoryMapped);
preferences.insert(AccessPattern::ReadHeavy, StorageEngineId::ColumnStore);
preferences.insert(AccessPattern::WriteHeavy, StorageEngineId::DiskStorage);
preferences.insert(AccessPattern::Streaming, StorageEngineId::DiskStorage);
preferences.insert(AccessPattern::Columnar, StorageEngineId::ColumnStore);
Self { preferences }
}
}
impl StorageStrategy for DefaultStorageStrategy {
fn select_engine(&self, requirements: &StorageRequirements) -> StorageEngineId {
self.preferences
.get(&requirements.config.access_pattern)
.copied()
.unwrap_or(StorageEngineId::ColumnStore)
}
fn should_migrate(
&self,
_handle: &StorageHandle,
_new_requirements: &StorageRequirements,
) -> bool {
false
}
fn migrate_storage(
&mut self,
_from: &StorageHandle,
_to: &StorageEngineId,
) -> Result<StorageHandle> {
Err(Error::NotImplemented("Storage migration".to_string()))
}
}
impl Default for DefaultStorageStrategy {
fn default() -> Self {
Self::new()
}
}