#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::doc_markdown)]
mod batch;
#[cfg(test)]
mod batch_tests;
mod filter;
#[cfg(test)]
mod filter_tests;
mod string_table;
mod types;
mod vacuum;
#[cfg(test)]
mod vacuum_tests;
use roaring::RoaringBitmap;
use rustc_hash::FxHashMap;
use std::collections::HashMap;
pub use string_table::StringTable;
pub use types::{
AutoVacuumConfig, BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError,
ColumnType, ColumnValue, ExpireResult, StringId, TypedColumn, UpsertResult, VacuumConfig,
VacuumStats,
};
#[derive(Debug, Default)]
pub struct ColumnStore {
pub(crate) columns: HashMap<String, TypedColumn>,
pub(crate) string_table: StringTable,
pub(crate) row_count: usize,
pub(crate) primary_key_column: Option<String>,
pub(crate) primary_index: HashMap<i64, usize>,
pub(crate) row_idx_to_pk: HashMap<usize, i64>,
pub(crate) deleted_rows: rustc_hash::FxHashSet<usize>,
pub(crate) deletion_bitmap: RoaringBitmap,
pub(crate) row_expiry: HashMap<usize, u64>,
}
impl ColumnStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_schema(fields: &[(&str, ColumnType)]) -> Self {
let mut store = Self::new();
for (name, col_type) in fields {
store.add_column(name, *col_type);
}
store
}
pub fn with_primary_key(
fields: &[(&str, ColumnType)],
pk_column: &str,
) -> crate::error::Result<Self> {
let pk_field = fields
.iter()
.find(|(name, _)| *name == pk_column)
.ok_or_else(|| {
crate::error::Error::ColumnStoreError(format!(
"Primary key column '{}' not found in fields: {:?}",
pk_column,
fields.iter().map(|(n, _)| *n).collect::<Vec<_>>()
))
})?;
if !matches!(pk_field.1, ColumnType::Int) {
return Err(crate::error::Error::ColumnStoreError(format!(
"Primary key column '{}' must be Int type, got {:?}",
pk_column, pk_field.1
)));
}
let mut store = Self::with_schema(fields);
store.primary_key_column = Some(pk_column.to_string());
store.primary_index = HashMap::new();
Ok(store)
}
#[must_use]
pub fn primary_key_column(&self) -> Option<&str> {
self.primary_key_column.as_deref()
}
pub fn add_column(&mut self, name: &str, col_type: ColumnType) {
let column = match col_type {
ColumnType::Int => TypedColumn::new_int(0),
ColumnType::Float => TypedColumn::new_float(0),
ColumnType::String => TypedColumn::new_string(0),
ColumnType::Bool => TypedColumn::new_bool(0),
};
self.columns.insert(name.to_string(), column);
}
#[must_use]
pub fn row_count(&self) -> usize {
self.row_count
}
#[must_use]
pub fn active_row_count(&self) -> usize {
self.row_count.saturating_sub(self.deleted_rows.len())
}
#[must_use]
pub fn deleted_row_count(&self) -> usize {
self.deleted_rows.len()
}
#[must_use]
pub fn string_table(&self) -> &StringTable {
&self.string_table
}
pub fn string_table_mut(&mut self) -> &mut StringTable {
&mut self.string_table
}
pub fn push_row_unchecked(&mut self, values: &[(&str, ColumnValue)]) {
let value_map: FxHashMap<&str, &ColumnValue> =
values.iter().map(|(k, v)| (*k, v)).collect();
for (name, column) in &mut self.columns {
if let Some(value) = value_map.get(name.as_str()) {
column.push_typed(value);
} else {
column.push_null();
}
}
self.row_count += 1;
}
#[inline]
pub fn push_row(&mut self, values: &[(&str, ColumnValue)]) {
self.push_row_unchecked(values);
}
pub fn insert_row(
&mut self,
values: &[(&str, ColumnValue)],
) -> Result<usize, ColumnStoreError> {
let Some(ref pk_col) = self.primary_key_column else {
self.push_row(values);
return Ok(self.row_count - 1);
};
let pk_value = Self::extract_pk_value(values, pk_col)?;
if let Some(&existing_idx) = self.primary_index.get(&pk_value) {
return self.reinsert_or_reject(values, existing_idx, pk_value);
}
let row_idx = self.row_count;
self.push_row(values);
self.primary_index.insert(pk_value, row_idx);
self.row_idx_to_pk.insert(row_idx, pk_value);
Ok(row_idx)
}
fn extract_pk_value(
values: &[(&str, ColumnValue)],
pk_col: &str,
) -> Result<i64, ColumnStoreError> {
values
.iter()
.find(|(name, _)| *name == pk_col)
.and_then(|(_, value)| {
if let ColumnValue::Int(v) = value {
Some(*v)
} else {
None
}
})
.ok_or(ColumnStoreError::MissingPrimaryKey)
}
fn reinsert_or_reject(
&mut self,
values: &[(&str, ColumnValue)],
existing_idx: usize,
pk_value: i64,
) -> Result<usize, ColumnStoreError> {
if !self.deleted_rows.contains(&existing_idx) {
return Err(ColumnStoreError::DuplicateKey(pk_value));
}
Self::validate_value_types(&self.columns, values, None)?;
self.undelete_row(existing_idx);
self.set_row_values(values, existing_idx, None)?;
Ok(existing_idx)
}
pub(super) fn validate_value_types(
columns: &HashMap<String, TypedColumn>,
values: &[(&str, ColumnValue)],
skip_col: Option<&str>,
) -> Result<(), ColumnStoreError> {
for (col_name, value) in values {
if skip_col.is_some_and(|s| s == *col_name) {
continue;
}
if let Some(col) = columns.get(*col_name) {
if !matches!(value, ColumnValue::Null) {
Self::validate_type_match(col, value)?;
}
}
}
Ok(())
}
fn undelete_row(&mut self, row_idx: usize) {
self.deleted_rows.remove(&row_idx);
if let Ok(idx) = u32::try_from(row_idx) {
self.deletion_bitmap.remove(idx);
}
self.row_expiry.remove(&row_idx);
}
pub(super) fn set_row_values(
&mut self,
values: &[(&str, ColumnValue)],
row_idx: usize,
skip_col: Option<&str>,
) -> Result<(), ColumnStoreError> {
let value_map: std::collections::HashMap<&str, &ColumnValue> =
values.iter().map(|(k, v)| (*k, v)).collect();
let col_names: Vec<String> = self.columns.keys().cloned().collect();
for col_name in col_names {
if skip_col.is_some_and(|s| s == col_name) {
continue;
}
if let Some(col) = self.columns.get_mut(&col_name) {
let val = value_map
.get(col_name.as_str())
.map_or(ColumnValue::Null, |v| (*v).clone());
Self::set_column_value(col, row_idx, val)?;
}
}
Ok(())
}
#[must_use]
pub fn get_row_idx_by_pk(&self, pk: i64) -> Option<usize> {
let row_idx = self.primary_index.get(&pk).copied()?;
if self.deleted_rows.contains(&row_idx) {
return None;
}
Some(row_idx)
}
pub fn delete_by_pk(&mut self, pk: i64) -> bool {
let Some(&row_idx) = self.primary_index.get(&pk) else {
return false;
};
if self.deleted_rows.contains(&row_idx) {
return false;
}
self.deleted_rows.insert(row_idx);
if let Ok(idx) = u32::try_from(row_idx) {
self.deletion_bitmap.insert(idx);
}
self.row_expiry.remove(&row_idx);
true
}
pub fn update_by_pk(
&mut self,
pk: i64,
column: &str,
value: ColumnValue,
) -> Result<(), ColumnStoreError> {
if self
.primary_key_column
.as_ref()
.is_some_and(|pk_col| pk_col == column)
{
return Err(ColumnStoreError::PrimaryKeyUpdate);
}
let row_idx = self.resolve_live_row(pk)?;
let col = self
.columns
.get_mut(column)
.ok_or_else(|| ColumnStoreError::ColumnNotFound(column.to_string()))?;
Self::set_column_value(col, row_idx, value)
}
pub fn update_multi_by_pk(
&mut self,
pk: i64,
updates: &[(&str, ColumnValue)],
) -> Result<(), ColumnStoreError> {
let row_idx = self.resolve_live_row(pk)?;
self.validate_multi_update(updates)?;
for (col_name, value) in updates {
let col = self
.columns
.get_mut(*col_name)
.ok_or_else(|| ColumnStoreError::ColumnNotFound((*col_name).to_string()))?;
Self::set_column_value(col, row_idx, value.clone())?;
}
Ok(())
}
fn resolve_live_row(&self, pk: i64) -> Result<usize, ColumnStoreError> {
let row_idx = *self
.primary_index
.get(&pk)
.ok_or(ColumnStoreError::RowNotFound(pk))?;
if self.deleted_rows.contains(&row_idx) {
return Err(ColumnStoreError::RowNotFound(pk));
}
Ok(row_idx)
}
fn validate_multi_update(
&self,
updates: &[(&str, ColumnValue)],
) -> Result<(), ColumnStoreError> {
for (col_name, value) in updates {
if self
.primary_key_column
.as_ref()
.is_some_and(|pk_col| pk_col == *col_name)
{
return Err(ColumnStoreError::PrimaryKeyUpdate);
}
let col = self
.columns
.get(*col_name)
.ok_or_else(|| ColumnStoreError::ColumnNotFound((*col_name).to_string()))?;
if !matches!(value, ColumnValue::Null) {
Self::validate_type_match(col, value)?;
}
}
Ok(())
}
#[must_use]
pub fn get_column(&self, name: &str) -> Option<&TypedColumn> {
self.columns.get(name)
}
pub fn column_names(&self) -> impl Iterator<Item = &str> {
self.columns.keys().map(String::as_str)
}
#[must_use]
pub fn get_value_as_json(&self, column: &str, row_idx: usize) -> Option<serde_json::Value> {
if self.deleted_rows.contains(&row_idx) {
return None;
}
let col = self.columns.get(column)?;
if let TypedColumn::String(v) = col {
return v.get(row_idx).and_then(|opt| {
opt.and_then(|id| self.string_table.get(id).map(|s| serde_json::json!(s)))
});
}
col.get_as_json_non_string(row_idx)
}
}