#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::doc_markdown)]
mod batch;
#[cfg(test)]
mod batch_tests;
mod filter;
mod filter_array;
mod filter_geo;
#[cfg(test)]
mod filter_tests;
pub(crate) mod haversine;
#[cfg(test)]
mod haversine_tests;
mod primary_key_ops;
mod string_table;
mod types;
mod vacuum;
#[cfg(test)]
mod vacuum_tests;
use roaring::RoaringBitmap;
use rustc_hash::FxHashMap;
use std::collections::HashMap;
pub use filter_geo::{CompareOp, GeoBboxParams, GeoDistanceParams};
pub use string_table::StringTable;
pub use types::{
AutoVacuumConfig, BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError,
ColumnType, ColumnValue, ExpireResult, StringId, TypedColumn, UpsertResult, VacuumConfig,
VacuumStats,
};
#[derive(Debug, Default)]
pub struct ColumnStore {
pub(crate) columns: HashMap<String, TypedColumn>,
pub(crate) string_table: StringTable,
pub(crate) row_count: usize,
pub(crate) primary_key_column: Option<String>,
pub(crate) primary_index: HashMap<i64, usize>,
pub(crate) row_idx_to_pk: HashMap<usize, i64>,
pub(crate) deleted_rows: rustc_hash::FxHashSet<usize>,
pub(crate) deletion_bitmap: RoaringBitmap,
pub(crate) row_expiry: HashMap<usize, u64>,
}
impl ColumnStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_schema(fields: &[(&str, ColumnType)]) -> Self {
let mut store = Self::new();
for (name, col_type) in fields {
store.add_column(name, col_type);
}
store
}
pub fn with_schema_validated(fields: &[(&str, ColumnType)]) -> Result<Self, ColumnStoreError> {
for (name, col_type) in fields {
Self::reject_nested_array(name, col_type)?;
}
Ok(Self::with_schema(fields))
}
pub fn with_primary_key(
fields: &[(&str, ColumnType)],
pk_column: &str,
) -> crate::error::Result<Self> {
let pk_field = fields
.iter()
.find(|(name, _)| *name == pk_column)
.ok_or_else(|| {
crate::error::Error::ColumnStoreError(format!(
"Primary key column '{}' not found in fields: {:?}",
pk_column,
fields.iter().map(|(n, _)| *n).collect::<Vec<_>>()
))
})?;
if !matches!(pk_field.1, ColumnType::Int) {
return Err(crate::error::Error::ColumnStoreError(format!(
"Primary key column '{}' must be Int type, got {:?}",
pk_column, pk_field.1
)));
}
let mut store = Self::with_schema(fields);
store.primary_key_column = Some(pk_column.to_string());
store.primary_index = HashMap::new();
Ok(store)
}
#[must_use]
pub fn primary_key_column(&self) -> Option<&str> {
self.primary_key_column.as_deref()
}
pub fn add_column(&mut self, name: &str, col_type: &ColumnType) {
let column = match col_type {
ColumnType::Int => TypedColumn::new_int(0),
ColumnType::Float => TypedColumn::new_float(0),
ColumnType::String => TypedColumn::new_string(0),
ColumnType::Bool => TypedColumn::new_bool(0),
ColumnType::Array(inner) => TypedColumn::new_array((**inner).clone(), 0),
ColumnType::GeoPoint => TypedColumn::new_geopoint(0),
};
self.columns.insert(name.to_string(), column);
}
#[must_use]
pub fn row_count(&self) -> usize {
self.row_count
}
#[must_use]
pub fn active_row_count(&self) -> usize {
self.row_count.saturating_sub(self.deleted_rows.len())
}
#[must_use]
pub fn deleted_row_count(&self) -> usize {
self.deleted_rows.len()
}
#[must_use]
pub fn string_table(&self) -> &StringTable {
&self.string_table
}
pub fn string_table_mut(&mut self) -> &mut StringTable {
&mut self.string_table
}
pub fn push_row_unchecked(&mut self, values: &[(&str, ColumnValue)]) {
let value_map: FxHashMap<&str, &ColumnValue> =
values.iter().map(|(k, v)| (*k, v)).collect();
for (name, column) in &mut self.columns {
if let Some(value) = value_map.get(name.as_str()) {
column.push_typed(value);
} else {
column.push_null();
}
}
self.row_count += 1;
}
#[inline]
pub fn push_row(&mut self, values: &[(&str, ColumnValue)]) {
self.push_row_unchecked(values);
}
#[must_use]
pub fn get_column(&self, name: &str) -> Option<&TypedColumn> {
self.columns.get(name)
}
fn reject_nested_array(name: &str, col_type: &ColumnType) -> Result<(), ColumnStoreError> {
if let ColumnType::Array(inner) = col_type {
if matches!(inner.as_ref(), ColumnType::Array(_)) {
return Err(ColumnStoreError::TypeMismatch {
expected: "scalar element type (Int, Float, String, Bool)".to_string(),
actual: format!("nested Array in column '{name}'"),
});
}
}
Ok(())
}
pub fn column_names(&self) -> impl Iterator<Item = &str> {
self.columns.keys().map(String::as_str)
}
#[must_use]
pub fn get_value_as_json(&self, column: &str, row_idx: usize) -> Option<serde_json::Value> {
if self.deleted_rows.contains(&row_idx) {
return None;
}
let col = self.columns.get(column)?;
if let TypedColumn::String(v) = col {
return v.get(row_idx).and_then(|opt| {
opt.and_then(|id| self.string_table.get(id).map(|s| serde_json::json!(s)))
});
}
if let TypedColumn::Array { data, .. } = col {
return self.get_array_as_json(data, row_idx);
}
if let TypedColumn::GeoPoint(v) = col {
return v
.get(row_idx)
.and_then(|opt| opt.map(|(lat, lng)| serde_json::json!({"lat": lat, "lng": lng})));
}
col.get_as_json_non_string(row_idx)
}
fn get_array_as_json(
&self,
data: &[Option<smallvec::SmallVec<[ColumnValue; 8]>>],
row_idx: usize,
) -> Option<serde_json::Value> {
let arr = data.get(row_idx)?.as_ref()?;
let json_arr: Vec<serde_json::Value> =
arr.iter().map(|v| self.column_value_to_json(v)).collect();
Some(serde_json::Value::Array(json_arr))
}
fn column_value_to_json(&self, value: &ColumnValue) -> serde_json::Value {
match value {
ColumnValue::Int(v) => serde_json::json!(v),
ColumnValue::Float(v) => serde_json::json!(v),
ColumnValue::Bool(v) => serde_json::json!(v),
ColumnValue::String(id) => self
.string_table
.get(*id)
.map_or(serde_json::Value::Null, |s| serde_json::json!(s)),
ColumnValue::Null => serde_json::Value::Null,
ColumnValue::Array(inner) => {
let arr: Vec<serde_json::Value> =
inner.iter().map(|v| self.column_value_to_json(v)).collect();
serde_json::Value::Array(arr)
}
ColumnValue::GeoPoint(lat, lng) => {
serde_json::json!({"lat": lat, "lng": lng})
}
}
}
}