ella 0.1.5

A streaming time-series datastore for low-latency applications.
Documentation
mod publisher;

pub use publisher::Publisher;

use std::{future::IntoFuture, sync::Arc};

use ella_engine::{
    registry::{TableId, TableRef},
    table::{info::TableInfo, EllaTable},
};
use ella_server::table::RemoteTable;
use futures::{future::BoxFuture, FutureExt};

use crate::Ella;

#[derive(Debug)]
pub struct Table {
    inner: TableInner,
}

#[derive(Debug)]
enum TableInner {
    Local(Arc<EllaTable>),
    Remote(RemoteTable),
}

impl Table {
    pub(crate) fn local(table: Arc<EllaTable>) -> Self {
        Self {
            inner: TableInner::Local(table),
        }
    }

    pub(crate) fn remote(table: RemoteTable) -> Self {
        Self {
            inner: TableInner::Remote(table),
        }
    }

    pub fn publish(&self) -> crate::Result<Publisher> {
        use TableInner::*;
        Ok(match &self.inner {
            Local(table) => match table.as_topic() {
                Some(topic) => Publisher::new(topic.publish(), topic.info().arrow_schema()),
                None => todo!(),
            },
            Remote(table) => Publisher::new(table.publish(), table.arrow_schema()?),
        })
    }

    pub fn id(&self) -> &TableId<'static> {
        use TableInner::*;
        match &self.inner {
            Local(table) => table.id(),
            Remote(table) => table.id(),
        }
    }

    pub fn info(&self) -> TableInfo {
        use TableInner::*;
        match &self.inner {
            Local(table) => table.info(),
            Remote(table) => table.info(),
        }
    }
}

#[must_use]
#[derive(Debug)]
pub struct GetTable<'a> {
    inner: &'a Ella,
    table: TableRef<'a>,
}

impl<'a> GetTable<'a> {
    pub(crate) fn new(inner: &'a Ella, table: TableRef<'a>) -> Self {
        Self { inner, table }
    }

    pub fn or_create(self, info: impl Into<TableInfo>) -> GetOrCreateTable<'a> {
        GetOrCreateTable {
            inner: self.inner,
            table: self.table,
            info: info.into(),
        }
    }

    pub fn replace(self, info: impl Into<TableInfo>) -> CreateOrReplaceTable<'a> {
        CreateOrReplaceTable(CreateTable {
            inner: self.inner,
            table: self.table,
            info: info.into(),
        })
    }

    pub fn drop(self) -> DropTable<'a> {
        DropTable {
            inner: self.inner,
            table: self.table,
            if_exists: false,
        }
    }
}

impl<'a> IntoFuture for GetTable<'a> {
    type Output = crate::Result<Option<crate::Table>>;
    type IntoFuture = BoxFuture<'a, crate::Result<Option<crate::Table>>>;

    fn into_future(self) -> Self::IntoFuture {
        self.inner.get_table(self.table).boxed()
    }
}

#[must_use]
#[derive(Debug)]
pub struct GetOrCreateTable<'a> {
    inner: &'a Ella,
    table: TableRef<'a>,
    info: TableInfo,
}

impl<'a> IntoFuture for GetOrCreateTable<'a> {
    type Output = crate::Result<crate::Table>;
    type IntoFuture = BoxFuture<'a, Self::Output>;

    fn into_future(self) -> Self::IntoFuture {
        async move {
            Ok(match self.inner.get_table(self.table.clone()).await? {
                Some(table) => table,
                None => {
                    self.inner
                        .create_table(self.table, self.info, true, false)
                        .await?
                }
            })
        }
        .boxed()
    }
}

#[must_use]
#[derive(Debug)]
pub struct CreateTable<'a> {
    inner: &'a Ella,
    table: TableRef<'a>,
    info: TableInfo,
}

impl<'a> CreateTable<'a> {
    pub fn if_not_exists(self) -> GetOrCreateTable<'a> {
        GetOrCreateTable {
            inner: self.inner,
            table: self.table,
            info: self.info,
        }
    }

    pub fn or_replace(self) -> CreateOrReplaceTable<'a> {
        CreateOrReplaceTable(self)
    }
}

impl<'a> IntoFuture for CreateTable<'a> {
    type Output = crate::Result<crate::Table>;
    type IntoFuture = BoxFuture<'a, Self::Output>;

    fn into_future(self) -> Self::IntoFuture {
        async move {
            self.inner
                .create_table(self.table, self.info, false, false)
                .await
        }
        .boxed()
    }
}

#[must_use]
#[derive(Debug)]
pub struct CreateOrReplaceTable<'a>(CreateTable<'a>);

impl<'a> IntoFuture for CreateOrReplaceTable<'a> {
    type Output = crate::Result<crate::Table>;
    type IntoFuture = BoxFuture<'a, Self::Output>;

    fn into_future(self) -> Self::IntoFuture {
        async move {
            self.0
                .inner
                .create_table(self.0.table, self.0.info, false, true)
                .await
        }
        .boxed()
    }
}

#[must_use]
#[derive(Debug)]
pub struct DropTable<'a> {
    inner: &'a Ella,
    table: TableRef<'a>,
    if_exists: bool,
}

impl<'a> DropTable<'a> {
    pub fn if_exists(mut self) -> Self {
        self.if_exists = true;
        self
    }
}

impl<'a> IntoFuture for DropTable<'a> {
    type Output = crate::Result<&'a Ella>;
    type IntoFuture = BoxFuture<'a, Self::Output>;

    fn into_future(self) -> Self::IntoFuture {
        async move {
            let if_exists = if self.if_exists { "IF EXISTS " } else { "" };
            self.inner
                .execute(format!("DROP TABLE {if_exists}{}", self.table))
                .await?;

            Ok(self.inner)
        }
        .boxed()
    }
}