limbo_core/translate/main_loop.rs
1use limbo_sqlite3_parser::ast;
2
3use crate::{
4 schema::Table,
5 translate::result_row::emit_select_result,
6 vdbe::{
7 builder::{CursorType, ProgramBuilder},
8 insn::{CmpInsFlags, Insn},
9 BranchOffset,
10 },
11 Result,
12};
13
14use super::{
15 aggregation::translate_aggregation_step,
16 emitter::{OperationMode, TranslateCtx},
17 expr::{translate_condition_expr, translate_expr, ConditionMetadata},
18 order_by::{order_by_sorter_insert, sorter_insert},
19 plan::{
20 IterationDirection, Operation, Search, SelectPlan, SelectQueryType, TableReference,
21 WhereTerm,
22 },
23};
24
25// Metadata for handling LEFT JOIN operations
26#[derive(Debug)]
27pub struct LeftJoinMetadata {
28 // integer register that holds a flag that is set to true if the current row has a match for the left join
29 pub reg_match_flag: usize,
30 // label for the instruction that sets the match flag to true
31 pub label_match_flag_set_true: BranchOffset,
32 // label for the instruction that checks if the match flag is true
33 pub label_match_flag_check_value: BranchOffset,
34}
35
36/// Jump labels for each loop in the query's main execution loop
37#[derive(Debug, Clone, Copy)]
38pub struct LoopLabels {
39 /// jump to the start of the loop body
40 loop_start: BranchOffset,
41 /// jump to the NextAsync instruction (or equivalent)
42 next: BranchOffset,
43 /// jump to the end of the loop, exiting it
44 loop_end: BranchOffset,
45}
46
47impl LoopLabels {
48 pub fn new(program: &mut ProgramBuilder) -> Self {
49 Self {
50 loop_start: program.allocate_label(),
51 next: program.allocate_label(),
52 loop_end: program.allocate_label(),
53 }
54 }
55}
56
57/// Initialize resources needed for the source operators (tables, joins, etc)
58pub fn init_loop(
59 program: &mut ProgramBuilder,
60 t_ctx: &mut TranslateCtx,
61 tables: &[TableReference],
62 mode: &OperationMode,
63) -> Result<()> {
64 assert!(
65 t_ctx.meta_left_joins.len() == tables.len(),
66 "meta_left_joins length does not match tables length"
67 );
68 for (table_index, table) in tables.iter().enumerate() {
69 // Initialize bookkeeping for OUTER JOIN
70 if let Some(join_info) = table.join_info.as_ref() {
71 if join_info.outer {
72 let lj_metadata = LeftJoinMetadata {
73 reg_match_flag: program.alloc_register(),
74 label_match_flag_set_true: program.allocate_label(),
75 label_match_flag_check_value: program.allocate_label(),
76 };
77 t_ctx.meta_left_joins[table_index] = Some(lj_metadata);
78 }
79 }
80 match &table.op {
81 Operation::Scan { .. } => {
82 let cursor_id = program.alloc_cursor_id(
83 Some(table.identifier.clone()),
84 match &table.table {
85 Table::BTree(_) => CursorType::BTreeTable(table.btree().unwrap().clone()),
86 Table::Virtual(_) => {
87 CursorType::VirtualTable(table.virtual_table().unwrap().clone())
88 }
89 other => panic!("Invalid table reference type in Scan: {:?}", other),
90 },
91 );
92 match (mode, &table.table) {
93 (OperationMode::SELECT, Table::BTree(_)) => {
94 let root_page = table.btree().unwrap().root_page;
95 program.emit_insn(Insn::OpenReadAsync {
96 cursor_id,
97 root_page,
98 });
99 program.emit_insn(Insn::OpenReadAwait {});
100 }
101 (OperationMode::DELETE, Table::BTree(_)) => {
102 let root_page = table.btree().unwrap().root_page;
103 program.emit_insn(Insn::OpenWriteAsync {
104 cursor_id,
105 root_page,
106 });
107 program.emit_insn(Insn::OpenWriteAwait {});
108 }
109 (OperationMode::SELECT, Table::Virtual(_)) => {
110 program.emit_insn(Insn::VOpenAsync { cursor_id });
111 program.emit_insn(Insn::VOpenAwait {});
112 }
113 (OperationMode::DELETE, Table::Virtual(_)) => {
114 program.emit_insn(Insn::VOpenAsync { cursor_id });
115 program.emit_insn(Insn::VOpenAwait {});
116 }
117 _ => {
118 unimplemented!()
119 }
120 }
121 }
122 Operation::Search(search) => {
123 let table_cursor_id = program.alloc_cursor_id(
124 Some(table.identifier.clone()),
125 CursorType::BTreeTable(table.btree().unwrap().clone()),
126 );
127
128 match mode {
129 OperationMode::SELECT => {
130 program.emit_insn(Insn::OpenReadAsync {
131 cursor_id: table_cursor_id,
132 root_page: table.table.get_root_page(),
133 });
134 program.emit_insn(Insn::OpenReadAwait {});
135 }
136 OperationMode::DELETE => {
137 program.emit_insn(Insn::OpenWriteAsync {
138 cursor_id: table_cursor_id,
139 root_page: table.table.get_root_page(),
140 });
141 program.emit_insn(Insn::OpenWriteAwait {});
142 }
143 _ => {
144 unimplemented!()
145 }
146 }
147
148 if let Search::IndexSearch { index, .. } = search {
149 let index_cursor_id = program.alloc_cursor_id(
150 Some(index.name.clone()),
151 CursorType::BTreeIndex(index.clone()),
152 );
153
154 match mode {
155 OperationMode::SELECT => {
156 program.emit_insn(Insn::OpenReadAsync {
157 cursor_id: index_cursor_id,
158 root_page: index.root_page,
159 });
160 program.emit_insn(Insn::OpenReadAwait);
161 }
162 OperationMode::DELETE => {
163 program.emit_insn(Insn::OpenWriteAsync {
164 cursor_id: index_cursor_id,
165 root_page: index.root_page,
166 });
167 program.emit_insn(Insn::OpenWriteAwait {});
168 }
169 _ => {
170 unimplemented!()
171 }
172 }
173 }
174 }
175 _ => {}
176 }
177 }
178
179 Ok(())
180}
181
182/// Set up the main query execution loop
183/// For example in the case of a nested table scan, this means emitting the RewindAsync instruction
184/// for all tables involved, outermost first.
185pub fn open_loop(
186 program: &mut ProgramBuilder,
187 t_ctx: &mut TranslateCtx,
188 tables: &[TableReference],
189 predicates: &[WhereTerm],
190) -> Result<()> {
191 for (table_index, table) in tables.iter().enumerate() {
192 let LoopLabels {
193 loop_start,
194 loop_end,
195 next,
196 } = *t_ctx
197 .labels_main_loop
198 .get(table_index)
199 .expect("table has no loop labels");
200
201 // Each OUTER JOIN has a "match flag" that is initially set to false,
202 // and is set to true when a match is found for the OUTER JOIN.
203 // This is used to determine whether to emit actual columns or NULLs for the columns of the right table.
204 if let Some(join_info) = table.join_info.as_ref() {
205 if join_info.outer {
206 let lj_meta = t_ctx.meta_left_joins[table_index].as_ref().unwrap();
207 program.emit_insn(Insn::Integer {
208 value: 0,
209 dest: lj_meta.reg_match_flag,
210 });
211 }
212 }
213
214 match &table.op {
215 Operation::Subquery { plan, .. } => {
216 let (yield_reg, coroutine_implementation_start) = match &plan.query_type {
217 SelectQueryType::Subquery {
218 yield_reg,
219 coroutine_implementation_start,
220 } => (*yield_reg, *coroutine_implementation_start),
221 _ => unreachable!("Subquery operator with non-subquery query type"),
222 };
223 // In case the subquery is an inner loop, it needs to be reinitialized on each iteration of the outer loop.
224 program.emit_insn(Insn::InitCoroutine {
225 yield_reg,
226 jump_on_definition: BranchOffset::Offset(0),
227 start_offset: coroutine_implementation_start,
228 });
229 program.resolve_label(loop_start, program.offset());
230 // A subquery within the main loop of a parent query has no cursor, so instead of advancing the cursor,
231 // it emits a Yield which jumps back to the main loop of the subquery itself to retrieve the next row.
232 // When the subquery coroutine completes, this instruction jumps to the label at the top of the termination_label_stack,
233 // which in this case is the end of the Yield-Goto loop in the parent query.
234 program.emit_insn(Insn::Yield {
235 yield_reg,
236 end_offset: loop_end,
237 });
238
239 // These are predicates evaluated outside of the subquery,
240 // so they are translated here.
241 // E.g. SELECT foo FROM (SELECT bar as foo FROM t1) sub WHERE sub.foo > 10
242 for cond in predicates
243 .iter()
244 .filter(|cond| cond.should_eval_at_loop(table_index))
245 {
246 let jump_target_when_true = program.allocate_label();
247 let condition_metadata = ConditionMetadata {
248 jump_if_condition_is_true: false,
249 jump_target_when_true,
250 jump_target_when_false: next,
251 };
252 translate_condition_expr(
253 program,
254 tables,
255 &cond.expr,
256 condition_metadata,
257 &t_ctx.resolver,
258 )?;
259 program.resolve_label(jump_target_when_true, program.offset());
260 }
261 }
262 Operation::Scan { iter_dir } => {
263 let cursor_id = program.resolve_cursor_id(&table.identifier);
264
265 if !matches!(&table.table, Table::Virtual(_)) {
266 if iter_dir
267 .as_ref()
268 .is_some_and(|dir| *dir == IterationDirection::Backwards)
269 {
270 program.emit_insn(Insn::LastAsync { cursor_id });
271 } else {
272 program.emit_insn(Insn::RewindAsync { cursor_id });
273 }
274 }
275 match &table.table {
276 Table::BTree(_) => program.emit_insn(
277 if iter_dir
278 .as_ref()
279 .is_some_and(|dir| *dir == IterationDirection::Backwards)
280 {
281 Insn::LastAwait {
282 cursor_id,
283 pc_if_empty: loop_end,
284 }
285 } else {
286 Insn::RewindAwait {
287 cursor_id,
288 pc_if_empty: loop_end,
289 }
290 },
291 ),
292 Table::Virtual(ref table) => {
293 let start_reg = program
294 .alloc_registers(table.args.as_ref().map(|a| a.len()).unwrap_or(0));
295 let mut cur_reg = start_reg;
296 let args = match table.args.as_ref() {
297 Some(args) => args,
298 None => &vec![],
299 };
300 for arg in args {
301 let reg = cur_reg;
302 cur_reg += 1;
303 let _ =
304 translate_expr(program, Some(tables), arg, reg, &t_ctx.resolver)?;
305 }
306 program.emit_insn(Insn::VFilter {
307 cursor_id,
308 pc_if_empty: loop_end,
309 arg_count: table.args.as_ref().map_or(0, |args| args.len()),
310 args_reg: start_reg,
311 });
312 }
313 other => panic!("Unsupported table reference type: {:?}", other),
314 }
315 program.resolve_label(loop_start, program.offset());
316
317 for cond in predicates
318 .iter()
319 .filter(|cond| cond.should_eval_at_loop(table_index))
320 {
321 let jump_target_when_true = program.allocate_label();
322 let condition_metadata = ConditionMetadata {
323 jump_if_condition_is_true: false,
324 jump_target_when_true,
325 jump_target_when_false: next,
326 };
327 translate_condition_expr(
328 program,
329 tables,
330 &cond.expr,
331 condition_metadata,
332 &t_ctx.resolver,
333 )?;
334 program.resolve_label(jump_target_when_true, program.offset());
335 }
336 }
337 Operation::Search(search) => {
338 let table_cursor_id = program.resolve_cursor_id(&table.identifier);
339 // Open the loop for the index search.
340 // Rowid equality point lookups are handled with a SeekRowid instruction which does not loop, since it is a single row lookup.
341 if !matches!(search, Search::RowidEq { .. }) {
342 let index_cursor_id = if let Search::IndexSearch { index, .. } = search {
343 Some(program.resolve_cursor_id(&index.name))
344 } else {
345 None
346 };
347 let cmp_reg = program.alloc_register();
348 let (cmp_expr, cmp_op) = match search {
349 Search::IndexSearch {
350 cmp_expr, cmp_op, ..
351 } => (cmp_expr, cmp_op),
352 Search::RowidSearch { cmp_expr, cmp_op } => (cmp_expr, cmp_op),
353 Search::RowidEq { .. } => unreachable!(),
354 };
355
356 // TODO this only handles ascending indexes
357 match cmp_op {
358 ast::Operator::Equals
359 | ast::Operator::Greater
360 | ast::Operator::GreaterEquals => {
361 translate_expr(
362 program,
363 Some(tables),
364 &cmp_expr.expr,
365 cmp_reg,
366 &t_ctx.resolver,
367 )?;
368 }
369 ast::Operator::Less | ast::Operator::LessEquals => {
370 program.emit_insn(Insn::Null {
371 dest: cmp_reg,
372 dest_end: None,
373 });
374 }
375 _ => unreachable!(),
376 }
377 // If we try to seek to a key that is not present in the table/index, we exit the loop entirely.
378 program.emit_insn(match cmp_op {
379 ast::Operator::Equals | ast::Operator::GreaterEquals => Insn::SeekGE {
380 is_index: index_cursor_id.is_some(),
381 cursor_id: index_cursor_id.unwrap_or(table_cursor_id),
382 start_reg: cmp_reg,
383 num_regs: 1,
384 target_pc: loop_end,
385 },
386 ast::Operator::Greater
387 | ast::Operator::Less
388 | ast::Operator::LessEquals => Insn::SeekGT {
389 is_index: index_cursor_id.is_some(),
390 cursor_id: index_cursor_id.unwrap_or(table_cursor_id),
391 start_reg: cmp_reg,
392 num_regs: 1,
393 target_pc: loop_end,
394 },
395 _ => unreachable!(),
396 });
397 if *cmp_op == ast::Operator::Less || *cmp_op == ast::Operator::LessEquals {
398 translate_expr(
399 program,
400 Some(tables),
401 &cmp_expr.expr,
402 cmp_reg,
403 &t_ctx.resolver,
404 )?;
405 }
406
407 program.resolve_label(loop_start, program.offset());
408 // TODO: We are currently only handling ascending indexes.
409 // For conditions like index_key > 10, we have already sought to the first key greater than 10, and can just scan forward.
410 // For conditions like index_key < 10, we are at the beginning of the index, and will scan forward and emit IdxGE(10) with a conditional jump to the end.
411 // For conditions like index_key = 10, we have already sought to the first key greater than or equal to 10, and can just scan forward and emit IdxGT(10) with a conditional jump to the end.
412 // For conditions like index_key >= 10, we have already sought to the first key greater than or equal to 10, and can just scan forward.
413 // For conditions like index_key <= 10, we are at the beginning of the index, and will scan forward and emit IdxGT(10) with a conditional jump to the end.
414 // For conditions like index_key != 10, TODO. probably the optimal way is not to use an index at all.
415 //
416 // For primary key searches we emit RowId and then compare it to the seek value.
417
418 match cmp_op {
419 ast::Operator::Equals | ast::Operator::LessEquals => {
420 if let Some(index_cursor_id) = index_cursor_id {
421 program.emit_insn(Insn::IdxGT {
422 cursor_id: index_cursor_id,
423 start_reg: cmp_reg,
424 num_regs: 1,
425 target_pc: loop_end,
426 });
427 } else {
428 let rowid_reg = program.alloc_register();
429 program.emit_insn(Insn::RowId {
430 cursor_id: table_cursor_id,
431 dest: rowid_reg,
432 });
433 program.emit_insn(Insn::Gt {
434 lhs: rowid_reg,
435 rhs: cmp_reg,
436 target_pc: loop_end,
437 flags: CmpInsFlags::default(),
438 });
439 }
440 }
441 ast::Operator::Less => {
442 if let Some(index_cursor_id) = index_cursor_id {
443 program.emit_insn(Insn::IdxGE {
444 cursor_id: index_cursor_id,
445 start_reg: cmp_reg,
446 num_regs: 1,
447 target_pc: loop_end,
448 });
449 } else {
450 let rowid_reg = program.alloc_register();
451 program.emit_insn(Insn::RowId {
452 cursor_id: table_cursor_id,
453 dest: rowid_reg,
454 });
455 program.emit_insn(Insn::Ge {
456 lhs: rowid_reg,
457 rhs: cmp_reg,
458 target_pc: loop_end,
459 flags: CmpInsFlags::default(),
460 });
461 }
462 }
463 _ => {}
464 }
465
466 if let Some(index_cursor_id) = index_cursor_id {
467 program.emit_insn(Insn::DeferredSeek {
468 index_cursor_id,
469 table_cursor_id,
470 });
471 }
472 }
473
474 if let Search::RowidEq { cmp_expr } = search {
475 let src_reg = program.alloc_register();
476 translate_expr(
477 program,
478 Some(tables),
479 &cmp_expr.expr,
480 src_reg,
481 &t_ctx.resolver,
482 )?;
483 program.emit_insn(Insn::SeekRowid {
484 cursor_id: table_cursor_id,
485 src_reg,
486 target_pc: next,
487 });
488 }
489 for cond in predicates
490 .iter()
491 .filter(|cond| cond.should_eval_at_loop(table_index))
492 {
493 let jump_target_when_true = program.allocate_label();
494 let condition_metadata = ConditionMetadata {
495 jump_if_condition_is_true: false,
496 jump_target_when_true,
497 jump_target_when_false: next,
498 };
499 translate_condition_expr(
500 program,
501 tables,
502 &cond.expr,
503 condition_metadata,
504 &t_ctx.resolver,
505 )?;
506 program.resolve_label(jump_target_when_true, program.offset());
507 }
508 }
509 }
510
511 // Set the match flag to true if this is a LEFT JOIN.
512 // At this point of execution we are going to emit columns for the left table,
513 // and either emit columns or NULLs for the right table, depending on whether the null_flag is set
514 // for the right table's cursor.
515 if let Some(join_info) = table.join_info.as_ref() {
516 if join_info.outer {
517 let lj_meta = t_ctx.meta_left_joins[table_index].as_ref().unwrap();
518 program.resolve_label(lj_meta.label_match_flag_set_true, program.offset());
519 program.emit_insn(Insn::Integer {
520 value: 1,
521 dest: lj_meta.reg_match_flag,
522 });
523 }
524 }
525 }
526
527 Ok(())
528}
529
530/// SQLite (and so Limbo) processes joins as a nested loop.
531/// The loop may emit rows to various destinations depending on the query:
532/// - a GROUP BY sorter (grouping is done by sorting based on the GROUP BY keys and aggregating while the GROUP BY keys match)
533/// - an ORDER BY sorter (when there is no GROUP BY, but there is an ORDER BY)
534/// - an AggStep (the columns are collected for aggregation, which is finished later)
535/// - a QueryResult (there is none of the above, so the loop either emits a ResultRow, or if it's a subquery, yields to the parent query)
536enum LoopEmitTarget {
537 GroupBySorter,
538 OrderBySorter,
539 AggStep,
540 QueryResult,
541}
542
543/// Emits the bytecode for the inner loop of a query.
544/// At this point the cursors for all tables have been opened and rewound.
545pub fn emit_loop(
546 program: &mut ProgramBuilder,
547 t_ctx: &mut TranslateCtx,
548 plan: &mut SelectPlan,
549) -> Result<()> {
550 // if we have a group by, we emit a record into the group by sorter.
551 if plan.group_by.is_some() {
552 return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBySorter);
553 }
554 // if we DONT have a group by, but we have aggregates, we emit without ResultRow.
555 // we also do not need to sort because we are emitting a single row.
556 if !plan.aggregates.is_empty() {
557 return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::AggStep);
558 }
559 // if we DONT have a group by, but we have an order by, we emit a record into the order by sorter.
560 if plan.order_by.is_some() {
561 return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::OrderBySorter);
562 }
563 // if we have neither, we emit a ResultRow. In that case, if we have a Limit, we handle that with DecrJumpZero.
564 emit_loop_source(program, t_ctx, plan, LoopEmitTarget::QueryResult)
565}
566
567/// This is a helper function for inner_loop_emit,
568/// which does a different thing depending on the emit target.
569/// See the InnerLoopEmitTarget enum for more details.
570fn emit_loop_source(
571 program: &mut ProgramBuilder,
572 t_ctx: &mut TranslateCtx,
573 plan: &SelectPlan,
574 emit_target: LoopEmitTarget,
575) -> Result<()> {
576 match emit_target {
577 LoopEmitTarget::GroupBySorter => {
578 let group_by = plan.group_by.as_ref().unwrap();
579 let aggregates = &plan.aggregates;
580 let sort_keys_count = group_by.exprs.len();
581 let aggregate_arguments_count = plan
582 .aggregates
583 .iter()
584 .map(|agg| agg.args.len())
585 .sum::<usize>();
586 let column_count = sort_keys_count + aggregate_arguments_count;
587 let start_reg = program.alloc_registers(column_count);
588 let mut cur_reg = start_reg;
589
590 // The group by sorter rows will contain the grouping keys first. They are also the sort keys.
591 for expr in group_by.exprs.iter() {
592 let key_reg = cur_reg;
593 cur_reg += 1;
594 translate_expr(
595 program,
596 Some(&plan.table_references),
597 expr,
598 key_reg,
599 &t_ctx.resolver,
600 )?;
601 }
602 // Then we have the aggregate arguments.
603 for agg in aggregates.iter() {
604 // Here we are collecting scalars for the group by sorter, which will include
605 // both the group by expressions and the aggregate arguments.
606 // e.g. in `select u.first_name, sum(u.age) from users group by u.first_name`
607 // the sorter will have two scalars: u.first_name and u.age.
608 // these are then sorted by u.first_name, and for each u.first_name, we sum the u.age.
609 // the actual aggregation is done later.
610 for expr in agg.args.iter() {
611 let agg_reg = cur_reg;
612 cur_reg += 1;
613 translate_expr(
614 program,
615 Some(&plan.table_references),
616 expr,
617 agg_reg,
618 &t_ctx.resolver,
619 )?;
620 }
621 }
622
623 // TODO: although it's less often useful, SQLite does allow for expressions in the SELECT that are not part of a GROUP BY or aggregate.
624 // We currently ignore those and only emit the GROUP BY keys and aggregate arguments. This should be fixed.
625
626 let group_by_metadata = t_ctx.meta_group_by.as_ref().unwrap();
627
628 sorter_insert(
629 program,
630 start_reg,
631 column_count,
632 group_by_metadata.sort_cursor,
633 group_by_metadata.reg_sorter_key,
634 );
635
636 Ok(())
637 }
638 LoopEmitTarget::OrderBySorter => order_by_sorter_insert(program, t_ctx, plan),
639 LoopEmitTarget::AggStep => {
640 let num_aggs = plan.aggregates.len();
641 let start_reg = program.alloc_registers(num_aggs);
642 t_ctx.reg_agg_start = Some(start_reg);
643
644 // In planner.rs, we have collected all aggregates from the SELECT clause, including ones where the aggregate is embedded inside
645 // a more complex expression. Some examples: length(sum(x)), sum(x) + avg(y), sum(x) + 1, etc.
646 // The result of those more complex expressions depends on the final result of the aggregate, so we don't translate the complete expressions here.
647 // Instead, we accumulate the intermediate results of all aggreagates, and evaluate any expressions that do not contain aggregates.
648 for (i, agg) in plan.aggregates.iter().enumerate() {
649 let reg = start_reg + i;
650 translate_aggregation_step(
651 program,
652 &plan.table_references,
653 agg,
654 reg,
655 &t_ctx.resolver,
656 )?;
657 }
658 for (i, rc) in plan.result_columns.iter().enumerate() {
659 if rc.contains_aggregates {
660 // Do nothing, aggregates are computed above
661 // if this result column is e.g. something like sum(x) + 1 or length(sum(x)), we do not want to translate that (+1) or length() yet,
662 // it will be computed after the aggregations are finalized.
663 continue;
664 }
665 let reg = start_reg + num_aggs + i;
666 translate_expr(
667 program,
668 Some(&plan.table_references),
669 &rc.expr,
670 reg,
671 &t_ctx.resolver,
672 )?;
673 }
674 Ok(())
675 }
676 LoopEmitTarget::QueryResult => {
677 assert!(
678 plan.aggregates.is_empty(),
679 "We should not get here with aggregates"
680 );
681 let offset_jump_to = t_ctx
682 .labels_main_loop
683 .first()
684 .map(|l| l.next)
685 .or(t_ctx.label_main_loop_end);
686 emit_select_result(
687 program,
688 t_ctx,
689 plan,
690 t_ctx.label_main_loop_end,
691 offset_jump_to,
692 )?;
693
694 Ok(())
695 }
696 }
697}
698
699/// Closes the loop for a given source operator.
700/// For example in the case of a nested table scan, this means emitting the NextAsync instruction
701/// for all tables involved, innermost first.
702pub fn close_loop(
703 program: &mut ProgramBuilder,
704 t_ctx: &mut TranslateCtx,
705 tables: &[TableReference],
706) -> Result<()> {
707 // We close the loops for all tables in reverse order, i.e. innermost first.
708 // OPEN t1
709 // OPEN t2
710 // OPEN t3
711 // <do stuff>
712 // CLOSE t3
713 // CLOSE t2
714 // CLOSE t1
715 for (idx, table) in tables.iter().rev().enumerate() {
716 let table_index = tables.len() - idx - 1;
717 let loop_labels = *t_ctx
718 .labels_main_loop
719 .get(table_index)
720 .expect("source has no loop labels");
721
722 match &table.op {
723 Operation::Subquery { .. } => {
724 program.resolve_label(loop_labels.next, program.offset());
725 // A subquery has no cursor to call NextAsync on, so it just emits a Goto
726 // to the Yield instruction, which in turn jumps back to the main loop of the subquery,
727 // so that the next row from the subquery can be read.
728 program.emit_insn(Insn::Goto {
729 target_pc: loop_labels.loop_start,
730 });
731 }
732 Operation::Scan { iter_dir, .. } => {
733 program.resolve_label(loop_labels.next, program.offset());
734 let cursor_id = program.resolve_cursor_id(&table.identifier);
735 match &table.table {
736 Table::BTree(_) => {
737 if iter_dir
738 .as_ref()
739 .is_some_and(|dir| *dir == IterationDirection::Backwards)
740 {
741 program.emit_insn(Insn::PrevAsync { cursor_id });
742 } else {
743 program.emit_insn(Insn::NextAsync { cursor_id });
744 }
745 if iter_dir
746 .as_ref()
747 .is_some_and(|dir| *dir == IterationDirection::Backwards)
748 {
749 program.emit_insn(Insn::PrevAwait {
750 cursor_id,
751 pc_if_next: loop_labels.loop_start,
752 });
753 } else {
754 program.emit_insn(Insn::NextAwait {
755 cursor_id,
756 pc_if_next: loop_labels.loop_start,
757 });
758 }
759 }
760 Table::Virtual(_) => {
761 program.emit_insn(Insn::VNext {
762 cursor_id,
763 pc_if_next: loop_labels.loop_start,
764 });
765 }
766 other => unreachable!("Unsupported table reference type: {:?}", other),
767 }
768 }
769 Operation::Search(search) => {
770 program.resolve_label(loop_labels.next, program.offset());
771 // Rowid equality point lookups are handled with a SeekRowid instruction which does not loop, so there is no need to emit a NextAsync instruction.
772 if !matches!(search, Search::RowidEq { .. }) {
773 let cursor_id = match search {
774 Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
775 Search::RowidSearch { .. } => program.resolve_cursor_id(&table.identifier),
776 Search::RowidEq { .. } => unreachable!(),
777 };
778
779 program.emit_insn(Insn::NextAsync { cursor_id });
780 program.emit_insn(Insn::NextAwait {
781 cursor_id,
782 pc_if_next: loop_labels.loop_start,
783 });
784 }
785 }
786 }
787
788 program.resolve_label(loop_labels.loop_end, program.offset());
789
790 // Handle OUTER JOIN logic. The reason this comes after the "loop end" mark is that we may need to still jump back
791 // and emit a row with NULLs for the right table, and then jump back to the next row of the left table.
792 if let Some(join_info) = table.join_info.as_ref() {
793 if join_info.outer {
794 let lj_meta = t_ctx.meta_left_joins[table_index].as_ref().unwrap();
795 // The left join match flag is set to 1 when there is any match on the right table
796 // (e.g. SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a).
797 // If the left join match flag has been set to 1, we jump to the next row on the outer table,
798 // i.e. continue to the next row of t1 in our example.
799 program.resolve_label(lj_meta.label_match_flag_check_value, program.offset());
800 let jump_offset = program.offset().add(3u32);
801 program.emit_insn(Insn::IfPos {
802 reg: lj_meta.reg_match_flag,
803 target_pc: jump_offset,
804 decrement_by: 0,
805 });
806 // If the left join match flag is still 0, it means there was no match on the right table,
807 // but since it's a LEFT JOIN, we still need to emit a row with NULLs for the right table.
808 // In that case, we now enter the routine that does exactly that.
809 // First we set the right table cursor's "pseudo null bit" on, which means any Insn::Column will return NULL
810 let right_cursor_id = match &table.op {
811 Operation::Scan { .. } => program.resolve_cursor_id(&table.identifier),
812 Operation::Search { .. } => program.resolve_cursor_id(&table.identifier),
813 _ => unreachable!(),
814 };
815 program.emit_insn(Insn::NullRow {
816 cursor_id: right_cursor_id,
817 });
818 // Then we jump to setting the left join match flag to 1 again,
819 // but this time the right table cursor will set everything to null.
820 // This leads to emitting a row with cols from the left + nulls from the right,
821 // and we will end up back in the IfPos instruction above, which will then
822 // check the match flag again, and since it is now 1, we will jump to the
823 // next row in the left table.
824 program.emit_insn(Insn::Goto {
825 target_pc: lj_meta.label_match_flag_set_true,
826 });
827
828 assert_eq!(program.offset(), jump_offset);
829 }
830 }
831 }
832 Ok(())
833}