dbx_core/engine/
delta_variant.rs1use crate::error::DbxResult;
4use crate::storage::StorageBackend;
5use crate::storage::columnar_delta::ColumnarDelta;
6use crate::storage::delta_store::DeltaStore;
7use std::sync::Arc;
8
9pub enum DeltaVariant {
11 RowBased(Arc<DeltaStore>),
12 Columnar(Arc<ColumnarDelta>),
13}
14
15impl DeltaVariant {
16 pub fn should_flush(&self) -> bool {
17 match self {
18 Self::RowBased(delta) => delta.should_flush(),
19 Self::Columnar(delta) => delta.should_flush(),
20 }
21 }
22
23 pub fn entry_count(&self) -> usize {
24 match self {
25 Self::RowBased(delta) => delta.entry_count(),
26 Self::Columnar(delta) => delta.row_count(),
27 }
28 }
29
30 #[allow(clippy::type_complexity)]
33 pub fn drain_all(&self) -> Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)> {
34 match self {
35 Self::RowBased(delta) => delta.drain_all(),
36 Self::Columnar(delta) => {
37 use crate::storage::kv_adapter::{batch_to_kv, merge_batches};
38
39 let table_names = delta.table_names();
41 let mut result = Vec::new();
42
43 for table in table_names {
44 let batches = delta.drain_table(&table);
46 if !batches.is_empty() {
47 let batch_refs: Vec<Arc<arrow::record_batch::RecordBatch>> =
49 batches.iter().map(|vb| Arc::clone(&vb.data)).collect();
50
51 if let Ok(merged) = merge_batches(batch_refs) {
52 if let Ok(rows) = batch_to_kv(&merged) {
54 result.push((table, rows));
55 }
56 }
57 }
58 }
59
60 result
61 }
62 }
63 }
64}
65
66impl StorageBackend for DeltaVariant {
67 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
68 match self {
69 Self::RowBased(delta) => delta.insert(table, key, value),
70 Self::Columnar(delta) => delta.insert(table, key, value),
71 }
72 }
73
74 fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
75 match self {
76 Self::RowBased(delta) => delta.insert_batch(table, rows),
77 Self::Columnar(delta) => delta.insert_batch(table, rows),
78 }
79 }
80
81 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
82 match self {
83 Self::RowBased(delta) => delta.get(table, key),
84 Self::Columnar(delta) => delta.get(table, key),
85 }
86 }
87
88 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
89 match self {
90 Self::RowBased(delta) => delta.delete(table, key),
91 Self::Columnar(delta) => delta.delete(table, key),
92 }
93 }
94
95 fn scan<R: std::ops::RangeBounds<Vec<u8>> + Clone>(
96 &self,
97 table: &str,
98 range: R,
99 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
100 match self {
101 Self::RowBased(delta) => delta.scan(table, range),
102 Self::Columnar(delta) => delta.scan(table, range),
103 }
104 }
105
106 fn scan_one<R: std::ops::RangeBounds<Vec<u8>> + Clone>(
107 &self,
108 table: &str,
109 range: R,
110 ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
111 match self {
112 Self::RowBased(delta) => delta.scan_one(table, range),
113 Self::Columnar(delta) => delta.scan_one(table, range),
114 }
115 }
116
117 fn flush(&self) -> DbxResult<()> {
118 match self {
119 Self::RowBased(delta) => delta.flush(),
120 Self::Columnar(delta) => delta.flush(),
121 }
122 }
123
124 fn count(&self, table: &str) -> DbxResult<usize> {
125 match self {
126 Self::RowBased(delta) => delta.count(table),
127 Self::Columnar(delta) => delta.count(table),
128 }
129 }
130
131 fn table_names(&self) -> DbxResult<Vec<String>> {
132 match self {
133 Self::RowBased(delta) => delta.table_names(),
134 Self::Columnar(delta) => Ok(delta.table_names()),
135 }
136 }
137}