velesdb_core/column_store/
mod.rs1#![allow(clippy::cast_precision_loss)]
28#![allow(clippy::cast_possible_truncation)]
29#![allow(clippy::doc_markdown)] mod batch;
32#[cfg(test)]
33mod batch_tests;
34mod filter;
35mod string_table;
36mod types;
37mod vacuum;
38
39use roaring::RoaringBitmap;
40use rustc_hash::FxHashMap;
41use std::collections::HashMap;
42
43pub use string_table::StringTable;
44pub use types::{
45 AutoVacuumConfig, BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError,
46 ColumnType, ColumnValue, ExpireResult, StringId, TypedColumn, UpsertResult, VacuumConfig,
47 VacuumStats,
48};
49
50#[derive(Debug, Default)]
52pub struct ColumnStore {
53 pub(crate) columns: HashMap<String, TypedColumn>,
55 pub(crate) string_table: StringTable,
57 pub(crate) row_count: usize,
59 pub(crate) primary_key_column: Option<String>,
61 pub(crate) primary_index: HashMap<i64, usize>,
63 pub(crate) row_idx_to_pk: HashMap<usize, i64>,
65 pub(crate) deleted_rows: rustc_hash::FxHashSet<usize>,
67 pub(crate) deletion_bitmap: RoaringBitmap,
69 pub(crate) row_expiry: HashMap<usize, u64>,
71}
72
73impl ColumnStore {
74 #[must_use]
76 pub fn new() -> Self {
77 Self::default()
78 }
79
80 #[must_use]
82 pub fn with_schema(fields: &[(&str, ColumnType)]) -> Self {
83 let mut store = Self::new();
84 for (name, col_type) in fields {
85 store.add_column(name, *col_type);
86 }
87 store
88 }
89
90 pub fn with_primary_key(
97 fields: &[(&str, ColumnType)],
98 pk_column: &str,
99 ) -> crate::error::Result<Self> {
100 let pk_field = fields
101 .iter()
102 .find(|(name, _)| *name == pk_column)
103 .ok_or_else(|| {
104 crate::error::Error::ColumnStoreError(format!(
105 "Primary key column '{}' not found in fields: {:?}",
106 pk_column,
107 fields.iter().map(|(n, _)| *n).collect::<Vec<_>>()
108 ))
109 })?;
110 if !matches!(pk_field.1, ColumnType::Int) {
111 return Err(crate::error::Error::ColumnStoreError(format!(
112 "Primary key column '{}' must be Int type, got {:?}",
113 pk_column, pk_field.1
114 )));
115 }
116
117 let mut store = Self::with_schema(fields);
118 store.primary_key_column = Some(pk_column.to_string());
119 store.primary_index = HashMap::new();
120 Ok(store)
121 }
122
123 #[must_use]
125 pub fn primary_key_column(&self) -> Option<&str> {
126 self.primary_key_column.as_deref()
127 }
128
129 pub fn add_column(&mut self, name: &str, col_type: ColumnType) {
131 let column = match col_type {
132 ColumnType::Int => TypedColumn::new_int(0),
133 ColumnType::Float => TypedColumn::new_float(0),
134 ColumnType::String => TypedColumn::new_string(0),
135 ColumnType::Bool => TypedColumn::new_bool(0),
136 };
137 self.columns.insert(name.to_string(), column);
138 }
139
140 #[must_use]
142 pub fn row_count(&self) -> usize {
143 self.row_count
144 }
145
146 #[must_use]
148 pub fn active_row_count(&self) -> usize {
149 self.row_count.saturating_sub(self.deleted_rows.len())
150 }
151
152 #[must_use]
154 pub fn deleted_row_count(&self) -> usize {
155 self.deleted_rows.len()
156 }
157
158 #[must_use]
160 pub fn string_table(&self) -> &StringTable {
161 &self.string_table
162 }
163
164 pub fn string_table_mut(&mut self) -> &mut StringTable {
166 &mut self.string_table
167 }
168
169 pub fn push_row_unchecked(&mut self, values: &[(&str, ColumnValue)]) {
171 let value_map: FxHashMap<&str, &ColumnValue> =
172 values.iter().map(|(k, v)| (*k, v)).collect();
173
174 for (name, column) in &mut self.columns {
175 if let Some(value) = value_map.get(name.as_str()) {
176 match value {
177 ColumnValue::Null => column.push_null(),
178 ColumnValue::Int(v) => {
179 if let TypedColumn::Int(col) = column {
180 col.push(Some(*v));
181 } else {
182 column.push_null();
183 }
184 }
185 ColumnValue::Float(v) => {
186 if let TypedColumn::Float(col) = column {
187 col.push(Some(*v));
188 } else {
189 column.push_null();
190 }
191 }
192 ColumnValue::String(id) => {
193 if let TypedColumn::String(col) = column {
194 col.push(Some(*id));
195 } else {
196 column.push_null();
197 }
198 }
199 ColumnValue::Bool(v) => {
200 if let TypedColumn::Bool(col) = column {
201 col.push(Some(*v));
202 } else {
203 column.push_null();
204 }
205 }
206 }
207 } else {
208 column.push_null();
209 }
210 }
211 self.row_count += 1;
212 }
213
214 #[inline]
216 pub fn push_row(&mut self, values: &[(&str, ColumnValue)]) {
217 self.push_row_unchecked(values);
218 }
219
220 pub fn insert_row(
227 &mut self,
228 values: &[(&str, ColumnValue)],
229 ) -> Result<usize, ColumnStoreError> {
230 let Some(ref pk_col) = self.primary_key_column else {
231 self.push_row(values);
232 return Ok(self.row_count - 1);
233 };
234
235 let pk_value = values
236 .iter()
237 .find(|(name, _)| *name == pk_col.as_str())
238 .and_then(|(_, value)| {
239 if let ColumnValue::Int(v) = value {
240 Some(*v)
241 } else {
242 None
243 }
244 })
245 .ok_or(ColumnStoreError::MissingPrimaryKey)?;
246
247 if let Some(&existing_idx) = self.primary_index.get(&pk_value) {
248 if self.deleted_rows.contains(&existing_idx) {
249 for (col_name, value) in values {
250 if let Some(col) = self.columns.get(*col_name) {
251 if !matches!(value, ColumnValue::Null) {
252 Self::validate_type_match(col, value)?;
253 }
254 }
255 }
256 self.deleted_rows.remove(&existing_idx);
257 if let Ok(idx) = u32::try_from(existing_idx) {
259 self.deletion_bitmap.remove(idx);
260 }
261 self.row_expiry.remove(&existing_idx);
262 let value_map: std::collections::HashMap<&str, &ColumnValue> =
263 values.iter().map(|(k, v)| (*k, v)).collect();
264 let col_names: Vec<String> = self.columns.keys().cloned().collect();
265 for col_name in col_names {
266 if let Some(col) = self.columns.get_mut(&col_name) {
267 if let Some(value) = value_map.get(col_name.as_str()) {
268 Self::set_column_value(col, existing_idx, (*value).clone())?;
269 } else {
270 Self::set_column_value(col, existing_idx, ColumnValue::Null)?;
271 }
272 }
273 }
274 return Ok(existing_idx);
275 }
276 return Err(ColumnStoreError::DuplicateKey(pk_value));
277 }
278
279 let row_idx = self.row_count;
280 self.push_row(values);
281 self.primary_index.insert(pk_value, row_idx);
282 self.row_idx_to_pk.insert(row_idx, pk_value);
283 Ok(row_idx)
284 }
285
286 #[must_use]
288 pub fn get_row_idx_by_pk(&self, pk: i64) -> Option<usize> {
289 let row_idx = self.primary_index.get(&pk).copied()?;
290 if self.deleted_rows.contains(&row_idx) {
291 return None;
292 }
293 Some(row_idx)
294 }
295
296 pub fn delete_by_pk(&mut self, pk: i64) -> bool {
301 let Some(&row_idx) = self.primary_index.get(&pk) else {
302 return false;
303 };
304 if self.deleted_rows.contains(&row_idx) {
305 return false;
306 }
307 self.deleted_rows.insert(row_idx);
308 if let Ok(idx) = u32::try_from(row_idx) {
310 self.deletion_bitmap.insert(idx);
311 }
312 self.row_expiry.remove(&row_idx);
313 true
314 }
315
316 pub fn update_by_pk(
324 &mut self,
325 pk: i64,
326 column: &str,
327 value: ColumnValue,
328 ) -> Result<(), ColumnStoreError> {
329 if self
330 .primary_key_column
331 .as_ref()
332 .is_some_and(|pk_col| pk_col == column)
333 {
334 return Err(ColumnStoreError::PrimaryKeyUpdate);
335 }
336
337 let row_idx = *self
338 .primary_index
339 .get(&pk)
340 .ok_or(ColumnStoreError::RowNotFound(pk))?;
341
342 if self.deleted_rows.contains(&row_idx) {
343 return Err(ColumnStoreError::RowNotFound(pk));
344 }
345
346 let col = self
347 .columns
348 .get_mut(column)
349 .ok_or_else(|| ColumnStoreError::ColumnNotFound(column.to_string()))?;
350
351 Self::set_column_value(col, row_idx, value)
352 }
353
354 pub fn update_multi_by_pk(
367 &mut self,
368 pk: i64,
369 updates: &[(&str, ColumnValue)],
370 ) -> Result<(), ColumnStoreError> {
371 let row_idx = *self
372 .primary_index
373 .get(&pk)
374 .ok_or(ColumnStoreError::RowNotFound(pk))?;
375
376 if self.deleted_rows.contains(&row_idx) {
377 return Err(ColumnStoreError::RowNotFound(pk));
378 }
379
380 for (col_name, value) in updates {
381 if self
382 .primary_key_column
383 .as_ref()
384 .is_some_and(|pk_col| pk_col == *col_name)
385 {
386 return Err(ColumnStoreError::PrimaryKeyUpdate);
387 }
388
389 let col = self
390 .columns
391 .get(*col_name)
392 .ok_or_else(|| ColumnStoreError::ColumnNotFound((*col_name).to_string()))?;
393
394 if !matches!(value, ColumnValue::Null) {
395 Self::validate_type_match(col, value)?;
396 }
397 }
398
399 for (col_name, value) in updates {
400 let col = self
401 .columns
402 .get_mut(*col_name)
403 .ok_or_else(|| ColumnStoreError::ColumnNotFound((*col_name).to_string()))?;
404 Self::set_column_value(col, row_idx, value.clone())?;
405 }
406
407 Ok(())
408 }
409
410 #[must_use]
412 pub fn get_column(&self, name: &str) -> Option<&TypedColumn> {
413 self.columns.get(name)
414 }
415
416 pub fn column_names(&self) -> impl Iterator<Item = &str> {
418 self.columns.keys().map(String::as_str)
419 }
420
421 #[must_use]
423 pub fn get_value_as_json(&self, column: &str, row_idx: usize) -> Option<serde_json::Value> {
424 if self.deleted_rows.contains(&row_idx) {
425 return None;
426 }
427
428 let col = self.columns.get(column)?;
429 match col {
430 TypedColumn::Int(v) => v
431 .get(row_idx)
432 .and_then(|opt| opt.map(|v| serde_json::json!(v))),
433 TypedColumn::Float(v) => v
434 .get(row_idx)
435 .and_then(|opt| opt.map(|v| serde_json::json!(v))),
436 TypedColumn::String(v) => v.get(row_idx).and_then(|opt| {
437 opt.and_then(|id| self.string_table.get(id).map(|s| serde_json::json!(s)))
438 }),
439 TypedColumn::Bool(v) => v
440 .get(row_idx)
441 .and_then(|opt| opt.map(|v| serde_json::json!(v))),
442 }
443 }
444}