liquid_cache_client/
lib.rs1#![warn(missing_docs)]
2#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
3use std::collections::HashMap;
4use std::error::Error;
5use std::sync::Arc;
6use std::time::Duration;
7mod client_exec;
8mod metrics;
9mod optimizer;
10pub use client_exec::LiquidCacheClientExec;
11use datafusion::{
12 error::{DataFusionError, Result},
13 execution::{SessionStateBuilder, object_store::ObjectStoreUrl, runtime_env::RuntimeEnv},
14 prelude::*,
15};
16use fastrace_tonic::FastraceClientService;
17use liquid_cache_common::CacheMode;
18pub use optimizer::PushdownOptimizer;
19use tonic::transport::Channel;
20
21pub struct LiquidCacheBuilder {
39 object_stores: Vec<(ObjectStoreUrl, HashMap<String, String>)>,
40 cache_mode: CacheMode,
41 cache_server: String,
42}
43
44impl LiquidCacheBuilder {
45 pub fn new(cache_server: impl AsRef<str>) -> Self {
47 Self {
48 object_stores: vec![],
49 cache_mode: CacheMode::Liquid,
50 cache_server: cache_server.as_ref().to_string(),
51 }
52 }
53
54 pub fn with_object_store(
56 mut self,
57 url: ObjectStoreUrl,
58 object_store_options: Option<HashMap<String, String>>,
59 ) -> Self {
60 self.object_stores
61 .push((url, object_store_options.unwrap_or_default()));
62 self
63 }
64
65 pub fn with_cache_mode(mut self, cache_mode: CacheMode) -> Self {
67 self.cache_mode = cache_mode;
68 self
69 }
70
71 pub fn build(self, config: SessionConfig) -> Result<SessionContext> {
73 let mut session_config = config;
74 session_config
75 .options_mut()
76 .execution
77 .parquet
78 .pushdown_filters = true;
79 session_config
80 .options_mut()
81 .execution
82 .parquet
83 .schema_force_view_types = false;
84 session_config
85 .options_mut()
86 .execution
87 .parquet
88 .binary_as_string = true;
89 session_config.options_mut().execution.batch_size = 8192 * 2;
90 let session_state = SessionStateBuilder::new()
91 .with_config(session_config)
92 .with_runtime_env(Arc::new(RuntimeEnv::default()))
93 .with_default_features()
94 .with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new(
95 self.cache_server.clone(),
96 self.cache_mode,
97 self.object_stores.clone(),
98 )))
99 .build();
100 Ok(SessionContext::new_with_state(session_state))
101 }
102}
103
104pub(crate) fn to_df_err<E: Error + Send + Sync + 'static>(err: E) -> DataFusionError {
105 DataFusionError::External(Box::new(err))
106}
107
108pub(crate) async fn flight_channel(
109 source: impl Into<String>,
110) -> Result<FastraceClientService<Channel>> {
111 use fastrace_tonic::FastraceClientLayer;
112 use tower::ServiceBuilder;
113
114 let endpoint = Channel::from_shared(source.into())
117 .map_err(to_df_err)?
118 .tcp_keepalive(Some(Duration::from_secs(10)));
119
120 let channel = endpoint.connect().await.map_err(to_df_err)?;
121 let channel = ServiceBuilder::new()
122 .layer(FastraceClientLayer)
123 .service(channel);
124 Ok(channel)
125}