ella-engine 0.1.5

Core engine implementation for the ella datastore.
Documentation
use arrow_schema::SchemaRef;

use crate::{
    table::{
        info::{TableInfo, TopicInfo, ViewInfo},
        topic::ShardInfo,
    },
    Path,
};

use super::id::*;

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PartitionId {
    pub key: String,
    pub value: String,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum PartitionValue {
    Int(i64),
    String(String),
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CreateCatalog {
    pub uuid: TransactionId,
    pub id: CatalogId<'static>,
    pub path: Path,
}

impl CreateCatalog {
    pub fn new(id: CatalogId<'static>, root: &Path) -> Self {
        let path = root.join(id.as_ref());
        Self {
            uuid: TransactionId::new(),
            id,
            path,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CreateSchema {
    pub uuid: TransactionId,
    pub id: SchemaId<'static>,
    pub path: Path,
}

impl CreateSchema {
    pub fn new(id: SchemaId<'static>, root: &Path) -> Self {
        let path = root.join(id.catalog.as_ref()).join(id.schema.as_ref());
        Self {
            uuid: TransactionId::new(),
            id,
            path,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CreateTable {
    pub uuid: TransactionId,
    pub id: TableId<'static>,
    pub info: TableInfo,
}

impl CreateTable {
    pub fn topic(id: TableId<'static>, info: TopicInfo) -> Self {
        Self {
            uuid: TransactionId::new(),
            id,
            info: info.into(),
        }
    }

    pub fn view(id: TableId<'static>, info: ViewInfo) -> Self {
        Self {
            uuid: TransactionId::new(),
            id,
            info: info.into(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TableKind {
    Topic(TopicInfo),
    View(ViewInfo),
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CreateShard {
    pub uuid: TransactionId,
    pub table: TableId<'static>,
    pub shard: ShardId,
    pub file_schema: SchemaRef,
    pub path: Path,
}

impl CreateShard {
    pub fn new(table: TableId<'static>, file_schema: SchemaRef, root: &Path) -> Self {
        let shard = ShardId::new();
        let path = shard.encode_path(root, "parquet");
        Self {
            uuid: TransactionId::new(),
            table,
            shard,
            file_schema,
            path,
        }
    }
}

impl From<CreateShard> for ShardInfo {
    fn from(tsn: CreateShard) -> Self {
        ShardInfo::new(tsn.shard, tsn.table, tsn.file_schema, tsn.path)
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CloseShard {
    pub uuid: TransactionId,
    pub table: TableId<'static>,
    pub shard: ShardId,
    pub rows: usize,
}

impl CloseShard {
    pub fn new(table: TableId<'static>, shard: ShardId, rows: usize) -> Self {
        Self {
            uuid: TransactionId::new(),
            table,
            shard,
            rows,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct DeleteShard {
    pub uuid: TransactionId,
    pub table: TableId<'static>,
    pub shard: ShardId,
}

impl DeleteShard {
    pub fn new(table: TableId<'static>, shard: ShardId) -> Self {
        Self {
            uuid: TransactionId::new(),
            table,
            shard,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CompactShards {
    pub uuid: TransactionId,
    pub table: TableId<'static>,
    pub src: Vec<ShardId>,
    pub dst: ShardId,
    pub file_schema: SchemaRef,
    pub path: Path,
}

impl CompactShards {
    pub fn new(
        table: TableId<'static>,
        src: Vec<ShardId>,
        file_schema: SchemaRef,
        root: &Path,
    ) -> Self {
        let dst = ShardId::generate_from(src.first().expect("cannot compact empty shard list"));
        let path = dst.encode_path(root, "parquet");
        Self {
            uuid: TransactionId::new(),
            table,
            src,
            dst,
            file_schema,
            path,
        }
    }
}

impl From<CompactShards> for ShardInfo {
    fn from(tsn: CompactShards) -> Self {
        ShardInfo::new(tsn.dst, tsn.table, tsn.file_schema, tsn.path)
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct DropTable {
    pub uuid: TransactionId,
    pub id: TableId<'static>,
}

impl DropTable {
    pub fn new(id: TableId<'static>) -> Self {
        Self {
            uuid: TransactionId::new(),
            id,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct DropSchema {
    pub uuid: TransactionId,
    pub id: SchemaId<'static>,
}

impl DropSchema {
    pub fn new(id: SchemaId<'static>) -> Self {
        Self {
            uuid: TransactionId::new(),
            id,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct DropCatalog {
    pub uuid: TransactionId,
    pub id: CatalogId<'static>,
}

impl DropCatalog {
    pub fn new(id: CatalogId<'static>) -> Self {
        Self {
            uuid: TransactionId::new(),
            id,
        }
    }
}

#[derive(
    Debug,
    Clone,
    PartialEq,
    Eq,
    serde::Serialize,
    serde::Deserialize,
    derive_more::From,
    strum::Display,
)]
#[strum(serialize_all = "snake_case")]
pub enum Transaction {
    CreateCatalog(CreateCatalog),
    CreateSchema(CreateSchema),
    CreateTable(CreateTable),
    CreateShard(CreateShard),
    CloseShard(CloseShard),
    DeleteShard(DeleteShard),
    CompactShards(CompactShards),
    DropTable(DropTable),
    DropSchema(DropSchema),
    DropCatalog(DropCatalog),
}

impl Transaction {
    pub fn uuid(&self) -> TransactionId {
        use Transaction::*;
        match self {
            CreateCatalog(t) => t.uuid,
            CreateSchema(t) => t.uuid,
            CreateTable(t) => t.uuid,
            CreateShard(t) => t.uuid,
            CloseShard(t) => t.uuid,
            DeleteShard(t) => t.uuid,
            CompactShards(t) => t.uuid,
            DropTable(t) => t.uuid,
            DropSchema(t) => t.uuid,
            DropCatalog(t) => t.uuid,
        }
    }

    pub fn kind(&self) -> String {
        self.to_string()
    }
}