use crate::core::error::{Error, Result};
use crate::storage::traits::{
AccessPattern, DataChunk, Efficiency, PerformanceProfile, Speed, StorageConfig, StorageEngine,
StorageStatistics,
};
use crate::{read_lock_safe, write_lock_safe};
use std::collections::HashMap;
use std::ops::Range;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionType {
None,
RunLength,
Dictionary,
BitPacked,
}
#[derive(Debug, Clone)]
pub struct ColumnMetadata {
pub name: String,
pub data_type: String,
pub row_count: usize,
pub compression: CompressionType,
pub null_count: usize,
pub size_bytes: usize,
pub min_value: Option<String>,
pub max_value: Option<String>,
}
#[derive(Debug, Clone)]
pub enum CompressedColumnData {
Raw(Vec<u8>),
RunLength(Vec<(Vec<u8>, usize)>),
Dictionary {
dictionary: Vec<Vec<u8>>,
indices: Vec<u32>,
},
BitPacked { data: Vec<u8>, bits_per_value: u8 },
}
impl CompressedColumnData {
pub fn size_bytes(&self) -> usize {
match self {
CompressedColumnData::Raw(data) => data.len(),
CompressedColumnData::RunLength(runs) => {
runs.iter().map(|(value, _)| value.len() + 8).sum()
}
CompressedColumnData::Dictionary {
dictionary,
indices,
} => dictionary.iter().map(|v| v.len()).sum::<usize>() + indices.len() * 4,
CompressedColumnData::BitPacked { data, .. } => data.len(),
}
}
pub fn decompress(&self) -> Vec<u8> {
match self {
CompressedColumnData::Raw(data) => data.clone(),
CompressedColumnData::RunLength(runs) => {
let mut result = Vec::new();
for (value, count) in runs {
for _ in 0..*count {
result.extend_from_slice(value);
}
}
result
}
CompressedColumnData::Dictionary {
dictionary,
indices,
} => {
let mut result = Vec::new();
for &index in indices {
if let Some(value) = dictionary.get(index as usize) {
result.extend_from_slice(value);
}
}
result
}
CompressedColumnData::BitPacked {
data,
bits_per_value,
} => {
data.clone()
}
}
}
}
#[derive(Debug)]
pub struct ColumnStore {
columns: Arc<RwLock<HashMap<String, CompressedColumnData>>>,
metadata: Arc<RwLock<HashMap<String, ColumnMetadata>>>,
row_count: Arc<RwLock<usize>>,
stats: Arc<RwLock<StorageStats>>,
}
#[derive(Debug, Default, Clone)]
pub struct StorageStats {
pub total_columns: usize,
pub total_size_bytes: usize,
pub total_rows: usize,
pub compression_ratio: f64,
pub read_operations: usize,
pub write_operations: usize,
}
impl ColumnStore {
pub fn new() -> Self {
Self {
columns: Arc::new(RwLock::new(HashMap::new())),
metadata: Arc::new(RwLock::new(HashMap::new())),
row_count: Arc::new(RwLock::new(0)),
stats: Arc::new(RwLock::new(StorageStats::default())),
}
}
pub fn add_column<T: AsRef<[u8]>>(
&self,
name: String,
data: &[T],
data_type: String,
) -> Result<()> {
if data.is_empty() {
return Err(Error::InvalidInput("Cannot add empty column".into()));
}
let compression = self.select_compression_strategy(data);
let compressed_data = self.compress_data(data, compression)?;
let metadata = ColumnMetadata {
name: name.clone(),
data_type,
row_count: data.len(),
compression,
null_count: 0, size_bytes: compressed_data.size_bytes(),
min_value: None, max_value: None, };
{
let mut columns = write_lock_safe!(self.columns, "column store columns write")?;
let mut metadata_map = write_lock_safe!(self.metadata, "column store metadata write")?;
let mut stats = write_lock_safe!(self.stats, "column store stats write")?;
let size_bytes = metadata.size_bytes; columns.insert(name.clone(), compressed_data);
metadata_map.insert(name, metadata);
stats.total_columns += 1;
stats.total_size_bytes += size_bytes;
stats.write_operations += 1;
}
{
let mut row_count = write_lock_safe!(self.row_count, "column store row count write")?;
if *row_count == 0 {
*row_count = data.len();
} else if *row_count != data.len() {
return Err(Error::DimensionMismatch(
"Column length doesn't match existing row count".into(),
));
}
}
Ok(())
}
pub fn get_column(&self, name: &str) -> Result<Vec<u8>> {
let columns = read_lock_safe!(self.columns, "column store columns read")?;
let mut stats = write_lock_safe!(self.stats, "column store stats write")?;
stats.read_operations += 1;
match columns.get(name) {
Some(compressed_data) => Ok(compressed_data.decompress()),
None => Err(Error::ColumnNotFound(name.to_string())),
}
}
pub fn get_metadata(&self, name: &str) -> Result<ColumnMetadata> {
let metadata = read_lock_safe!(self.metadata, "column store metadata read")?;
match metadata.get(name) {
Some(meta) => Ok(meta.clone()),
None => Err(Error::ColumnNotFound(name.to_string())),
}
}
pub fn column_names(&self) -> Result<Vec<String>> {
let columns = read_lock_safe!(self.columns, "column store columns read")?;
Ok(columns.keys().cloned().collect())
}
pub fn row_count(&self) -> Result<usize> {
Ok(*read_lock_safe!(
self.row_count,
"column store row count read"
)?)
}
pub fn stats(&self) -> Result<StorageStats> {
let stats = read_lock_safe!(self.stats, "column store stats read")?;
Ok((*stats).clone())
}
pub fn remove_column(&self, name: &str) -> Result<()> {
let mut columns = write_lock_safe!(self.columns, "column store columns write")?;
let mut metadata_map = write_lock_safe!(self.metadata, "column store metadata write")?;
let mut stats = write_lock_safe!(self.stats, "column store stats write")?;
if let Some(compressed_data) = columns.remove(name) {
metadata_map.remove(name);
stats.total_columns -= 1;
stats.total_size_bytes -= compressed_data.size_bytes();
Ok(())
} else {
Err(Error::ColumnNotFound(name.to_string()))
}
}
pub fn optimize(&self) -> Result<()> {
let column_names: Vec<String> = self.column_names()?;
for name in column_names {
let data = self.get_column(&name)?;
let metadata = self.get_metadata(&name)?;
self.remove_column(&name)?;
let single_chunk = vec![data.as_slice()];
self.add_column(name, &single_chunk, metadata.data_type)?;
}
Ok(())
}
pub fn compression_ratio(&self) -> Result<f64> {
let columns = read_lock_safe!(self.columns, "column store columns read")?;
if columns.is_empty() {
return Ok(1.0);
}
let compressed_size: usize = columns.values().map(|data| data.size_bytes()).sum();
let uncompressed_size: usize = columns.values().map(|data| data.decompress().len()).sum();
if compressed_size == 0 {
Ok(1.0)
} else {
Ok(uncompressed_size as f64 / compressed_size as f64)
}
}
fn select_compression_strategy<T: AsRef<[u8]>>(&self, data: &[T]) -> CompressionType {
if data.len() < 10 {
return CompressionType::None;
}
let mut consecutive_count = 1;
let mut max_consecutive = 1;
for i in 1..data.len() {
if data[i].as_ref() == data[i - 1].as_ref() {
consecutive_count += 1;
max_consecutive = max_consecutive.max(consecutive_count);
} else {
consecutive_count = 1;
}
}
if max_consecutive > data.len() / 4 {
return CompressionType::RunLength;
}
let unique_count = {
let mut unique = std::collections::HashSet::new();
for item in data {
unique.insert(item.as_ref());
if unique.len() > data.len() / 2 {
break; }
}
unique.len()
};
if unique_count < data.len() / 4 {
CompressionType::Dictionary
} else {
CompressionType::None
}
}
fn compress_data<T: AsRef<[u8]>>(
&self,
data: &[T],
compression: CompressionType,
) -> Result<CompressedColumnData> {
match compression {
CompressionType::None => {
let mut raw_data = Vec::new();
for item in data {
raw_data.extend_from_slice(item.as_ref());
}
Ok(CompressedColumnData::Raw(raw_data))
}
CompressionType::RunLength => {
let mut runs = Vec::new();
if !data.is_empty() {
let mut current_value = data[0].as_ref().to_vec();
let mut count = 1;
for item in data.iter().skip(1) {
if item.as_ref() == current_value {
count += 1;
} else {
runs.push((current_value, count));
current_value = item.as_ref().to_vec();
count = 1;
}
}
runs.push((current_value, count));
}
Ok(CompressedColumnData::RunLength(runs))
}
CompressionType::Dictionary => {
let mut dictionary = Vec::new();
let mut value_to_index = HashMap::new();
let mut indices = Vec::new();
for item in data {
let bytes = item.as_ref().to_vec();
if let Some(&index) = value_to_index.get(&bytes) {
indices.push(index);
} else {
let index = dictionary.len() as u32;
dictionary.push(bytes.clone());
value_to_index.insert(bytes, index);
indices.push(index);
}
}
Ok(CompressedColumnData::Dictionary {
dictionary,
indices,
})
}
CompressionType::BitPacked => {
let mut packed_data = Vec::new();
for item in data {
packed_data.extend_from_slice(item.as_ref());
}
Ok(CompressedColumnData::BitPacked {
data: packed_data,
bits_per_value: 8, })
}
}
}
}
impl Default for ColumnStore {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct ColumnStoreHandle {
pub id: usize,
pub store: Arc<ColumnStore>,
}
impl ColumnStoreHandle {
pub fn new(id: usize, store: Arc<ColumnStore>) -> Self {
Self { id, store }
}
}
impl StorageEngine for ColumnStore {
type Handle = ColumnStoreHandle;
type Error = Error;
fn create_storage(&mut self, _config: &StorageConfig) -> Result<Self::Handle> {
use std::sync::atomic::{AtomicUsize, Ordering};
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
let id = NEXT_ID.fetch_add(1, Ordering::SeqCst);
Ok(ColumnStoreHandle::new(id, Arc::new(ColumnStore::new())))
}
fn read_chunk(&self, handle: &Self::Handle, range: Range<usize>) -> Result<DataChunk> {
let columns = read_lock_safe!(handle.store.columns, "storage engine columns read")?;
let mut chunk_data = Vec::new();
let mut total_rows = 0;
for (_name, compressed_data) in columns.iter() {
let decompressed = compressed_data.decompress();
let start = range.start.min(decompressed.len());
let end = range.end.min(decompressed.len());
if start < end {
chunk_data.extend_from_slice(&decompressed[start..end]);
total_rows = (end - start) / 8; }
}
let metadata = crate::storage::traits::ChunkMetadata {
row_count: total_rows,
column_count: columns.len(),
compression: crate::storage::traits::CompressionPreference::Auto,
uncompressed_size: chunk_data.len(),
compressed_size: chunk_data.len(), };
Ok(DataChunk::new(chunk_data, metadata))
}
fn write_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()> {
let column_name = format!("chunk_{}", chunk.metadata.row_count);
let data: Vec<Vec<u8>> = chunk.data
.chunks(8) .map(|chunk| chunk.to_vec())
.collect();
handle
.store
.add_column(column_name, &data, "bytes".to_string())?;
Ok(())
}
fn append_chunk(&mut self, handle: &Self::Handle, chunk: DataChunk) -> Result<()> {
self.write_chunk(handle, chunk)
}
fn flush(&mut self, _handle: &Self::Handle) -> Result<()> {
Ok(())
}
fn delete_storage(&mut self, _handle: &Self::Handle) -> Result<()> {
Ok(())
}
fn performance_profile(&self) -> PerformanceProfile {
PerformanceProfile {
read_speed: Speed::Fast,
write_speed: Speed::Medium,
memory_efficiency: Efficiency::Good,
compression_ratio: 0.7, random_access_speed: Speed::Fast,
sequential_access_speed: Speed::VeryFast,
}
}
fn storage_stats(&self, handle: &Self::Handle) -> Result<StorageStatistics> {
let stats = handle.store.stats()?;
Ok(StorageStatistics {
total_size: stats.total_size_bytes,
chunk_count: stats.total_columns,
avg_compression_ratio: stats.compression_ratio,
read_operations: stats.read_operations as u64,
write_operations: stats.write_operations as u64,
cache_hit_rate: 0.9, })
}
fn supports_random_access(&self) -> bool {
true
}
fn supports_streaming(&self) -> bool {
false }
fn supports_compression(&self) -> bool {
true
}
fn optimal_chunk_size(&self) -> usize {
64 * 1024 }
fn memory_overhead(&self) -> usize {
1024 }
fn optimize_for_pattern(&mut self, pattern: AccessPattern) -> Result<()> {
match pattern {
AccessPattern::Columnar | AccessPattern::ReadHeavy => {
Ok(())
}
AccessPattern::Sequential => {
Ok(())
}
_ => {
Ok(())
}
}
}
fn compact(&mut self, handle: &Self::Handle) -> Result<()> {
handle.store.optimize()
}
}