use serde::{Deserialize, Serialize};
use rocksdb::DB;
use crate::{Error, Result, Value};
pub const BATCH_SIZE: usize = 1024;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnBatch {
pub column: String,
pub start_row_id: u64,
pub values: Vec<Value>,
}
impl ColumnBatch {
pub fn new(column: &str, start_row_id: u64) -> Self {
Self {
column: column.to_string(),
start_row_id,
values: vec![Value::Null; BATCH_SIZE],
}
}
pub fn get(&self, row_id: u64) -> Option<&Value> {
if row_id < self.start_row_id {
return None;
}
let offset = (row_id - self.start_row_id) as usize;
self.values.get(offset)
}
pub fn set(&mut self, row_id: u64, value: Value) -> bool {
if row_id < self.start_row_id {
return false;
}
let offset = (row_id - self.start_row_id) as usize;
if offset >= BATCH_SIZE {
return false;
}
while self.values.len() <= offset {
self.values.push(Value::Null);
}
if let Some(slot) = self.values.get_mut(offset) {
*slot = value;
}
true
}
pub fn count_non_null(&self) -> usize {
self.values.iter().filter(|v| !matches!(v, Value::Null)).count()
}
}
pub struct ColumnarStore;
impl ColumnarStore {
fn batch_key(table: &str, column: &str, batch_id: u64) -> Vec<u8> {
format!("col:{}:{}:{}", table, column, batch_id).into_bytes()
}
fn column_prefix(table: &str, column: &str) -> Vec<u8> {
format!("col:{}:{}:", table, column).into_bytes()
}
fn batch_location(row_id: u64) -> (u64, usize) {
let batch_id = row_id / BATCH_SIZE as u64;
let offset = (row_id % BATCH_SIZE as u64) as usize;
(batch_id, offset)
}
pub fn store(
db: &DB,
table: &str,
column: &str,
row_id: u64,
value: Value,
) -> Result<()> {
let (batch_id, _offset) = Self::batch_location(row_id);
let key = Self::batch_key(table, column, batch_id);
let mut batch = match db.get(&key)
.map_err(|e| Error::storage(format!("Columnar load failed: {}", e)))?
{
Some(data) => bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Columnar deserialize failed: {}", e)))?,
None => ColumnBatch::new(column, batch_id * BATCH_SIZE as u64),
};
if !batch.set(row_id, value) {
return Err(Error::storage(format!(
"Invalid row_id {} for batch starting at {}",
row_id, batch.start_row_id
)));
}
let data = bincode::serialize(&batch)
.map_err(|e| Error::storage(format!("Columnar serialize failed: {}", e)))?;
db.put(&key, &data)
.map_err(|e| Error::storage(format!("Columnar store failed: {}", e)))?;
Ok(())
}
pub fn get(
db: &DB,
table: &str,
column: &str,
row_id: u64,
) -> Result<Option<Value>> {
let (batch_id, _offset) = Self::batch_location(row_id);
let key = Self::batch_key(table, column, batch_id);
match db.get(&key)
.map_err(|e| Error::storage(format!("Columnar load failed: {}", e)))?
{
Some(data) => {
let batch: ColumnBatch = bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Columnar deserialize failed: {}", e)))?;
Ok(batch.get(row_id).cloned())
}
None => Ok(None),
}
}
pub fn scan_column(
db: &DB,
table: &str,
column: &str,
) -> Result<Vec<(u64, Value)>> {
let prefix = Self::column_prefix(table, column);
let mut results = Vec::new();
let iter = db.prefix_iterator(&prefix);
for item in iter {
let (key, value) = item
.map_err(|e| Error::storage(format!("Columnar iterator error: {}", e)))?;
if !key.starts_with(&prefix) {
break;
}
let batch: ColumnBatch = bincode::deserialize(&value)
.map_err(|e| Error::storage(format!("Columnar deserialize failed: {}", e)))?;
for (i, val) in batch.values.iter().enumerate() {
if !matches!(val, Value::Null) {
results.push((batch.start_row_id + i as u64, val.clone()));
}
}
}
Ok(results)
}
pub fn delete(
db: &DB,
table: &str,
column: &str,
row_id: u64,
) -> Result<()> {
Self::store(db, table, column, row_id, Value::Null)
}
pub fn drop_column(
db: &DB,
table: &str,
column: &str,
) -> Result<usize> {
let prefix = Self::column_prefix(table, column);
let mut deleted = 0;
let iter = db.prefix_iterator(&prefix);
for item in iter {
let (key, _) = item
.map_err(|e| Error::storage(format!("Columnar iterator error: {}", e)))?;
if !key.starts_with(&prefix) {
break;
}
db.delete(&key)
.map_err(|e| Error::storage(format!("Columnar delete failed: {}", e)))?;
deleted += 1;
}
Ok(deleted)
}
pub fn stats(
db: &DB,
table: &str,
column: &str,
) -> Result<ColumnarStats> {
let prefix = Self::column_prefix(table, column);
let mut total_batches = 0;
let mut total_values = 0;
let mut non_null_values = 0;
let iter = db.prefix_iterator(&prefix);
for item in iter {
let (key, value) = item
.map_err(|e| Error::storage(format!("Columnar iterator error: {}", e)))?;
if !key.starts_with(&prefix) {
break;
}
let batch: ColumnBatch = bincode::deserialize(&value)
.map_err(|e| Error::storage(format!("Columnar deserialize failed: {}", e)))?;
total_batches += 1;
total_values += batch.values.len();
non_null_values += batch.count_non_null();
}
Ok(ColumnarStats {
batch_count: total_batches,
total_slots: total_values,
non_null_values,
batch_size: BATCH_SIZE,
})
}
}
#[derive(Debug, Clone)]
pub struct ColumnarStats {
pub batch_count: usize,
pub total_slots: usize,
pub non_null_values: usize,
pub batch_size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_db() -> (TempDir, DB) {
let dir = TempDir::new().unwrap();
let db = DB::open_default(dir.path()).unwrap();
(dir, db)
}
#[test]
fn test_columnar_store_get() {
let (_dir, db) = test_db();
ColumnarStore::store(&db, "metrics", "value", 0, Value::Float8(1.5)).unwrap();
ColumnarStore::store(&db, "metrics", "value", 1, Value::Float8(2.5)).unwrap();
ColumnarStore::store(&db, "metrics", "value", 2, Value::Float8(3.5)).unwrap();
assert_eq!(
ColumnarStore::get(&db, "metrics", "value", 0).unwrap(),
Some(Value::Float8(1.5))
);
assert_eq!(
ColumnarStore::get(&db, "metrics", "value", 1).unwrap(),
Some(Value::Float8(2.5))
);
assert_eq!(
ColumnarStore::get(&db, "metrics", "value", 2).unwrap(),
Some(Value::Float8(3.5))
);
assert_eq!(
ColumnarStore::get(&db, "metrics", "value", 100).unwrap(),
Some(Value::Null)
);
}
#[test]
fn test_columnar_scan() {
let (_dir, db) = test_db();
ColumnarStore::store(&db, "test", "col", 0, Value::Int4(100)).unwrap();
ColumnarStore::store(&db, "test", "col", 5, Value::Int4(500)).unwrap();
ColumnarStore::store(&db, "test", "col", 10, Value::Int4(1000)).unwrap();
let results = ColumnarStore::scan_column(&db, "test", "col").unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0], (0, Value::Int4(100)));
assert_eq!(results[1], (5, Value::Int4(500)));
assert_eq!(results[2], (10, Value::Int4(1000)));
}
#[test]
fn test_columnar_multiple_batches() {
let (_dir, db) = test_db();
ColumnarStore::store(&db, "test", "col", 0, Value::Int4(1)).unwrap();
ColumnarStore::store(&db, "test", "col", 1023, Value::Int4(2)).unwrap(); ColumnarStore::store(&db, "test", "col", 1024, Value::Int4(3)).unwrap(); ColumnarStore::store(&db, "test", "col", 2048, Value::Int4(4)).unwrap();
assert_eq!(
ColumnarStore::get(&db, "test", "col", 0).unwrap(),
Some(Value::Int4(1))
);
assert_eq!(
ColumnarStore::get(&db, "test", "col", 1023).unwrap(),
Some(Value::Int4(2))
);
assert_eq!(
ColumnarStore::get(&db, "test", "col", 1024).unwrap(),
Some(Value::Int4(3))
);
assert_eq!(
ColumnarStore::get(&db, "test", "col", 2048).unwrap(),
Some(Value::Int4(4))
);
let stats = ColumnarStore::stats(&db, "test", "col").unwrap();
assert_eq!(stats.batch_count, 3);
assert_eq!(stats.non_null_values, 4);
}
#[test]
fn test_columnar_delete() {
let (_dir, db) = test_db();
ColumnarStore::store(&db, "test", "col", 5, Value::Int4(100)).unwrap();
ColumnarStore::delete(&db, "test", "col", 5).unwrap();
assert_eq!(
ColumnarStore::get(&db, "test", "col", 5).unwrap(),
Some(Value::Null)
);
}
#[test]
fn test_batch_location() {
assert_eq!(ColumnarStore::batch_location(0), (0, 0));
assert_eq!(ColumnarStore::batch_location(1023), (0, 1023));
assert_eq!(ColumnarStore::batch_location(1024), (1, 0));
assert_eq!(ColumnarStore::batch_location(2047), (1, 1023));
assert_eq!(ColumnarStore::batch_location(2048), (2, 0));
}
}