liquid_cache_client/
lib.rs

1#![warn(missing_docs)]
2#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
3use std::collections::HashMap;
4use std::error::Error;
5use std::sync::Arc;
6use std::time::Duration;
7mod client_exec;
8mod metrics;
9mod optimizer;
10pub use client_exec::LiquidCacheClientExec;
11use datafusion::{
12    error::{DataFusionError, Result},
13    execution::{SessionStateBuilder, object_store::ObjectStoreUrl, runtime_env::RuntimeEnv},
14    prelude::*,
15};
16use fastrace_tonic::FastraceClientService;
17use liquid_cache_common::CacheMode;
18pub use optimizer::PushdownOptimizer;
19use tonic::transport::Channel;
20
21#[cfg(test)]
22mod tests;
23
24/// The builder for LiquidCache client state.
25///
26/// # Example
27///
28/// ```ignore
29/// use liquid_cache_client::LiquidCacheBuilder;
30/// use std::collections::HashMap;
31///
32/// let mut s3_options = HashMap::new();
33/// s3_options.insert("access_key_id".to_string(), "your-access-key".to_string());
34/// s3_options.insert("secret_access_key".to_string(), "your-secret-key".to_string());
35/// s3_options.insert("region".to_string(), "us-east-1".to_string());
36///
37/// let ctx = LiquidCacheBuilder::new("localhost:15214")
38///     .with_object_store("s3://my_bucket", Some(s3_options))
39///     .with_cache_mode(CacheMode::Liquid)
40///     .build(SessionConfig::from_env().unwrap())
41///     .unwrap();
42///
43/// ctx.register_parquet("my_table", "s3://my_bucket/my_table.parquet", Default::default())
44///     .await?;
45/// let df = ctx.sql("SELECT * FROM my_table").await?.show().await?;
46/// println!("{:?}", df);
47/// ```
48pub struct LiquidCacheBuilder {
49    object_stores: Vec<(ObjectStoreUrl, HashMap<String, String>)>,
50    cache_mode: CacheMode,
51    cache_server: String,
52}
53
54impl LiquidCacheBuilder {
55    /// Create a new builder for LiquidCache client state.
56    pub fn new(cache_server: impl AsRef<str>) -> Self {
57        Self {
58            object_stores: vec![],
59            cache_mode: CacheMode::Liquid,
60            cache_server: cache_server.as_ref().to_string(),
61        }
62    }
63
64    /// Add an object store to the builder.
65    /// Checkout <https://docs.rs/object_store/latest/object_store/fn.parse_url_opts.html> for available options.
66    pub fn with_object_store(
67        mut self,
68        url: ObjectStoreUrl,
69        object_store_options: Option<HashMap<String, String>>,
70    ) -> Self {
71        self.object_stores
72            .push((url, object_store_options.unwrap_or_default()));
73        self
74    }
75
76    /// Set the cache mode for the builder.
77    pub fn with_cache_mode(mut self, cache_mode: CacheMode) -> Self {
78        self.cache_mode = cache_mode;
79        self
80    }
81
82    /// Build the [SessionContext].
83    pub fn build(self, config: SessionConfig) -> Result<SessionContext> {
84        let mut session_config = config;
85        session_config
86            .options_mut()
87            .execution
88            .parquet
89            .pushdown_filters = true;
90        session_config
91            .options_mut()
92            .execution
93            .parquet
94            .schema_force_view_types = false;
95        session_config
96            .options_mut()
97            .execution
98            .parquet
99            .binary_as_string = true;
100        session_config.options_mut().execution.batch_size = 8192 * 2;
101
102        let runtime_env = Arc::new(RuntimeEnv::default());
103
104        // Register object stores
105        for (object_store_url, options) in &self.object_stores {
106            let (object_store, _path) =
107                object_store::parse_url_opts(object_store_url.as_ref(), options.clone())
108                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
109            runtime_env.register_object_store(object_store_url.as_ref(), Arc::new(object_store));
110        }
111
112        let session_state = SessionStateBuilder::new()
113            .with_config(session_config)
114            .with_runtime_env(runtime_env)
115            .with_default_features()
116            .with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new(
117                self.cache_server.clone(),
118                self.cache_mode,
119                self.object_stores.clone(),
120            )))
121            .build();
122        Ok(SessionContext::new_with_state(session_state))
123    }
124}
125
126pub(crate) fn to_df_err<E: Error + Send + Sync + 'static>(err: E) -> DataFusionError {
127    DataFusionError::External(Box::new(err))
128}
129
130pub(crate) async fn flight_channel(
131    source: impl Into<String>,
132) -> Result<FastraceClientService<Channel>> {
133    use fastrace_tonic::FastraceClientLayer;
134    use tower::ServiceBuilder;
135
136    // No tls here, to avoid the overhead of TLS
137    // we assume both server and client are running on the trusted network.
138    let endpoint = Channel::from_shared(source.into())
139        .map_err(to_df_err)?
140        .tcp_keepalive(Some(Duration::from_secs(10)));
141
142    let channel = endpoint.connect().await.map_err(to_df_err)?;
143    let channel = ServiceBuilder::new()
144        .layer(FastraceClientLayer)
145        .service(channel);
146    Ok(channel)
147}