#![warn(missing_docs)]
#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
mod client_exec;
mod metrics;
mod optimizer;
pub use client_exec::LiquidCacheClientExec;
use datafusion::{
error::{DataFusionError, Result},
execution::{SessionStateBuilder, object_store::ObjectStoreUrl, runtime_env::RuntimeEnv},
prelude::*,
};
use fastrace_tonic::FastraceClientService;
pub use optimizer::PushdownOptimizer;
use tonic::transport::Channel;
pub use liquid_cache_common as common;
#[cfg(test)]
mod tests;
pub struct LiquidCacheBuilder {
object_stores: Vec<(ObjectStoreUrl, HashMap<String, String>)>,
cache_server: String,
}
impl LiquidCacheBuilder {
pub fn new(cache_server: impl AsRef<str>) -> Self {
Self {
object_stores: vec![],
cache_server: cache_server.as_ref().to_string(),
}
}
pub fn with_object_store(
mut self,
url: ObjectStoreUrl,
object_store_options: Option<HashMap<String, String>>,
) -> Self {
self.object_stores
.push((url, object_store_options.unwrap_or_default()));
self
}
pub fn build(self, config: SessionConfig) -> Result<SessionContext> {
let mut session_config = config;
session_config
.options_mut()
.execution
.parquet
.pushdown_filters = true;
session_config
.options_mut()
.execution
.parquet
.schema_force_view_types = false;
session_config
.options_mut()
.execution
.parquet
.binary_as_string = true;
session_config.options_mut().execution.batch_size = 8192 * 2;
let runtime_env = Arc::new(RuntimeEnv::default());
for (object_store_url, options) in &self.object_stores {
let (object_store, _path) =
object_store::parse_url_opts(object_store_url.as_ref(), options.clone())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
runtime_env.register_object_store(object_store_url.as_ref(), Arc::new(object_store));
}
let session_state = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(runtime_env)
.with_default_features()
.with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new(
self.cache_server.clone(),
self.object_stores.clone(),
)))
.build();
Ok(SessionContext::new_with_state(session_state))
}
}
pub(crate) fn to_df_err<E: Error + Send + Sync + 'static>(err: E) -> DataFusionError {
DataFusionError::External(Box::new(err))
}
pub(crate) async fn flight_channel(
source: impl Into<String>,
) -> Result<FastraceClientService<Channel>> {
use fastrace_tonic::FastraceClientLayer;
use tower::ServiceBuilder;
let endpoint = Channel::from_shared(source.into())
.map_err(to_df_err)?
.tcp_keepalive(Some(Duration::from_secs(10)));
let channel = endpoint.connect().await.map_err(to_df_err)?;
let channel = ServiceBuilder::new()
.layer(FastraceClientLayer)
.service(channel);
Ok(channel)
}