use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion::execution::{SessionState, SessionStateBuilder};
use http::HeaderMap;
use std::sync::Arc;
#[derive(Debug, Default)]
pub struct WorkerQueryContext {
pub builder: SessionStateBuilder,
pub headers: HeaderMap,
}
#[async_trait]
pub trait WorkerSessionBuilder {
async fn build_session_state(
&self,
ctx: WorkerQueryContext,
) -> Result<SessionState, DataFusionError>;
}
#[derive(Debug, Clone)]
pub struct DefaultSessionBuilder;
#[async_trait]
impl WorkerSessionBuilder for DefaultSessionBuilder {
async fn build_session_state(
&self,
ctx: WorkerQueryContext,
) -> Result<SessionState, DataFusionError> {
Ok(ctx.builder.build())
}
}
#[async_trait]
impl<F, Fut> WorkerSessionBuilder for F
where
F: Fn(WorkerQueryContext) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<SessionState, DataFusionError>> + Send + 'static,
{
async fn build_session_state(
&self,
ctx: WorkerQueryContext,
) -> Result<SessionState, DataFusionError> {
self(ctx).await
}
}
pub trait MappedWorkerSessionBuilderExt {
fn map<F>(self, f: F) -> MappedWorkerSessionBuilder<Self, F>
where
Self: Sized,
F: Fn(SessionStateBuilder) -> Result<SessionState, DataFusionError>;
}
impl<T: WorkerSessionBuilder> MappedWorkerSessionBuilderExt for T {
fn map<F>(self, f: F) -> MappedWorkerSessionBuilder<Self, F>
where
Self: Sized,
{
MappedWorkerSessionBuilder {
inner: self,
f: Arc::new(f),
}
}
}
pub struct MappedWorkerSessionBuilder<T, F> {
inner: T,
f: Arc<F>,
}
impl<T: Clone, F> Clone for MappedWorkerSessionBuilder<T, F> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
f: self.f.clone(),
}
}
}
#[async_trait]
impl<T, F> WorkerSessionBuilder for MappedWorkerSessionBuilder<T, F>
where
T: WorkerSessionBuilder + Send + Sync + 'static,
F: Fn(SessionStateBuilder) -> Result<SessionState, DataFusionError> + Send + Sync,
{
async fn build_session_state(
&self,
ctx: WorkerQueryContext,
) -> Result<SessionState, DataFusionError> {
let state = self.inner.build_session_state(ctx).await?;
let builder = SessionStateBuilder::new_from_existing(state);
(self.f)(builder)
}
}