velesdb-core 1.13.1

High-performance vector database engine written in Rust
Documentation
//! Column-oriented storage for high-performance metadata filtering.
//!
//! This module provides a columnar storage format for frequently filtered fields,
//! avoiding the overhead of JSON parsing during filter operations.
//!
//! # Performance Goals
//!
//! - Maintain 50M+ items/sec throughput at 100k items (vs 19M/s with JSON)
//! - Cache-friendly sequential memory access
//! - Support for common filter operations: Eq, Gt, Lt, In, Range
//!
//! # Architecture
//!
//! ```text
//! ColumnStore
//! ├── columns: HashMap<field_name, TypedColumn>
//! │   ├── "category" -> StringColumn(Vec<Option<StringId>>)
//! │   ├── "price"    -> IntColumn(Vec<Option<i64>>)
//! │   └── "rating"   -> FloatColumn(Vec<Option<f64>>)
//! ```

// Reason: Numeric casts in column store are intentional:
// - All casts are for columnar data processing and statistics
// - u64/usize conversions for row indices and bitmap operations
// - Values bounded by column cardinality and row count
// - Precision loss acceptable for column statistics
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::doc_markdown)] // Column-store docs include many storage type identifiers.

mod batch;
#[cfg(test)]
mod batch_tests;
mod filter;
mod filter_array;
mod filter_geo;
#[cfg(test)]
mod filter_tests;
pub(crate) mod haversine;
#[cfg(test)]
mod haversine_tests;
mod primary_key_ops;
mod string_table;
mod types;
mod vacuum;
#[cfg(test)]
mod vacuum_tests;

use roaring::RoaringBitmap;
use rustc_hash::FxHashMap;
use std::collections::HashMap;

pub use filter_geo::{CompareOp, GeoBboxParams, GeoDistanceParams};
pub use string_table::StringTable;
pub use types::{
    AutoVacuumConfig, BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError,
    ColumnType, ColumnValue, ExpireResult, StringId, TypedColumn, UpsertResult, VacuumConfig,
    VacuumStats,
};

/// Column store for high-performance filtering.
#[derive(Debug, Default)]
pub struct ColumnStore {
    /// Columns indexed by field name
    pub(crate) columns: HashMap<String, TypedColumn>,
    /// String interning table
    pub(crate) string_table: StringTable,
    /// Number of rows
    pub(crate) row_count: usize,
    /// Primary key column name (if any)
    pub(crate) primary_key_column: Option<String>,
    /// Primary key index: pk_value → row_idx (O(1) lookup)
    pub(crate) primary_index: HashMap<i64, usize>,
    /// Reverse index: row_idx → pk_value (O(1) reverse lookup for expire_rows)
    pub(crate) row_idx_to_pk: HashMap<usize, i64>,
    /// Deleted row indices (tombstones) - FxHashSet for backward compatibility
    pub(crate) deleted_rows: rustc_hash::FxHashSet<usize>,
    /// Deleted row bitmap (EPIC-043 US-002) - RoaringBitmap for O(1) contains
    pub(crate) deletion_bitmap: RoaringBitmap,
    /// Row expiry timestamps: row_idx → expiry_timestamp (US-004 TTL)
    pub(crate) row_expiry: HashMap<usize, u64>,
}

impl ColumnStore {
    /// Creates a new empty column store.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Creates a column store with pre-defined indexed fields.
    #[must_use]
    pub fn with_schema(fields: &[(&str, ColumnType)]) -> Self {
        let mut store = Self::new();
        for (name, col_type) in fields {
            store.add_column(name, col_type);
        }
        store
    }

    /// Creates a column store with validated schema (rejects nested arrays).
    ///
    /// # Errors
    ///
    /// Returns `ColumnStoreError::TypeMismatch` if any column uses nested arrays.
    pub fn with_schema_validated(fields: &[(&str, ColumnType)]) -> Result<Self, ColumnStoreError> {
        for (name, col_type) in fields {
            Self::reject_nested_array(name, col_type)?;
        }
        Ok(Self::with_schema(fields))
    }

    /// Creates a column store with a primary key for O(1) lookups.
    ///
    /// # Errors
    ///
    /// Returns `Error::ColumnStoreError` if `pk_column` is not found in `fields`
    /// or is not of type `Int`.
    pub fn with_primary_key(
        fields: &[(&str, ColumnType)],
        pk_column: &str,
    ) -> crate::error::Result<Self> {
        let pk_field = fields
            .iter()
            .find(|(name, _)| *name == pk_column)
            .ok_or_else(|| {
                crate::error::Error::ColumnStoreError(format!(
                    "Primary key column '{}' not found in fields: {:?}",
                    pk_column,
                    fields.iter().map(|(n, _)| *n).collect::<Vec<_>>()
                ))
            })?;
        if !matches!(pk_field.1, ColumnType::Int) {
            return Err(crate::error::Error::ColumnStoreError(format!(
                "Primary key column '{}' must be Int type, got {:?}",
                pk_column, pk_field.1
            )));
        }

        let mut store = Self::with_schema(fields);
        store.primary_key_column = Some(pk_column.to_string());
        store.primary_index = HashMap::new();
        Ok(store)
    }

    /// Returns the primary key column name if set.
    #[must_use]
    pub fn primary_key_column(&self) -> Option<&str> {
        self.primary_key_column.as_deref()
    }

    /// Adds a new column to the store.
    ///
    /// # Panics
    ///
    /// Does not panic. Nested arrays are silently treated as scalar arrays.
    /// Use `add_column_validated` for strict schema validation.
    pub fn add_column(&mut self, name: &str, col_type: &ColumnType) {
        let column = match col_type {
            ColumnType::Int => TypedColumn::new_int(0),
            ColumnType::Float => TypedColumn::new_float(0),
            ColumnType::String => TypedColumn::new_string(0),
            ColumnType::Bool => TypedColumn::new_bool(0),
            ColumnType::Array(inner) => TypedColumn::new_array((**inner).clone(), 0),
            ColumnType::GeoPoint => TypedColumn::new_geopoint(0),
        };
        self.columns.insert(name.to_string(), column);
    }

    /// Returns the total number of rows in the store (including deleted/tombstoned rows).
    #[must_use]
    pub fn row_count(&self) -> usize {
        self.row_count
    }

    /// Returns the number of active (non-deleted) rows in the store.
    #[must_use]
    pub fn active_row_count(&self) -> usize {
        self.row_count.saturating_sub(self.deleted_rows.len())
    }

    /// Returns the number of deleted (tombstoned) rows.
    #[must_use]
    pub fn deleted_row_count(&self) -> usize {
        self.deleted_rows.len()
    }

    /// Returns the string table for string interning.
    #[must_use]
    pub fn string_table(&self) -> &StringTable {
        &self.string_table
    }

    /// Returns a mutable reference to the string table.
    pub fn string_table_mut(&mut self) -> &mut StringTable {
        &mut self.string_table
    }

    /// Pushes values for a new row (low-level, no validation).
    pub fn push_row_unchecked(&mut self, values: &[(&str, ColumnValue)]) {
        let value_map: FxHashMap<&str, &ColumnValue> =
            values.iter().map(|(k, v)| (*k, v)).collect();

        for (name, column) in &mut self.columns {
            if let Some(value) = value_map.get(name.as_str()) {
                column.push_typed(value);
            } else {
                column.push_null();
            }
        }
        self.row_count += 1;
    }

    /// Convenience alias for [`push_row_unchecked()`](Self::push_row_unchecked).
    #[inline]
    pub fn push_row(&mut self, values: &[(&str, ColumnValue)]) {
        self.push_row_unchecked(values);
    }

    /// Gets a column by name.
    #[must_use]
    pub fn get_column(&self, name: &str) -> Option<&TypedColumn> {
        self.columns.get(name)
    }

    /// Rejects nested array types (`Array(Array(...))`) at schema creation time.
    fn reject_nested_array(name: &str, col_type: &ColumnType) -> Result<(), ColumnStoreError> {
        if let ColumnType::Array(inner) = col_type {
            if matches!(inner.as_ref(), ColumnType::Array(_)) {
                return Err(ColumnStoreError::TypeMismatch {
                    expected: "scalar element type (Int, Float, String, Bool)".to_string(),
                    actual: format!("nested Array in column '{name}'"),
                });
            }
        }
        Ok(())
    }

    /// Returns an iterator over column names.
    pub fn column_names(&self) -> impl Iterator<Item = &str> {
        self.columns.keys().map(String::as_str)
    }

    /// Gets a value from a column at a specific row index as JSON.
    #[must_use]
    pub fn get_value_as_json(&self, column: &str, row_idx: usize) -> Option<serde_json::Value> {
        if self.deleted_rows.contains(&row_idx) {
            return None;
        }

        let col = self.columns.get(column)?;
        // String columns need special handling for intern-table resolution.
        if let TypedColumn::String(v) = col {
            return v.get(row_idx).and_then(|opt| {
                opt.and_then(|id| self.string_table.get(id).map(|s| serde_json::json!(s)))
            });
        }
        // Array columns need special handling for string element resolution.
        if let TypedColumn::Array { data, .. } = col {
            return self.get_array_as_json(data, row_idx);
        }
        // GeoPoint columns return {"lat": f64, "lng": f64}.
        if let TypedColumn::GeoPoint(v) = col {
            return v
                .get(row_idx)
                .and_then(|opt| opt.map(|(lat, lng)| serde_json::json!({"lat": lat, "lng": lng})));
        }
        col.get_as_json_non_string(row_idx)
    }

    /// Converts an array column cell to a JSON array, resolving string IDs.
    fn get_array_as_json(
        &self,
        data: &[Option<smallvec::SmallVec<[ColumnValue; 8]>>],
        row_idx: usize,
    ) -> Option<serde_json::Value> {
        let arr = data.get(row_idx)?.as_ref()?;
        let json_arr: Vec<serde_json::Value> =
            arr.iter().map(|v| self.column_value_to_json(v)).collect();
        Some(serde_json::Value::Array(json_arr))
    }

    /// Converts a single `ColumnValue` to its JSON representation.
    fn column_value_to_json(&self, value: &ColumnValue) -> serde_json::Value {
        match value {
            ColumnValue::Int(v) => serde_json::json!(v),
            ColumnValue::Float(v) => serde_json::json!(v),
            ColumnValue::Bool(v) => serde_json::json!(v),
            ColumnValue::String(id) => self
                .string_table
                .get(*id)
                .map_or(serde_json::Value::Null, |s| serde_json::json!(s)),
            ColumnValue::Null => serde_json::Value::Null,
            ColumnValue::Array(inner) => {
                let arr: Vec<serde_json::Value> =
                    inner.iter().map(|v| self.column_value_to_json(v)).collect();
                serde_json::Value::Array(arr)
            }
            ColumnValue::GeoPoint(lat, lng) => {
                serde_json::json!({"lat": lat, "lng": lng})
            }
        }
    }
}