1use crate::error::{DbxError, DbxResult};
7use crate::storage::StorageBackend;
8use crate::storage::kv_adapter::{batch_to_kv, kv_to_batch, merge_batches};
9use crate::storage::versioned_batch::VersionedBatch;
10use arrow::array::{Array, BinaryArray, BooleanArray};
11use arrow::compute::{filter, sort_to_indices, take};
12use arrow::record_batch::RecordBatch;
13use dashmap::DashMap;
14use std::ops::RangeBounds;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18pub struct ColumnarDelta {
25 tables: DashMap<String, Vec<VersionedBatch>>,
27
28 sequence: AtomicU64,
30
31 flush_threshold: usize,
33
34 row_count: AtomicU64,
36}
37
38impl ColumnarDelta {
39 pub fn new(flush_threshold: usize) -> Self {
41 Self {
42 tables: DashMap::new(),
43 sequence: AtomicU64::new(0),
44 flush_threshold,
45 row_count: AtomicU64::new(0),
46 }
47 }
48
49 pub fn insert_versioned_batch(
53 &self,
54 table: &str,
55 batch: RecordBatch,
56 begin_ts: u64,
57 ) -> DbxResult<()> {
58 let sequence = self.sequence.fetch_add(1, Ordering::SeqCst);
59 let versioned = VersionedBatch::new(Arc::new(batch.clone()), begin_ts, sequence);
60
61 let row_count = batch.num_rows();
62 self.row_count.fetch_add(row_count as u64, Ordering::SeqCst);
63
64 self.tables
65 .entry(table.to_string())
66 .or_default()
67 .push(versioned);
68
69 Ok(())
70 }
71
72 pub fn get_visible_batches(&self, table: &str, read_ts: u64) -> Vec<Arc<RecordBatch>> {
74 if let Some(batches) = self.tables.get(table) {
75 batches
76 .iter()
77 .filter(|b| b.is_visible(read_ts))
78 .map(|b| Arc::clone(&b.data))
79 .collect()
80 } else {
81 Vec::new()
82 }
83 }
84
85 pub fn should_flush(&self) -> bool {
87 self.row_count.load(Ordering::SeqCst) as usize >= self.flush_threshold
88 }
89
90 pub fn row_count(&self) -> usize {
92 self.row_count.load(Ordering::SeqCst) as usize
93 }
94
95 pub fn drain_table(&self, table: &str) -> Vec<VersionedBatch> {
99 if let Some((_, batches)) = self.tables.remove(table) {
100 let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
101 self.row_count.fetch_sub(row_count as u64, Ordering::SeqCst);
102 batches
103 } else {
104 Vec::new()
105 }
106 }
107
108 pub fn table_names(&self) -> Vec<String> {
110 self.tables
111 .iter()
112 .map(|entry| entry.key().clone())
113 .collect()
114 }
115
116 #[cfg(test)]
118 pub fn clear(&self) {
119 self.tables.clear();
120 self.row_count.store(0, Ordering::SeqCst);
121 }
122}
123
124fn find_key_in_batch(batch: &RecordBatch, target_key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
133 if batch.num_rows() == 0 {
134 return Ok(None);
135 }
136
137 let key_array = batch
139 .column(0)
140 .as_any()
141 .downcast_ref::<BinaryArray>()
142 .ok_or_else(|| DbxError::Storage("Key column is not BinaryArray".into()))?;
143
144 for i in 0..key_array.len() {
146 if !key_array.is_null(i) && key_array.value(i) == target_key {
147 let value_array = batch
149 .column(1)
150 .as_any()
151 .downcast_ref::<BinaryArray>()
152 .ok_or_else(|| DbxError::Storage("Value column is not BinaryArray".into()))?;
153
154 if !value_array.is_null(i) {
155 return Ok(Some(value_array.value(i).to_vec()));
156 }
157 }
158 }
159
160 Ok(None)
161}
162
163fn apply_range_filter<R: RangeBounds<Vec<u8>>>(
165 batch: &RecordBatch,
166 range: R,
167) -> DbxResult<RecordBatch> {
168 if batch.num_rows() == 0 {
169 return Ok(batch.clone());
170 }
171
172 let key_array = batch
173 .column(0)
174 .as_any()
175 .downcast_ref::<BinaryArray>()
176 .ok_or_else(|| DbxError::Storage("Key column is not BinaryArray".into()))?;
177
178 let mut mask = vec![true; batch.num_rows()];
180
181 for (i, mask_val) in mask.iter_mut().enumerate().take(key_array.len()) {
182 if !key_array.is_null(i) {
183 let key = key_array.value(i).to_vec();
184 *mask_val = range.contains(&key);
185 } else {
186 *mask_val = false;
187 }
188 }
189
190 let mask_array = BooleanArray::from(mask);
192
193 let filtered_columns: Vec<Arc<dyn Array>> = batch
195 .columns()
196 .iter()
197 .map(|col| filter(col.as_ref(), &mask_array))
198 .collect::<Result<Vec<_>, _>>()?;
199
200 let filtered = RecordBatch::try_new(batch.schema(), filtered_columns)?;
202
203 Ok(filtered)
204}
205
206fn sort_batch_by_key(batch: &RecordBatch) -> DbxResult<RecordBatch> {
208 if batch.num_rows() == 0 {
209 return Ok(batch.clone());
210 }
211
212 let indices = sort_to_indices(batch.column(0), None, None)?;
214
215 let sorted_columns: Vec<Arc<dyn Array>> = batch
217 .columns()
218 .iter()
219 .map(|col| take(col.as_ref(), &indices, None))
220 .collect::<Result<Vec<_>, _>>()?;
221
222 let sorted_batch = RecordBatch::try_new(batch.schema(), sorted_columns)?;
224
225 Ok(sorted_batch)
226}
227
228impl StorageBackend for ColumnarDelta {
229 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
230 let batch = kv_to_batch(vec![(key.to_vec(), value.to_vec())])?;
232
233 self.insert_versioned_batch(table, batch, 0)?;
235 Ok(())
236 }
237
238 fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
239 if rows.is_empty() {
240 return Ok(());
241 }
242
243 let batch = kv_to_batch(rows)?;
245
246 self.insert_versioned_batch(table, batch, 0)?;
248 Ok(())
249 }
250
251 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
252 let batches = self.get_visible_batches(table, u64::MAX);
254
255 for batch in batches {
257 if let Some(value) = find_key_in_batch(&batch, key)? {
258 return Ok(Some(value));
259 }
260 }
261
262 Ok(None)
263 }
264
265 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
266 if self.get(table, key)?.is_none() {
269 return Ok(false);
270 }
271
272 let tombstone_batch = kv_to_batch(vec![(key.to_vec(), Vec::new())])?;
274 self.insert_versioned_batch(table, tombstone_batch, 0)?;
275 Ok(true)
276 }
277
278 fn scan<R: RangeBounds<Vec<u8>> + Clone>(
279 &self,
280 table: &str,
281 range: R,
282 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
283 let batches = self.get_visible_batches(table, u64::MAX);
285
286 if batches.is_empty() {
287 return Ok(Vec::new());
288 }
289
290 let merged = merge_batches(batches)?;
292
293 let filtered = apply_range_filter(&merged, range)?;
295
296 let sorted = sort_batch_by_key(&filtered)?;
298
299 batch_to_kv(&sorted)
301 }
302
303 fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
304 &self,
305 table: &str,
306 range: R,
307 ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
308 let results = self.scan(table, range)?;
309 Ok(results.into_iter().next())
310 }
311
312 fn flush(&self) -> DbxResult<()> {
313 Ok(())
315 }
316
317 fn count(&self, table: &str) -> DbxResult<usize> {
318 let batches = self.get_visible_batches(table, u64::MAX);
319 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
320 Ok(total)
321 }
322
323 fn table_names(&self) -> DbxResult<Vec<String>> {
324 Ok(ColumnarDelta::table_names(self))
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use arrow::array::{Int32Array, StringArray};
332 use arrow::datatypes::{DataType, Field, Schema};
333
334 fn create_test_batch(ids: Vec<i32>, names: Vec<&str>) -> RecordBatch {
335 let schema = Arc::new(Schema::new(vec![
336 Field::new("id", DataType::Int32, false),
337 Field::new("name", DataType::Utf8, false),
338 ]));
339
340 let id_array = Int32Array::from(ids);
341 let name_array = StringArray::from(names);
342
343 RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
344 }
345
346 #[test]
347 fn test_insert_and_retrieve() {
348 let delta = ColumnarDelta::new(1000);
349
350 let batch1 = create_test_batch(vec![1, 2], vec!["Alice", "Bob"]);
351 delta.insert_versioned_batch("users", batch1, 10).unwrap();
352
353 let visible = delta.get_visible_batches("users", 15);
354 assert_eq!(visible.len(), 1);
355 assert_eq!(visible[0].num_rows(), 2);
356 }
357
358 #[test]
359 fn test_snapshot_isolation() {
360 let delta = ColumnarDelta::new(1000);
361
362 let batch1 = create_test_batch(vec![1], vec!["Alice"]);
364 delta.insert_versioned_batch("users", batch1, 10).unwrap();
365
366 let batch2 = create_test_batch(vec![2], vec!["Bob"]);
368 delta.insert_versioned_batch("users", batch2, 20).unwrap();
369
370 let visible = delta.get_visible_batches("users", 15);
372 assert_eq!(visible.len(), 1);
373 assert_eq!(visible[0].num_rows(), 1);
374
375 let visible = delta.get_visible_batches("users", 25);
377 assert_eq!(visible.len(), 2);
378 }
379
380 #[test]
381 fn test_flush_threshold() {
382 let delta = ColumnarDelta::new(5);
383
384 let batch1 = create_test_batch(vec![1, 2, 3], vec!["A", "B", "C"]);
385 delta.insert_versioned_batch("users", batch1, 10).unwrap();
386
387 assert!(!delta.should_flush()); let batch2 = create_test_batch(vec![4, 5], vec!["D", "E"]);
390 delta.insert_versioned_batch("users", batch2, 20).unwrap();
391
392 assert!(delta.should_flush()); }
394
395 #[test]
396 fn test_drain_table() {
397 let delta = ColumnarDelta::new(1000);
398
399 let batch1 = create_test_batch(vec![1, 2], vec!["Alice", "Bob"]);
400 delta.insert_versioned_batch("users", batch1, 10).unwrap();
401
402 assert_eq!(delta.row_count(), 2);
403
404 let drained = delta.drain_table("users");
405 assert_eq!(drained.len(), 1);
406 assert_eq!(delta.row_count(), 0);
407
408 let visible = delta.get_visible_batches("users", 15);
410 assert_eq!(visible.len(), 0);
411 }
412
413 #[test]
414 fn test_multiple_tables() {
415 let delta = ColumnarDelta::new(1000);
416
417 let batch1 = create_test_batch(vec![1], vec!["Alice"]);
418 delta.insert_versioned_batch("users", batch1, 10).unwrap();
419
420 let batch2 = create_test_batch(vec![100], vec!["Order1"]);
421 delta.insert_versioned_batch("orders", batch2, 10).unwrap();
422
423 let tables = delta.table_names();
424 assert_eq!(tables.len(), 2);
425 assert!(tables.contains(&"users".to_string()));
426 assert!(tables.contains(&"orders".to_string()));
427 }
428
429 #[test]
430 fn test_arc_sharing() {
431 let delta = ColumnarDelta::new(1000);
432
433 let batch = create_test_batch(vec![1, 2], vec!["Alice", "Bob"]);
434 delta.insert_versioned_batch("users", batch, 10).unwrap();
435
436 let visible1 = delta.get_visible_batches("users", 15);
438 let visible2 = delta.get_visible_batches("users", 15);
439
440 assert!(Arc::ptr_eq(&visible1[0], &visible2[0]));
442 }
443
444 #[test]
447 fn test_storage_backend_insert_get() {
448 use crate::storage::StorageBackend;
449
450 let delta = ColumnarDelta::new(1000);
451
452 delta.insert("users", b"key1", b"value1").unwrap();
453 delta.insert("users", b"key2", b"value2").unwrap();
454
455 assert_eq!(
456 delta.get("users", b"key1").unwrap(),
457 Some(b"value1".to_vec())
458 );
459 assert_eq!(
460 delta.get("users", b"key2").unwrap(),
461 Some(b"value2".to_vec())
462 );
463 assert_eq!(delta.get("users", b"key3").unwrap(), None);
464 }
465
466 #[test]
467 fn test_storage_backend_batch_insert() {
468 use crate::storage::StorageBackend;
469
470 let delta = ColumnarDelta::new(1000);
471
472 let rows = vec![
473 (b"key1".to_vec(), b"value1".to_vec()),
474 (b"key2".to_vec(), b"value2".to_vec()),
475 (b"key3".to_vec(), b"value3".to_vec()),
476 ];
477
478 StorageBackend::insert_batch(&delta, "users", rows).unwrap();
479
480 assert_eq!(delta.count("users").unwrap(), 3);
481 assert_eq!(
482 delta.get("users", b"key2").unwrap(),
483 Some(b"value2".to_vec())
484 );
485 }
486
487 #[test]
488 fn test_storage_backend_scan() {
489 use crate::storage::StorageBackend;
490
491 let delta = ColumnarDelta::new(1000);
492
493 delta.insert("users", b"key1", b"value1").unwrap();
494 delta.insert("users", b"key2", b"value2").unwrap();
495 delta.insert("users", b"key3", b"value3").unwrap();
496
497 let results = delta.scan("users", Vec::<u8>::new()..).unwrap();
498 assert_eq!(results.len(), 3);
499
500 assert_eq!(results[0].0, b"key1");
502 assert_eq!(results[1].0, b"key2");
503 assert_eq!(results[2].0, b"key3");
504 }
505
506 #[test]
507 fn test_storage_backend_count() {
508 use crate::storage::StorageBackend;
509
510 let delta = ColumnarDelta::new(1000);
511
512 assert_eq!(delta.count("users").unwrap(), 0);
513
514 delta.insert("users", b"key1", b"value1").unwrap();
515 assert_eq!(delta.count("users").unwrap(), 1);
516
517 delta.insert("users", b"key2", b"value2").unwrap();
518 assert_eq!(delta.count("users").unwrap(), 2);
519 }
520
521 #[test]
522 fn test_storage_backend_table_names() {
523 use crate::storage::StorageBackend;
524
525 let delta = ColumnarDelta::new(1000);
526
527 delta.insert("users", b"key1", b"value1").unwrap();
528 delta.insert("orders", b"key2", b"value2").unwrap();
529
530 let tables = ColumnarDelta::table_names(&delta);
531 assert_eq!(tables.len(), 2);
532 assert!(tables.contains(&"users".to_string()));
533 assert!(tables.contains(&"orders".to_string()));
534 }
535}