1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// SPDX-License-Identifier: BUSL-1.1
//! Dispatch for ColumnarOp variants (scan, insert, update, delete).
use crate::bridge::envelope::Response;
use crate::bridge::physical_plan::ColumnarOp;
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::handlers::columnar_read::ColumnarScanParams;
use crate::data::executor::task::ExecutionTask;
impl CoreLoop {
pub(super) fn dispatch_columnar(&mut self, task: &ExecutionTask, op: &ColumnarOp) -> Response {
match op {
ColumnarOp::Scan {
collection,
projection,
limit,
filters,
rls_filters,
sort_keys,
system_as_of_ms,
valid_at_ms,
prefilter,
} => self.execute_columnar_scan(
task,
ColumnarScanParams {
collection,
projection,
limit: *limit,
filters,
rls_filters,
sort_keys,
system_as_of_ms: *system_as_of_ms,
valid_at_ms: *valid_at_ms,
prefilter: prefilter.as_ref(),
},
),
ColumnarOp::Insert {
collection,
payload,
format,
intent,
on_conflict_updates,
surrogates,
} => {
if let Some(r) = self.check_engine_pressure(task, nodedb_mem::EngineId::Columnar) {
return r;
}
self.execute_columnar_insert(
task,
collection,
payload,
format,
*intent,
on_conflict_updates,
surrogates,
)
}
ColumnarOp::Update {
collection,
filters,
updates,
} => {
if let Some(r) = self.check_engine_pressure(task, nodedb_mem::EngineId::Columnar) {
return r;
}
self.execute_columnar_update(task, collection, filters, updates)
}
ColumnarOp::Delete {
collection,
filters,
} => {
if let Some(r) = self.check_engine_pressure(task, nodedb_mem::EngineId::Columnar) {
return r;
}
self.execute_columnar_delete(task, collection, filters)
}
ColumnarOp::MaterializeScan {
collection,
cursor,
count,
system_as_of_ms,
} => self.execute_columnar_materialize_scan(
task,
collection,
cursor,
*count,
*system_as_of_ms,
),
}
}
}