use std::io::{self, Write};
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::{self, BoxStream, StreamExt};
use serde::Serialize;
use tokio::sync::mpsc;
use crate::errors::AppError;
use crate::models::{CountRequest, QueryRequest};
use crate::schema::DatasetSchema;
pub type ArrowIpcStream = BoxStream<'static, Result<Bytes, AppError>>;
pub struct ArrowIpcChunkWriter {
tx: mpsc::Sender<Result<Bytes, AppError>>,
}
impl ArrowIpcChunkWriter {
pub fn send_error(&self, err: AppError) {
let _ = self.tx.blocking_send(Err(err));
}
}
impl Write for ArrowIpcChunkWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.tx
.blocking_send(Ok(Bytes::copy_from_slice(buf)))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "response stream closed"))?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub fn arrow_ipc_stream_channel(capacity: usize) -> (ArrowIpcChunkWriter, ArrowIpcStream) {
let (tx, rx) = mpsc::channel(capacity);
let writer = ArrowIpcChunkWriter { tx };
let stream = stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
})
.boxed();
(writer, stream)
}
#[derive(Debug, Clone, Copy, Serialize)]
pub struct ReloadStats {
pub rows: usize,
pub elapsed_ms: u128,
}
#[derive(Debug, Clone, Serialize)]
pub struct DatasetSummary {
pub name: String,
pub columns: usize,
pub rows: usize,
}
#[async_trait]
pub trait Backend: Send + Sync + 'static {
fn names(&self) -> Vec<String>;
fn summary(&self, name: &str) -> Result<DatasetSummary, AppError>;
fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError>;
fn indexed_columns(&self, _name: &str) -> Result<Vec<String>, AppError> {
Ok(Vec::new())
}
async fn sample(&self, name: &str) -> Result<String, AppError>;
async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError>;
async fn query_arrow(&self, _name: &str, _req: &QueryRequest) -> Result<Vec<u8>, AppError> {
Err(AppError::InvalidValue(
"Arrow IPC response format is not supported by this backend".into(),
))
}
async fn query_arrow_stream(
&self,
name: &str,
req: &QueryRequest,
) -> Result<ArrowIpcStream, AppError> {
let bytes = self.query_arrow(name, req).await?;
Ok(Box::pin(stream::once(
async move { Ok(Bytes::from(bytes)) },
)))
}
async fn query_arrow_stream_all(
&self,
name: &str,
req: &QueryRequest,
) -> Result<ArrowIpcStream, AppError> {
let bytes = self.query_arrow(name, req).await?;
Ok(Box::pin(stream::once(
async move { Ok(Bytes::from(bytes)) },
)))
}
async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError>;
async fn reload(&self, name: &str) -> Result<ReloadStats, AppError>;
}