use std::pin::Pin;
use std::task::{Context, Poll};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
pub type RecordBatchBoxStream = Pin<
Box<
dyn futures_core::Stream<
Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>,
> + Send,
>,
>;
pub fn vec_to_stream(batches: Vec<RecordBatch>) -> RecordBatchBoxStream {
Box::pin(VecStream {
batches: batches.into_iter(),
})
}
struct VecStream {
batches: std::vec::IntoIter<RecordBatch>,
}
impl futures_core::Stream for VecStream {
type Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.batches.next().map(Ok))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.batches.size_hint()
}
}
pub trait OlapEngine: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
fn query(
&self,
sql: &str,
) -> impl std::future::Future<Output = Result<Vec<RecordBatch>, Self::Error>> + Send;
fn query_stream(
&self,
sql: &str,
) -> impl std::future::Future<Output = Result<RecordBatchBoxStream, Self::Error>> + Send {
let sql = sql.to_string();
async move {
let batches = self.query(&sql).await?;
Ok(vec_to_stream(batches))
}
}
fn execute(
&self,
sql: &str,
) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
fn load_arrow(
&self,
table: &str,
batches: &[RecordBatch],
) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
fn create_table(
&self,
table_name: &str,
schema: &SchemaRef,
primary_key: &[String],
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn table_exists(
&self,
table_name: &str,
) -> impl std::future::Future<Output = Result<bool, Self::Error>> + Send;
fn add_column(
&self,
table_name: &str,
column_name: &str,
data_type: &arrow::datatypes::DataType,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn drop_column(
&self,
table_name: &str,
column_name: &str,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn supports_transactions(&self) -> bool {
false
}
}