use crate::core::error::{Error, Result};
use std::any::Any;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, RwLock,
};
use std::time::Instant;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StorageType {
ColumnStore,
MemoryMapped,
StringPool,
HybridLargeScale,
DiskBased,
InMemory,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AccessPattern {
Sequential,
Random,
Streaming,
Columnar,
HighLocality,
MediumLocality,
LowLocality,
HighDuplication,
LowDuplication,
LongStrings,
ShortStrings,
TemporalHotSpot,
ColdArchival,
Strided { stride: usize },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PerformancePriority {
Speed,
Memory,
Balanced,
Throughput,
Latency,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DurabilityLevel {
Temporary,
Session,
Durable,
HighDurability,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionPreference {
None,
Auto,
Fast,
High,
Balanced,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConcurrencyLevel {
Single,
Low,
Medium,
High,
VeryHigh,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IoPattern {
ReadHeavy,
WriteHeavy,
Balanced,
AppendOnly,
UpdateInPlace,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DataCharacteristics {
Numeric,
Text,
Mixed,
TimeSeries,
Categorical,
Sparse,
Dense,
}
#[derive(Debug, Clone)]
pub struct StorageRequirements {
pub estimated_size: usize,
pub access_pattern: AccessPattern,
pub performance_priority: PerformancePriority,
pub durability: DurabilityLevel,
pub compression: CompressionPreference,
pub concurrency: ConcurrencyLevel,
pub memory_limit: Option<usize>,
pub io_pattern: IoPattern,
pub data_characteristics: DataCharacteristics,
}
impl Default for StorageRequirements {
fn default() -> Self {
Self {
estimated_size: 1024 * 1024, access_pattern: AccessPattern::Random,
performance_priority: PerformancePriority::Balanced,
durability: DurabilityLevel::Temporary,
compression: CompressionPreference::Auto,
concurrency: ConcurrencyLevel::Medium,
memory_limit: None,
io_pattern: IoPattern::Balanced,
data_characteristics: DataCharacteristics::Mixed,
}
}
}
#[derive(Debug, Clone)]
pub struct StorageConfig {
pub requirements: StorageRequirements,
pub options: HashMap<String, String>,
pub data_sample: Option<Vec<u8>>,
pub expected_access_pattern: AccessPattern,
pub constraints: StorageConstraints,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
requirements: StorageRequirements::default(),
options: HashMap::new(),
data_sample: None,
expected_access_pattern: AccessPattern::Random,
constraints: StorageConstraints::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct StorageConstraints {
pub max_memory: Option<usize>,
pub max_disk: Option<usize>,
pub max_cpu_percent: Option<f64>,
pub availability_requirement: f64,
}
impl Default for StorageConstraints {
fn default() -> Self {
Self {
max_memory: None,
max_disk: None,
max_cpu_percent: Some(80.0),
availability_requirement: 0.99,
}
}
}
#[derive(Debug, Clone)]
pub struct DataChunk {
pub data: Vec<u8>,
pub metadata: ChunkMetadata,
}
impl DataChunk {
pub fn new(data: Vec<u8>) -> Self {
Self {
metadata: ChunkMetadata::new(data.len()),
data,
}
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn from_slice(data: &[u8]) -> Self {
Self::new(data.to_vec())
}
pub fn from_strings(strings: Vec<String>) -> Self {
let data = strings.join("\0").into_bytes();
Self::new(data)
}
pub fn as_strings(&self) -> Result<Vec<String>> {
let data_str = String::from_utf8(self.data.clone())
.map_err(|e| Error::InvalidOperation(format!("Invalid UTF-8 data: {}", e)))?;
Ok(data_str.split('\0').map(|s| s.to_string()).collect())
}
pub fn new_test_data(size: usize) -> Self {
let data = vec![0u8; size];
Self::new(data)
}
}
#[derive(Debug, Clone)]
pub struct ChunkMetadata {
pub size: usize,
pub checksum: u64,
pub compression: CompressionType,
pub created_at: Instant,
}
impl ChunkMetadata {
fn new(size: usize) -> Self {
Self {
size,
checksum: 0,
compression: CompressionType::None,
created_at: Instant::now(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CompressionType {
None,
Auto,
Lz4,
Zstd,
Snappy,
Gzip,
}
#[derive(Debug, Clone)]
pub struct ChunkRange {
pub start: usize,
pub end: usize,
}
impl ChunkRange {
pub fn new(start: usize, end: usize) -> Self {
Self { start, end }
}
pub fn len(&self) -> usize {
self.end.saturating_sub(self.start)
}
pub fn is_empty(&self) -> bool {
self.start >= self.end
}
pub fn full() -> Self {
Self {
start: 0,
end: usize::MAX,
}
}
}
impl From<Range<usize>> for ChunkRange {
fn from(range: Range<usize>) -> Self {
Self::new(range.start, range.end)
}
}
#[derive(Debug, Clone)]
pub struct StrategyCapability {
pub can_handle: bool,
pub confidence: f64,
pub performance_score: f64,
pub resource_cost: ResourceCost,
}
#[derive(Debug, Clone)]
pub struct ResourceCost {
pub memory: usize,
pub cpu: f64,
pub disk: usize,
pub network: usize,
}
#[derive(Debug, Clone)]
pub struct PerformanceProfile {
pub read_speed: Speed,
pub write_speed: Speed,
pub memory_efficiency: Efficiency,
pub compression_ratio: f64,
pub query_optimization: QueryOptimization,
pub parallel_scalability: ParallelScalability,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Speed {
VerySlow,
Slow,
Medium,
Fast,
VeryFast,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Efficiency {
Poor,
Fair,
Good,
Excellent,
Outstanding,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryOptimization {
None,
Basic,
Good,
Excellent,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParallelScalability {
None,
Limited,
Good,
Excellent,
}
#[derive(Debug, Clone)]
pub struct StorageStats {
pub total_size: usize,
pub used_size: usize,
pub read_operations: u64,
pub write_operations: u64,
pub avg_read_latency_ns: u64,
pub avg_write_latency_ns: u64,
pub cache_hit_rate: f64,
}
impl Default for StorageStats {
fn default() -> Self {
Self {
total_size: 0,
used_size: 0,
read_operations: 0,
write_operations: 0,
avg_read_latency_ns: 0,
avg_write_latency_ns: 0,
cache_hit_rate: 0.0,
}
}
}
pub trait StorageStrategy: Send + Sync {
type Handle;
type Error: std::error::Error + Send + Sync + 'static;
type Metadata: Clone + Send + Sync;
fn name(&self) -> &'static str;
fn create_storage(
&mut self,
config: &StorageConfig,
) -> std::result::Result<Self::Handle, Self::Error>;
fn read_chunk(
&self,
handle: &Self::Handle,
range: ChunkRange,
) -> std::result::Result<DataChunk, Self::Error>;
fn write_chunk(
&mut self,
handle: &Self::Handle,
chunk: DataChunk,
) -> std::result::Result<(), Self::Error>;
fn append_chunk(
&mut self,
handle: &Self::Handle,
chunk: DataChunk,
) -> std::result::Result<(), Self::Error>;
fn flush(&mut self, handle: &Self::Handle) -> std::result::Result<(), Self::Error>;
fn delete_storage(&mut self, handle: &Self::Handle) -> std::result::Result<(), Self::Error>;
fn can_handle(&self, requirements: &StorageRequirements) -> StrategyCapability;
fn performance_profile(&self) -> PerformanceProfile;
fn storage_stats(&self) -> StorageStats;
fn optimize_for_pattern(
&mut self,
pattern: AccessPattern,
) -> std::result::Result<(), Self::Error>;
fn compact(
&mut self,
handle: &Self::Handle,
) -> std::result::Result<CompactionResult, Self::Error>;
}
#[derive(Debug, Clone)]
pub struct CompactionResult {
pub size_before: usize,
pub size_after: usize,
pub duration: std::time::Duration,
}
#[derive(Debug)]
pub struct StorageHandle {
pub id: StorageId,
pub strategy_type: StorageType,
pub inner_handle: Box<dyn Any + Send + Sync>,
pub metadata: StorageMetadata,
pub ref_count: Arc<AtomicUsize>,
pub performance_tracker: PerformanceTracker,
}
impl StorageHandle {
pub fn new(
id: StorageId,
strategy_type: StorageType,
inner_handle: Box<dyn Any + Send + Sync>,
metadata: StorageMetadata,
) -> Self {
Self {
id,
strategy_type,
inner_handle,
metadata,
ref_count: Arc::new(AtomicUsize::new(1)),
performance_tracker: PerformanceTracker::new(),
}
}
}
impl Drop for StorageHandle {
fn drop(&mut self) {
if self.ref_count.fetch_sub(1, Ordering::SeqCst) == 1 {
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct StorageId(pub u64);
#[derive(Debug, Clone)]
pub struct StorageMetadata {
pub created_at: Instant,
pub last_accessed: Instant,
pub size: usize,
pub access_count: u64,
}
impl StorageMetadata {
pub fn new(size: usize) -> Self {
let now = Instant::now();
Self {
created_at: now,
last_accessed: now,
size,
access_count: 0,
}
}
}
#[derive(Debug, Clone)]
pub struct PerformanceTracker {
pub read_times: Vec<std::time::Duration>,
pub write_times: Vec<std::time::Duration>,
pub bytes_read: u64,
pub bytes_written: u64,
}
impl PerformanceTracker {
pub fn new() -> Self {
Self {
read_times: Vec::new(),
write_times: Vec::new(),
bytes_read: 0,
bytes_written: 0,
}
}
pub fn record_read(&mut self, duration: std::time::Duration, bytes: u64) {
self.read_times.push(duration);
self.bytes_read += bytes;
}
pub fn record_write(&mut self, duration: std::time::Duration, bytes: u64) {
self.write_times.push(duration);
self.bytes_written += bytes;
}
pub fn average_read_time(&self) -> Option<std::time::Duration> {
if self.read_times.is_empty() {
None
} else {
let total: std::time::Duration = self.read_times.iter().sum();
Some(total / self.read_times.len() as u32)
}
}
pub fn average_write_time(&self) -> Option<std::time::Duration> {
if self.write_times.is_empty() {
None
} else {
let total: std::time::Duration = self.write_times.iter().sum();
Some(total / self.write_times.len() as u32)
}
}
}
#[derive(Debug)]
pub struct AtomicMemoryStats {
pub total_allocated: AtomicUsize,
pub peak_usage: AtomicUsize,
pub active_allocations: AtomicUsize,
pub allocation_count: AtomicUsize,
pub deallocation_count: AtomicUsize,
}
impl AtomicMemoryStats {
pub fn new() -> Self {
Self {
total_allocated: AtomicUsize::new(0),
peak_usage: AtomicUsize::new(0),
active_allocations: AtomicUsize::new(0),
allocation_count: AtomicUsize::new(0),
deallocation_count: AtomicUsize::new(0),
}
}
pub fn record_allocation(&self, size: usize) {
self.total_allocated.fetch_add(size, Ordering::SeqCst);
self.active_allocations.fetch_add(1, Ordering::SeqCst);
self.allocation_count.fetch_add(1, Ordering::SeqCst);
let current = self.total_allocated.load(Ordering::SeqCst);
self.peak_usage.fetch_max(current, Ordering::SeqCst);
}
pub fn record_deallocation(&self, size: usize) {
self.total_allocated.fetch_sub(size, Ordering::SeqCst);
self.active_allocations.fetch_sub(1, Ordering::SeqCst);
self.deallocation_count.fetch_add(1, Ordering::SeqCst);
}
}
impl Default for AtomicMemoryStats {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_storage_requirements_default() {
let req = StorageRequirements::default();
assert_eq!(req.estimated_size, 1024 * 1024);
assert_eq!(req.performance_priority, PerformancePriority::Balanced);
}
#[test]
fn test_data_chunk() {
let data = vec![1, 2, 3, 4, 5];
let chunk = DataChunk::new(data.clone());
assert_eq!(chunk.len(), 5);
assert_eq!(chunk.data, data);
}
#[test]
fn test_chunk_range() {
let range = ChunkRange::new(10, 20);
assert_eq!(range.len(), 10);
assert!(!range.is_empty());
let empty_range = ChunkRange::new(20, 10);
assert!(empty_range.is_empty());
}
#[test]
fn test_storage_handle_creation() {
let handle = StorageHandle::new(
StorageId(1),
StorageType::InMemory,
Box::new(42u32),
StorageMetadata::new(1024),
);
assert_eq!(handle.ref_count.load(Ordering::SeqCst), 1);
assert_eq!(handle.id, StorageId(1));
assert_eq!(handle.strategy_type, StorageType::InMemory);
assert_eq!(handle.metadata.size, 1024);
}
#[test]
fn test_performance_tracker() {
let mut tracker = PerformanceTracker::new();
tracker.record_read(std::time::Duration::from_millis(10), 1024);
tracker.record_read(std::time::Duration::from_millis(20), 2048);
assert_eq!(tracker.bytes_read, 3072);
let avg_time = tracker
.average_read_time()
.expect("operation should succeed");
assert_eq!(avg_time, std::time::Duration::from_millis(15));
}
#[test]
fn test_atomic_memory_stats() {
let stats = AtomicMemoryStats::new();
stats.record_allocation(1024);
assert_eq!(stats.total_allocated.load(Ordering::SeqCst), 1024);
assert_eq!(stats.active_allocations.load(Ordering::SeqCst), 1);
stats.record_allocation(2048);
assert_eq!(stats.total_allocated.load(Ordering::SeqCst), 3072);
assert_eq!(stats.peak_usage.load(Ordering::SeqCst), 3072);
stats.record_deallocation(1024);
assert_eq!(stats.total_allocated.load(Ordering::SeqCst), 2048);
assert_eq!(stats.active_allocations.load(Ordering::SeqCst), 1);
}
}