liquid_cache_local/
lib.rs1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3
4#[cfg(test)]
5mod tests;
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use datafusion::error::Result;
11use datafusion::prelude::{SessionConfig, SessionContext};
12use liquid_cache_common::IoMode;
13use liquid_cache_parquet::optimizers::{DateExtractOptimizer, LocalModeOptimizer};
14use liquid_cache_parquet::{LiquidCache, LiquidCacheRef};
15use liquid_cache_storage::cache::squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict};
16use liquid_cache_storage::cache_policies::CachePolicy;
17use liquid_cache_storage::cache_policies::LiquidPolicy;
18
19pub use liquid_cache_common as common;
20pub use liquid_cache_storage as storage;
21
22pub struct LiquidCacheLocalBuilder {
55 batch_size: usize,
57 max_cache_bytes: usize,
59 cache_dir: PathBuf,
61 cache_policy: Box<dyn CachePolicy>,
63 squeeze_policy: Box<dyn SqueezePolicy>,
65
66 span: fastrace::Span,
67
68 io_mode: IoMode,
69}
70
71impl Default for LiquidCacheLocalBuilder {
72 fn default() -> Self {
73 Self {
74 batch_size: 8192,
75 max_cache_bytes: 1024 * 1024 * 1024, cache_dir: std::env::temp_dir().join("liquid_cache"),
77 cache_policy: Box::new(LiquidPolicy::new()),
78 squeeze_policy: Box::new(TranscodeSqueezeEvict),
79 span: fastrace::Span::enter_with_local_parent("liquid_cache_local_builder"),
80 io_mode: IoMode::StdBlocking,
81 }
82 }
83}
84
85impl LiquidCacheLocalBuilder {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
93 self.batch_size = batch_size;
94 self
95 }
96
97 pub fn with_max_cache_bytes(mut self, max_cache_bytes: usize) -> Self {
99 self.max_cache_bytes = max_cache_bytes;
100 self
101 }
102
103 pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
105 self.cache_dir = cache_dir;
106 self
107 }
108
109 pub fn with_squeeze_policy(mut self, squeeze_policy: Box<dyn SqueezePolicy>) -> Self {
111 self.squeeze_policy = squeeze_policy;
112 self
113 }
114
115 pub fn with_cache_policy(mut self, cache_policy: Box<dyn CachePolicy>) -> Self {
117 self.cache_policy = cache_policy;
118 self
119 }
120
121 pub fn with_span(mut self, span: fastrace::Span) -> Self {
123 self.span = span;
124 self
125 }
126
127 pub fn with_io_mode(mut self, io_mode: IoMode) -> Self {
129 self.io_mode = io_mode;
130 self
131 }
132
133 pub fn build(self, mut config: SessionConfig) -> Result<(SessionContext, LiquidCacheRef)> {
136 config.options_mut().execution.parquet.pushdown_filters = true;
137 config
138 .options_mut()
139 .execution
140 .parquet
141 .schema_force_view_types = false;
142 config.options_mut().execution.batch_size = self.batch_size;
143
144 let cache = LiquidCache::new(
145 self.batch_size,
146 self.max_cache_bytes,
147 self.cache_dir,
148 self.cache_policy,
149 self.squeeze_policy,
150 self.io_mode,
151 );
152 let cache_ref = Arc::new(cache);
153
154 let date_extract_optimizer = Arc::new(DateExtractOptimizer::new());
155
156 let optimizer = LocalModeOptimizer::with_cache(cache_ref.clone());
157
158 let state = datafusion::execution::SessionStateBuilder::new()
159 .with_config(config)
160 .with_default_features()
161 .with_optimizer_rule(date_extract_optimizer)
162 .with_physical_optimizer_rule(Arc::new(optimizer))
163 .build();
164
165 Ok((SessionContext::new_with_state(state), cache_ref))
166 }
167}
168
169#[cfg(test)]
170mod local_tests {
171 use arrow_schema::{DataType, Field, Schema};
172 use datafusion::datasource::{
173 file_format::parquet::ParquetFormat,
174 listing::{ListingOptions, ListingTableUrl},
175 };
176
177 use super::*;
178
179 #[tokio::test]
180 async fn register_with_listing_table() -> Result<()> {
181 let file_format = ParquetFormat::default().with_enable_pruning(true);
182 let listing_options =
183 ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet");
184 let (ctx, _) = LiquidCacheLocalBuilder::new().build(SessionConfig::new())?;
185 let table_path = ListingTableUrl::parse("../../examples/nano_hits.parquet")?;
186 ctx.register_listing_table("hits", &table_path, listing_options.clone(), None, None)
187 .await?;
188
189 ctx.sql("SELECT * FROM hits where \"URL\" like '%google%'")
190 .await?
191 .show()
192 .await?;
193 Ok(())
194 }
195
196 #[tokio::test]
197 async fn test_provide_schema() -> Result<()> {
198 let (ctx, _) = LiquidCacheLocalBuilder::new().build(SessionConfig::new())?;
199
200 let file_format = ParquetFormat::default().with_enable_pruning(true);
201 let listing_options =
202 ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet");
203
204 let table_path = ListingTableUrl::parse("../../examples/nano_hits.parquet")?;
205 let schema = Schema::new(vec![Field::new("WatchID", DataType::Int64, false)]);
206
207 ctx.register_listing_table(
208 "hits",
209 &table_path,
210 listing_options.clone(),
211 Some(Arc::new(schema)),
212 None,
213 )
214 .await?;
215
216 ctx.sql("SELECT \"WatchID\" FROM hits limit 1")
217 .await?
218 .show()
219 .await?;
220 Ok(())
221 }
222}