liquid_cache_local/
lib.rs

1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3
4#[cfg(test)]
5mod tests;
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use datafusion::error::Result;
11use datafusion::prelude::{SessionConfig, SessionContext};
12use liquid_cache_common::IoMode;
13use liquid_cache_parquet::optimizers::{DateExtractOptimizer, LocalModeOptimizer};
14use liquid_cache_parquet::{LiquidCache, LiquidCacheRef};
15use liquid_cache_storage::cache::squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict};
16use liquid_cache_storage::cache_policies::CachePolicy;
17use liquid_cache_storage::cache_policies::LiquidPolicy;
18
19pub use liquid_cache_common as common;
20pub use liquid_cache_storage as storage;
21
22/// Builder for in-process liquid cache session context
23///
24/// This allows you to use liquid cache within the same process,
25/// instead of using the client-server architecture as in the default mode.
26///
27/// # Example
28/// ```rust
29/// use liquid_cache_local::{
30///     storage::cache_policies::LiquidPolicy,
31///     LiquidCacheLocalBuilder,
32/// };
33/// use datafusion::prelude::{SessionConfig, SessionContext};
34/// use tempfile::TempDir;
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
38///     let temp_dir = TempDir::new().unwrap();
39///
40///     let (ctx, _) = LiquidCacheLocalBuilder::new()
41///         .with_max_cache_bytes(1024 * 1024 * 1024) // 1GB
42///         .with_cache_dir(temp_dir.path().to_path_buf())
43///         .with_cache_policy(Box::new(LiquidPolicy::new()))
44///         .build(SessionConfig::new())?;
45///
46///     // Register the test parquet file
47///     ctx.register_parquet("hits", "../../examples/nano_hits.parquet", Default::default())
48///         .await?;
49///
50///     ctx.sql("SELECT COUNT(*) FROM hits").await?.show().await?;
51///     Ok(())
52/// }
53/// ```
54pub struct LiquidCacheLocalBuilder {
55    /// Size of batches for caching
56    batch_size: usize,
57    /// Maximum cache size in bytes
58    max_cache_bytes: usize,
59    /// Directory for disk cache
60    cache_dir: PathBuf,
61    /// Cache policy
62    cache_policy: Box<dyn CachePolicy>,
63    /// Squeeze policy
64    squeeze_policy: Box<dyn SqueezePolicy>,
65
66    span: fastrace::Span,
67
68    io_mode: IoMode,
69}
70
71impl Default for LiquidCacheLocalBuilder {
72    fn default() -> Self {
73        Self {
74            batch_size: 8192,
75            max_cache_bytes: 1024 * 1024 * 1024, // 1GB
76            cache_dir: std::env::temp_dir().join("liquid_cache"),
77            cache_policy: Box::new(LiquidPolicy::new()),
78            squeeze_policy: Box::new(TranscodeSqueezeEvict),
79            span: fastrace::Span::enter_with_local_parent("liquid_cache_local_builder"),
80            io_mode: IoMode::StdBlocking,
81        }
82    }
83}
84
85impl LiquidCacheLocalBuilder {
86    /// Create a new builder with defaults
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Set batch size
92    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
93        self.batch_size = batch_size;
94        self
95    }
96
97    /// Set maximum cache size in bytes
98    pub fn with_max_cache_bytes(mut self, max_cache_bytes: usize) -> Self {
99        self.max_cache_bytes = max_cache_bytes;
100        self
101    }
102
103    /// Set cache directory
104    pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
105        self.cache_dir = cache_dir;
106        self
107    }
108
109    /// Set squeeze policy
110    pub fn with_squeeze_policy(mut self, squeeze_policy: Box<dyn SqueezePolicy>) -> Self {
111        self.squeeze_policy = squeeze_policy;
112        self
113    }
114
115    /// Set cache strategy
116    pub fn with_cache_policy(mut self, cache_policy: Box<dyn CachePolicy>) -> Self {
117        self.cache_policy = cache_policy;
118        self
119    }
120
121    /// Set fastrace span
122    pub fn with_span(mut self, span: fastrace::Span) -> Self {
123        self.span = span;
124        self
125    }
126
127    /// Set IO mode
128    pub fn with_io_mode(mut self, io_mode: IoMode) -> Self {
129        self.io_mode = io_mode;
130        self
131    }
132
133    /// Build a SessionContext with liquid cache configured
134    /// Returns the SessionContext and the liquid cache reference
135    pub fn build(self, mut config: SessionConfig) -> Result<(SessionContext, LiquidCacheRef)> {
136        config.options_mut().execution.parquet.pushdown_filters = true;
137        config
138            .options_mut()
139            .execution
140            .parquet
141            .schema_force_view_types = false;
142        config.options_mut().execution.batch_size = self.batch_size;
143
144        let cache = LiquidCache::new(
145            self.batch_size,
146            self.max_cache_bytes,
147            self.cache_dir,
148            self.cache_policy,
149            self.squeeze_policy,
150            self.io_mode,
151        );
152        let cache_ref = Arc::new(cache);
153
154        let date_extract_optimizer = Arc::new(DateExtractOptimizer::new());
155
156        let optimizer = LocalModeOptimizer::with_cache(cache_ref.clone());
157
158        let state = datafusion::execution::SessionStateBuilder::new()
159            .with_config(config)
160            .with_default_features()
161            .with_optimizer_rule(date_extract_optimizer)
162            .with_physical_optimizer_rule(Arc::new(optimizer))
163            .build();
164
165        Ok((SessionContext::new_with_state(state), cache_ref))
166    }
167}
168
169#[cfg(test)]
170mod local_tests {
171    use arrow_schema::{DataType, Field, Schema};
172    use datafusion::datasource::{
173        file_format::parquet::ParquetFormat,
174        listing::{ListingOptions, ListingTableUrl},
175    };
176
177    use super::*;
178
179    #[tokio::test]
180    async fn register_with_listing_table() -> Result<()> {
181        let file_format = ParquetFormat::default().with_enable_pruning(true);
182        let listing_options =
183            ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet");
184        let (ctx, _) = LiquidCacheLocalBuilder::new().build(SessionConfig::new())?;
185        let table_path = ListingTableUrl::parse("../../examples/nano_hits.parquet")?;
186        ctx.register_listing_table("hits", &table_path, listing_options.clone(), None, None)
187            .await?;
188
189        ctx.sql("SELECT * FROM hits where \"URL\" like '%google%'")
190            .await?
191            .show()
192            .await?;
193        Ok(())
194    }
195
196    #[tokio::test]
197    async fn test_provide_schema() -> Result<()> {
198        let (ctx, _) = LiquidCacheLocalBuilder::new().build(SessionConfig::new())?;
199
200        let file_format = ParquetFormat::default().with_enable_pruning(true);
201        let listing_options =
202            ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet");
203
204        let table_path = ListingTableUrl::parse("../../examples/nano_hits.parquet")?;
205        let schema = Schema::new(vec![Field::new("WatchID", DataType::Int64, false)]);
206
207        ctx.register_listing_table(
208            "hits",
209            &table_path,
210            listing_options.clone(),
211            Some(Arc::new(schema)),
212            None,
213        )
214        .await?;
215
216        ctx.sql("SELECT \"WatchID\" FROM hits limit 1")
217            .await?
218            .show()
219            .await?;
220        Ok(())
221    }
222}