Skip to main content

dbx_core/storage/
columnar_delta.rs

1//! Columnar Delta Store — MVCC-aware in-memory columnar buffer.
2//!
3//! Replaces the row-based Delta Store with a columnar implementation
4//! using Arrow RecordBatch and VersionedBatch for MVCC Snapshot Isolation.
5
6use crate::error::{DbxError, DbxResult};
7use crate::storage::StorageBackend;
8use crate::storage::kv_adapter::{batch_to_kv, kv_to_batch, merge_batches};
9use crate::storage::versioned_batch::VersionedBatch;
10use arrow::array::{Array, BinaryArray, BooleanArray};
11use arrow::compute::{filter, sort_to_indices, take};
12use arrow::record_batch::RecordBatch;
13use dashmap::DashMap;
14use std::ops::RangeBounds;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18/// Columnar Delta Store with MVCC support.
19///
20/// Stores versioned RecordBatches for each table, enabling:
21/// - Snapshot Isolation via VersionedBatch
22/// - Efficient columnar scans via Arrow
23/// - Memory sharing via `Arc<RecordBatch>`
24pub struct ColumnarDelta {
25    /// Table name → list of versioned batches
26    tables: DashMap<String, Vec<VersionedBatch>>,
27
28    /// Global sequence counter for ordering batches
29    sequence: AtomicU64,
30
31    /// Flush threshold (number of rows across all tables)
32    flush_threshold: usize,
33
34    /// Current total row count across all tables
35    row_count: AtomicU64,
36}
37
38impl ColumnarDelta {
39    /// Create a new ColumnarDelta with the given flush threshold.
40    pub fn new(flush_threshold: usize) -> Self {
41        Self {
42            tables: DashMap::new(),
43            sequence: AtomicU64::new(0),
44            flush_threshold,
45            row_count: AtomicU64::new(0),
46        }
47    }
48
49    /// Insert a versioned batch for a table.
50    ///
51    /// The batch will be assigned a sequence number and begin_ts.
52    pub fn insert_versioned_batch(
53        &self,
54        table: &str,
55        batch: RecordBatch,
56        begin_ts: u64,
57    ) -> DbxResult<()> {
58        let sequence = self.sequence.fetch_add(1, Ordering::SeqCst);
59        let versioned = VersionedBatch::new(Arc::new(batch.clone()), begin_ts, sequence);
60
61        let row_count = batch.num_rows();
62        self.row_count.fetch_add(row_count as u64, Ordering::SeqCst);
63
64        self.tables
65            .entry(table.to_string())
66            .or_default()
67            .push(versioned);
68
69        Ok(())
70    }
71
72    /// Get all batches visible to a snapshot at the given read_ts.
73    pub fn get_visible_batches(&self, table: &str, read_ts: u64) -> Vec<Arc<RecordBatch>> {
74        if let Some(batches) = self.tables.get(table) {
75            batches
76                .iter()
77                .filter(|b| b.is_visible(read_ts))
78                .map(|b| Arc::clone(&b.data))
79                .collect()
80        } else {
81            Vec::new()
82        }
83    }
84
85    /// Check if flush is needed based on row count threshold.
86    pub fn should_flush(&self) -> bool {
87        self.row_count.load(Ordering::SeqCst) as usize >= self.flush_threshold
88    }
89
90    /// Get the current row count across all tables.
91    pub fn row_count(&self) -> usize {
92        self.row_count.load(Ordering::SeqCst) as usize
93    }
94
95    /// Drain all batches from a table (for flushing to WOS/Parquet).
96    ///
97    /// Returns all batches and clears the table.
98    pub fn drain_table(&self, table: &str) -> Vec<VersionedBatch> {
99        if let Some((_, batches)) = self.tables.remove(table) {
100            let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
101            self.row_count.fetch_sub(row_count as u64, Ordering::SeqCst);
102            batches
103        } else {
104            Vec::new()
105        }
106    }
107
108    /// Get all table names.
109    pub fn table_names(&self) -> Vec<String> {
110        self.tables
111            .iter()
112            .map(|entry| entry.key().clone())
113            .collect()
114    }
115
116    /// Clear all data (for testing).
117    #[cfg(test)]
118    pub fn clear(&self) {
119        self.tables.clear();
120        self.row_count.store(0, Ordering::SeqCst);
121    }
122}
123
124// ============================================================================
125// Arrow Compute Kernel Helpers
126// ============================================================================
127
128/// Find a key in a RecordBatch using direct Binary array access.
129///
130/// This is much faster than converting to key-value pairs and searching.
131/// Uses Arrow's zero-copy Binary array access for optimal performance.
132fn find_key_in_batch(batch: &RecordBatch, target_key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
133    if batch.num_rows() == 0 {
134        return Ok(None);
135    }
136
137    // Extract key column (column 0)
138    let key_array = batch
139        .column(0)
140        .as_any()
141        .downcast_ref::<BinaryArray>()
142        .ok_or_else(|| DbxError::Storage("Key column is not BinaryArray".into()))?;
143
144    // Search for matching key
145    for i in 0..key_array.len() {
146        if !key_array.is_null(i) && key_array.value(i) == target_key {
147            // Found! Extract value from column 1
148            let value_array = batch
149                .column(1)
150                .as_any()
151                .downcast_ref::<BinaryArray>()
152                .ok_or_else(|| DbxError::Storage("Value column is not BinaryArray".into()))?;
153
154            if !value_array.is_null(i) {
155                return Ok(Some(value_array.value(i).to_vec()));
156            }
157        }
158    }
159
160    Ok(None)
161}
162
163/// Apply range filter to RecordBatch using Arrow compute.
164fn apply_range_filter<R: RangeBounds<Vec<u8>>>(
165    batch: &RecordBatch,
166    range: R,
167) -> DbxResult<RecordBatch> {
168    if batch.num_rows() == 0 {
169        return Ok(batch.clone());
170    }
171
172    let key_array = batch
173        .column(0)
174        .as_any()
175        .downcast_ref::<BinaryArray>()
176        .ok_or_else(|| DbxError::Storage("Key column is not BinaryArray".into()))?;
177
178    // Build filter mask
179    let mut mask = vec![true; batch.num_rows()];
180
181    for (i, mask_val) in mask.iter_mut().enumerate().take(key_array.len()) {
182        if !key_array.is_null(i) {
183            let key = key_array.value(i).to_vec();
184            *mask_val = range.contains(&key);
185        } else {
186            *mask_val = false;
187        }
188    }
189
190    // Apply filter using Arrow compute
191    let mask_array = BooleanArray::from(mask);
192
193    // Filter each column
194    let filtered_columns: Vec<Arc<dyn Array>> = batch
195        .columns()
196        .iter()
197        .map(|col| filter(col.as_ref(), &mask_array))
198        .collect::<Result<Vec<_>, _>>()?;
199
200    // Create new batch with filtered columns
201    let filtered = RecordBatch::try_new(batch.schema(), filtered_columns)?;
202
203    Ok(filtered)
204}
205
206/// Sort RecordBatch by key column using Arrow compute.
207fn sort_batch_by_key(batch: &RecordBatch) -> DbxResult<RecordBatch> {
208    if batch.num_rows() == 0 {
209        return Ok(batch.clone());
210    }
211
212    // Get sort indices for key column (column 0)
213    let indices = sort_to_indices(batch.column(0), None, None)?;
214
215    // Apply indices to all columns
216    let sorted_columns: Vec<Arc<dyn Array>> = batch
217        .columns()
218        .iter()
219        .map(|col| take(col.as_ref(), &indices, None))
220        .collect::<Result<Vec<_>, _>>()?;
221
222    // Create new batch with sorted columns
223    let sorted_batch = RecordBatch::try_new(batch.schema(), sorted_columns)?;
224
225    Ok(sorted_batch)
226}
227
228impl StorageBackend for ColumnarDelta {
229    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
230        // Convert single key-value pair to RecordBatch
231        let batch = kv_to_batch(vec![(key.to_vec(), value.to_vec())])?;
232
233        // Use current timestamp (0 for now, will be set by Database)
234        self.insert_versioned_batch(table, batch, 0)?;
235        Ok(())
236    }
237
238    fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
239        if rows.is_empty() {
240            return Ok(());
241        }
242
243        // Convert key-value pairs to RecordBatch
244        let batch = kv_to_batch(rows)?;
245
246        // Insert with timestamp 0 (will be overridden by Database)
247        self.insert_versioned_batch(table, batch, 0)?;
248        Ok(())
249    }
250
251    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
252        // Get all visible batches (using max timestamp for now)
253        let batches = self.get_visible_batches(table, u64::MAX);
254
255        // Search through batches using Arrow operations (no conversion needed)
256        for batch in batches {
257            if let Some(value) = find_key_in_batch(&batch, key)? {
258                return Ok(Some(value));
259            }
260        }
261
262        Ok(None)
263    }
264
265    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
266        // Tombstone support: insert a batch with an empty value as a deletion marker
267        // The key exists check is done first
268        if self.get(table, key)?.is_none() {
269            return Ok(false);
270        }
271
272        // Insert tombstone: key with empty value signals deletion
273        let tombstone_batch = kv_to_batch(vec![(key.to_vec(), Vec::new())])?;
274        self.insert_versioned_batch(table, tombstone_batch, 0)?;
275        Ok(true)
276    }
277
278    fn scan<R: RangeBounds<Vec<u8>> + Clone>(
279        &self,
280        table: &str,
281        range: R,
282    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
283        // Get all visible batches
284        let batches = self.get_visible_batches(table, u64::MAX);
285
286        if batches.is_empty() {
287            return Ok(Vec::new());
288        }
289
290        // 1. Merge all batches
291        let merged = merge_batches(batches)?;
292
293        // 2. Apply range filter using Arrow compute
294        let filtered = apply_range_filter(&merged, range)?;
295
296        // 3. Sort by key using Arrow compute
297        let sorted = sort_batch_by_key(&filtered)?;
298
299        // 4. Convert only the filtered/sorted results
300        batch_to_kv(&sorted)
301    }
302
303    fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
304        &self,
305        table: &str,
306        range: R,
307    ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
308        let results = self.scan(table, range)?;
309        Ok(results.into_iter().next())
310    }
311
312    fn flush(&self) -> DbxResult<()> {
313        // Flushing is handled by Database, not by ColumnarDelta itself
314        Ok(())
315    }
316
317    fn count(&self, table: &str) -> DbxResult<usize> {
318        let batches = self.get_visible_batches(table, u64::MAX);
319        let total: usize = batches.iter().map(|b| b.num_rows()).sum();
320        Ok(total)
321    }
322
323    fn table_names(&self) -> DbxResult<Vec<String>> {
324        Ok(ColumnarDelta::table_names(self))
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use arrow::array::{Int32Array, StringArray};
332    use arrow::datatypes::{DataType, Field, Schema};
333
334    fn create_test_batch(ids: Vec<i32>, names: Vec<&str>) -> RecordBatch {
335        let schema = Arc::new(Schema::new(vec![
336            Field::new("id", DataType::Int32, false),
337            Field::new("name", DataType::Utf8, false),
338        ]));
339
340        let id_array = Int32Array::from(ids);
341        let name_array = StringArray::from(names);
342
343        RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
344    }
345
346    #[test]
347    fn test_insert_and_retrieve() {
348        let delta = ColumnarDelta::new(1000);
349
350        let batch1 = create_test_batch(vec![1, 2], vec!["Alice", "Bob"]);
351        delta.insert_versioned_batch("users", batch1, 10).unwrap();
352
353        let visible = delta.get_visible_batches("users", 15);
354        assert_eq!(visible.len(), 1);
355        assert_eq!(visible[0].num_rows(), 2);
356    }
357
358    #[test]
359    fn test_snapshot_isolation() {
360        let delta = ColumnarDelta::new(1000);
361
362        // Insert batch at ts=10
363        let batch1 = create_test_batch(vec![1], vec!["Alice"]);
364        delta.insert_versioned_batch("users", batch1, 10).unwrap();
365
366        // Insert batch at ts=20
367        let batch2 = create_test_batch(vec![2], vec!["Bob"]);
368        delta.insert_versioned_batch("users", batch2, 20).unwrap();
369
370        // Snapshot at ts=15 should only see batch1
371        let visible = delta.get_visible_batches("users", 15);
372        assert_eq!(visible.len(), 1);
373        assert_eq!(visible[0].num_rows(), 1);
374
375        // Snapshot at ts=25 should see both batches
376        let visible = delta.get_visible_batches("users", 25);
377        assert_eq!(visible.len(), 2);
378    }
379
380    #[test]
381    fn test_flush_threshold() {
382        let delta = ColumnarDelta::new(5);
383
384        let batch1 = create_test_batch(vec![1, 2, 3], vec!["A", "B", "C"]);
385        delta.insert_versioned_batch("users", batch1, 10).unwrap();
386
387        assert!(!delta.should_flush()); // 3 rows < 5
388
389        let batch2 = create_test_batch(vec![4, 5], vec!["D", "E"]);
390        delta.insert_versioned_batch("users", batch2, 20).unwrap();
391
392        assert!(delta.should_flush()); // 5 rows >= 5
393    }
394
395    #[test]
396    fn test_drain_table() {
397        let delta = ColumnarDelta::new(1000);
398
399        let batch1 = create_test_batch(vec![1, 2], vec!["Alice", "Bob"]);
400        delta.insert_versioned_batch("users", batch1, 10).unwrap();
401
402        assert_eq!(delta.row_count(), 2);
403
404        let drained = delta.drain_table("users");
405        assert_eq!(drained.len(), 1);
406        assert_eq!(delta.row_count(), 0);
407
408        // Table should be empty now
409        let visible = delta.get_visible_batches("users", 15);
410        assert_eq!(visible.len(), 0);
411    }
412
413    #[test]
414    fn test_multiple_tables() {
415        let delta = ColumnarDelta::new(1000);
416
417        let batch1 = create_test_batch(vec![1], vec!["Alice"]);
418        delta.insert_versioned_batch("users", batch1, 10).unwrap();
419
420        let batch2 = create_test_batch(vec![100], vec!["Order1"]);
421        delta.insert_versioned_batch("orders", batch2, 10).unwrap();
422
423        let tables = delta.table_names();
424        assert_eq!(tables.len(), 2);
425        assert!(tables.contains(&"users".to_string()));
426        assert!(tables.contains(&"orders".to_string()));
427    }
428
429    #[test]
430    fn test_arc_sharing() {
431        let delta = ColumnarDelta::new(1000);
432
433        let batch = create_test_batch(vec![1, 2], vec!["Alice", "Bob"]);
434        delta.insert_versioned_batch("users", batch, 10).unwrap();
435
436        // Get visible batches multiple times
437        let visible1 = delta.get_visible_batches("users", 15);
438        let visible2 = delta.get_visible_batches("users", 15);
439
440        // Both should share the same Arc
441        assert!(Arc::ptr_eq(&visible1[0], &visible2[0]));
442    }
443
444    // StorageBackend trait tests
445
446    #[test]
447    fn test_storage_backend_insert_get() {
448        use crate::storage::StorageBackend;
449
450        let delta = ColumnarDelta::new(1000);
451
452        delta.insert("users", b"key1", b"value1").unwrap();
453        delta.insert("users", b"key2", b"value2").unwrap();
454
455        assert_eq!(
456            delta.get("users", b"key1").unwrap(),
457            Some(b"value1".to_vec())
458        );
459        assert_eq!(
460            delta.get("users", b"key2").unwrap(),
461            Some(b"value2".to_vec())
462        );
463        assert_eq!(delta.get("users", b"key3").unwrap(), None);
464    }
465
466    #[test]
467    fn test_storage_backend_batch_insert() {
468        use crate::storage::StorageBackend;
469
470        let delta = ColumnarDelta::new(1000);
471
472        let rows = vec![
473            (b"key1".to_vec(), b"value1".to_vec()),
474            (b"key2".to_vec(), b"value2".to_vec()),
475            (b"key3".to_vec(), b"value3".to_vec()),
476        ];
477
478        StorageBackend::insert_batch(&delta, "users", rows).unwrap();
479
480        assert_eq!(delta.count("users").unwrap(), 3);
481        assert_eq!(
482            delta.get("users", b"key2").unwrap(),
483            Some(b"value2".to_vec())
484        );
485    }
486
487    #[test]
488    fn test_storage_backend_scan() {
489        use crate::storage::StorageBackend;
490
491        let delta = ColumnarDelta::new(1000);
492
493        delta.insert("users", b"key1", b"value1").unwrap();
494        delta.insert("users", b"key2", b"value2").unwrap();
495        delta.insert("users", b"key3", b"value3").unwrap();
496
497        let results = delta.scan("users", Vec::<u8>::new()..).unwrap();
498        assert_eq!(results.len(), 3);
499
500        // Results should be sorted by key
501        assert_eq!(results[0].0, b"key1");
502        assert_eq!(results[1].0, b"key2");
503        assert_eq!(results[2].0, b"key3");
504    }
505
506    #[test]
507    fn test_storage_backend_count() {
508        use crate::storage::StorageBackend;
509
510        let delta = ColumnarDelta::new(1000);
511
512        assert_eq!(delta.count("users").unwrap(), 0);
513
514        delta.insert("users", b"key1", b"value1").unwrap();
515        assert_eq!(delta.count("users").unwrap(), 1);
516
517        delta.insert("users", b"key2", b"value2").unwrap();
518        assert_eq!(delta.count("users").unwrap(), 2);
519    }
520
521    #[test]
522    fn test_storage_backend_table_names() {
523        use crate::storage::StorageBackend;
524
525        let delta = ColumnarDelta::new(1000);
526
527        delta.insert("users", b"key1", b"value1").unwrap();
528        delta.insert("orders", b"key2", b"value2").unwrap();
529
530        let tables = ColumnarDelta::table_names(&delta);
531        assert_eq!(tables.len(), 2);
532        assert!(tables.contains(&"users".to_string()));
533        assert!(tables.contains(&"orders".to_string()));
534    }
535}