Skip to main content

velesdb_core/column_store/
mod.rs

1//! Column-oriented storage for high-performance metadata filtering.
2//!
3//! This module provides a columnar storage format for frequently filtered fields,
4//! avoiding the overhead of JSON parsing during filter operations.
5//!
6//! # Performance Goals
7//!
8//! - Maintain 50M+ items/sec throughput at 100k items (vs 19M/s with JSON)
9//! - Cache-friendly sequential memory access
10//! - Support for common filter operations: Eq, Gt, Lt, In, Range
11//!
12//! # Architecture
13//!
14//! ```text
15//! ColumnStore
16//! ├── columns: HashMap<field_name, TypedColumn>
17//! │   ├── "category" -> StringColumn(Vec<Option<StringId>>)
18//! │   ├── "price"    -> IntColumn(Vec<Option<i64>>)
19//! │   └── "rating"   -> FloatColumn(Vec<Option<f64>>)
20//! ```
21
22// SAFETY: Numeric casts in column store are intentional:
23// - All casts are for columnar data processing and statistics
24// - u64/usize conversions for row indices and bitmap operations
25// - Values bounded by column cardinality and row count
26// - Precision loss acceptable for column statistics
27#![allow(clippy::cast_precision_loss)]
28#![allow(clippy::cast_possible_truncation)]
29#![allow(clippy::doc_markdown)] // Column-store docs include many storage type identifiers.
30
31mod batch;
32#[cfg(test)]
33mod batch_tests;
34mod filter;
35mod string_table;
36mod types;
37mod vacuum;
38
39use roaring::RoaringBitmap;
40use rustc_hash::FxHashMap;
41use std::collections::HashMap;
42
43pub use string_table::StringTable;
44pub use types::{
45    AutoVacuumConfig, BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError,
46    ColumnType, ColumnValue, ExpireResult, StringId, TypedColumn, UpsertResult, VacuumConfig,
47    VacuumStats,
48};
49
50/// Column store for high-performance filtering.
51#[derive(Debug, Default)]
52pub struct ColumnStore {
53    /// Columns indexed by field name
54    pub(crate) columns: HashMap<String, TypedColumn>,
55    /// String interning table
56    pub(crate) string_table: StringTable,
57    /// Number of rows
58    pub(crate) row_count: usize,
59    /// Primary key column name (if any)
60    pub(crate) primary_key_column: Option<String>,
61    /// Primary key index: pk_value → row_idx (O(1) lookup)
62    pub(crate) primary_index: HashMap<i64, usize>,
63    /// Reverse index: row_idx → pk_value (O(1) reverse lookup for expire_rows)
64    pub(crate) row_idx_to_pk: HashMap<usize, i64>,
65    /// Deleted row indices (tombstones) - FxHashSet for backward compatibility
66    pub(crate) deleted_rows: rustc_hash::FxHashSet<usize>,
67    /// Deleted row bitmap (EPIC-043 US-002) - RoaringBitmap for O(1) contains
68    pub(crate) deletion_bitmap: RoaringBitmap,
69    /// Row expiry timestamps: row_idx → expiry_timestamp (US-004 TTL)
70    pub(crate) row_expiry: HashMap<usize, u64>,
71}
72
73impl ColumnStore {
74    /// Creates a new empty column store.
75    #[must_use]
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Creates a column store with pre-defined indexed fields.
81    #[must_use]
82    pub fn with_schema(fields: &[(&str, ColumnType)]) -> Self {
83        let mut store = Self::new();
84        for (name, col_type) in fields {
85            store.add_column(name, *col_type);
86        }
87        store
88    }
89
90    /// Creates a column store with a primary key for O(1) lookups.
91    ///
92    /// # Errors
93    ///
94    /// Returns `Error::ColumnStoreError` if `pk_column` is not found in `fields`
95    /// or is not of type `Int`.
96    pub fn with_primary_key(
97        fields: &[(&str, ColumnType)],
98        pk_column: &str,
99    ) -> crate::error::Result<Self> {
100        let pk_field = fields
101            .iter()
102            .find(|(name, _)| *name == pk_column)
103            .ok_or_else(|| {
104                crate::error::Error::ColumnStoreError(format!(
105                    "Primary key column '{}' not found in fields: {:?}",
106                    pk_column,
107                    fields.iter().map(|(n, _)| *n).collect::<Vec<_>>()
108                ))
109            })?;
110        if !matches!(pk_field.1, ColumnType::Int) {
111            return Err(crate::error::Error::ColumnStoreError(format!(
112                "Primary key column '{}' must be Int type, got {:?}",
113                pk_column, pk_field.1
114            )));
115        }
116
117        let mut store = Self::with_schema(fields);
118        store.primary_key_column = Some(pk_column.to_string());
119        store.primary_index = HashMap::new();
120        Ok(store)
121    }
122
123    /// Returns the primary key column name if set.
124    #[must_use]
125    pub fn primary_key_column(&self) -> Option<&str> {
126        self.primary_key_column.as_deref()
127    }
128
129    /// Adds a new column to the store.
130    pub fn add_column(&mut self, name: &str, col_type: ColumnType) {
131        let column = match col_type {
132            ColumnType::Int => TypedColumn::new_int(0),
133            ColumnType::Float => TypedColumn::new_float(0),
134            ColumnType::String => TypedColumn::new_string(0),
135            ColumnType::Bool => TypedColumn::new_bool(0),
136        };
137        self.columns.insert(name.to_string(), column);
138    }
139
140    /// Returns the total number of rows in the store (including deleted/tombstoned rows).
141    #[must_use]
142    pub fn row_count(&self) -> usize {
143        self.row_count
144    }
145
146    /// Returns the number of active (non-deleted) rows in the store.
147    #[must_use]
148    pub fn active_row_count(&self) -> usize {
149        self.row_count.saturating_sub(self.deleted_rows.len())
150    }
151
152    /// Returns the number of deleted (tombstoned) rows.
153    #[must_use]
154    pub fn deleted_row_count(&self) -> usize {
155        self.deleted_rows.len()
156    }
157
158    /// Returns the string table for string interning.
159    #[must_use]
160    pub fn string_table(&self) -> &StringTable {
161        &self.string_table
162    }
163
164    /// Returns a mutable reference to the string table.
165    pub fn string_table_mut(&mut self) -> &mut StringTable {
166        &mut self.string_table
167    }
168
169    /// Pushes values for a new row (low-level, no validation).
170    pub fn push_row_unchecked(&mut self, values: &[(&str, ColumnValue)]) {
171        let value_map: FxHashMap<&str, &ColumnValue> =
172            values.iter().map(|(k, v)| (*k, v)).collect();
173
174        for (name, column) in &mut self.columns {
175            if let Some(value) = value_map.get(name.as_str()) {
176                match value {
177                    ColumnValue::Null => column.push_null(),
178                    ColumnValue::Int(v) => {
179                        if let TypedColumn::Int(col) = column {
180                            col.push(Some(*v));
181                        } else {
182                            column.push_null();
183                        }
184                    }
185                    ColumnValue::Float(v) => {
186                        if let TypedColumn::Float(col) = column {
187                            col.push(Some(*v));
188                        } else {
189                            column.push_null();
190                        }
191                    }
192                    ColumnValue::String(id) => {
193                        if let TypedColumn::String(col) = column {
194                            col.push(Some(*id));
195                        } else {
196                            column.push_null();
197                        }
198                    }
199                    ColumnValue::Bool(v) => {
200                        if let TypedColumn::Bool(col) = column {
201                            col.push(Some(*v));
202                        } else {
203                            column.push_null();
204                        }
205                    }
206                }
207            } else {
208                column.push_null();
209            }
210        }
211        self.row_count += 1;
212    }
213
214    /// Convenience alias for [`push_row_unchecked()`](Self::push_row_unchecked).
215    #[inline]
216    pub fn push_row(&mut self, values: &[(&str, ColumnValue)]) {
217        self.push_row_unchecked(values);
218    }
219
220    /// Inserts a row with primary key validation and index update.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the primary key is missing, duplicated, or any
225    /// provided value does not match the target column type.
226    pub fn insert_row(
227        &mut self,
228        values: &[(&str, ColumnValue)],
229    ) -> Result<usize, ColumnStoreError> {
230        let Some(ref pk_col) = self.primary_key_column else {
231            self.push_row(values);
232            return Ok(self.row_count - 1);
233        };
234
235        let pk_value = values
236            .iter()
237            .find(|(name, _)| *name == pk_col.as_str())
238            .and_then(|(_, value)| {
239                if let ColumnValue::Int(v) = value {
240                    Some(*v)
241                } else {
242                    None
243                }
244            })
245            .ok_or(ColumnStoreError::MissingPrimaryKey)?;
246
247        if let Some(&existing_idx) = self.primary_index.get(&pk_value) {
248            if self.deleted_rows.contains(&existing_idx) {
249                for (col_name, value) in values {
250                    if let Some(col) = self.columns.get(*col_name) {
251                        if !matches!(value, ColumnValue::Null) {
252                            Self::validate_type_match(col, value)?;
253                        }
254                    }
255                }
256                self.deleted_rows.remove(&existing_idx);
257                // BUG-9 FIX: Also update RoaringBitmap when undeleting a row
258                if let Ok(idx) = u32::try_from(existing_idx) {
259                    self.deletion_bitmap.remove(idx);
260                }
261                self.row_expiry.remove(&existing_idx);
262                let value_map: std::collections::HashMap<&str, &ColumnValue> =
263                    values.iter().map(|(k, v)| (*k, v)).collect();
264                let col_names: Vec<String> = self.columns.keys().cloned().collect();
265                for col_name in col_names {
266                    if let Some(col) = self.columns.get_mut(&col_name) {
267                        if let Some(value) = value_map.get(col_name.as_str()) {
268                            Self::set_column_value(col, existing_idx, (*value).clone())?;
269                        } else {
270                            Self::set_column_value(col, existing_idx, ColumnValue::Null)?;
271                        }
272                    }
273                }
274                return Ok(existing_idx);
275            }
276            return Err(ColumnStoreError::DuplicateKey(pk_value));
277        }
278
279        let row_idx = self.row_count;
280        self.push_row(values);
281        self.primary_index.insert(pk_value, row_idx);
282        self.row_idx_to_pk.insert(row_idx, pk_value);
283        Ok(row_idx)
284    }
285
286    /// Gets the row index by primary key value - O(1) lookup.
287    #[must_use]
288    pub fn get_row_idx_by_pk(&self, pk: i64) -> Option<usize> {
289        let row_idx = self.primary_index.get(&pk).copied()?;
290        if self.deleted_rows.contains(&row_idx) {
291            return None;
292        }
293        Some(row_idx)
294    }
295
296    /// Deletes a row by primary key value.
297    ///
298    /// Also clears any TTL metadata to prevent false-positive expirations.
299    /// Updates both FxHashSet and RoaringBitmap (EPIC-043 US-002).
300    pub fn delete_by_pk(&mut self, pk: i64) -> bool {
301        let Some(&row_idx) = self.primary_index.get(&pk) else {
302            return false;
303        };
304        if self.deleted_rows.contains(&row_idx) {
305            return false;
306        }
307        self.deleted_rows.insert(row_idx);
308        // EPIC-043 US-002: Also update RoaringBitmap for O(1) contains
309        if let Ok(idx) = u32::try_from(row_idx) {
310            self.deletion_bitmap.insert(idx);
311        }
312        self.row_expiry.remove(&row_idx);
313        true
314    }
315
316    /// Updates a single column value for a row identified by primary key - O(1).
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the row does not exist, the column does not exist,
321    /// the update targets the primary-key column, or the value type mismatches
322    /// the column type.
323    pub fn update_by_pk(
324        &mut self,
325        pk: i64,
326        column: &str,
327        value: ColumnValue,
328    ) -> Result<(), ColumnStoreError> {
329        if self
330            .primary_key_column
331            .as_ref()
332            .is_some_and(|pk_col| pk_col == column)
333        {
334            return Err(ColumnStoreError::PrimaryKeyUpdate);
335        }
336
337        let row_idx = *self
338            .primary_index
339            .get(&pk)
340            .ok_or(ColumnStoreError::RowNotFound(pk))?;
341
342        if self.deleted_rows.contains(&row_idx) {
343            return Err(ColumnStoreError::RowNotFound(pk));
344        }
345
346        let col = self
347            .columns
348            .get_mut(column)
349            .ok_or_else(|| ColumnStoreError::ColumnNotFound(column.to_string()))?;
350
351        Self::set_column_value(col, row_idx, value)
352    }
353
354    /// Updates multiple columns atomically for a row identified by primary key.
355    ///
356    /// # Panics
357    ///
358    /// This function will not panic under normal operation. The internal expect
359    /// is guarded by prior validation that all columns exist.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if the row does not exist, one of the columns does not
364    /// exist, one update attempts to modify the primary key, or a value type
365    /// mismatches its target column type.
366    pub fn update_multi_by_pk(
367        &mut self,
368        pk: i64,
369        updates: &[(&str, ColumnValue)],
370    ) -> Result<(), ColumnStoreError> {
371        let row_idx = *self
372            .primary_index
373            .get(&pk)
374            .ok_or(ColumnStoreError::RowNotFound(pk))?;
375
376        if self.deleted_rows.contains(&row_idx) {
377            return Err(ColumnStoreError::RowNotFound(pk));
378        }
379
380        for (col_name, value) in updates {
381            if self
382                .primary_key_column
383                .as_ref()
384                .is_some_and(|pk_col| pk_col == *col_name)
385            {
386                return Err(ColumnStoreError::PrimaryKeyUpdate);
387            }
388
389            let col = self
390                .columns
391                .get(*col_name)
392                .ok_or_else(|| ColumnStoreError::ColumnNotFound((*col_name).to_string()))?;
393
394            if !matches!(value, ColumnValue::Null) {
395                Self::validate_type_match(col, value)?;
396            }
397        }
398
399        for (col_name, value) in updates {
400            let col = self
401                .columns
402                .get_mut(*col_name)
403                .ok_or_else(|| ColumnStoreError::ColumnNotFound((*col_name).to_string()))?;
404            Self::set_column_value(col, row_idx, value.clone())?;
405        }
406
407        Ok(())
408    }
409
410    /// Gets a column by name.
411    #[must_use]
412    pub fn get_column(&self, name: &str) -> Option<&TypedColumn> {
413        self.columns.get(name)
414    }
415
416    /// Returns an iterator over column names.
417    pub fn column_names(&self) -> impl Iterator<Item = &str> {
418        self.columns.keys().map(String::as_str)
419    }
420
421    /// Gets a value from a column at a specific row index as JSON.
422    #[must_use]
423    pub fn get_value_as_json(&self, column: &str, row_idx: usize) -> Option<serde_json::Value> {
424        if self.deleted_rows.contains(&row_idx) {
425            return None;
426        }
427
428        let col = self.columns.get(column)?;
429        match col {
430            TypedColumn::Int(v) => v
431                .get(row_idx)
432                .and_then(|opt| opt.map(|v| serde_json::json!(v))),
433            TypedColumn::Float(v) => v
434                .get(row_idx)
435                .and_then(|opt| opt.map(|v| serde_json::json!(v))),
436            TypedColumn::String(v) => v.get(row_idx).and_then(|opt| {
437                opt.and_then(|id| self.string_table.get(id).map(|s| serde_json::json!(s)))
438            }),
439            TypedColumn::Bool(v) => v
440                .get(row_idx)
441                .and_then(|opt| opt.map(|v| serde_json::json!(v))),
442        }
443    }
444}