Skip to main content

velesdb_core/column_store/
batch.rs

1//! Batch operations for `ColumnStore`.
2//!
3//! This module provides batch update, TTL expiration, and upsert operations.
4
5use std::collections::HashMap;
6
7use super::types::{
8    BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError, ColumnValue, ExpireResult,
9    TypedColumn, UpsertResult,
10};
11use super::ColumnStore;
12
13impl ColumnStore {
14    /// Performs batch updates with optimized cache locality.
15    pub fn batch_update(&mut self, updates: &[BatchUpdate]) -> BatchUpdateResult {
16        let mut result = BatchUpdateResult::default();
17        let mut by_column: HashMap<&str, Vec<(usize, ColumnValue)>> = HashMap::new();
18
19        for update in updates {
20            if self
21                .primary_key_column
22                .as_ref()
23                .is_some_and(|pk_col| pk_col == &update.column)
24            {
25                result
26                    .failed
27                    .push((update.pk, ColumnStoreError::PrimaryKeyUpdate));
28                continue;
29            }
30
31            if let Some(&row_idx) = self.primary_index.get(&update.pk) {
32                if self.deleted_rows.contains(&row_idx) {
33                    result
34                        .failed
35                        .push((update.pk, ColumnStoreError::RowNotFound(update.pk)));
36                    continue;
37                }
38                by_column
39                    .entry(update.column.as_str())
40                    .or_default()
41                    .push((row_idx, update.value.clone()));
42            } else {
43                result
44                    .failed
45                    .push((update.pk, ColumnStoreError::RowNotFound(update.pk)));
46            }
47        }
48
49        let mut row_to_pk: HashMap<usize, i64> = HashMap::new();
50        for update in updates {
51            if let Some(&row_idx) = self.primary_index.get(&update.pk) {
52                row_to_pk.insert(row_idx, update.pk);
53            }
54        }
55
56        for (col_name, col_updates) in by_column {
57            if let Some(col) = self.columns.get_mut(col_name) {
58                for (row_idx, value) in col_updates {
59                    let actual_type = Self::value_type_name(&value);
60                    if Self::set_column_value(col, row_idx, value).is_ok() {
61                        result.successful += 1;
62                    } else {
63                        let pk = row_to_pk.get(&row_idx).copied().unwrap_or(0);
64                        result.failed.push((
65                            pk,
66                            ColumnStoreError::TypeMismatch {
67                                expected: Self::column_type_name(col),
68                                actual: actual_type,
69                            },
70                        ));
71                    }
72                }
73            } else {
74                for (row_idx, _) in col_updates {
75                    let pk = row_to_pk.get(&row_idx).copied().unwrap_or(0);
76                    result
77                        .failed
78                        .push((pk, ColumnStoreError::ColumnNotFound(col_name.to_string())));
79                }
80            }
81        }
82
83        result
84    }
85
86    /// Batch update with same value for multiple primary keys.
87    pub fn batch_update_same_value(
88        &mut self,
89        pks: &[i64],
90        column: &str,
91        value: &ColumnValue,
92    ) -> BatchUpdateResult {
93        let updates: Vec<BatchUpdate> = pks
94            .iter()
95            .map(|&pk| BatchUpdate {
96                pk,
97                column: column.to_string(),
98                value: value.clone(),
99            })
100            .collect();
101        self.batch_update(&updates)
102    }
103
104    /// Sets a TTL (Time To Live) on a row.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error when `pk` is missing or points to a deleted row.
109    pub fn set_ttl(&mut self, pk: i64, ttl_seconds: u64) -> Result<(), ColumnStoreError> {
110        let row_idx = *self
111            .primary_index
112            .get(&pk)
113            .ok_or(ColumnStoreError::RowNotFound(pk))?;
114
115        if self.deleted_rows.contains(&row_idx) {
116            return Err(ColumnStoreError::RowNotFound(pk));
117        }
118
119        let expiry_ts = Self::now_timestamp() + ttl_seconds;
120        self.row_expiry.insert(row_idx, expiry_ts);
121        Ok(())
122    }
123
124    /// Expires all rows that have passed their TTL.
125    pub fn expire_rows(&mut self) -> ExpireResult {
126        let now = Self::now_timestamp();
127        let mut result = ExpireResult::default();
128
129        let expired_rows: Vec<usize> = self
130            .row_expiry
131            .iter()
132            .filter(|(_, &expiry)| expiry <= now)
133            .map(|(&row_idx, _)| row_idx)
134            .collect();
135
136        for row_idx in expired_rows {
137            if let Some(&pk) = self.row_idx_to_pk.get(&row_idx) {
138                self.deleted_rows.insert(row_idx);
139                // BUG-2 FIX: Also update RoaringBitmap to keep both in sync
140                if let Ok(idx) = u32::try_from(row_idx) {
141                    self.deletion_bitmap.insert(idx);
142                }
143                self.row_expiry.remove(&row_idx);
144                result.pks.push(pk);
145                result.expired_count += 1;
146            }
147        }
148
149        result
150    }
151
152    /// Upsert: inserts a new row or updates an existing one.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error when primary key constraints are violated,
157    /// when a referenced column does not exist, or when types mismatch.
158    pub fn upsert(
159        &mut self,
160        values: &[(&str, ColumnValue)],
161    ) -> Result<UpsertResult, ColumnStoreError> {
162        let Some(ref pk_col) = self.primary_key_column else {
163            return Err(ColumnStoreError::MissingPrimaryKey);
164        };
165
166        let pk_value = values
167            .iter()
168            .find(|(name, _)| *name == pk_col.as_str())
169            .and_then(|(_, value)| {
170                if let ColumnValue::Int(v) = value {
171                    Some(*v)
172                } else {
173                    None
174                }
175            })
176            .ok_or(ColumnStoreError::MissingPrimaryKey)?;
177
178        for (col_name, _) in values {
179            if *col_name != pk_col.as_str() && !self.columns.contains_key(*col_name) {
180                return Err(ColumnStoreError::ColumnNotFound((*col_name).to_string()));
181            }
182        }
183
184        if let Some(&row_idx) = self.primary_index.get(&pk_value) {
185            if self.deleted_rows.contains(&row_idx) {
186                for (col_name, value) in values {
187                    if *col_name != pk_col.as_str() {
188                        if let Some(col) = self.columns.get(*col_name) {
189                            if !matches!(value, ColumnValue::Null) {
190                                Self::validate_type_match(col, value)?;
191                            }
192                        }
193                    }
194                }
195                self.deleted_rows.remove(&row_idx);
196                self.row_expiry.remove(&row_idx);
197                let value_map: std::collections::HashMap<&str, &ColumnValue> =
198                    values.iter().map(|(k, v)| (*k, v)).collect();
199                let col_names: Vec<String> = self.columns.keys().cloned().collect();
200                for col_name in col_names {
201                    if col_name != *pk_col {
202                        if let Some(col) = self.columns.get_mut(&col_name) {
203                            if let Some(value) = value_map.get(col_name.as_str()) {
204                                Self::set_column_value(col, row_idx, (*value).clone())?;
205                            } else {
206                                Self::set_column_value(col, row_idx, ColumnValue::Null)?;
207                            }
208                        }
209                    }
210                }
211                return Ok(UpsertResult::Inserted);
212            }
213
214            for (col_name, value) in values {
215                if *col_name != pk_col.as_str() {
216                    if let Some(col) = self.columns.get(*col_name) {
217                        if !matches!(value, ColumnValue::Null) {
218                            Self::validate_type_match(col, value)?;
219                        }
220                    }
221                }
222            }
223            for (col_name, value) in values {
224                if *col_name != pk_col.as_str() {
225                    if let Some(col) = self.columns.get_mut(*col_name) {
226                        Self::set_column_value(col, row_idx, value.clone())?;
227                    }
228                }
229            }
230            Ok(UpsertResult::Updated)
231        } else {
232            self.insert_row(values)?;
233            Ok(UpsertResult::Inserted)
234        }
235    }
236
237    /// Batch upsert: inserts or updates multiple rows.
238    pub fn batch_upsert(&mut self, rows: &[Vec<(&str, ColumnValue)>]) -> BatchUpsertResult {
239        let mut result = BatchUpsertResult::default();
240
241        for row in rows {
242            match self.upsert(row) {
243                Ok(UpsertResult::Inserted) => result.inserted += 1,
244                Ok(UpsertResult::Updated) => result.updated += 1,
245                Err(e) => {
246                    let pk = row
247                        .iter()
248                        .find(|(name, _)| {
249                            self.primary_key_column
250                                .as_ref()
251                                .is_some_and(|pk| pk.as_str() == *name)
252                        })
253                        .and_then(|(_, v)| {
254                            if let ColumnValue::Int(pk) = v {
255                                Some(*pk)
256                            } else {
257                                None
258                            }
259                        })
260                        .unwrap_or(0);
261                    result.failed.push((pk, e));
262                }
263            }
264        }
265
266        result
267    }
268
269    pub(super) fn validate_type_match(
270        col: &TypedColumn,
271        value: &ColumnValue,
272    ) -> Result<(), ColumnStoreError> {
273        let type_matches = matches!(
274            (col, value),
275            (TypedColumn::Int(_), ColumnValue::Int(_))
276                | (TypedColumn::Float(_), ColumnValue::Float(_))
277                | (TypedColumn::String(_), ColumnValue::String(_))
278                | (TypedColumn::Bool(_), ColumnValue::Bool(_))
279                | (_, ColumnValue::Null)
280        );
281
282        if type_matches {
283            Ok(())
284        } else {
285            Err(ColumnStoreError::TypeMismatch {
286                expected: Self::column_type_name(col),
287                actual: Self::value_type_name(value),
288            })
289        }
290    }
291
292    pub(super) fn set_column_value(
293        col: &mut TypedColumn,
294        row_idx: usize,
295        value: ColumnValue,
296    ) -> Result<(), ColumnStoreError> {
297        if matches!(value, ColumnValue::Null) {
298            match col {
299                TypedColumn::Int(vec) => {
300                    if row_idx >= vec.len() {
301                        return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
302                    }
303                    vec[row_idx] = None;
304                }
305                TypedColumn::Float(vec) => {
306                    if row_idx >= vec.len() {
307                        return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
308                    }
309                    vec[row_idx] = None;
310                }
311                TypedColumn::String(vec) => {
312                    if row_idx >= vec.len() {
313                        return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
314                    }
315                    vec[row_idx] = None;
316                }
317                TypedColumn::Bool(vec) => {
318                    if row_idx >= vec.len() {
319                        return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
320                    }
321                    vec[row_idx] = None;
322                }
323            }
324            return Ok(());
325        }
326
327        match (col, value) {
328            (TypedColumn::Int(vec), ColumnValue::Int(v)) => {
329                if row_idx >= vec.len() {
330                    return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
331                }
332                vec[row_idx] = Some(v);
333                Ok(())
334            }
335            (TypedColumn::Float(vec), ColumnValue::Float(v)) => {
336                if row_idx >= vec.len() {
337                    return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
338                }
339                vec[row_idx] = Some(v);
340                Ok(())
341            }
342            (TypedColumn::String(vec), ColumnValue::String(v)) => {
343                if row_idx >= vec.len() {
344                    return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
345                }
346                vec[row_idx] = Some(v);
347                Ok(())
348            }
349            (TypedColumn::Bool(vec), ColumnValue::Bool(v)) => {
350                if row_idx >= vec.len() {
351                    return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
352                }
353                vec[row_idx] = Some(v);
354                Ok(())
355            }
356            (col, value) => Err(ColumnStoreError::TypeMismatch {
357                expected: Self::column_type_name(col),
358                actual: Self::value_type_name(&value),
359            }),
360        }
361    }
362
363    pub(super) fn column_type_name(col: &TypedColumn) -> String {
364        match col {
365            TypedColumn::Int(_) => "Int".to_string(),
366            TypedColumn::Float(_) => "Float".to_string(),
367            TypedColumn::String(_) => "String".to_string(),
368            TypedColumn::Bool(_) => "Bool".to_string(),
369        }
370    }
371
372    pub(super) fn value_type_name(value: &ColumnValue) -> String {
373        match value {
374            ColumnValue::Int(_) => "Int".to_string(),
375            ColumnValue::Float(_) => "Float".to_string(),
376            ColumnValue::String(_) => "String".to_string(),
377            ColumnValue::Bool(_) => "Bool".to_string(),
378            ColumnValue::Null => "Null".to_string(),
379        }
380    }
381
382    pub(super) fn now_timestamp() -> u64 {
383        std::time::SystemTime::now()
384            .duration_since(std::time::UNIX_EPOCH)
385            .map(|d| d.as_secs())
386            .unwrap_or(0)
387    }
388}
389
390// Tests moved to batch_tests.rs per project rules