ella-engine 0.1.5

Core engine implementation for the ella datastore.
Documentation
mod channel;
mod rw;
pub(crate) mod shard;

pub use channel::{Publisher, Subscriber, TopicChannel};
use futures::{stream::BoxStream, Stream, StreamExt};
pub(crate) use rw::RwBuffer;
pub(crate) use shard::compact_shards;
pub use shard::ShardInfo;
pub(crate) use shard::ShardManager;

use std::{sync::Arc, task::Poll};

use datafusion::{
    arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
    datasource::{provider_as_source, TableProvider},
    error::{DataFusionError, Result},
    execution::context::SessionState,
    logical_expr::{LogicalPlanBuilder, TableProviderFilterPushDown, TableType, UNNAMED_TABLE},
    parquet::format::SortingColumn,
    physical_plan::{
        insert::InsertExec, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
        RecordBatchStream, Statistics,
    },
    prelude::Expr,
};

use crate::{engine::EllaState, registry::TableId, table::TableConfig, Path};

use self::shard::ShardSet;

use super::info::{EllaTableInfo, TopicInfo};

#[derive(Debug)]
pub struct EllaTopic {
    info: TopicInfo,
    table_info: EllaTableInfo,
    config: TableConfig,
    channel: Arc<TopicChannel>,
    rw: Option<Arc<RwBuffer>>,
    shards: Option<Arc<ShardManager>>,
}

impl EllaTopic {
    pub(crate) fn new(
        id: TableId<'static>,
        info: TopicInfo,
        state: &EllaState,
    ) -> crate::Result<Self> {
        let table_info = info.table_info(id, state)?;
        let config = table_info.config().clone();

        let (shards, rw) = if !info.temporary() {
            let shards = Arc::new(ShardManager::new(
                state.log().clone(),
                state.store().clone(),
                table_info.clone(),
                config.shard_config(),
            ));
            let rw = Arc::new(RwBuffer::new(
                table_info.clone(),
                shards.clone(),
                config.rw_buffer_config(),
            ));
            (Some(shards), Some(rw))
        } else {
            (None, None)
        };

        let channel = Arc::new(TopicChannel::new(
            table_info.clone(),
            rw.clone(),
            config.channel_config(),
        ));

        Ok(Self {
            info,
            table_info,
            channel,
            rw,
            shards,
            config,
        })
    }

    pub fn publish(&self) -> Publisher {
        self.channel.publish()
    }

    pub fn table(&self) -> &TableId<'static> {
        self.table_info.id()
    }

    pub fn config(&self) -> &TableConfig {
        &self.config
    }

    pub fn path(&self) -> &Path {
        self.table_info.path()
    }

    pub fn temporary(&self) -> bool {
        self.info.temporary()
    }

    pub async fn close(&self) -> crate::Result<()> {
        if let Some(rw) = &self.rw {
            rw.close().await;
        }
        if let Some(shards) = &self.shards {
            shards.close().await?;
        }
        Ok(())
    }

    pub(crate) async fn drop_shards(&self) -> crate::Result<()> {
        self.close().await?;
        if let Some(shards) = &self.shards {
            shards.delete_all().await?;
        }
        Ok(())
    }

    pub fn info(&self) -> &TopicInfo {
        &self.info
    }

    pub(crate) fn shards(&self) -> Option<&Arc<ShardSet>> {
        self.shards.as_ref().map(|s| s.shards())
    }

    pub(crate) fn file_schema(&self) -> SchemaRef {
        self.table_info
            .parquet_schema()
            .cloned()
            .unwrap_or_else(|| self.table_info.arrow_schema().clone())
    }

    pub(crate) fn sort(&self) -> Option<Vec<SortingColumn>> {
        self.table_info.sorting_cols().cloned()
    }
}

#[async_trait::async_trait]
impl TableProvider for EllaTopic {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        self.table_info.arrow_schema().clone()
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>> {
        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
    }

    async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let filter = filters.iter().cloned().reduce(|acc, f| acc.and(f));
        let child_plan = |provider: Arc<dyn TableProvider>| async {
            let mut plan = LogicalPlanBuilder::scan(
                UNNAMED_TABLE,
                provider_as_source(provider),
                projection.cloned(),
            )?;
            if let Some(filter) = &filter {
                plan = plan.filter(filter.clone())?;
            }
            if let Some(limit) = limit {
                plan = plan.limit(0, Some(limit))?;
            }
            state.create_physical_plan(&plan.build()?).await
        };
        let shards = match self.shards.clone() {
            Some(shards) => Some(child_plan(shards).await?),
            None => None,
        };
        let rw = match self.rw.clone() {
            Some(rw) => Some(child_plan(rw).await?),
            None => None,
        };
        let channel = child_plan(self.channel.clone()).await?;

        Ok(Arc::new(TopicExec {
            table: self.table().clone(),
            shards,
            rw,
            channel,
        }))
    }

    async fn insert_into(
        &self,
        _state: &SessionState,
        input: Arc<dyn ExecutionPlan>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let sink = Arc::new(self.publish());
        Ok(Arc::new(InsertExec::new(input, sink)))
    }
}

#[derive(Debug, Clone)]
struct TopicExec {
    table: TableId<'static>,
    shards: Option<Arc<dyn ExecutionPlan>>,
    rw: Option<Arc<dyn ExecutionPlan>>,
    channel: Arc<dyn ExecutionPlan>,
}

impl ExecutionPlan for TopicExec {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        self.channel.schema()
    }

    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
        Ok(children.iter().any(|c| *c))
    }

    fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
        Partitioning::UnknownPartitioning(1)
    }

    fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> {
        self.channel.output_ordering()
    }

    fn maintains_input_order(&self) -> Vec<bool> {
        self.children().into_iter().map(|_| true).collect()
    }

    fn benefits_from_input_partitioning(&self) -> bool {
        false
    }

    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
        let mut children = Vec::with_capacity(3);
        if let Some(shards) = self.shards.clone() {
            children.push(shards);
        }
        if let Some(rw) = self.rw.clone() {
            children.push(rw);
        }
        children.push(self.channel.clone());
        children
    }

    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let num_children = self.children().len();
        if children.len() != num_children {
            return Err(DataFusionError::Execution(format!(
                "TopicExec expects exactly {} children, but got {}",
                num_children,
                children.len()
            )));
        }
        let mut iter = children.into_iter();

        let shards = self.shards.is_some().then(|| iter.next().unwrap());
        let rw = self.rw.is_some().then(|| iter.next().unwrap());
        let channel = iter.next().unwrap();
        Ok(Arc::new(Self {
            table: self.table.clone(),
            shards,
            rw,
            channel,
        }))
    }

    fn execute(
        &self,
        _partition: usize,
        context: Arc<datafusion::execution::TaskContext>,
    ) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
        let exec_child =
            |child: &Arc<dyn ExecutionPlan>| -> Result<BoxStream<'static, Result<RecordBatch>>> {
                let streams = (0..child.output_partitioning().partition_count())
                    .map(|i| child.execute(i, context.clone()))
                    .collect::<Result<Vec<_>>>()?;
                Ok(futures::stream::iter(streams).flatten().boxed())
            };

        let shards = match self.shards.as_ref() {
            Some(shards) => Some(exec_child(shards)?),
            None => None,
        };
        let rw = match self.rw.as_ref() {
            Some(rw) => Some(exec_child(rw)?),
            None => None,
        };

        let channel = (0..self.channel.output_partitioning().partition_count())
            .map(|i| self.channel.execute(i, context.clone()))
            .collect::<Result<Vec<_>>>()?;
        let channel = futures::stream::iter(channel).flatten().boxed();

        Ok(Box::pin(TopicStream {
            schema: self.schema(),
            shards,
            rw,
            channel: Some(channel),
        }))
    }

    fn statistics(&self) -> Statistics {
        Statistics::default()
    }

    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "TopicExec: table={}", self.table)
    }
}

impl DisplayAs for TopicExec {
    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match t {
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
                write!(f, "TopicExec: table={}", self.table)
            }
        }
    }
}

struct TopicStream {
    schema: SchemaRef,
    shards: Option<BoxStream<'static, Result<RecordBatch>>>,
    rw: Option<BoxStream<'static, Result<RecordBatch>>>,
    channel: Option<BoxStream<'static, Result<RecordBatch>>>,
}

impl Stream for TopicStream {
    type Item = Result<RecordBatch>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        if let Some(shards) = &mut self.shards {
            match futures::ready!(shards.poll_next_unpin(cx)) {
                Some(item) => return Poll::Ready(Some(item)),
                None => {
                    self.shards = None;
                }
            }
        }
        if let Some(rw) = &mut self.rw {
            match futures::ready!(rw.poll_next_unpin(cx)) {
                Some(item) => return Poll::Ready(Some(item)),
                None => {
                    self.rw = None;
                }
            }
        }
        if let Some(channel) = &mut self.channel {
            match futures::ready!(channel.poll_next_unpin(cx)) {
                Some(item) => return Poll::Ready(Some(item)),
                None => {
                    self.channel = None;
                }
            }
        }
        Poll::Ready(None)
    }
}

impl RecordBatchStream for TopicStream {
    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }
}