ella 0.1.5

A streaming time-series datastore for low-latency applications.
Documentation
use std::{fmt::Debug, pin::Pin};

use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use ella_common::row::{RowFormat, RowSink};
use futures::{Sink, SinkExt};

pub struct Publisher {
    inner: Pin<Box<dyn Sink<RecordBatch, Error = crate::Error> + Send + 'static>>,
    arrow_schema: SchemaRef,
}

impl Publisher {
    pub(super) fn new<S>(inner: S, arrow_schema: SchemaRef) -> Self
    where
        S: Sink<RecordBatch, Error = crate::Error> + Send + 'static,
    {
        Self {
            inner: Box::pin(inner),
            arrow_schema,
        }
    }

    pub fn rows<R: RowFormat>(self, buffer: usize) -> crate::Result<RowSink<R>> {
        let schema = self.arrow_schema.clone();
        RowSink::try_new(self, schema, buffer)
    }

    pub fn arrow_schema(&self) -> &SchemaRef {
        &self.arrow_schema
    }
}

impl Debug for Publisher {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Publisher")
            .field("arrow_schema", &self.arrow_schema)
            .finish_non_exhaustive()
    }
}

impl Sink<RecordBatch> for Publisher {
    type Error = crate::Error;

    #[inline]
    fn poll_ready(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.inner.poll_ready_unpin(cx)
    }

    #[inline]
    fn start_send(mut self: Pin<&mut Self>, item: RecordBatch) -> Result<(), Self::Error> {
        self.inner.start_send_unpin(item)
    }

    #[inline]
    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.inner.poll_flush_unpin(cx)
    }

    #[inline]
    fn poll_close(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.inner.poll_close_unpin(cx)
    }
}