ella-engine 0.1.5

Core engine implementation for the ella datastore.
Documentation
use std::sync::Arc;

use dashmap::DashMap;
use datafusion::{catalog::CatalogList, error::DataFusionError};

use crate::{
    catalog::EllaCatalog,
    engine::EllaState,
    registry::{
        snapshot::Snapshot,
        transactions::{CreateCatalog, DropCatalog},
        Id, TransactionLog,
    },
    Path,
};

#[derive(Debug)]
pub struct EllaCluster {
    catalogs: DashMap<Id<'static>, Arc<EllaCatalog>>,
    log: Arc<TransactionLog>,
    root: Path,
}

impl EllaCluster {
    pub fn new(log: Arc<TransactionLog>, root: Path) -> Self {
        Self {
            catalogs: DashMap::new(),
            log,
            root,
        }
    }

    pub fn catalogs(&self) -> Vec<Arc<EllaCatalog>> {
        self.catalogs.iter().map(|c| c.value().clone()).collect()
    }

    pub fn catalog<'a>(&self, id: impl Into<Id<'a>>) -> Option<Arc<EllaCatalog>> {
        let id: Id<'a> = id.into();
        self.catalogs.get(id.as_ref()).map(|c| c.value().clone())
    }

    pub async fn create_catalog<'a>(
        &self,
        id: impl Into<Id<'a>>,
        if_not_exists: bool,
    ) -> crate::Result<Arc<EllaCatalog>> {
        let id: Id<'static> = id.into().into_owned();
        match (if_not_exists, self.catalog(id.as_ref())) {
            (true, Some(catalog)) => Ok(catalog),
            (true, None) | (false, None) => {
                let catalog = Arc::new(EllaCatalog::new(
                    id.clone().into(),
                    self.log.clone(),
                    self.root.clone(),
                ));
                self.register(id, catalog.clone()).await?;
                Ok(catalog)
            }
            (false, Some(_)) => Err(crate::EngineError::CatalogExists(id.to_string()).into()),
        }
    }

    pub(crate) async fn register(
        &self,
        id: Id<'static>,
        catalog: Arc<EllaCatalog>,
    ) -> crate::Result<Option<Arc<EllaCatalog>>> {
        self.log
            .commit(CreateCatalog::new(id.clone().into(), &self.root))
            .await?;
        Ok(self.catalogs.insert(id, catalog))
    }

    pub async fn deregister<'a>(&self, id: impl Into<Id<'a>>, cascade: bool) -> crate::Result<()> {
        let id: Id<'a> = id.into();
        if let Some(catalog) = self.catalog(id.as_ref()) {
            match (cascade, catalog.is_empty()) {
                (true, _) | (false, true) => {
                    let (_, catalog) = self
                        .catalogs
                        .remove(id.as_ref())
                        .ok_or_else(|| crate::EngineError::CatalogNotFound(id.to_string()))?;
                    catalog.drop_schemas().await?;
                    self.log
                        .commit(DropCatalog::new(id.into_owned().into()))
                        .await?;
                    Ok(())
                }
                (false, false) => Err(DataFusionError::Execution(format!(
                    "cannot remove non-empty catalog {}",
                    id,
                ))
                .into()),
            }
        } else {
            Err(crate::EngineError::CatalogNotFound(id.to_string()).into())
        }
    }

    pub(crate) async fn close(&self) -> crate::Result<()> {
        let results = futures::future::join_all(
            self.catalogs()
                .into_iter()
                .map(|c| async move { c.close().await }),
        )
        .await;
        results
            .into_iter()
            .find(|res| res.is_err())
            .unwrap_or_else(|| Ok(()))
    }

    pub(crate) fn load(&self, snapshot: &Snapshot, state: &EllaState) -> crate::Result<()> {
        for catalog in &snapshot.catalogs {
            self.catalogs.insert(
                catalog.id.clone().into(),
                Arc::new(EllaCatalog::load(catalog, state)?),
            );
        }
        self.resolve(state)?;
        Ok(())
    }

    pub(crate) fn resolve(&self, state: &EllaState) -> crate::Result<()> {
        for catalog in &self.catalogs {
            catalog.resolve(state)?;
        }
        Ok(())
    }
}

impl CatalogList for EllaCluster {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn register_catalog(
        &self,
        _name: String,
        _catalog: std::sync::Arc<dyn datafusion::catalog::CatalogProvider>,
    ) -> Option<std::sync::Arc<dyn datafusion::catalog::CatalogProvider>> {
        unimplemented!()
    }

    fn catalog_names(&self) -> Vec<String> {
        self.catalogs.iter().map(|c| c.key().to_string()).collect()
    }

    fn catalog(
        &self,
        name: &str,
    ) -> Option<std::sync::Arc<dyn datafusion::catalog::CatalogProvider>> {
        self.catalog(name).map(|c| c as Arc<_>)
    }
}