limbo_core/translate/
main_loop.rs

1use limbo_sqlite3_parser::ast;
2
3use crate::{
4    schema::Table,
5    translate::result_row::emit_select_result,
6    vdbe::{
7        builder::{CursorType, ProgramBuilder},
8        insn::{CmpInsFlags, Insn},
9        BranchOffset,
10    },
11    Result,
12};
13
14use super::{
15    aggregation::translate_aggregation_step,
16    emitter::{OperationMode, TranslateCtx},
17    expr::{translate_condition_expr, translate_expr, ConditionMetadata},
18    order_by::{order_by_sorter_insert, sorter_insert},
19    plan::{
20        IterationDirection, Operation, Search, SelectPlan, SelectQueryType, TableReference,
21        WhereTerm,
22    },
23};
24
25// Metadata for handling LEFT JOIN operations
26#[derive(Debug)]
27pub struct LeftJoinMetadata {
28    // integer register that holds a flag that is set to true if the current row has a match for the left join
29    pub reg_match_flag: usize,
30    // label for the instruction that sets the match flag to true
31    pub label_match_flag_set_true: BranchOffset,
32    // label for the instruction that checks if the match flag is true
33    pub label_match_flag_check_value: BranchOffset,
34}
35
36/// Jump labels for each loop in the query's main execution loop
37#[derive(Debug, Clone, Copy)]
38pub struct LoopLabels {
39    /// jump to the start of the loop body
40    loop_start: BranchOffset,
41    /// jump to the NextAsync instruction (or equivalent)
42    next: BranchOffset,
43    /// jump to the end of the loop, exiting it
44    loop_end: BranchOffset,
45}
46
47impl LoopLabels {
48    pub fn new(program: &mut ProgramBuilder) -> Self {
49        Self {
50            loop_start: program.allocate_label(),
51            next: program.allocate_label(),
52            loop_end: program.allocate_label(),
53        }
54    }
55}
56
57/// Initialize resources needed for the source operators (tables, joins, etc)
58pub fn init_loop(
59    program: &mut ProgramBuilder,
60    t_ctx: &mut TranslateCtx,
61    tables: &[TableReference],
62    mode: &OperationMode,
63) -> Result<()> {
64    assert!(
65        t_ctx.meta_left_joins.len() == tables.len(),
66        "meta_left_joins length does not match tables length"
67    );
68    for (table_index, table) in tables.iter().enumerate() {
69        // Initialize bookkeeping for OUTER JOIN
70        if let Some(join_info) = table.join_info.as_ref() {
71            if join_info.outer {
72                let lj_metadata = LeftJoinMetadata {
73                    reg_match_flag: program.alloc_register(),
74                    label_match_flag_set_true: program.allocate_label(),
75                    label_match_flag_check_value: program.allocate_label(),
76                };
77                t_ctx.meta_left_joins[table_index] = Some(lj_metadata);
78            }
79        }
80        match &table.op {
81            Operation::Scan { .. } => {
82                let cursor_id = program.alloc_cursor_id(
83                    Some(table.identifier.clone()),
84                    match &table.table {
85                        Table::BTree(_) => CursorType::BTreeTable(table.btree().unwrap().clone()),
86                        Table::Virtual(_) => {
87                            CursorType::VirtualTable(table.virtual_table().unwrap().clone())
88                        }
89                        other => panic!("Invalid table reference type in Scan: {:?}", other),
90                    },
91                );
92                match (mode, &table.table) {
93                    (OperationMode::SELECT, Table::BTree(_)) => {
94                        let root_page = table.btree().unwrap().root_page;
95                        program.emit_insn(Insn::OpenReadAsync {
96                            cursor_id,
97                            root_page,
98                        });
99                        program.emit_insn(Insn::OpenReadAwait {});
100                    }
101                    (OperationMode::DELETE, Table::BTree(_)) => {
102                        let root_page = table.btree().unwrap().root_page;
103                        program.emit_insn(Insn::OpenWriteAsync {
104                            cursor_id,
105                            root_page,
106                        });
107                        program.emit_insn(Insn::OpenWriteAwait {});
108                    }
109                    (OperationMode::SELECT, Table::Virtual(_)) => {
110                        program.emit_insn(Insn::VOpenAsync { cursor_id });
111                        program.emit_insn(Insn::VOpenAwait {});
112                    }
113                    (OperationMode::DELETE, Table::Virtual(_)) => {
114                        program.emit_insn(Insn::VOpenAsync { cursor_id });
115                        program.emit_insn(Insn::VOpenAwait {});
116                    }
117                    _ => {
118                        unimplemented!()
119                    }
120                }
121            }
122            Operation::Search(search) => {
123                let table_cursor_id = program.alloc_cursor_id(
124                    Some(table.identifier.clone()),
125                    CursorType::BTreeTable(table.btree().unwrap().clone()),
126                );
127
128                match mode {
129                    OperationMode::SELECT => {
130                        program.emit_insn(Insn::OpenReadAsync {
131                            cursor_id: table_cursor_id,
132                            root_page: table.table.get_root_page(),
133                        });
134                        program.emit_insn(Insn::OpenReadAwait {});
135                    }
136                    OperationMode::DELETE => {
137                        program.emit_insn(Insn::OpenWriteAsync {
138                            cursor_id: table_cursor_id,
139                            root_page: table.table.get_root_page(),
140                        });
141                        program.emit_insn(Insn::OpenWriteAwait {});
142                    }
143                    _ => {
144                        unimplemented!()
145                    }
146                }
147
148                if let Search::IndexSearch { index, .. } = search {
149                    let index_cursor_id = program.alloc_cursor_id(
150                        Some(index.name.clone()),
151                        CursorType::BTreeIndex(index.clone()),
152                    );
153
154                    match mode {
155                        OperationMode::SELECT => {
156                            program.emit_insn(Insn::OpenReadAsync {
157                                cursor_id: index_cursor_id,
158                                root_page: index.root_page,
159                            });
160                            program.emit_insn(Insn::OpenReadAwait);
161                        }
162                        OperationMode::DELETE => {
163                            program.emit_insn(Insn::OpenWriteAsync {
164                                cursor_id: index_cursor_id,
165                                root_page: index.root_page,
166                            });
167                            program.emit_insn(Insn::OpenWriteAwait {});
168                        }
169                        _ => {
170                            unimplemented!()
171                        }
172                    }
173                }
174            }
175            _ => {}
176        }
177    }
178
179    Ok(())
180}
181
182/// Set up the main query execution loop
183/// For example in the case of a nested table scan, this means emitting the RewindAsync instruction
184/// for all tables involved, outermost first.
185pub fn open_loop(
186    program: &mut ProgramBuilder,
187    t_ctx: &mut TranslateCtx,
188    tables: &[TableReference],
189    predicates: &[WhereTerm],
190) -> Result<()> {
191    for (table_index, table) in tables.iter().enumerate() {
192        let LoopLabels {
193            loop_start,
194            loop_end,
195            next,
196        } = *t_ctx
197            .labels_main_loop
198            .get(table_index)
199            .expect("table has no loop labels");
200
201        // Each OUTER JOIN has a "match flag" that is initially set to false,
202        // and is set to true when a match is found for the OUTER JOIN.
203        // This is used to determine whether to emit actual columns or NULLs for the columns of the right table.
204        if let Some(join_info) = table.join_info.as_ref() {
205            if join_info.outer {
206                let lj_meta = t_ctx.meta_left_joins[table_index].as_ref().unwrap();
207                program.emit_insn(Insn::Integer {
208                    value: 0,
209                    dest: lj_meta.reg_match_flag,
210                });
211            }
212        }
213
214        match &table.op {
215            Operation::Subquery { plan, .. } => {
216                let (yield_reg, coroutine_implementation_start) = match &plan.query_type {
217                    SelectQueryType::Subquery {
218                        yield_reg,
219                        coroutine_implementation_start,
220                    } => (*yield_reg, *coroutine_implementation_start),
221                    _ => unreachable!("Subquery operator with non-subquery query type"),
222                };
223                // In case the subquery is an inner loop, it needs to be reinitialized on each iteration of the outer loop.
224                program.emit_insn(Insn::InitCoroutine {
225                    yield_reg,
226                    jump_on_definition: BranchOffset::Offset(0),
227                    start_offset: coroutine_implementation_start,
228                });
229                program.resolve_label(loop_start, program.offset());
230                // A subquery within the main loop of a parent query has no cursor, so instead of advancing the cursor,
231                // it emits a Yield which jumps back to the main loop of the subquery itself to retrieve the next row.
232                // When the subquery coroutine completes, this instruction jumps to the label at the top of the termination_label_stack,
233                // which in this case is the end of the Yield-Goto loop in the parent query.
234                program.emit_insn(Insn::Yield {
235                    yield_reg,
236                    end_offset: loop_end,
237                });
238
239                // These are predicates evaluated outside of the subquery,
240                // so they are translated here.
241                // E.g. SELECT foo FROM (SELECT bar as foo FROM t1) sub WHERE sub.foo > 10
242                for cond in predicates
243                    .iter()
244                    .filter(|cond| cond.should_eval_at_loop(table_index))
245                {
246                    let jump_target_when_true = program.allocate_label();
247                    let condition_metadata = ConditionMetadata {
248                        jump_if_condition_is_true: false,
249                        jump_target_when_true,
250                        jump_target_when_false: next,
251                    };
252                    translate_condition_expr(
253                        program,
254                        tables,
255                        &cond.expr,
256                        condition_metadata,
257                        &t_ctx.resolver,
258                    )?;
259                    program.resolve_label(jump_target_when_true, program.offset());
260                }
261            }
262            Operation::Scan { iter_dir } => {
263                let cursor_id = program.resolve_cursor_id(&table.identifier);
264
265                if !matches!(&table.table, Table::Virtual(_)) {
266                    if iter_dir
267                        .as_ref()
268                        .is_some_and(|dir| *dir == IterationDirection::Backwards)
269                    {
270                        program.emit_insn(Insn::LastAsync { cursor_id });
271                    } else {
272                        program.emit_insn(Insn::RewindAsync { cursor_id });
273                    }
274                }
275                match &table.table {
276                    Table::BTree(_) => program.emit_insn(
277                        if iter_dir
278                            .as_ref()
279                            .is_some_and(|dir| *dir == IterationDirection::Backwards)
280                        {
281                            Insn::LastAwait {
282                                cursor_id,
283                                pc_if_empty: loop_end,
284                            }
285                        } else {
286                            Insn::RewindAwait {
287                                cursor_id,
288                                pc_if_empty: loop_end,
289                            }
290                        },
291                    ),
292                    Table::Virtual(ref table) => {
293                        let start_reg = program
294                            .alloc_registers(table.args.as_ref().map(|a| a.len()).unwrap_or(0));
295                        let mut cur_reg = start_reg;
296                        let args = match table.args.as_ref() {
297                            Some(args) => args,
298                            None => &vec![],
299                        };
300                        for arg in args {
301                            let reg = cur_reg;
302                            cur_reg += 1;
303                            let _ =
304                                translate_expr(program, Some(tables), arg, reg, &t_ctx.resolver)?;
305                        }
306                        program.emit_insn(Insn::VFilter {
307                            cursor_id,
308                            pc_if_empty: loop_end,
309                            arg_count: table.args.as_ref().map_or(0, |args| args.len()),
310                            args_reg: start_reg,
311                        });
312                    }
313                    other => panic!("Unsupported table reference type: {:?}", other),
314                }
315                program.resolve_label(loop_start, program.offset());
316
317                for cond in predicates
318                    .iter()
319                    .filter(|cond| cond.should_eval_at_loop(table_index))
320                {
321                    let jump_target_when_true = program.allocate_label();
322                    let condition_metadata = ConditionMetadata {
323                        jump_if_condition_is_true: false,
324                        jump_target_when_true,
325                        jump_target_when_false: next,
326                    };
327                    translate_condition_expr(
328                        program,
329                        tables,
330                        &cond.expr,
331                        condition_metadata,
332                        &t_ctx.resolver,
333                    )?;
334                    program.resolve_label(jump_target_when_true, program.offset());
335                }
336            }
337            Operation::Search(search) => {
338                let table_cursor_id = program.resolve_cursor_id(&table.identifier);
339                // Open the loop for the index search.
340                // Rowid equality point lookups are handled with a SeekRowid instruction which does not loop, since it is a single row lookup.
341                if !matches!(search, Search::RowidEq { .. }) {
342                    let index_cursor_id = if let Search::IndexSearch { index, .. } = search {
343                        Some(program.resolve_cursor_id(&index.name))
344                    } else {
345                        None
346                    };
347                    let cmp_reg = program.alloc_register();
348                    let (cmp_expr, cmp_op) = match search {
349                        Search::IndexSearch {
350                            cmp_expr, cmp_op, ..
351                        } => (cmp_expr, cmp_op),
352                        Search::RowidSearch { cmp_expr, cmp_op } => (cmp_expr, cmp_op),
353                        Search::RowidEq { .. } => unreachable!(),
354                    };
355
356                    // TODO this only handles ascending indexes
357                    match cmp_op {
358                        ast::Operator::Equals
359                        | ast::Operator::Greater
360                        | ast::Operator::GreaterEquals => {
361                            translate_expr(
362                                program,
363                                Some(tables),
364                                &cmp_expr.expr,
365                                cmp_reg,
366                                &t_ctx.resolver,
367                            )?;
368                        }
369                        ast::Operator::Less | ast::Operator::LessEquals => {
370                            program.emit_insn(Insn::Null {
371                                dest: cmp_reg,
372                                dest_end: None,
373                            });
374                        }
375                        _ => unreachable!(),
376                    }
377                    // If we try to seek to a key that is not present in the table/index, we exit the loop entirely.
378                    program.emit_insn(match cmp_op {
379                        ast::Operator::Equals | ast::Operator::GreaterEquals => Insn::SeekGE {
380                            is_index: index_cursor_id.is_some(),
381                            cursor_id: index_cursor_id.unwrap_or(table_cursor_id),
382                            start_reg: cmp_reg,
383                            num_regs: 1,
384                            target_pc: loop_end,
385                        },
386                        ast::Operator::Greater
387                        | ast::Operator::Less
388                        | ast::Operator::LessEquals => Insn::SeekGT {
389                            is_index: index_cursor_id.is_some(),
390                            cursor_id: index_cursor_id.unwrap_or(table_cursor_id),
391                            start_reg: cmp_reg,
392                            num_regs: 1,
393                            target_pc: loop_end,
394                        },
395                        _ => unreachable!(),
396                    });
397                    if *cmp_op == ast::Operator::Less || *cmp_op == ast::Operator::LessEquals {
398                        translate_expr(
399                            program,
400                            Some(tables),
401                            &cmp_expr.expr,
402                            cmp_reg,
403                            &t_ctx.resolver,
404                        )?;
405                    }
406
407                    program.resolve_label(loop_start, program.offset());
408                    // TODO: We are currently only handling ascending indexes.
409                    // For conditions like index_key > 10, we have already sought to the first key greater than 10, and can just scan forward.
410                    // For conditions like index_key < 10, we are at the beginning of the index, and will scan forward and emit IdxGE(10) with a conditional jump to the end.
411                    // For conditions like index_key = 10, we have already sought to the first key greater than or equal to 10, and can just scan forward and emit IdxGT(10) with a conditional jump to the end.
412                    // For conditions like index_key >= 10, we have already sought to the first key greater than or equal to 10, and can just scan forward.
413                    // For conditions like index_key <= 10, we are at the beginning of the index, and will scan forward and emit IdxGT(10) with a conditional jump to the end.
414                    // For conditions like index_key != 10, TODO. probably the optimal way is not to use an index at all.
415                    //
416                    // For primary key searches we emit RowId and then compare it to the seek value.
417
418                    match cmp_op {
419                        ast::Operator::Equals | ast::Operator::LessEquals => {
420                            if let Some(index_cursor_id) = index_cursor_id {
421                                program.emit_insn(Insn::IdxGT {
422                                    cursor_id: index_cursor_id,
423                                    start_reg: cmp_reg,
424                                    num_regs: 1,
425                                    target_pc: loop_end,
426                                });
427                            } else {
428                                let rowid_reg = program.alloc_register();
429                                program.emit_insn(Insn::RowId {
430                                    cursor_id: table_cursor_id,
431                                    dest: rowid_reg,
432                                });
433                                program.emit_insn(Insn::Gt {
434                                    lhs: rowid_reg,
435                                    rhs: cmp_reg,
436                                    target_pc: loop_end,
437                                    flags: CmpInsFlags::default(),
438                                });
439                            }
440                        }
441                        ast::Operator::Less => {
442                            if let Some(index_cursor_id) = index_cursor_id {
443                                program.emit_insn(Insn::IdxGE {
444                                    cursor_id: index_cursor_id,
445                                    start_reg: cmp_reg,
446                                    num_regs: 1,
447                                    target_pc: loop_end,
448                                });
449                            } else {
450                                let rowid_reg = program.alloc_register();
451                                program.emit_insn(Insn::RowId {
452                                    cursor_id: table_cursor_id,
453                                    dest: rowid_reg,
454                                });
455                                program.emit_insn(Insn::Ge {
456                                    lhs: rowid_reg,
457                                    rhs: cmp_reg,
458                                    target_pc: loop_end,
459                                    flags: CmpInsFlags::default(),
460                                });
461                            }
462                        }
463                        _ => {}
464                    }
465
466                    if let Some(index_cursor_id) = index_cursor_id {
467                        program.emit_insn(Insn::DeferredSeek {
468                            index_cursor_id,
469                            table_cursor_id,
470                        });
471                    }
472                }
473
474                if let Search::RowidEq { cmp_expr } = search {
475                    let src_reg = program.alloc_register();
476                    translate_expr(
477                        program,
478                        Some(tables),
479                        &cmp_expr.expr,
480                        src_reg,
481                        &t_ctx.resolver,
482                    )?;
483                    program.emit_insn(Insn::SeekRowid {
484                        cursor_id: table_cursor_id,
485                        src_reg,
486                        target_pc: next,
487                    });
488                }
489                for cond in predicates
490                    .iter()
491                    .filter(|cond| cond.should_eval_at_loop(table_index))
492                {
493                    let jump_target_when_true = program.allocate_label();
494                    let condition_metadata = ConditionMetadata {
495                        jump_if_condition_is_true: false,
496                        jump_target_when_true,
497                        jump_target_when_false: next,
498                    };
499                    translate_condition_expr(
500                        program,
501                        tables,
502                        &cond.expr,
503                        condition_metadata,
504                        &t_ctx.resolver,
505                    )?;
506                    program.resolve_label(jump_target_when_true, program.offset());
507                }
508            }
509        }
510
511        // Set the match flag to true if this is a LEFT JOIN.
512        // At this point of execution we are going to emit columns for the left table,
513        // and either emit columns or NULLs for the right table, depending on whether the null_flag is set
514        // for the right table's cursor.
515        if let Some(join_info) = table.join_info.as_ref() {
516            if join_info.outer {
517                let lj_meta = t_ctx.meta_left_joins[table_index].as_ref().unwrap();
518                program.resolve_label(lj_meta.label_match_flag_set_true, program.offset());
519                program.emit_insn(Insn::Integer {
520                    value: 1,
521                    dest: lj_meta.reg_match_flag,
522                });
523            }
524        }
525    }
526
527    Ok(())
528}
529
530/// SQLite (and so Limbo) processes joins as a nested loop.
531/// The loop may emit rows to various destinations depending on the query:
532/// - a GROUP BY sorter (grouping is done by sorting based on the GROUP BY keys and aggregating while the GROUP BY keys match)
533/// - an ORDER BY sorter (when there is no GROUP BY, but there is an ORDER BY)
534/// - an AggStep (the columns are collected for aggregation, which is finished later)
535/// - a QueryResult (there is none of the above, so the loop either emits a ResultRow, or if it's a subquery, yields to the parent query)
536enum LoopEmitTarget {
537    GroupBySorter,
538    OrderBySorter,
539    AggStep,
540    QueryResult,
541}
542
543/// Emits the bytecode for the inner loop of a query.
544/// At this point the cursors for all tables have been opened and rewound.
545pub fn emit_loop(
546    program: &mut ProgramBuilder,
547    t_ctx: &mut TranslateCtx,
548    plan: &mut SelectPlan,
549) -> Result<()> {
550    // if we have a group by, we emit a record into the group by sorter.
551    if plan.group_by.is_some() {
552        return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBySorter);
553    }
554    // if we DONT have a group by, but we have aggregates, we emit without ResultRow.
555    // we also do not need to sort because we are emitting a single row.
556    if !plan.aggregates.is_empty() {
557        return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::AggStep);
558    }
559    // if we DONT have a group by, but we have an order by, we emit a record into the order by sorter.
560    if plan.order_by.is_some() {
561        return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::OrderBySorter);
562    }
563    // if we have neither, we emit a ResultRow. In that case, if we have a Limit, we handle that with DecrJumpZero.
564    emit_loop_source(program, t_ctx, plan, LoopEmitTarget::QueryResult)
565}
566
567/// This is a helper function for inner_loop_emit,
568/// which does a different thing depending on the emit target.
569/// See the InnerLoopEmitTarget enum for more details.
570fn emit_loop_source(
571    program: &mut ProgramBuilder,
572    t_ctx: &mut TranslateCtx,
573    plan: &SelectPlan,
574    emit_target: LoopEmitTarget,
575) -> Result<()> {
576    match emit_target {
577        LoopEmitTarget::GroupBySorter => {
578            let group_by = plan.group_by.as_ref().unwrap();
579            let aggregates = &plan.aggregates;
580            let sort_keys_count = group_by.exprs.len();
581            let aggregate_arguments_count = plan
582                .aggregates
583                .iter()
584                .map(|agg| agg.args.len())
585                .sum::<usize>();
586            let column_count = sort_keys_count + aggregate_arguments_count;
587            let start_reg = program.alloc_registers(column_count);
588            let mut cur_reg = start_reg;
589
590            // The group by sorter rows will contain the grouping keys first. They are also the sort keys.
591            for expr in group_by.exprs.iter() {
592                let key_reg = cur_reg;
593                cur_reg += 1;
594                translate_expr(
595                    program,
596                    Some(&plan.table_references),
597                    expr,
598                    key_reg,
599                    &t_ctx.resolver,
600                )?;
601            }
602            // Then we have the aggregate arguments.
603            for agg in aggregates.iter() {
604                // Here we are collecting scalars for the group by sorter, which will include
605                // both the group by expressions and the aggregate arguments.
606                // e.g. in `select u.first_name, sum(u.age) from users group by u.first_name`
607                // the sorter will have two scalars: u.first_name and u.age.
608                // these are then sorted by u.first_name, and for each u.first_name, we sum the u.age.
609                // the actual aggregation is done later.
610                for expr in agg.args.iter() {
611                    let agg_reg = cur_reg;
612                    cur_reg += 1;
613                    translate_expr(
614                        program,
615                        Some(&plan.table_references),
616                        expr,
617                        agg_reg,
618                        &t_ctx.resolver,
619                    )?;
620                }
621            }
622
623            // TODO: although it's less often useful, SQLite does allow for expressions in the SELECT that are not part of a GROUP BY or aggregate.
624            // We currently ignore those and only emit the GROUP BY keys and aggregate arguments. This should be fixed.
625
626            let group_by_metadata = t_ctx.meta_group_by.as_ref().unwrap();
627
628            sorter_insert(
629                program,
630                start_reg,
631                column_count,
632                group_by_metadata.sort_cursor,
633                group_by_metadata.reg_sorter_key,
634            );
635
636            Ok(())
637        }
638        LoopEmitTarget::OrderBySorter => order_by_sorter_insert(program, t_ctx, plan),
639        LoopEmitTarget::AggStep => {
640            let num_aggs = plan.aggregates.len();
641            let start_reg = program.alloc_registers(num_aggs);
642            t_ctx.reg_agg_start = Some(start_reg);
643
644            // In planner.rs, we have collected all aggregates from the SELECT clause, including ones where the aggregate is embedded inside
645            // a more complex expression. Some examples: length(sum(x)), sum(x) + avg(y), sum(x) + 1, etc.
646            // The result of those more complex expressions depends on the final result of the aggregate, so we don't translate the complete expressions here.
647            // Instead, we accumulate the intermediate results of all aggreagates, and evaluate any expressions that do not contain aggregates.
648            for (i, agg) in plan.aggregates.iter().enumerate() {
649                let reg = start_reg + i;
650                translate_aggregation_step(
651                    program,
652                    &plan.table_references,
653                    agg,
654                    reg,
655                    &t_ctx.resolver,
656                )?;
657            }
658            for (i, rc) in plan.result_columns.iter().enumerate() {
659                if rc.contains_aggregates {
660                    // Do nothing, aggregates are computed above
661                    // if this result column is e.g. something like sum(x) + 1 or length(sum(x)), we do not want to translate that (+1) or length() yet,
662                    // it will be computed after the aggregations are finalized.
663                    continue;
664                }
665                let reg = start_reg + num_aggs + i;
666                translate_expr(
667                    program,
668                    Some(&plan.table_references),
669                    &rc.expr,
670                    reg,
671                    &t_ctx.resolver,
672                )?;
673            }
674            Ok(())
675        }
676        LoopEmitTarget::QueryResult => {
677            assert!(
678                plan.aggregates.is_empty(),
679                "We should not get here with aggregates"
680            );
681            let offset_jump_to = t_ctx
682                .labels_main_loop
683                .first()
684                .map(|l| l.next)
685                .or(t_ctx.label_main_loop_end);
686            emit_select_result(
687                program,
688                t_ctx,
689                plan,
690                t_ctx.label_main_loop_end,
691                offset_jump_to,
692            )?;
693
694            Ok(())
695        }
696    }
697}
698
699/// Closes the loop for a given source operator.
700/// For example in the case of a nested table scan, this means emitting the NextAsync instruction
701/// for all tables involved, innermost first.
702pub fn close_loop(
703    program: &mut ProgramBuilder,
704    t_ctx: &mut TranslateCtx,
705    tables: &[TableReference],
706) -> Result<()> {
707    // We close the loops for all tables in reverse order, i.e. innermost first.
708    // OPEN t1
709    //   OPEN t2
710    //     OPEN t3
711    //       <do stuff>
712    //     CLOSE t3
713    //   CLOSE t2
714    // CLOSE t1
715    for (idx, table) in tables.iter().rev().enumerate() {
716        let table_index = tables.len() - idx - 1;
717        let loop_labels = *t_ctx
718            .labels_main_loop
719            .get(table_index)
720            .expect("source has no loop labels");
721
722        match &table.op {
723            Operation::Subquery { .. } => {
724                program.resolve_label(loop_labels.next, program.offset());
725                // A subquery has no cursor to call NextAsync on, so it just emits a Goto
726                // to the Yield instruction, which in turn jumps back to the main loop of the subquery,
727                // so that the next row from the subquery can be read.
728                program.emit_insn(Insn::Goto {
729                    target_pc: loop_labels.loop_start,
730                });
731            }
732            Operation::Scan { iter_dir, .. } => {
733                program.resolve_label(loop_labels.next, program.offset());
734                let cursor_id = program.resolve_cursor_id(&table.identifier);
735                match &table.table {
736                    Table::BTree(_) => {
737                        if iter_dir
738                            .as_ref()
739                            .is_some_and(|dir| *dir == IterationDirection::Backwards)
740                        {
741                            program.emit_insn(Insn::PrevAsync { cursor_id });
742                        } else {
743                            program.emit_insn(Insn::NextAsync { cursor_id });
744                        }
745                        if iter_dir
746                            .as_ref()
747                            .is_some_and(|dir| *dir == IterationDirection::Backwards)
748                        {
749                            program.emit_insn(Insn::PrevAwait {
750                                cursor_id,
751                                pc_if_next: loop_labels.loop_start,
752                            });
753                        } else {
754                            program.emit_insn(Insn::NextAwait {
755                                cursor_id,
756                                pc_if_next: loop_labels.loop_start,
757                            });
758                        }
759                    }
760                    Table::Virtual(_) => {
761                        program.emit_insn(Insn::VNext {
762                            cursor_id,
763                            pc_if_next: loop_labels.loop_start,
764                        });
765                    }
766                    other => unreachable!("Unsupported table reference type: {:?}", other),
767                }
768            }
769            Operation::Search(search) => {
770                program.resolve_label(loop_labels.next, program.offset());
771                // Rowid equality point lookups are handled with a SeekRowid instruction which does not loop, so there is no need to emit a NextAsync instruction.
772                if !matches!(search, Search::RowidEq { .. }) {
773                    let cursor_id = match search {
774                        Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
775                        Search::RowidSearch { .. } => program.resolve_cursor_id(&table.identifier),
776                        Search::RowidEq { .. } => unreachable!(),
777                    };
778
779                    program.emit_insn(Insn::NextAsync { cursor_id });
780                    program.emit_insn(Insn::NextAwait {
781                        cursor_id,
782                        pc_if_next: loop_labels.loop_start,
783                    });
784                }
785            }
786        }
787
788        program.resolve_label(loop_labels.loop_end, program.offset());
789
790        // Handle OUTER JOIN logic. The reason this comes after the "loop end" mark is that we may need to still jump back
791        // and emit a row with NULLs for the right table, and then jump back to the next row of the left table.
792        if let Some(join_info) = table.join_info.as_ref() {
793            if join_info.outer {
794                let lj_meta = t_ctx.meta_left_joins[table_index].as_ref().unwrap();
795                // The left join match flag is set to 1 when there is any match on the right table
796                // (e.g. SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a).
797                // If the left join match flag has been set to 1, we jump to the next row on the outer table,
798                // i.e. continue to the next row of t1 in our example.
799                program.resolve_label(lj_meta.label_match_flag_check_value, program.offset());
800                let jump_offset = program.offset().add(3u32);
801                program.emit_insn(Insn::IfPos {
802                    reg: lj_meta.reg_match_flag,
803                    target_pc: jump_offset,
804                    decrement_by: 0,
805                });
806                // If the left join match flag is still 0, it means there was no match on the right table,
807                // but since it's a LEFT JOIN, we still need to emit a row with NULLs for the right table.
808                // In that case, we now enter the routine that does exactly that.
809                // First we set the right table cursor's "pseudo null bit" on, which means any Insn::Column will return NULL
810                let right_cursor_id = match &table.op {
811                    Operation::Scan { .. } => program.resolve_cursor_id(&table.identifier),
812                    Operation::Search { .. } => program.resolve_cursor_id(&table.identifier),
813                    _ => unreachable!(),
814                };
815                program.emit_insn(Insn::NullRow {
816                    cursor_id: right_cursor_id,
817                });
818                // Then we jump to setting the left join match flag to 1 again,
819                // but this time the right table cursor will set everything to null.
820                // This leads to emitting a row with cols from the left + nulls from the right,
821                // and we will end up back in the IfPos instruction above, which will then
822                // check the match flag again, and since it is now 1, we will jump to the
823                // next row in the left table.
824                program.emit_insn(Insn::Goto {
825                    target_pc: lj_meta.label_match_flag_set_true,
826                });
827
828                assert_eq!(program.offset(), jump_offset);
829            }
830        }
831    }
832    Ok(())
833}