liquid_cache_client/
lib.rs1#![warn(missing_docs)]
2#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
3use std::collections::HashMap;
4use std::error::Error;
5use std::sync::Arc;
6use std::time::Duration;
7mod client_exec;
8mod metrics;
9mod optimizer;
10pub use client_exec::LiquidCacheClientExec;
11use datafusion::{
12 error::{DataFusionError, Result},
13 execution::{SessionStateBuilder, object_store::ObjectStoreUrl, runtime_env::RuntimeEnv},
14 prelude::*,
15};
16use fastrace_tonic::FastraceClientService;
17pub use optimizer::PushdownOptimizer;
18use tonic::transport::Channel;
19
20pub use liquid_cache_common as common;
21
22#[cfg(test)]
23mod tests;
24
25pub struct LiquidCacheBuilder {
51 object_stores: Vec<(ObjectStoreUrl, HashMap<String, String>)>,
52 cache_server: String,
53}
54
55impl LiquidCacheBuilder {
56 pub fn new(cache_server: impl AsRef<str>) -> Self {
58 Self {
59 object_stores: vec![],
60 cache_server: cache_server.as_ref().to_string(),
61 }
62 }
63
64 pub fn with_object_store(
67 mut self,
68 url: ObjectStoreUrl,
69 object_store_options: Option<HashMap<String, String>>,
70 ) -> Self {
71 self.object_stores
72 .push((url, object_store_options.unwrap_or_default()));
73 self
74 }
75
76 pub fn build(self, config: SessionConfig) -> Result<SessionContext> {
78 let mut session_config = config;
79 session_config
80 .options_mut()
81 .execution
82 .parquet
83 .pushdown_filters = true;
84 session_config
85 .options_mut()
86 .execution
87 .parquet
88 .schema_force_view_types = false;
89 session_config
90 .options_mut()
91 .execution
92 .parquet
93 .binary_as_string = true;
94 session_config.options_mut().execution.batch_size = 8192 * 2;
95
96 let runtime_env = Arc::new(RuntimeEnv::default());
97
98 for (object_store_url, options) in &self.object_stores {
100 let (object_store, _path) =
101 object_store::parse_url_opts(object_store_url.as_ref(), options.clone())
102 .map_err(|e| DataFusionError::External(Box::new(e)))?;
103 runtime_env.register_object_store(object_store_url.as_ref(), Arc::new(object_store));
104 }
105
106 let session_state = SessionStateBuilder::new()
107 .with_config(session_config)
108 .with_runtime_env(runtime_env)
109 .with_default_features()
110 .with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new(
111 self.cache_server.clone(),
112 self.object_stores.clone(),
113 )))
114 .build();
115 Ok(SessionContext::new_with_state(session_state))
116 }
117}
118
119pub(crate) fn to_df_err<E: Error + Send + Sync + 'static>(err: E) -> DataFusionError {
120 DataFusionError::External(Box::new(err))
121}
122
123pub(crate) async fn flight_channel(
124 source: impl Into<String>,
125) -> Result<FastraceClientService<Channel>> {
126 use fastrace_tonic::FastraceClientLayer;
127 use tower::ServiceBuilder;
128
129 let endpoint = Channel::from_shared(source.into())
132 .map_err(to_df_err)?
133 .tcp_keepalive(Some(Duration::from_secs(10)));
134
135 let channel = endpoint.connect().await.map_err(to_df_err)?;
136 let channel = ServiceBuilder::new()
137 .layer(FastraceClientLayer)
138 .service(channel);
139 Ok(channel)
140}