velesdb_core/column_store/
batch.rs1use std::collections::HashMap;
6
7use super::types::{
8 BatchUpdate, BatchUpdateResult, BatchUpsertResult, ColumnStoreError, ColumnValue, ExpireResult,
9 TypedColumn, UpsertResult,
10};
11use super::ColumnStore;
12
13impl ColumnStore {
14 pub fn batch_update(&mut self, updates: &[BatchUpdate]) -> BatchUpdateResult {
16 let mut result = BatchUpdateResult::default();
17 let mut by_column: HashMap<&str, Vec<(usize, ColumnValue)>> = HashMap::new();
18
19 for update in updates {
20 if self
21 .primary_key_column
22 .as_ref()
23 .is_some_and(|pk_col| pk_col == &update.column)
24 {
25 result
26 .failed
27 .push((update.pk, ColumnStoreError::PrimaryKeyUpdate));
28 continue;
29 }
30
31 if let Some(&row_idx) = self.primary_index.get(&update.pk) {
32 if self.deleted_rows.contains(&row_idx) {
33 result
34 .failed
35 .push((update.pk, ColumnStoreError::RowNotFound(update.pk)));
36 continue;
37 }
38 by_column
39 .entry(update.column.as_str())
40 .or_default()
41 .push((row_idx, update.value.clone()));
42 } else {
43 result
44 .failed
45 .push((update.pk, ColumnStoreError::RowNotFound(update.pk)));
46 }
47 }
48
49 let mut row_to_pk: HashMap<usize, i64> = HashMap::new();
50 for update in updates {
51 if let Some(&row_idx) = self.primary_index.get(&update.pk) {
52 row_to_pk.insert(row_idx, update.pk);
53 }
54 }
55
56 for (col_name, col_updates) in by_column {
57 if let Some(col) = self.columns.get_mut(col_name) {
58 for (row_idx, value) in col_updates {
59 let actual_type = Self::value_type_name(&value);
60 if Self::set_column_value(col, row_idx, value).is_ok() {
61 result.successful += 1;
62 } else {
63 let pk = row_to_pk.get(&row_idx).copied().unwrap_or(0);
64 result.failed.push((
65 pk,
66 ColumnStoreError::TypeMismatch {
67 expected: Self::column_type_name(col),
68 actual: actual_type,
69 },
70 ));
71 }
72 }
73 } else {
74 for (row_idx, _) in col_updates {
75 let pk = row_to_pk.get(&row_idx).copied().unwrap_or(0);
76 result
77 .failed
78 .push((pk, ColumnStoreError::ColumnNotFound(col_name.to_string())));
79 }
80 }
81 }
82
83 result
84 }
85
86 pub fn batch_update_same_value(
88 &mut self,
89 pks: &[i64],
90 column: &str,
91 value: &ColumnValue,
92 ) -> BatchUpdateResult {
93 let updates: Vec<BatchUpdate> = pks
94 .iter()
95 .map(|&pk| BatchUpdate {
96 pk,
97 column: column.to_string(),
98 value: value.clone(),
99 })
100 .collect();
101 self.batch_update(&updates)
102 }
103
104 pub fn set_ttl(&mut self, pk: i64, ttl_seconds: u64) -> Result<(), ColumnStoreError> {
110 let row_idx = *self
111 .primary_index
112 .get(&pk)
113 .ok_or(ColumnStoreError::RowNotFound(pk))?;
114
115 if self.deleted_rows.contains(&row_idx) {
116 return Err(ColumnStoreError::RowNotFound(pk));
117 }
118
119 let expiry_ts = Self::now_timestamp() + ttl_seconds;
120 self.row_expiry.insert(row_idx, expiry_ts);
121 Ok(())
122 }
123
124 pub fn expire_rows(&mut self) -> ExpireResult {
126 let now = Self::now_timestamp();
127 let mut result = ExpireResult::default();
128
129 let expired_rows: Vec<usize> = self
130 .row_expiry
131 .iter()
132 .filter(|(_, &expiry)| expiry <= now)
133 .map(|(&row_idx, _)| row_idx)
134 .collect();
135
136 for row_idx in expired_rows {
137 if let Some(&pk) = self.row_idx_to_pk.get(&row_idx) {
138 self.deleted_rows.insert(row_idx);
139 if let Ok(idx) = u32::try_from(row_idx) {
141 self.deletion_bitmap.insert(idx);
142 }
143 self.row_expiry.remove(&row_idx);
144 result.pks.push(pk);
145 result.expired_count += 1;
146 }
147 }
148
149 result
150 }
151
152 pub fn upsert(
159 &mut self,
160 values: &[(&str, ColumnValue)],
161 ) -> Result<UpsertResult, ColumnStoreError> {
162 let Some(ref pk_col) = self.primary_key_column else {
163 return Err(ColumnStoreError::MissingPrimaryKey);
164 };
165
166 let pk_value = values
167 .iter()
168 .find(|(name, _)| *name == pk_col.as_str())
169 .and_then(|(_, value)| {
170 if let ColumnValue::Int(v) = value {
171 Some(*v)
172 } else {
173 None
174 }
175 })
176 .ok_or(ColumnStoreError::MissingPrimaryKey)?;
177
178 for (col_name, _) in values {
179 if *col_name != pk_col.as_str() && !self.columns.contains_key(*col_name) {
180 return Err(ColumnStoreError::ColumnNotFound((*col_name).to_string()));
181 }
182 }
183
184 if let Some(&row_idx) = self.primary_index.get(&pk_value) {
185 if self.deleted_rows.contains(&row_idx) {
186 for (col_name, value) in values {
187 if *col_name != pk_col.as_str() {
188 if let Some(col) = self.columns.get(*col_name) {
189 if !matches!(value, ColumnValue::Null) {
190 Self::validate_type_match(col, value)?;
191 }
192 }
193 }
194 }
195 self.deleted_rows.remove(&row_idx);
196 self.row_expiry.remove(&row_idx);
197 let value_map: std::collections::HashMap<&str, &ColumnValue> =
198 values.iter().map(|(k, v)| (*k, v)).collect();
199 let col_names: Vec<String> = self.columns.keys().cloned().collect();
200 for col_name in col_names {
201 if col_name != *pk_col {
202 if let Some(col) = self.columns.get_mut(&col_name) {
203 if let Some(value) = value_map.get(col_name.as_str()) {
204 Self::set_column_value(col, row_idx, (*value).clone())?;
205 } else {
206 Self::set_column_value(col, row_idx, ColumnValue::Null)?;
207 }
208 }
209 }
210 }
211 return Ok(UpsertResult::Inserted);
212 }
213
214 for (col_name, value) in values {
215 if *col_name != pk_col.as_str() {
216 if let Some(col) = self.columns.get(*col_name) {
217 if !matches!(value, ColumnValue::Null) {
218 Self::validate_type_match(col, value)?;
219 }
220 }
221 }
222 }
223 for (col_name, value) in values {
224 if *col_name != pk_col.as_str() {
225 if let Some(col) = self.columns.get_mut(*col_name) {
226 Self::set_column_value(col, row_idx, value.clone())?;
227 }
228 }
229 }
230 Ok(UpsertResult::Updated)
231 } else {
232 self.insert_row(values)?;
233 Ok(UpsertResult::Inserted)
234 }
235 }
236
237 pub fn batch_upsert(&mut self, rows: &[Vec<(&str, ColumnValue)>]) -> BatchUpsertResult {
239 let mut result = BatchUpsertResult::default();
240
241 for row in rows {
242 match self.upsert(row) {
243 Ok(UpsertResult::Inserted) => result.inserted += 1,
244 Ok(UpsertResult::Updated) => result.updated += 1,
245 Err(e) => {
246 let pk = row
247 .iter()
248 .find(|(name, _)| {
249 self.primary_key_column
250 .as_ref()
251 .is_some_and(|pk| pk.as_str() == *name)
252 })
253 .and_then(|(_, v)| {
254 if let ColumnValue::Int(pk) = v {
255 Some(*pk)
256 } else {
257 None
258 }
259 })
260 .unwrap_or(0);
261 result.failed.push((pk, e));
262 }
263 }
264 }
265
266 result
267 }
268
269 pub(super) fn validate_type_match(
270 col: &TypedColumn,
271 value: &ColumnValue,
272 ) -> Result<(), ColumnStoreError> {
273 let type_matches = matches!(
274 (col, value),
275 (TypedColumn::Int(_), ColumnValue::Int(_))
276 | (TypedColumn::Float(_), ColumnValue::Float(_))
277 | (TypedColumn::String(_), ColumnValue::String(_))
278 | (TypedColumn::Bool(_), ColumnValue::Bool(_))
279 | (_, ColumnValue::Null)
280 );
281
282 if type_matches {
283 Ok(())
284 } else {
285 Err(ColumnStoreError::TypeMismatch {
286 expected: Self::column_type_name(col),
287 actual: Self::value_type_name(value),
288 })
289 }
290 }
291
292 pub(super) fn set_column_value(
293 col: &mut TypedColumn,
294 row_idx: usize,
295 value: ColumnValue,
296 ) -> Result<(), ColumnStoreError> {
297 if matches!(value, ColumnValue::Null) {
298 match col {
299 TypedColumn::Int(vec) => {
300 if row_idx >= vec.len() {
301 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
302 }
303 vec[row_idx] = None;
304 }
305 TypedColumn::Float(vec) => {
306 if row_idx >= vec.len() {
307 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
308 }
309 vec[row_idx] = None;
310 }
311 TypedColumn::String(vec) => {
312 if row_idx >= vec.len() {
313 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
314 }
315 vec[row_idx] = None;
316 }
317 TypedColumn::Bool(vec) => {
318 if row_idx >= vec.len() {
319 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
320 }
321 vec[row_idx] = None;
322 }
323 }
324 return Ok(());
325 }
326
327 match (col, value) {
328 (TypedColumn::Int(vec), ColumnValue::Int(v)) => {
329 if row_idx >= vec.len() {
330 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
331 }
332 vec[row_idx] = Some(v);
333 Ok(())
334 }
335 (TypedColumn::Float(vec), ColumnValue::Float(v)) => {
336 if row_idx >= vec.len() {
337 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
338 }
339 vec[row_idx] = Some(v);
340 Ok(())
341 }
342 (TypedColumn::String(vec), ColumnValue::String(v)) => {
343 if row_idx >= vec.len() {
344 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
345 }
346 vec[row_idx] = Some(v);
347 Ok(())
348 }
349 (TypedColumn::Bool(vec), ColumnValue::Bool(v)) => {
350 if row_idx >= vec.len() {
351 return Err(ColumnStoreError::IndexOutOfBounds(row_idx));
352 }
353 vec[row_idx] = Some(v);
354 Ok(())
355 }
356 (col, value) => Err(ColumnStoreError::TypeMismatch {
357 expected: Self::column_type_name(col),
358 actual: Self::value_type_name(&value),
359 }),
360 }
361 }
362
363 pub(super) fn column_type_name(col: &TypedColumn) -> String {
364 match col {
365 TypedColumn::Int(_) => "Int".to_string(),
366 TypedColumn::Float(_) => "Float".to_string(),
367 TypedColumn::String(_) => "String".to_string(),
368 TypedColumn::Bool(_) => "Bool".to_string(),
369 }
370 }
371
372 pub(super) fn value_type_name(value: &ColumnValue) -> String {
373 match value {
374 ColumnValue::Int(_) => "Int".to_string(),
375 ColumnValue::Float(_) => "Float".to_string(),
376 ColumnValue::String(_) => "String".to_string(),
377 ColumnValue::Bool(_) => "Bool".to_string(),
378 ColumnValue::Null => "Null".to_string(),
379 }
380 }
381
382 pub(super) fn now_timestamp() -> u64 {
383 std::time::SystemTime::now()
384 .duration_since(std::time::UNIX_EPOCH)
385 .map(|d| d.as_secs())
386 .unwrap_or(0)
387 }
388}
389
390