Skip to main content

quill_sql/execution/physical_plan/
create_index.rs

1use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
2use crate::error::QuillSQLError;
3use crate::expression::{ColumnExpr, Expr};
4use crate::plan::logical_plan::OrderByExpr;
5use crate::storage::tuple::Tuple;
6use crate::utils::table_ref::TableReference;
7use crate::{
8    error::QuillSQLResult,
9    execution::{ExecutionContext, VolcanoExecutor},
10};
11use std::sync::Arc;
12
13#[derive(Debug, derive_new::new)]
14pub struct PhysicalCreateIndex {
15    pub name: String,
16    pub table: TableReference,
17    pub table_schema: SchemaRef,
18    pub columns: Vec<OrderByExpr>,
19}
20
21impl VolcanoExecutor for PhysicalCreateIndex {
22    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
23        let mut key_indices = vec![];
24        for col in self.columns.iter() {
25            match col.expr.as_ref() {
26                Expr::Column(ColumnExpr { name, .. }) => {
27                    key_indices.push(self.table_schema.index_of(None, name)?);
28                }
29                _ => {
30                    return Err(QuillSQLError::Execution(format!(
31                        "The expr should be column instead of {}",
32                        col.expr
33                    )))
34                }
35            }
36        }
37        let key_schema = Arc::new(self.table_schema.project(&key_indices)?);
38        context
39            .catalog
40            .create_index(self.name.clone(), &self.table, key_schema)?;
41        backfill_index(context, &self.table, &self.name)?;
42        Ok(None)
43    }
44    fn output_schema(&self) -> SchemaRef {
45        EMPTY_SCHEMA_REF.clone()
46    }
47}
48
49fn backfill_index(
50    context: &mut ExecutionContext,
51    table: &TableReference,
52    index_name: &str,
53) -> QuillSQLResult<()> {
54    let binding = context.table(table)?;
55    let index = binding
56        .indexes()
57        .iter()
58        .find(|index| index.name() == index_name)
59        .cloned()
60        .ok_or_else(|| QuillSQLError::Execution(format!("index {index_name} not found")))?;
61    let mut stream = binding.scan()?;
62    while let Some((rid, meta, tuple)) = stream.next()? {
63        if meta.is_deleted || !context.txn_ctx().is_visible(&meta) {
64            continue;
65        }
66        if let Ok(key) = tuple.project_with_schema(index.key_schema()) {
67            index.insert(&key, rid, context.txn_ctx().txn_id())?;
68        }
69    }
70    Ok(())
71}
72
73impl std::fmt::Display for PhysicalCreateIndex {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "CreateIndex: {}", self.name)
76    }
77}