ella-engine 0.1.5

Core engine implementation for the ella datastore.
Documentation
use super::id::*;
use super::transactions::*;
use crate::Path;

use crate::config::EllaConfig;
use crate::table::info::TableInfo;
use crate::table::info::TopicInfo;
use crate::table::info::ViewInfo;

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
    pub uuid: SnapshotId,
    pub last_transaction: Option<TransactionId>,
    pub catalogs: Vec<CatalogState>,
    pub config: EllaConfig,
}

impl Snapshot {
    pub fn empty(config: EllaConfig) -> Self {
        Self {
            uuid: SnapshotId::new(),
            last_transaction: None,
            catalogs: Vec::new(),
            config,
        }
    }

    pub fn commit_many<I>(&mut self, iter: I) -> crate::Result<()>
    where
        I: IntoIterator<Item = Transaction>,
    {
        for t in iter {
            self.commit(t)?;
        }
        self.uuid = SnapshotId::new();
        Ok(())
    }

    pub fn commit_one(&mut self, t: Transaction) -> crate::Result<()> {
        self.commit(t)?;
        self.uuid = SnapshotId::new();
        Ok(())
    }

    fn commit(&mut self, t: Transaction) -> crate::Result<()> {
        use Transaction::*;
        if Some(t.uuid()) < self.last_transaction {
            tracing::warn!(transaction=%t.uuid(), "skipping outdated transaction");
            return Ok(());
        }

        tracing::debug!(uuid=%t.uuid(), kind=t.kind(), "committing transaction");
        self.last_transaction = Some(t.uuid());
        match t {
            CreateCatalog(t) => self.create_catalog(t),
            CreateSchema(t) => self.create_schema(t),
            CreateTable(t) => self.create_table(t),
            CreateShard(t) => self.create_shard(t),
            CloseShard(t) => self.close_shard(t),
            DeleteShard(t) => self.delete_shard(t),
            CompactShards(t) => self.compact_shards(t),
            DropTable(t) => self.drop_table(t),
            DropSchema(t) => self.drop_schema(t),
            DropCatalog(t) => self.drop_catalog(t),
        }
    }

    fn create_catalog(&mut self, tsn: CreateCatalog) -> crate::Result<()> {
        self.catalogs.push(tsn.into());
        Ok(())
    }

    fn create_schema(&mut self, tsn: CreateSchema) -> crate::Result<()> {
        self.catalog_mut(&tsn.id.catalog)?.schemas.push(tsn.into());
        Ok(())
    }

    fn create_table(&mut self, tsn: CreateTable) -> crate::Result<()> {
        self.schema_mut(&tsn.id.clone().into())?
            .tables
            .push(tsn.into());
        Ok(())
    }

    fn create_shard(&mut self, tsn: CreateShard) -> crate::Result<()> {
        self.table_mut(&tsn.table)?
            .topic_mut()?
            .insert_shard(tsn.into())?;
        Ok(())
    }

    fn close_shard(&mut self, tsn: CloseShard) -> crate::Result<()> {
        self.table_mut(&tsn.table)?
            .topic_mut()?
            .shard_mut(&tsn.shard)?
            .close(tsn.rows);
        Ok(())
    }

    fn delete_shard(&mut self, tsn: DeleteShard) -> crate::Result<()> {
        let topic = self.table_mut(&tsn.table)?.topic_mut()?;
        topic.shards_mut().retain(|s| s.id != tsn.shard);
        Ok(())
    }

    fn compact_shards(&mut self, tsn: CompactShards) -> crate::Result<()> {
        let topic = self.table_mut(&tsn.table)?.topic_mut()?;
        topic.insert_shard(tsn.into())?;
        Ok(())
    }

    fn drop_table(&mut self, tsn: DropTable) -> crate::Result<()> {
        self.catalog_mut(&tsn.id.catalog)?
            .schema_mut(&tsn.id.schema)?
            .tables
            .retain(|t| t.id != tsn.id);
        Ok(())
    }

    fn drop_schema(&mut self, tsn: DropSchema) -> crate::Result<()> {
        self.catalog_mut(&tsn.id.catalog)?
            .schemas
            .retain(|s| s.id != tsn.id);

        Ok(())
    }

    fn drop_catalog(&mut self, tsn: DropCatalog) -> crate::Result<()> {
        self.catalogs.retain(|c| c.id == tsn.id);

        Ok(())
    }

    fn catalog_mut(&mut self, id: &Id) -> crate::Result<&mut CatalogState> {
        self.catalogs
            .iter_mut()
            .find(|c| &c.id.0 == id)
            .ok_or_else(|| crate::EngineError::CatalogNotFound(id.to_string()).into())
    }

    fn schema_mut(&mut self, id: &SchemaId) -> crate::Result<&mut SchemaState> {
        self.catalog_mut(&id.catalog)?.schema_mut(&id.schema)
    }

    fn table_mut(&mut self, id: &TableId) -> crate::Result<&mut TableState> {
        self.catalog_mut(&id.catalog)?
            .schema_mut(&id.schema)?
            .table_mut(&id.table)
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CatalogState {
    pub id: CatalogId<'static>,
    pub path: Path,
    pub schemas: Vec<SchemaState>,
}

impl CatalogState {
    pub fn new(id: CatalogId<'static>, path: Path) -> Self {
        Self {
            id,
            path,
            schemas: Vec::new(),
        }
    }

    pub fn schema_mut(&mut self, id: &Id) -> crate::Result<&mut SchemaState> {
        self.schemas
            .iter_mut()
            .find(|s| &s.id.schema == id)
            .ok_or_else(|| crate::EngineError::SchemaNotFound(id.to_string()).into())
    }
}

impl From<CreateCatalog> for CatalogState {
    fn from(value: CreateCatalog) -> Self {
        Self::new(value.id, value.path)
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SchemaState {
    pub id: SchemaId<'static>,
    pub path: Path,
    pub tables: Vec<TableState>,
}

impl SchemaState {
    pub fn table_mut(&mut self, id: &Id) -> crate::Result<&mut TableState> {
        self.tables
            .iter_mut()
            .find(|t| &t.id.table == id)
            .ok_or_else(|| crate::EngineError::TableNotFound(id.to_string()).into())
    }
}

impl From<CreateSchema> for SchemaState {
    fn from(value: CreateSchema) -> Self {
        Self {
            id: value.id,
            path: value.path,
            tables: Vec::new(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TableState {
    pub id: TableId<'static>,
    pub info: TableInfo,
}

impl TableState {
    pub fn topic(&self) -> crate::Result<&TopicInfo> {
        match &self.info {
            TableInfo::Topic(t) => Ok(t),
            TableInfo::View(_) => Err(crate::EngineError::table_kind("topic", "view").into()),
        }
    }

    pub fn topic_mut(&mut self) -> crate::Result<&mut TopicInfo> {
        match &mut self.info {
            TableInfo::Topic(t) => Ok(t),
            TableInfo::View(_) => Err(crate::EngineError::table_kind("topic", "view").into()),
        }
    }

    pub fn view(&self) -> crate::Result<&ViewInfo> {
        match &self.info {
            TableInfo::Topic(_) => Err(crate::EngineError::table_kind("view", "topic").into()),
            TableInfo::View(v) => Ok(v),
        }
    }
}

impl From<CreateTable> for TableState {
    fn from(value: CreateTable) -> Self {
        Self {
            id: value.id.clone(),
            info: value.info,
        }
    }
}