use crate::core::error::{Error, Result};
use crate::storage::unified_memory::*;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct ColumnStoreConfig {
pub compression_type: CompressionType,
pub encoding_type: EncodingType,
pub block_size: usize,
pub enable_dictionary: bool,
pub enable_parallel: bool,
pub metadata_cache_size: usize,
}
impl Default for ColumnStoreConfig {
fn default() -> Self {
Self {
compression_type: CompressionType::Zstd,
encoding_type: EncodingType::Auto,
block_size: 64 * 1024, enable_dictionary: true,
enable_parallel: true,
metadata_cache_size: 10 * 1024 * 1024, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum EncodingType {
Auto,
None,
RunLength,
Dictionary,
Delta,
BitPacked,
}
pub trait CompressionEngine: Send + Sync {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>>;
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>>;
fn name(&self) -> &'static str;
fn compression_ratio(&self, original_size: usize, compressed_size: usize) -> f64;
}
pub struct Lz4CompressionEngine;
impl CompressionEngine for Lz4CompressionEngine {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
let mut compressed = vec![data.len() as u8]; compressed.extend(data); Ok(compressed)
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
if data.is_empty() {
return Ok(Vec::new());
}
Ok(data[1..].to_vec()) }
fn name(&self) -> &'static str {
"LZ4"
}
fn compression_ratio(&self, original_size: usize, compressed_size: usize) -> f64 {
if compressed_size == 0 {
0.0
} else {
original_size as f64 / compressed_size as f64
}
}
}
pub struct ZstdCompressionEngine {
compression_level: i32,
}
impl ZstdCompressionEngine {
pub fn new(level: i32) -> Self {
Self {
compression_level: level,
}
}
}
impl CompressionEngine for ZstdCompressionEngine {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
let mut compressed = vec![(data.len() >> 8) as u8, data.len() as u8]; compressed.extend(data); Ok(compressed)
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
if data.len() < 2 {
return Ok(Vec::new());
}
Ok(data[2..].to_vec()) }
fn name(&self) -> &'static str {
"ZSTD"
}
fn compression_ratio(&self, original_size: usize, compressed_size: usize) -> f64 {
if compressed_size == 0 {
0.0
} else {
original_size as f64 / compressed_size as f64
}
}
}
pub struct NoCompressionEngine;
impl CompressionEngine for NoCompressionEngine {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
Ok(data.to_vec())
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
Ok(data.to_vec())
}
fn name(&self) -> &'static str {
"None"
}
fn compression_ratio(&self, _original_size: usize, _compressed_size: usize) -> f64 {
1.0
}
}
pub trait EncodingStrategy: Send + Sync {
fn encode(&self, data: &[u8]) -> Result<EncodedData>;
fn decode(&self, data: &EncodedData) -> Result<Vec<u8>>;
fn name(&self) -> &'static str;
fn encoding_ratio(&self, original_size: usize, encoded_size: usize) -> f64;
}
#[derive(Debug, Clone)]
pub struct EncodedData {
pub data: Vec<u8>,
pub encoding_type: EncodingType,
pub original_size: usize,
pub metadata: HashMap<String, String>,
}
pub struct RunLengthEncodingStrategy;
impl EncodingStrategy for RunLengthEncodingStrategy {
fn encode(&self, data: &[u8]) -> Result<EncodedData> {
let mut encoded = Vec::new();
let mut i = 0;
while i < data.len() {
let current_byte = data[i];
let mut count = 1u8;
while i + (count as usize) < data.len()
&& data[i + (count as usize)] == current_byte
&& count < 255
{
count += 1;
}
encoded.push(count);
encoded.push(current_byte);
i += count as usize;
}
Ok(EncodedData {
data: encoded,
encoding_type: EncodingType::RunLength,
original_size: data.len(),
metadata: HashMap::new(),
})
}
fn decode(&self, encoded: &EncodedData) -> Result<Vec<u8>> {
let mut decoded = Vec::with_capacity(encoded.original_size);
let mut i = 0;
while i + 1 < encoded.data.len() {
let count = encoded.data[i];
let byte_val = encoded.data[i + 1];
for _ in 0..count {
decoded.push(byte_val);
}
i += 2;
}
Ok(decoded)
}
fn name(&self) -> &'static str {
"RunLength"
}
fn encoding_ratio(&self, original_size: usize, encoded_size: usize) -> f64 {
if encoded_size == 0 {
0.0
} else {
original_size as f64 / encoded_size as f64
}
}
}
#[derive(Debug, Clone)]
pub struct BlockMetadata {
pub id: BlockId,
pub location: BlockLocation,
pub compressed_size: usize,
pub uncompressed_size: usize,
pub compression_type: CompressionType,
pub encoding_type: EncodingType,
pub checksum: u64,
pub created_at: Instant,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
pub null_count: u64,
pub distinct_count: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BlockId(pub u64);
#[derive(Debug, Clone)]
pub struct BlockLocation {
pub offset: u64,
pub size: usize,
}
#[derive(Debug, Clone)]
pub struct CompressedBlock {
pub data: Vec<u8>,
pub compression_type: CompressionType,
pub encoding_type: EncodingType,
pub metadata: BlockMetadata,
}
impl CompressedBlock {
pub fn new(
data: Vec<u8>,
compression_type: CompressionType,
encoding_type: EncodingType,
) -> Self {
let metadata = BlockMetadata {
id: BlockId(0), location: BlockLocation {
offset: 0,
size: data.len(),
},
compressed_size: data.len(),
uncompressed_size: data.len(), compression_type,
encoding_type,
checksum: Self::compute_checksum(&data),
created_at: Instant::now(),
min_value: None,
max_value: None,
null_count: 0,
distinct_count: None,
};
Self {
data,
compression_type,
encoding_type,
metadata,
}
}
pub fn compressed_size(&self) -> usize {
self.data.len()
}
pub fn uncompressed_size(&self) -> usize {
self.metadata.uncompressed_size
}
pub fn checksum(&self) -> u64 {
self.metadata.checksum
}
fn compute_checksum(data: &[u8]) -> u64 {
data.iter().map(|&b| b as u64).sum()
}
}
#[derive(Debug)]
pub struct ColumnStoreHandle {
pub layout: ColumnLayout,
pub compression_type: CompressionType,
pub encoding_type: EncodingType,
pub blocks: Vec<BlockId>,
pub statistics: ColumnStatistics,
}
#[derive(Debug, Clone)]
pub struct ColumnLayout {
pub name: String,
pub data_type: ColumnDataType,
pub nullable: bool,
pub block_size: usize,
pub total_size: usize,
pub row_count: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnDataType {
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Float32,
Float64,
String,
Binary,
Boolean,
Timestamp,
Date,
Decimal,
}
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
pub null_count: u64,
pub distinct_count: Option<u64>,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
pub total_size: u64,
pub compressed_size: u64,
pub compression_ratio: f64,
pub encoding_ratio: f64,
}
impl ColumnStatistics {
pub fn new() -> Self {
Self {
null_count: 0,
distinct_count: None,
min_value: None,
max_value: None,
total_size: 0,
compressed_size: 0,
compression_ratio: 1.0,
encoding_ratio: 1.0,
}
}
}
pub struct BlockManager {
storage: Box<dyn PhysicalStorage>,
allocator: BlockAllocator,
block_cache: Arc<Mutex<HashMap<BlockId, CompressedBlock>>>,
metadata_index: HashMap<BlockId, BlockMetadata>,
free_space_tracker: FreeSpaceTracker,
next_block_id: std::sync::atomic::AtomicU64,
}
impl BlockManager {
pub fn new(storage: Box<dyn PhysicalStorage>) -> Self {
Self {
storage,
allocator: BlockAllocator::new(),
block_cache: Arc::new(Mutex::new(HashMap::new())),
metadata_index: HashMap::new(),
free_space_tracker: FreeSpaceTracker::new(),
next_block_id: std::sync::atomic::AtomicU64::new(1),
}
}
pub fn write_block(&mut self, mut block: CompressedBlock) -> Result<BlockId> {
let block_id = BlockId(
self.next_block_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
);
block.metadata.id = block_id;
let location = self.allocator.allocate_space(block.compressed_size())?;
block.metadata.location = location.clone();
self.storage.write_at_location(&location, &block.data)?;
self.metadata_index.insert(block_id, block.metadata.clone());
if let Ok(mut cache) = self.block_cache.lock() {
cache.insert(block_id, block);
}
Ok(block_id)
}
pub fn read_block(&self, block_id: BlockId) -> Result<CompressedBlock> {
if let Ok(cache) = self.block_cache.lock() {
if let Some(block) = cache.get(&block_id) {
return Ok(block.clone());
}
}
let metadata = self
.metadata_index
.get(&block_id)
.ok_or_else(|| Error::InvalidOperation(format!("Block {:?} not found", block_id)))?;
let data = self
.storage
.read_at_location(&metadata.location, metadata.compressed_size)?;
let computed_checksum = CompressedBlock::compute_checksum(&data);
if computed_checksum != metadata.checksum {
return Err(Error::InvalidOperation(format!(
"Checksum mismatch for block {:?}: expected {}, got {}",
block_id, metadata.checksum, computed_checksum
)));
}
let block = CompressedBlock {
data,
compression_type: metadata.compression_type,
encoding_type: metadata.encoding_type,
metadata: metadata.clone(),
};
if let Ok(mut cache) = self.block_cache.lock() {
cache.insert(block_id, block.clone());
}
Ok(block)
}
}
pub trait PhysicalStorage: Send + Sync {
fn write_at_location(&mut self, location: &BlockLocation, data: &[u8]) -> Result<()>;
fn read_at_location(&self, location: &BlockLocation, size: usize) -> Result<Vec<u8>>;
fn delete_at_location(&mut self, location: &BlockLocation) -> Result<()>;
fn total_size(&self) -> u64;
fn available_space(&self) -> u64;
}
pub struct InMemoryPhysicalStorage {
data: Vec<u8>,
allocated_regions: Vec<(u64, usize)>,
}
impl InMemoryPhysicalStorage {
pub fn new(capacity: usize) -> Self {
Self {
data: vec![0; capacity],
allocated_regions: Vec::new(),
}
}
}
impl PhysicalStorage for InMemoryPhysicalStorage {
fn write_at_location(&mut self, location: &BlockLocation, data: &[u8]) -> Result<()> {
let start = location.offset as usize;
let end = start + data.len();
if end > self.data.len() {
return Err(Error::InvalidOperation(
"Not enough space in storage".to_string(),
));
}
self.data[start..end].copy_from_slice(data);
self.allocated_regions.push((location.offset, data.len()));
Ok(())
}
fn read_at_location(&self, location: &BlockLocation, size: usize) -> Result<Vec<u8>> {
let start = location.offset as usize;
let end = start + size;
if end > self.data.len() {
return Err(Error::InvalidOperation(
"Read beyond storage bounds".to_string(),
));
}
Ok(self.data[start..end].to_vec())
}
fn delete_at_location(&mut self, location: &BlockLocation) -> Result<()> {
self.allocated_regions
.retain(|(offset, _)| *offset != location.offset);
Ok(())
}
fn total_size(&self) -> u64 {
self.data.len() as u64
}
fn available_space(&self) -> u64 {
let allocated: usize = self.allocated_regions.iter().map(|(_, size)| size).sum();
(self.data.len() - allocated) as u64
}
}
pub struct BlockAllocator {
next_offset: u64,
}
impl BlockAllocator {
pub fn new() -> Self {
Self { next_offset: 0 }
}
pub fn allocate_space(&mut self, size: usize) -> Result<BlockLocation> {
let location = BlockLocation {
offset: self.next_offset,
size,
};
self.next_offset += size as u64;
Ok(location)
}
}
pub struct FreeSpaceTracker {
free_blocks: Vec<(u64, usize)>,
}
impl FreeSpaceTracker {
pub fn new() -> Self {
Self {
free_blocks: Vec::new(),
}
}
pub fn add_free_space(&mut self, offset: u64, size: usize) {
self.free_blocks.push((offset, size));
self.free_blocks.sort_by_key(|(offset, _)| *offset);
self.merge_adjacent_blocks();
}
pub fn merge_adjacent_blocks(&mut self) {
if self.free_blocks.len() <= 1 {
return;
}
let mut merged = Vec::new();
let mut current = self.free_blocks[0];
for &next in self.free_blocks.iter().skip(1) {
if current.0 + current.1 as u64 == next.0 {
current.1 += next.1;
} else {
merged.push(current);
current = next;
}
}
merged.push(current);
self.free_blocks = merged;
}
pub fn find_space(&self, required_size: usize) -> Option<u64> {
self.free_blocks
.iter()
.find(|(_, size)| *size >= required_size)
.map(|(offset, _)| *offset)
}
}
pub struct UnifiedColumnStoreStrategy {
compression_engines: HashMap<CompressionType, Box<dyn CompressionEngine>>,
encoding_strategies: HashMap<EncodingType, Box<dyn EncodingStrategy>>,
block_manager: Arc<Mutex<BlockManager>>,
metadata_cache: Arc<RwLock<HashMap<String, ColumnStatistics>>>,
config: ColumnStoreConfig,
}
impl UnifiedColumnStoreStrategy {
pub fn new(config: ColumnStoreConfig) -> Self {
let mut compression_engines: HashMap<CompressionType, Box<dyn CompressionEngine>> =
HashMap::new();
compression_engines.insert(CompressionType::None, Box::new(NoCompressionEngine));
compression_engines.insert(CompressionType::Lz4, Box::new(Lz4CompressionEngine));
compression_engines.insert(
CompressionType::Zstd,
Box::new(ZstdCompressionEngine::new(3)),
);
let mut encoding_strategies: HashMap<EncodingType, Box<dyn EncodingStrategy>> =
HashMap::new();
encoding_strategies.insert(EncodingType::RunLength, Box::new(RunLengthEncodingStrategy));
let storage = Box::new(InMemoryPhysicalStorage::new(100 * 1024 * 1024)); let block_manager = Arc::new(Mutex::new(BlockManager::new(storage)));
Self {
compression_engines,
encoding_strategies,
block_manager,
metadata_cache: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
fn determine_optimal_layout(&self, config: &StorageConfig) -> Result<ColumnLayout> {
let data_type = match config.requirements.data_characteristics {
DataCharacteristics::Numeric => ColumnDataType::Float64,
DataCharacteristics::Text => ColumnDataType::String,
DataCharacteristics::TimeSeries => ColumnDataType::Timestamp,
DataCharacteristics::Categorical => ColumnDataType::String,
_ => ColumnDataType::Binary,
};
Ok(ColumnLayout {
name: "default".to_string(),
data_type,
nullable: true,
block_size: self.config.block_size,
total_size: config.requirements.estimated_size,
row_count: (config.requirements.estimated_size / 8) as u64, })
}
fn select_compression_strategy(
&self,
characteristics: &DataCharacteristics,
) -> Result<CompressionType> {
match characteristics {
DataCharacteristics::Text => Ok(CompressionType::Zstd), DataCharacteristics::Numeric => Ok(CompressionType::Lz4), DataCharacteristics::Sparse => Ok(CompressionType::Zstd), _ => Ok(self.config.compression_type),
}
}
fn select_encoding_strategy(
&self,
characteristics: &DataCharacteristics,
) -> Result<EncodingType> {
match characteristics {
DataCharacteristics::Categorical => Ok(EncodingType::Dictionary),
DataCharacteristics::Sparse => Ok(EncodingType::RunLength),
DataCharacteristics::TimeSeries => Ok(EncodingType::Delta),
_ => Ok(self.config.encoding_type),
}
}
fn find_blocks_for_range(
&self,
handle: &ColumnStoreHandle,
range: &ChunkRange,
) -> Result<Vec<BlockId>> {
Ok(handle.blocks.clone())
}
fn read_and_decompress_block(
&self,
_handle: &ColumnStoreHandle,
block_id: BlockId,
) -> Result<Vec<u8>> {
let block_manager = self.block_manager.lock().map_err(|_| {
Error::InvalidOperation("Failed to acquire block manager lock".to_string())
})?;
let compressed_block = block_manager.read_block(block_id)?;
let compression_engine = self
.compression_engines
.get(&compressed_block.compression_type)
.ok_or_else(|| {
Error::InvalidOperation(format!(
"Compression engine {:?} not found",
compressed_block.compression_type
))
})?;
let decompressed_data = compression_engine.decompress(&compressed_block.data)?;
if compressed_block.encoding_type != EncodingType::None {
let encoding_strategy = self
.encoding_strategies
.get(&compressed_block.encoding_type)
.ok_or_else(|| {
Error::InvalidOperation(format!(
"Encoding strategy {:?} not found",
compressed_block.encoding_type
))
})?;
let encoded_data = EncodedData {
data: decompressed_data,
encoding_type: compressed_block.encoding_type,
original_size: compressed_block.uncompressed_size(),
metadata: HashMap::new(),
};
encoding_strategy.decode(&encoded_data)
} else {
Ok(decompressed_data)
}
}
fn merge_blocks_to_chunk(&self, blocks: Vec<Vec<u8>>, _range: ChunkRange) -> Result<DataChunk> {
let mut merged_data = Vec::new();
for block in blocks {
merged_data.extend(block);
}
Ok(DataChunk::new(merged_data))
}
fn split_chunk_into_blocks(
&self,
chunk: &DataChunk,
layout: &ColumnLayout,
) -> Result<Vec<Vec<u8>>> {
let mut blocks = Vec::new();
let chunk_size = layout.block_size;
for chunk_data in chunk.data.chunks(chunk_size) {
blocks.push(chunk_data.to_vec());
}
Ok(blocks)
}
fn compress_and_encode_block(
&self,
handle: &ColumnStoreHandle,
block: &[u8],
) -> Result<CompressedBlock> {
let encoded_data = if handle.encoding_type != EncodingType::None {
if let Some(encoding_strategy) = self.encoding_strategies.get(&handle.encoding_type) {
let encoded = encoding_strategy.encode(block)?;
encoded.data
} else {
block.to_vec()
}
} else {
block.to_vec()
};
let compressed_data = if handle.compression_type != CompressionType::None {
if let Some(compression_engine) = self.compression_engines.get(&handle.compression_type)
{
compression_engine.compress(&encoded_data)?
} else {
encoded_data
}
} else {
encoded_data
};
let mut compressed_block = CompressedBlock::new(
compressed_data,
handle.compression_type,
handle.encoding_type,
);
compressed_block.metadata.uncompressed_size = block.len();
Ok(compressed_block)
}
fn update_column_statistics(
&mut self,
handle: &ColumnStoreHandle,
chunk: &DataChunk,
) -> Result<()> {
if let Ok(mut cache) = self.metadata_cache.write() {
let stats = cache
.entry(handle.layout.name.clone())
.or_insert_with(ColumnStatistics::new);
stats.total_size += chunk.len() as u64;
let original_size = chunk.len() as u64;
let estimated_compressed_size = match handle.compression_type {
CompressionType::None => original_size,
CompressionType::Auto => (original_size as f64 * 0.5) as u64, CompressionType::Lz4 => (original_size as f64 * 0.6) as u64, CompressionType::Snappy => (original_size as f64 * 0.7) as u64, CompressionType::Gzip => (original_size as f64 * 0.5) as u64, CompressionType::Zstd => (original_size as f64 * 0.4) as u64, };
stats.compressed_size += estimated_compressed_size;
if stats.compressed_size > 0 {
stats.compression_ratio = stats.total_size as f64 / stats.compressed_size as f64;
}
stats.encoding_ratio = match handle.encoding_type {
EncodingType::None => 1.0,
EncodingType::RunLength => 1.5, EncodingType::Dictionary => 2.0, EncodingType::Delta => 1.3, EncodingType::BitPacked => 2.0, EncodingType::Auto => 1.2, };
if let ColumnDataType::Float64
| ColumnDataType::Int64
| ColumnDataType::Float32
| ColumnDataType::Int32 = handle.layout.data_type
{
if chunk.len() >= 8 {
let sample_bytes = &chunk.data[0..8.min(chunk.data.len())];
if stats.min_value.is_none() {
stats.min_value = Some(sample_bytes.to_vec());
}
if stats.max_value.is_none() {
stats.max_value = Some(sample_bytes.to_vec());
}
}
}
}
Ok(())
}
fn average_compression_ratio(&self) -> f64 {
2.5 }
}
impl StorageStrategy for UnifiedColumnStoreStrategy {
type Handle = ColumnStoreHandle;
type Error = Error;
type Metadata = ColumnStatistics;
fn name(&self) -> &'static str {
"UnifiedColumnStore"
}
fn create_storage(&mut self, config: &StorageConfig) -> Result<Self::Handle> {
let layout = self.determine_optimal_layout(config)?;
let compression_type =
self.select_compression_strategy(&config.requirements.data_characteristics)?;
let encoding_type =
self.select_encoding_strategy(&config.requirements.data_characteristics)?;
let handle = ColumnStoreHandle {
layout,
compression_type,
encoding_type,
blocks: Vec::new(),
statistics: ColumnStatistics::new(),
};
Ok(handle)
}
fn read_chunk(&self, handle: &Self::Handle, range: ChunkRange) -> Result<DataChunk> {
let relevant_blocks = self.find_blocks_for_range(handle, &range)?;
let block_data: Result<Vec<_>> = if self.config.enable_parallel && relevant_blocks.len() > 1
{
relevant_blocks
.par_iter()
.map(|&block_id| self.read_and_decompress_block(handle, block_id))
.collect()
} else {
relevant_blocks
.iter()
.map(|&block_id| self.read_and_decompress_block(handle, block_id))
.collect()
};
let blocks = block_data?;
self.merge_blocks_to_chunk(blocks, range)
}
fn write_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()> {
let blocks = self.split_chunk_into_blocks(&chunk, &handle.layout)?;
let compressed_blocks: Result<Vec<_>> = if self.config.enable_parallel && blocks.len() > 1 {
blocks
.par_iter()
.map(|block| self.compress_and_encode_block(handle, block))
.collect()
} else {
blocks
.iter()
.map(|block| self.compress_and_encode_block(handle, block))
.collect()
};
let final_blocks = compressed_blocks?;
{
let mut block_manager = self.block_manager.lock().map_err(|_| {
Error::InvalidOperation("Failed to acquire block manager lock".to_string())
})?;
for block in final_blocks {
block_manager.write_block(block)?;
}
}
self.update_column_statistics(handle, &chunk)?;
Ok(())
}
fn append_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()> {
self.write_chunk(handle, chunk)
}
fn flush(&mut self, _handle: &Self::Handle) -> Result<()> {
Ok(())
}
fn delete_storage(&mut self, handle: &Self::Handle) -> Result<()> {
let mut block_manager = self.block_manager.lock().map_err(|_| {
Error::InvalidOperation("Failed to acquire block manager lock".to_string())
})?;
let mut blocks_to_delete = Vec::new();
for &block_id in &handle.blocks {
if let Some(metadata) = block_manager.metadata_index.get(&block_id) {
blocks_to_delete.push((block_id, metadata.clone()));
}
}
for (block_id, metadata) in blocks_to_delete {
if let Err(e) = block_manager.storage.delete_at_location(&metadata.location) {
eprintln!("Warning: Failed to delete block {:?}: {}", block_id, e);
}
block_manager
.free_space_tracker
.add_free_space(metadata.location.offset, metadata.location.size);
if let Ok(mut cache) = block_manager.block_cache.lock() {
cache.remove(&block_id);
}
block_manager.metadata_index.remove(&block_id);
}
if let Ok(mut cache) = self.metadata_cache.write() {
cache.remove(&handle.layout.name);
}
Ok(())
}
fn can_handle(&self, requirements: &StorageRequirements) -> StrategyCapability {
let can_handle = match requirements.data_characteristics {
DataCharacteristics::Numeric
| DataCharacteristics::TimeSeries
| DataCharacteristics::Dense => true,
_ => requirements.estimated_size > 1024 * 1024, };
let confidence = if can_handle { 0.9 } else { 0.3 };
let performance_score = match requirements.performance_priority {
PerformancePriority::Speed => 0.8,
PerformancePriority::Memory => 0.9,
PerformancePriority::Balanced => 0.85,
_ => 0.7,
};
StrategyCapability {
can_handle,
confidence,
performance_score,
resource_cost: ResourceCost {
memory: requirements.estimated_size / 2, cpu: 15.0, disk: requirements.estimated_size / 3, network: 0,
},
}
}
fn performance_profile(&self) -> PerformanceProfile {
PerformanceProfile {
read_speed: Speed::VeryFast,
write_speed: Speed::Fast,
memory_efficiency: Efficiency::Excellent,
compression_ratio: self.average_compression_ratio(),
query_optimization: QueryOptimization::Excellent,
parallel_scalability: ParallelScalability::Excellent,
}
}
fn storage_stats(&self) -> StorageStats {
let mut total_size = 0u64;
let mut compressed_size = 0u64;
let total_rows = 0u64;
if let Ok(cache) = self.metadata_cache.read() {
for stats in cache.values() {
total_size += stats.total_size;
compressed_size += stats.compressed_size;
}
}
let (total_blocks, total_memory) = if let Ok(block_manager) = self.block_manager.lock() {
let block_count = block_manager.metadata_index.len();
let memory_usage = block_manager.storage.total_size();
(block_count, memory_usage)
} else {
(0, 0)
};
let compression_ratio = if compressed_size > 0 {
total_size as f64 / compressed_size as f64
} else {
1.0
};
StorageStats {
total_size: total_size.try_into().unwrap_or(usize::MAX),
used_size: compressed_size.try_into().unwrap_or(usize::MAX),
read_operations: total_blocks as u64,
write_operations: total_blocks as u64,
avg_read_latency_ns: 1000, avg_write_latency_ns: 2000, cache_hit_rate: 0.85, }
}
fn optimize_for_pattern(&mut self, pattern: AccessPattern) -> Result<()> {
match pattern {
AccessPattern::Sequential => {
self.config.block_size = 256 * 1024; self.config.enable_parallel = false; }
AccessPattern::Random => {
self.config.block_size = 16 * 1024; self.config.enable_parallel = true;
}
AccessPattern::Columnar => {
self.config.enable_dictionary = true;
self.config.compression_type = CompressionType::Zstd;
}
_ => {
}
}
Ok(())
}
fn compact(&mut self, handle: &Self::Handle) -> Result<CompactionResult> {
let start_time = Instant::now();
let mut size_before = 0u64;
if let Ok(cache) = self.metadata_cache.read() {
if let Some(stats) = cache.get(&handle.layout.name) {
size_before = stats.total_size;
}
}
let mut block_manager = self.block_manager.lock().map_err(|_| {
Error::InvalidOperation("Failed to acquire block manager lock".to_string())
})?;
let mut compacted_blocks = Vec::new();
let mut total_freed_space = 0usize;
for &block_id in &handle.blocks {
if let Ok(block) = block_manager.read_block(block_id) {
let compression_engine = self
.compression_engines
.get(&CompressionType::Zstd)
.expect("operation should succeed");
let decompressed = compression_engine.decompress(&block.data)?;
let recompressed = compression_engine.compress(&decompressed)?;
if recompressed.len() < block.data.len() {
let space_saved = block.data.len() - recompressed.len();
total_freed_space += space_saved;
let mut new_block = CompressedBlock::new(
recompressed,
CompressionType::Zstd,
block.encoding_type,
);
new_block.metadata.uncompressed_size = block.metadata.uncompressed_size;
compacted_blocks.push((block_id, new_block));
} else {
compacted_blocks.push((block_id, block));
}
}
}
for (block_id, new_block) in compacted_blocks {
let location = if let Some(metadata) = block_manager.metadata_index.get(&block_id) {
Some(metadata.location.clone())
} else {
None
};
if let Some(location) = location {
block_manager
.storage
.write_at_location(&location, &new_block.data)?;
if let Some(metadata) = block_manager.metadata_index.get_mut(&block_id) {
metadata.compressed_size = new_block.data.len();
metadata.checksum = new_block.checksum();
}
if let Ok(mut cache) = block_manager.block_cache.lock() {
cache.insert(block_id, new_block);
}
}
}
block_manager.free_space_tracker.merge_adjacent_blocks();
let size_after = if size_before > total_freed_space as u64 {
size_before - total_freed_space as u64
} else {
size_before
};
if let Ok(mut cache) = self.metadata_cache.write() {
if let Some(stats) = cache.get_mut(&handle.layout.name) {
stats.compressed_size = size_after;
if stats.compressed_size > 0 {
stats.compression_ratio =
stats.total_size as f64 / stats.compressed_size as f64;
}
}
}
Ok(CompactionResult {
size_before: size_before.try_into().unwrap_or(usize::MAX),
size_after: size_after.try_into().unwrap_or(usize::MAX),
duration: start_time.elapsed(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lz4_compression() {
let engine = Lz4CompressionEngine;
let data = b"Hello, World! This is a test string for compression.";
let compressed = engine.compress(data).expect("operation should succeed");
let decompressed = engine
.decompress(&compressed)
.expect("operation should succeed");
assert_eq!(data.to_vec(), decompressed);
}
#[test]
fn test_run_length_encoding() {
let strategy = RunLengthEncodingStrategy;
let data = b"aaaaabbbbcccccddddd";
let encoded = strategy.encode(data).expect("operation should succeed");
let decoded = strategy.decode(&encoded).expect("operation should succeed");
assert_eq!(data.to_vec(), decoded);
assert!(encoded.data.len() < data.len()); }
#[test]
fn test_column_store_strategy() {
let config = ColumnStoreConfig::default();
let mut strategy = UnifiedColumnStoreStrategy::new(config);
let storage_config = StorageConfig {
requirements: StorageRequirements {
estimated_size: 1024,
data_characteristics: DataCharacteristics::Numeric,
..Default::default()
},
..Default::default()
};
let handle = strategy
.create_storage(&storage_config)
.expect("operation should succeed");
assert_eq!(handle.layout.data_type, ColumnDataType::Float64);
}
#[test]
fn test_block_manager() {
let storage = Box::new(InMemoryPhysicalStorage::new(1024 * 1024));
let mut manager = BlockManager::new(storage);
let block = CompressedBlock::new(
vec![1, 2, 3, 4, 5],
CompressionType::None,
EncodingType::None,
);
let block_id = manager
.write_block(block)
.expect("operation should succeed");
let read_block = manager
.read_block(block_id)
.expect("operation should succeed");
assert_eq!(read_block.data, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_capability_assessment() {
let config = ColumnStoreConfig::default();
let strategy = UnifiedColumnStoreStrategy::new(config);
let requirements = StorageRequirements {
estimated_size: 10 * 1024 * 1024, data_characteristics: DataCharacteristics::Numeric,
performance_priority: PerformancePriority::Speed,
..Default::default()
};
let capability = strategy.can_handle(&requirements);
assert!(capability.can_handle);
assert!(capability.confidence > 0.8);
}
}