1pub mod checker;
2pub(crate) mod executor;
3pub(crate) mod iterators;
4pub(in crate::idx) mod knn;
5pub(crate) mod plan;
6pub(in crate::idx) mod rewriter;
7pub(in crate::idx) mod tree;
8
9use crate::ctx::Context;
10use crate::dbs::{Iterable, Iterator, Options, Statement};
11use crate::err::Error;
12use crate::idx::planner::executor::{InnerQueryExecutor, IteratorEntry, QueryExecutor};
13use crate::idx::planner::iterators::IteratorRef;
14use crate::idx::planner::knn::KnnBruteForceResults;
15use crate::idx::planner::plan::{Plan, PlanBuilder, PlanBuilderParameters};
16use crate::idx::planner::tree::Tree;
17use crate::sql::with::With;
18use crate::sql::{order::Ordering, Cond, Fields, Groups, Table};
19use reblessive::tree::Stk;
20use std::collections::HashMap;
21use std::fmt::{Display, Formatter};
22use std::sync::atomic::{self, AtomicU8};
23
24pub(crate) struct StatementContext<'a> {
28 pub(crate) ctx: &'a Context,
29 pub(crate) opt: &'a Options,
30 pub(crate) ns: &'a str,
31 pub(crate) db: &'a str,
32 pub(crate) stm: &'a Statement<'a>,
33 pub(crate) fields: Option<&'a Fields>,
34 pub(crate) with: Option<&'a With>,
35 pub(crate) order: Option<&'a Ordering>,
36 pub(crate) cond: Option<&'a Cond>,
37 pub(crate) group: Option<&'a Groups>,
38 is_perm: bool,
39}
40
41#[derive(Clone, Copy, Debug)]
42pub(crate) enum RecordStrategy {
43 Count,
44 KeysOnly,
45 KeysAndValues,
46}
47
48#[derive(Clone, Copy, Debug)]
49pub(crate) enum ScanDirection {
50 Forward,
51 #[cfg(any(feature = "kv-rocksdb", feature = "kv-tikv"))]
52 Backward,
53}
54
55impl Display for ScanDirection {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 match self {
58 ScanDirection::Forward => f.write_str("forward"),
59 #[cfg(any(feature = "kv-rocksdb", feature = "kv-tikv"))]
60 ScanDirection::Backward => f.write_str("backward"),
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug)]
66pub(crate) enum GrantedPermission {
67 None,
68 Full,
69 Specific,
70}
71
72impl<'a> StatementContext<'a> {
73 pub(crate) fn new(
74 ctx: &'a Context,
75 opt: &'a Options,
76 stm: &'a Statement<'a>,
77 ) -> Result<Self, Error> {
78 let is_perm = opt.check_perms(stm.into())?;
79 let (ns, db) = opt.ns_db()?;
80 Ok(Self {
81 ctx,
82 opt,
83 stm,
84 ns,
85 db,
86 fields: stm.expr(),
87 with: stm.with(),
88 order: stm.order(),
89 cond: stm.cond(),
90 group: stm.group(),
91 is_perm,
92 })
93 }
94
95 pub(crate) async fn check_table_permission(
96 &self,
97 tb: &str,
98 ) -> Result<GrantedPermission, Error> {
99 if !self.is_perm {
100 return Ok(GrantedPermission::Full);
101 }
102 match self.ctx.tx().get_tb(self.ns, self.db, tb).await {
104 Ok(table) => {
105 let perms = self.stm.permissions(&table, self.stm.is_create());
111 if perms.is_specific() {
114 return Ok(GrantedPermission::Specific);
115 }
116 if perms.is_none() {
119 return Ok(GrantedPermission::None);
120 }
121 }
122 Err(Error::TbNotFound {
123 ..
124 }) => {
125 }
129 Err(e) => return Err(e),
130 }
131 Ok(GrantedPermission::Full)
132 }
133
134 pub(crate) fn check_record_strategy(
135 &self,
136 all_expressions_with_index: bool,
137 granted_permission: GrantedPermission,
138 ) -> Result<RecordStrategy, Error> {
139 if matches!(self.stm, Statement::Update(_) | Statement::Upsert(_) | Statement::Delete(_)) {
143 return Ok(RecordStrategy::KeysAndValues);
144 }
145 if !all_expressions_with_index && self.cond.is_some() {
149 return Ok(RecordStrategy::KeysAndValues);
150 }
151
152 let is_group_all = if let Some(g) = self.group {
156 if !g.is_empty() {
157 return Ok(RecordStrategy::KeysAndValues);
158 }
159 true
160 } else {
161 false
162 };
163
164 if let Some(p) = self.order {
168 match p {
169 Ordering::Random => {}
170 Ordering::Order(x) => {
171 if !x.is_empty() {
172 return Ok(RecordStrategy::KeysAndValues);
173 }
174 }
175 }
176 }
177
178 let is_count_all = if let Some(fields) = self.fields {
182 if !fields.is_count_all_only() {
183 return Ok(RecordStrategy::KeysAndValues);
184 }
185 true
186 } else {
187 false
188 };
189
190 if matches!(granted_permission, GrantedPermission::Specific) {
194 return Ok(RecordStrategy::KeysAndValues);
195 }
196
197 if is_count_all && is_group_all {
199 return Ok(RecordStrategy::Count);
200 }
201 Ok(RecordStrategy::KeysOnly)
203 }
204
205 pub(crate) fn check_scan_direction(&self) -> ScanDirection {
210 #[cfg(any(feature = "kv-rocksdb", feature = "kv-tikv"))]
211 if let Some(Ordering::Order(o)) = self.order {
212 if let Some(o) = o.first() {
213 if !o.direction && o.value.is_id() {
214 return ScanDirection::Backward;
215 }
216 }
217 }
218 ScanDirection::Forward
219 }
220}
221
222pub(crate) struct QueryPlanner {
223 executors: HashMap<String, QueryExecutor>,
225 requires_distinct: bool,
226 fallbacks: Vec<String>,
227 iteration_workflow: Vec<IterationStage>,
228 iteration_index: AtomicU8,
229 orders: Vec<IteratorRef>,
230 granted_permissions: HashMap<String, GrantedPermission>,
231 any_specific_permission: bool,
232}
233
234impl QueryPlanner {
235 pub(crate) fn new() -> Self {
236 Self {
237 executors: HashMap::default(),
238 requires_distinct: false,
239 fallbacks: vec![],
240 iteration_workflow: Vec::default(),
241 iteration_index: AtomicU8::new(0),
242 orders: vec![],
243 granted_permissions: HashMap::default(),
244 any_specific_permission: false,
245 }
246 }
247
248 pub(crate) async fn check_table_permission(
251 &mut self,
252 ctx: &StatementContext<'_>,
253 tb: &str,
254 ) -> Result<GrantedPermission, Error> {
255 if ctx.is_perm {
256 if let Some(p) = self.granted_permissions.get(tb) {
257 return Ok(*p);
258 }
259 let p = ctx.check_table_permission(tb).await?;
260 self.granted_permissions.insert(tb.to_string(), p);
261 if matches!(p, GrantedPermission::Specific) {
262 self.any_specific_permission = true;
263 }
264 return Ok(p);
265 }
266 Ok(GrantedPermission::Full)
267 }
268
269 pub(crate) async fn add_iterables(
270 &mut self,
271 stk: &mut Stk,
272 ctx: &StatementContext<'_>,
273 t: Table,
274 gp: GrantedPermission,
275 it: &mut Iterator,
276 ) -> Result<(), Error> {
277 let mut is_table_iterator = false;
278
279 let tree = Tree::build(stk, ctx, &t).await?;
280
281 let is_knn = !tree.knn_expressions.is_empty();
282 let mut exe = InnerQueryExecutor::new(
283 stk,
284 ctx.ctx,
285 ctx.opt,
286 &t,
287 tree.index_map.options,
288 tree.knn_expressions,
289 tree.knn_brute_force_expressions,
290 tree.knn_condition,
291 )
292 .await?;
293 let p = PlanBuilderParameters {
294 root: tree.root,
295 gp,
296 compound_indexes: tree.index_map.compound_indexes,
297 order_limit: tree.index_map.order_limit,
298 with_indexes: tree.with_indexes,
299 all_and: tree.all_and,
300 all_expressions_with_index: tree.all_expressions_with_index,
301 all_and_groups: tree.all_and_groups,
302 reverse_scan: ctx.ctx.tx().reverse_scan(),
303 };
304 match PlanBuilder::build(ctx, p).await? {
305 Plan::SingleIndex(exp, io, rs) => {
306 if io.require_distinct() {
307 self.requires_distinct = true;
308 }
309 let is_order = exp.is_none();
310 let ir = exe.add_iterator(IteratorEntry::Single(exp, io));
311 self.add(t.clone(), Some(ir), exe, it, rs);
312 if is_order {
313 self.orders.push(ir);
314 }
315 }
316 Plan::MultiIndex(non_range_indexes, ranges_indexes, rs) => {
317 for (exp, io) in non_range_indexes {
318 let ie = IteratorEntry::Single(Some(exp), io);
319 let ir = exe.add_iterator(ie);
320 it.ingest(Iterable::Index(t.clone(), ir, rs));
321 }
322 for (ixr, rq) in ranges_indexes {
323 let ie = IteratorEntry::Range(rq.exps, ixr, rq.from, rq.to);
324 let ir = exe.add_iterator(ie);
325 it.ingest(Iterable::Index(t.clone(), ir, rs));
326 }
327 self.requires_distinct = true;
328 self.add(t.clone(), None, exe, it, rs);
329 }
330 Plan::SingleIndexRange(ixn, rq, keys_only) => {
331 let ir = exe.add_iterator(IteratorEntry::Range(rq.exps, ixn, rq.from, rq.to));
332 self.add(t.clone(), Some(ir), exe, it, keys_only);
333 }
334 Plan::TableIterator(reason, rs, sc) => {
335 if let Some(reason) = reason {
336 self.fallbacks.push(reason);
337 }
338 self.add(t.clone(), None, exe, it, rs);
339 it.ingest(Iterable::Table(t, rs, sc));
340 is_table_iterator = true;
341 }
342 }
343 if is_knn && is_table_iterator {
344 self.iteration_workflow = vec![IterationStage::CollectKnn, IterationStage::BuildKnn];
345 } else {
346 self.iteration_workflow = vec![IterationStage::Iterate(None)];
347 }
348 Ok(())
349 }
350
351 fn add(
352 &mut self,
353 tb: Table,
354 irf: Option<IteratorRef>,
355 exe: InnerQueryExecutor,
356 it: &mut Iterator,
357 rs: RecordStrategy,
358 ) {
359 self.executors.insert(tb.0.clone(), exe.into());
360 if let Some(irf) = irf {
361 it.ingest(Iterable::Index(tb, irf, rs));
362 }
363 }
364 pub(crate) fn has_executors(&self) -> bool {
365 !self.executors.is_empty()
366 }
367
368 pub(crate) fn get_query_executor(&self, tb: &str) -> Option<&QueryExecutor> {
369 self.executors.get(tb)
370 }
371
372 pub(crate) fn requires_distinct(&self) -> bool {
373 self.requires_distinct
374 }
375
376 pub(crate) fn fallbacks(&self) -> &Vec<String> {
377 &self.fallbacks
378 }
379
380 pub(crate) fn is_order(&self, irf: &IteratorRef) -> bool {
381 self.orders.contains(irf)
382 }
383
384 pub(crate) fn is_any_specific_permission(&self) -> bool {
385 self.any_specific_permission
386 }
387
388 pub(crate) async fn next_iteration_stage(&self) -> Option<IterationStage> {
389 let pos = self.iteration_index.fetch_add(1, atomic::Ordering::Relaxed);
390 match self.iteration_workflow.get(pos as usize) {
391 Some(IterationStage::BuildKnn) => {
392 Some(IterationStage::Iterate(Some(self.build_bruteforce_knn_results().await)))
393 }
394 is => is.cloned(),
395 }
396 }
397
398 async fn build_bruteforce_knn_results(&self) -> KnnBruteForceResults {
399 let mut results = HashMap::with_capacity(self.executors.len());
400 for (tb, exe) in &self.executors {
401 results.insert(tb.clone(), exe.build_bruteforce_knn_result().await);
402 }
403 results.into()
404 }
405}
406
407#[derive(Clone)]
408pub(crate) enum IterationStage {
409 Iterate(Option<KnnBruteForceResults>),
410 CollectKnn,
411 BuildKnn,
412}