quill_sql/execution/physical_plan/
create_index.rs1use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
2use crate::error::QuillSQLError;
3use crate::expression::{ColumnExpr, Expr};
4use crate::plan::logical_plan::OrderByExpr;
5use crate::storage::tuple::Tuple;
6use crate::utils::table_ref::TableReference;
7use crate::{
8 error::QuillSQLResult,
9 execution::{ExecutionContext, VolcanoExecutor},
10};
11use std::sync::Arc;
12
13#[derive(Debug, derive_new::new)]
14pub struct PhysicalCreateIndex {
15 pub name: String,
16 pub table: TableReference,
17 pub table_schema: SchemaRef,
18 pub columns: Vec<OrderByExpr>,
19}
20
21impl VolcanoExecutor for PhysicalCreateIndex {
22 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
23 let mut key_indices = vec![];
24 for col in self.columns.iter() {
25 match col.expr.as_ref() {
26 Expr::Column(ColumnExpr { name, .. }) => {
27 key_indices.push(self.table_schema.index_of(None, name)?);
28 }
29 _ => {
30 return Err(QuillSQLError::Execution(format!(
31 "The expr should be column instead of {}",
32 col.expr
33 )))
34 }
35 }
36 }
37 let key_schema = Arc::new(self.table_schema.project(&key_indices)?);
38 context
39 .catalog
40 .create_index(self.name.clone(), &self.table, key_schema)?;
41 backfill_index(context, &self.table, &self.name)?;
42 Ok(None)
43 }
44 fn output_schema(&self) -> SchemaRef {
45 EMPTY_SCHEMA_REF.clone()
46 }
47}
48
49fn backfill_index(
50 context: &mut ExecutionContext,
51 table: &TableReference,
52 index_name: &str,
53) -> QuillSQLResult<()> {
54 let binding = context.table(table)?;
55 let index = binding
56 .indexes()
57 .iter()
58 .find(|index| index.name() == index_name)
59 .cloned()
60 .ok_or_else(|| QuillSQLError::Execution(format!("index {index_name} not found")))?;
61 let mut stream = binding.scan()?;
62 while let Some((rid, meta, tuple)) = stream.next()? {
63 if meta.is_deleted || !context.txn_ctx().is_visible(&meta) {
64 continue;
65 }
66 if let Ok(key) = tuple.project_with_schema(index.key_schema()) {
67 index.insert(&key, rid, context.txn_ctx().txn_id())?;
68 }
69 }
70 Ok(())
71}
72
73impl std::fmt::Display for PhysicalCreateIndex {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "CreateIndex: {}", self.name)
76 }
77}