Skip to main content

dbx_core/engine/
delta_variant.rs

1//! Delta Store Variant — supports both row-based and columnar implementations
2
3use crate::error::DbxResult;
4use crate::storage::StorageBackend;
5use crate::storage::columnar_delta::ColumnarDelta;
6use crate::storage::delta_store::DeltaStore;
7use std::sync::Arc;
8
9/// Delta Store variant — supports both row-based and columnar implementations.
10pub enum DeltaVariant {
11    RowBased(Arc<DeltaStore>),
12    Columnar(Arc<ColumnarDelta>),
13}
14
15impl DeltaVariant {
16    pub fn should_flush(&self) -> bool {
17        match self {
18            Self::RowBased(delta) => delta.should_flush(),
19            Self::Columnar(delta) => delta.should_flush(),
20        }
21    }
22
23    pub fn entry_count(&self) -> usize {
24        match self {
25            Self::RowBased(delta) => delta.entry_count(),
26            Self::Columnar(delta) => delta.row_count(),
27        }
28    }
29
30    /// Drain all data from the store.
31    /// Returns table→entries mapping for flushing to WOS.
32    #[allow(clippy::type_complexity)]
33    pub fn drain_all(&self) -> Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)> {
34        match self {
35            Self::RowBased(delta) => delta.drain_all(),
36            Self::Columnar(delta) => {
37                use crate::storage::kv_adapter::{batch_to_kv, merge_batches};
38
39                // Get all table names
40                let table_names = delta.table_names();
41                let mut result = Vec::new();
42
43                for table in table_names {
44                    // Drain all batches from this table
45                    let batches = delta.drain_table(&table);
46                    if !batches.is_empty() {
47                        // Merge all batches
48                        let batch_refs: Vec<Arc<arrow::record_batch::RecordBatch>> =
49                            batches.iter().map(|vb| Arc::clone(&vb.data)).collect();
50
51                        if let Ok(merged) = merge_batches(batch_refs) {
52                            // Convert to key-value pairs
53                            if let Ok(rows) = batch_to_kv(&merged) {
54                                result.push((table, rows));
55                            }
56                        }
57                    }
58                }
59
60                result
61            }
62        }
63    }
64}
65
66impl StorageBackend for DeltaVariant {
67    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
68        match self {
69            Self::RowBased(delta) => delta.insert(table, key, value),
70            Self::Columnar(delta) => delta.insert(table, key, value),
71        }
72    }
73
74    fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
75        match self {
76            Self::RowBased(delta) => delta.insert_batch(table, rows),
77            Self::Columnar(delta) => delta.insert_batch(table, rows),
78        }
79    }
80
81    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
82        match self {
83            Self::RowBased(delta) => delta.get(table, key),
84            Self::Columnar(delta) => delta.get(table, key),
85        }
86    }
87
88    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
89        match self {
90            Self::RowBased(delta) => delta.delete(table, key),
91            Self::Columnar(delta) => delta.delete(table, key),
92        }
93    }
94
95    fn scan<R: std::ops::RangeBounds<Vec<u8>> + Clone>(
96        &self,
97        table: &str,
98        range: R,
99    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
100        match self {
101            Self::RowBased(delta) => delta.scan(table, range),
102            Self::Columnar(delta) => delta.scan(table, range),
103        }
104    }
105
106    fn scan_one<R: std::ops::RangeBounds<Vec<u8>> + Clone>(
107        &self,
108        table: &str,
109        range: R,
110    ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
111        match self {
112            Self::RowBased(delta) => delta.scan_one(table, range),
113            Self::Columnar(delta) => delta.scan_one(table, range),
114        }
115    }
116
117    fn flush(&self) -> DbxResult<()> {
118        match self {
119            Self::RowBased(delta) => delta.flush(),
120            Self::Columnar(delta) => delta.flush(),
121        }
122    }
123
124    fn count(&self, table: &str) -> DbxResult<usize> {
125        match self {
126            Self::RowBased(delta) => delta.count(table),
127            Self::Columnar(delta) => delta.count(table),
128        }
129    }
130
131    fn table_names(&self) -> DbxResult<Vec<String>> {
132        match self {
133            Self::RowBased(delta) => delta.table_names(),
134            Self::Columnar(delta) => Ok(delta.table_names()),
135        }
136    }
137}