ella-engine 0.1.5

Core engine implementation for the ella datastore.
Documentation
use std::{fmt::Debug, pin::Pin, sync::Arc};

use arrow_schema::Schema;
use datafusion::{
    arrow::compute::concat_batches,
    datasource::provider_as_source,
    logical_expr::{DdlStatement, LogicalPlan, LogicalPlanBuilder},
    physical_plan::{
        execute_stream, stream::RecordBatchStreamAdapter, RecordBatchStream,
        SendableRecordBatchStream,
    },
};
use ella_tensor::DataFrame;
use futures::TryStreamExt;

use crate::{
    engine::EllaState,
    registry::{SchemaId, TableRef},
    table::info::{ViewBuilder, ViewInfo},
    Plan,
};

#[async_trait::async_trait]
pub trait LazyBackend: Debug + Send + Sync {
    async fn stream(&self, plan: &Plan) -> crate::Result<SendableRecordBatchStream>;

    async fn create_view(
        &self,
        table: TableRef<'static>,
        info: ViewInfo,
        if_not_exists: bool,
        or_replace: bool,
    ) -> crate::Result<Plan>;

    async fn execute(&self, plan: &Plan) -> crate::Result<DataFrame> {
        let stream = self.stream(plan).await?;
        let schema = stream.schema();
        let batches = stream.try_collect::<Vec<_>>().await?;

        concat_batches(&schema, &batches)?.try_into()
    }
}

#[derive(Debug, Clone)]
pub struct LocalBackend {
    state: EllaState,
}

impl LocalBackend {
    pub(crate) fn new(state: EllaState) -> Self {
        Self { state }
    }
}

fn empty() -> Pin<Box<dyn RecordBatchStream + Send + 'static>> {
    Box::pin(RecordBatchStreamAdapter::new(
        Arc::new(Schema::empty()),
        futures::stream::empty(),
    ))
}

#[async_trait::async_trait]
impl LazyBackend for LocalBackend {
    async fn stream(&self, plan: &Plan) -> crate::Result<SendableRecordBatchStream> {
        let plan = plan.resolve(&self.state)?;
        match plan {
            LogicalPlan::Ddl(ddl) => match ddl {
                DdlStatement::CreateView(cmd) => {
                    let name = TableRef::from(cmd.name.clone());
                    let id = self.state.resolve(name.clone());
                    let plan = (*cmd.input).clone();
                    let mut info = ViewBuilder::new(Plan::from_plan(plan));
                    if let Some(definition) = cmd.definition.as_deref() {
                        info = info.definition(definition);
                    }
                    self.state
                        .create_view(id, info.build(), false, cmd.or_replace)
                        .await?;
                    Ok(empty())
                }
                DdlStatement::CreateMemoryTable(_cmd) => {
                    todo!()
                }
                DdlStatement::CreateCatalogSchema(cmd) => {
                    let id =
                        SchemaId::parse(&cmd.schema_name, self.state.default_catalog().clone());
                    self.state
                        .cluster()
                        .catalog(id.catalog.as_ref())
                        .ok_or_else(|| crate::EngineError::CatalogNotFound(id.catalog.to_string()))?
                        .create_schema(id.schema, cmd.if_not_exists)
                        .await?;
                    Ok(empty())
                }
                DdlStatement::CreateCatalog(cmd) => {
                    self.state
                        .cluster()
                        .create_catalog(&cmd.catalog_name, cmd.if_not_exists)
                        .await?;
                    Ok(empty())
                }
                DdlStatement::CreateExternalTable(_cmd) => unimplemented!(),
                DdlStatement::DropTable(cmd) => {
                    let name = TableRef::from(cmd.name.clone());
                    let id = self.state.resolve(name.clone());

                    let schema = self
                        .state
                        .cluster()
                        .catalog(&id.catalog)
                        .and_then(|catalog| catalog.schema(&id.schema));
                    match (cmd.if_exists, schema) {
                        (_, Some(schema)) => {
                            schema.drop_topic(&id.table, cmd.if_exists).await?;
                            Ok(empty())
                        }
                        (true, None) => Ok(empty()),
                        (false, None) => {
                            Err(crate::EngineError::TableNotFound(id.to_string()).into())
                        }
                    }
                }
                DdlStatement::DropView(cmd) => {
                    let name = TableRef::from(cmd.name.clone());
                    let id = self.state.resolve(name.clone());

                    let schema = self
                        .state
                        .cluster()
                        .catalog(&id.catalog)
                        .and_then(|catalog| catalog.schema(&id.schema));
                    match (cmd.if_exists, schema) {
                        (_, Some(schema)) => {
                            schema.drop_view(&id.table, cmd.if_exists).await?;
                            Ok(empty())
                        }
                        (true, None) => Ok(empty()),
                        (false, None) => {
                            Err(crate::EngineError::TableNotFound(id.to_string()).into())
                        }
                    }
                }
                DdlStatement::DropCatalogSchema(cmd) => {
                    let id =
                        SchemaId::resolve(cmd.name.clone(), self.state.default_catalog().clone());

                    let catalog = self.state.cluster().catalog(&id.catalog);
                    match (cmd.if_exists, catalog) {
                        (_, Some(catalog)) => {
                            catalog
                                .deregister(&id.schema, cmd.if_exists, cmd.cascade)
                                .await?;
                            Ok(empty())
                        }
                        (true, None) => Ok(empty()),
                        (false, None) => {
                            Err(crate::EngineError::CatalogNotFound(id.catalog.to_string()).into())
                        }
                    }
                }
            },
            LogicalPlan::Statement(_stmt) => unimplemented!(),
            LogicalPlan::DescribeTable(_desc) => todo!(),
            plan => {
                let plan = self.state.session().create_physical_plan(&plan).await?;

                Ok(execute_stream(plan, self.state.session().task_ctx())?)
            }
        }
    }

    async fn create_view(
        &self,
        table: TableRef<'static>,
        info: ViewInfo,
        if_not_exists: bool,
        or_replace: bool,
    ) -> crate::Result<Plan> {
        let id = self.state.resolve(table);
        let view = self
            .state
            .create_view(id, info, if_not_exists, or_replace)
            .await?;
        let plan = LogicalPlanBuilder::scan(view.table().clone(), provider_as_source(view), None)?
            .build()?;
        Ok(Plan::from_plan(plan))
    }
}