Skip to main content

liquid_cache_datafusion_server/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc = include_str!("../README.md")]
19
20use arrow::ipc::writer::IpcWriteOptions;
21use arrow_flight::{
22    Action, HandshakeRequest, HandshakeResponse, Ticket,
23    encode::{DictionaryHandling, FlightDataEncoderBuilder},
24    flight_service_server::FlightService,
25    sql::{
26        Any, CommandPreparedStatementUpdate, SqlInfo,
27        server::{FlightSqlService, PeekableFlightDataStream},
28    },
29};
30use datafusion::{
31    error::DataFusionError,
32    execution::{SessionStateBuilder, object_store::ObjectStoreUrl},
33    prelude::{SessionConfig, SessionContext},
34};
35use datafusion_proto::bytes::physical_plan_from_bytes;
36use fastrace::prelude::SpanContext;
37use futures::{Stream, TryStreamExt};
38use liquid_cache_common::{
39    IoMode,
40    rpc::{FetchResults, LiquidCacheActions},
41};
42use liquid_cache_datafusion::cache::LiquidCacheParquetRef;
43use log::info;
44use prost::bytes::Bytes;
45use service::LiquidCacheServiceInner;
46use std::pin::Pin;
47use std::{path::PathBuf, sync::Arc};
48use tonic::{Request, Response, Status, Streaming};
49use url::Url;
50use uuid::Uuid;
51mod service;
52mod utils;
53use utils::FinalStream;
54mod admin_server;
55mod errors;
56pub use admin_server::{models::*, run_admin_server};
57pub use errors::{
58    LiquidCacheErrorExt, LiquidCacheResult, anyhow_to_status, df_error_to_status_with_trace,
59};
60pub use liquid_cache as storage;
61use liquid_cache::{
62    cache::{
63        AlwaysHydrate, HydrationPolicy,
64        squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict},
65    },
66    cache_policies::{CachePolicy, LiquidPolicy},
67};
68pub use liquid_cache_common as common;
69use object_store::path::Path;
70use object_store::{GetOptions, GetRange};
71
72#[cfg(test)]
73mod tests;
74
75/// The LiquidCache server.
76///
77/// # Example
78///
79/// ```rust
80/// use arrow_flight::flight_service_server::FlightServiceServer;
81/// use datafusion::prelude::SessionContext;
82/// use liquid_cache_datafusion_server::LiquidCacheService;
83/// use liquid_cache_datafusion_server::storage::cache::squeeze_policies::TranscodeSqueezeEvict;
84/// use liquid_cache_datafusion_server::storage::cache::AlwaysHydrate;
85/// use liquid_cache_datafusion_server::storage::cache_policies::LiquidPolicy;
86/// use tonic::transport::Server;
87/// let liquid_cache = LiquidCacheService::new(
88///     SessionContext::new(),
89///     None,
90///     None,
91///     Box::new(LiquidPolicy::new()),
92///     Box::new(TranscodeSqueezeEvict),
93///     Box::new(AlwaysHydrate::new()),
94///     None,
95/// )
96/// .unwrap();
97/// let flight = FlightServiceServer::new(liquid_cache);
98/// Server::builder()
99///     .add_service(flight)
100///     .serve("0.0.0.0:15214".parse().unwrap());
101/// ```
102pub struct LiquidCacheService {
103    inner: LiquidCacheServiceInner,
104}
105
106impl Default for LiquidCacheService {
107    fn default() -> Self {
108        Self::try_new().unwrap()
109    }
110}
111
112impl LiquidCacheService {
113    /// Create a new [LiquidCacheService] with a default [SessionContext]
114    /// With no disk cache and unbounded memory usage.
115    pub fn try_new() -> anyhow::Result<Self> {
116        let ctx = Self::context()?;
117        Self::new(
118            ctx,
119            None,
120            None,
121            Box::new(LiquidPolicy::new()),
122            Box::new(TranscodeSqueezeEvict),
123            Box::new(AlwaysHydrate::new()),
124            None,
125        )
126    }
127
128    /// Create a new [LiquidCacheService] with a custom [SessionContext]
129    ///
130    /// # Arguments
131    ///
132    /// * `ctx` - The [SessionContext] to use
133    /// * `max_cache_bytes` - The maximum number of bytes to cache in memory
134    /// * `disk_cache_dir` - The directory to store the disk cache
135    /// * `io_mode` - Whether reads and writes should go through the OS page cache
136    pub fn new(
137        ctx: SessionContext,
138        max_cache_bytes: Option<usize>,
139        disk_cache_dir: Option<PathBuf>,
140        cache_policy: Box<dyn CachePolicy>,
141        squeeze_policy: Box<dyn SqueezePolicy>,
142        hydration_policy: Box<dyn HydrationPolicy>,
143        io_mode: Option<IoMode>,
144    ) -> anyhow::Result<Self> {
145        let disk_cache_dir = match disk_cache_dir {
146            Some(dir) => dir,
147            None => {
148                let dir = tempfile::tempdir()?.keep();
149                info!("Using temporary directory for disk cache: {dir:?}");
150                dir
151            }
152        };
153        let io_mode = match io_mode {
154            Some(io) => io,
155            None => IoMode::Uring,
156        };
157        Ok(Self {
158            inner: LiquidCacheServiceInner::new(
159                Arc::new(ctx),
160                max_cache_bytes,
161                disk_cache_dir,
162                cache_policy,
163                squeeze_policy,
164                hydration_policy,
165                io_mode,
166            ),
167        })
168    }
169
170    /// Get a reference to the cache
171    pub fn cache(&self) -> &LiquidCacheParquetRef {
172        self.inner.cache()
173    }
174
175    /// Create a new [SessionContext] with good defaults
176    /// This is the recommended way to create a [SessionContext] for LiquidCache
177    pub fn context() -> Result<SessionContext, DataFusionError> {
178        let mut session_config = SessionConfig::from_env()?;
179        let options_mut = session_config.options_mut();
180        options_mut.execution.parquet.pushdown_filters = true;
181        options_mut.execution.batch_size = 8192 * 2;
182
183        {
184            // view types cause excessive memory usage because they are not gced.
185            // For Arrow memory mode, we need to read as UTF-8
186            // For Liquid cache, we have our own way of handling string columns
187            options_mut.execution.parquet.schema_force_view_types = false;
188        }
189
190        let object_store_url = ObjectStoreUrl::parse("file://")?;
191        let object_store = object_store::local::LocalFileSystem::new();
192
193        let state = SessionStateBuilder::new()
194            .with_config(session_config)
195            .with_default_features()
196            .with_object_store(object_store_url.as_ref(), Arc::new(object_store))
197            .build();
198
199        let ctx = SessionContext::new_with_state(state);
200        Ok(ctx)
201    }
202
203    /// Get the parquet cache directory
204    pub fn get_parquet_cache_dir(&self) -> &PathBuf {
205        self.inner.get_parquet_cache_dir()
206    }
207
208    pub(crate) fn inner(&self) -> &LiquidCacheServiceInner {
209        &self.inner
210    }
211
212    async fn do_get_fallback_inner(
213        &self,
214        message: Any,
215    ) -> anyhow::Result<Response<<Self as FlightService>::DoGetStream>> {
216        if !message.is::<FetchResults>() {
217            return Err(anyhow::anyhow!(
218                "do_get: The defined request is invalid: {}",
219                message.type_url
220            ));
221        }
222
223        let fetch_results: FetchResults = message
224            .unpack()?
225            .ok_or_else(|| anyhow::anyhow!("Expected FetchResults but got None!"))?;
226
227        let span_context = SpanContext::decode_w3c_traceparent(&fetch_results.traceparent)
228            .unwrap_or_else(SpanContext::random);
229        let span = fastrace::Span::root("poll_stream", span_context);
230
231        let handle = Uuid::from_bytes_ref(fetch_results.handle.as_ref().try_into()?);
232        let partition = fetch_results.partition as usize;
233        let stream = self.inner.execute_plan(handle, partition).await?;
234        let stream = FinalStream::new(stream, self.inner.batch_size(), span).map_err(|e| {
235            let status = anyhow_to_status(anyhow::Error::from(e).context("Error executing plan"));
236            arrow_flight::error::FlightError::Tonic(Box::new(status))
237        });
238
239        let ipc_options = IpcWriteOptions::default();
240        let stream = FlightDataEncoderBuilder::new()
241            .with_options(ipc_options)
242            .with_dictionary_handling(DictionaryHandling::Resend)
243            .build(stream)
244            .map_err(Status::from);
245
246        Ok(Response::new(Box::pin(stream)))
247    }
248
249    async fn do_action_inner(
250        &self,
251        action: LiquidCacheActions,
252    ) -> anyhow::Result<Response<<Self as FlightService>::DoActionStream>> {
253        match action {
254            LiquidCacheActions::RegisterObjectStore(cmd) => {
255                let url = Url::parse(&cmd.url)?;
256                self.inner.register_object_store(&url, cmd.options).await?;
257
258                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
259                    body: Bytes::default(),
260                })]);
261                Ok(Response::new(Box::pin(output)))
262            }
263            LiquidCacheActions::RegisterPlan(cmd) => {
264                let plan = cmd.plan;
265                let plan = physical_plan_from_bytes(&plan, &self.inner.get_ctx().task_ctx())?;
266                let handle = Uuid::from_bytes_ref(cmd.handle.as_ref().try_into()?);
267                self.inner.register_plan(*handle, plan);
268                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
269                    body: Bytes::default(),
270                })]);
271                Ok(Response::new(Box::pin(output)))
272            }
273            LiquidCacheActions::PrefetchFromObjectStore(cmd) => {
274                // Parse the object store URL (e.g., s3://bucket)
275                let url = Url::parse(&cmd.url)?;
276
277                // Register the object store if not already registered
278                self.inner
279                    .register_object_store(&url, cmd.store_options)
280                    .await?;
281
282                // Get the local cache wrapper for this object store
283                let local_cache = self.inner.get_object_store(&url)?;
284
285                // Parse the path to the object within the store (e.g., path/to/file.parquet)
286                let path = Path::from(cmd.location);
287
288                // Create a range for the specific chunk we want to prefetch, if specified
289                let get_options = if let (Some(range_start), Some(range_end)) =
290                    (cmd.range_start, cmd.range_end)
291                {
292                    let chunk_range = GetRange::Bounded(range_start..range_end);
293                    GetOptions {
294                        range: Some(chunk_range),
295                        ..GetOptions::default()
296                    }
297                } else {
298                    GetOptions::default()
299                };
300
301                // Call the underlying object store to get the data and cache it
302                // The LocalCache wrapper will handle caching the fetched data
303                local_cache.get_opts(&path, get_options).await?;
304
305                // Return empty response to indicate successful prefetch
306                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
307                    body: Bytes::default(),
308                })]);
309                Ok(Response::new(Box::pin(output)))
310            }
311        }
312    }
313}
314
315#[tonic::async_trait]
316impl FlightSqlService for LiquidCacheService {
317    type FlightService = LiquidCacheService;
318
319    async fn do_handshake(
320        &self,
321        _request: Request<Streaming<HandshakeRequest>>,
322    ) -> Result<
323        Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
324        Status,
325    > {
326        unimplemented!("We don't do handshake")
327    }
328
329    #[fastrace::trace]
330    async fn do_get_fallback(
331        &self,
332        _request: Request<Ticket>,
333        message: Any,
334    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
335        self.do_get_fallback_inner(message)
336            .await
337            .map_err(anyhow_to_status)
338    }
339
340    async fn do_put_prepared_statement_update(
341        &self,
342        _handle: CommandPreparedStatementUpdate,
343        _request: Request<PeekableFlightDataStream>,
344    ) -> Result<i64, Status> {
345        info!("do_put_prepared_statement_update");
346        // statements like "CREATE TABLE.." or "SET datafusion.nnn.." call this function
347        // and we are required to return some row count here
348        Ok(-1)
349    }
350
351    #[fastrace::trace]
352    async fn do_action_fallback(
353        &self,
354        request: Request<Action>,
355    ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
356        let action = LiquidCacheActions::from(request.into_inner());
357        self.do_action_inner(action).await.map_err(anyhow_to_status)
358    }
359
360    async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
361}
362
363#[cfg(test)]
364mod server_actions_tests {
365    use super::*;
366    use liquid_cache::ByteCache;
367    use liquid_cache_common::rpc::PrefetchFromObjectStoreRequest;
368    use std::collections::HashMap;
369    use tokio::fs::File;
370    use tokio::io::AsyncWriteExt;
371
372    async fn create_test_file(file_path: &str, size_mb: usize) -> anyhow::Result<()> {
373        let mut file = File::create(file_path).await?;
374        let size_bytes = size_mb * 1024 * 1024;
375
376        let chunk_size = 64 * 1024;
377        let chunk_data = vec![0u8; chunk_size];
378        let mut written = 0;
379
380        while written < size_bytes {
381            let remaining = size_bytes - written;
382            let write_size = std::cmp::min(chunk_size, remaining);
383            file.write_all(&chunk_data[..write_size]).await?;
384            written += write_size;
385        }
386
387        file.flush().await?;
388        Ok(())
389    }
390
391    #[tokio::test]
392    async fn test_prefetch_from_object_store() {
393        let service = LiquidCacheService::default();
394
395        // Create temporary test file
396        let temp_dir = tempfile::tempdir().unwrap();
397        let file_path = temp_dir.path().join("test_prefetch_data.bin");
398        let file_path_str = file_path.to_string_lossy().to_string();
399
400        // Generate 16MB test file
401        create_test_file(&file_path_str, 16).await.unwrap();
402
403        // Test with local file system
404        let url = Url::parse("file:///").unwrap();
405
406        let request = PrefetchFromObjectStoreRequest {
407            url: url.to_string(),
408            store_options: HashMap::new(),
409            location: file_path_str.clone(),
410            range_start: None,
411            range_end: None,
412        };
413
414        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
415        let result = service.do_action_inner(action).await.unwrap();
416
417        let mut stream = result.into_inner();
418        let result = stream.try_next().await.unwrap().unwrap();
419        assert!(result.body.is_empty());
420
421        // Cleanup is handled by tempdir drop
422    }
423
424    #[tokio::test]
425    async fn test_prefetch_from_object_store_with_range() {
426        let service = LiquidCacheService::default();
427
428        // Create temporary test file
429        let temp_dir = tempfile::tempdir().unwrap();
430        let file_path = temp_dir.path().join("test_prefetch_data_range.bin");
431        let file_path_str = file_path.to_string_lossy().to_string();
432
433        // Generate 16MB test file
434        create_test_file(&file_path_str, 16).await.unwrap();
435
436        // Test with local file system
437        let url = Url::parse("file:///").unwrap();
438
439        // range from 1mb to 10mb
440        let range_start = 1024 * 1024;
441        let range_end = 10 * 1024 * 1024;
442
443        let request = PrefetchFromObjectStoreRequest {
444            url: url.to_string(),
445            store_options: HashMap::new(),
446            location: file_path_str.clone(),
447            range_start: Some(range_start),
448            range_end: Some(range_end),
449        };
450
451        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
452        let result = service.do_action_inner(action).await.unwrap();
453
454        let mut stream = result.into_inner();
455        let result = stream.try_next().await.unwrap().unwrap();
456        assert!(result.body.is_empty());
457
458        // Cleanup is handled by tempdir drop
459    }
460
461    #[tokio::test]
462    async fn test_prefetch_invalid_object_store() {
463        let service = LiquidCacheService::default();
464
465        let request = PrefetchFromObjectStoreRequest {
466            url: "invalid://url".to_string(),
467            store_options: HashMap::new(),
468            location: "test.parquet".to_string(),
469            range_start: Some(0),
470            range_end: Some(1024),
471        };
472
473        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
474        let result = service.do_action_inner(action).await;
475        assert!(result.is_err());
476    }
477
478    #[tokio::test]
479    async fn test_prefetch_invalid_location() {
480        let service = LiquidCacheService::default();
481
482        let url = Url::parse("file:///").unwrap();
483        let request = PrefetchFromObjectStoreRequest {
484            url: url.to_string(),
485            store_options: HashMap::new(),
486            location: "non_existent_file.parquet".to_string(),
487            range_start: Some(0),
488            range_end: Some(1024),
489        };
490
491        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
492        let result = service.do_action_inner(action).await;
493        assert!(result.is_err());
494    }
495
496    #[tokio::test]
497    async fn test_prefetch_with_mock_store_metrics() {
498        use datafusion::execution::object_store::ObjectStoreUrl;
499        use liquid_cache_common::mock_store::MockStore;
500        use liquid_cache_common::utils::sanitize_object_store_url_for_dirname;
501
502        const BLOCK_SIZE: u64 = 1024 * 1024 * 4;
503
504        let service = LiquidCacheService::default();
505
506        let url = Url::parse("s3://mock").unwrap();
507
508        let inner = Arc::new(MockStore::new_with_files(1, (BLOCK_SIZE * 3) as usize));
509
510        let cache_dir = service
511            .get_parquet_cache_dir()
512            .join(sanitize_object_store_url_for_dirname(&url));
513        let local_cache = ByteCache::new(inner.clone(), cache_dir);
514
515        let object_store_url = ObjectStoreUrl::parse(url.as_str()).unwrap();
516        service
517            .inner()
518            .get_ctx()
519            .runtime_env()
520            .register_object_store(object_store_url.as_ref(), Arc::new(local_cache));
521
522        let start = BLOCK_SIZE / 2;
523        let end = BLOCK_SIZE + start;
524
525        let request = PrefetchFromObjectStoreRequest {
526            url: url.to_string(),
527            store_options: HashMap::new(),
528            location: "0.parquet".to_string(),
529            range_start: Some(start),
530            range_end: Some(end),
531        };
532
533        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
534        let result = service.do_action_inner(action).await.unwrap();
535        let mut stream = result.into_inner();
536        let _ = stream.try_next().await.unwrap().unwrap();
537
538        let path = Path::from("0.parquet");
539        let ranges = inner.get_access_ranges(&path).unwrap();
540        assert_eq!(ranges.len(), 2);
541        assert_eq!(ranges[0], 0..BLOCK_SIZE);
542        assert_eq!(ranges[1], BLOCK_SIZE..BLOCK_SIZE * 2);
543
544        let request2 = PrefetchFromObjectStoreRequest {
545            url: url.to_string(),
546            store_options: HashMap::new(),
547            location: "0.parquet".to_string(),
548            range_start: Some(start + 1024),
549            range_end: Some(end - 1024),
550        };
551
552        let action = LiquidCacheActions::PrefetchFromObjectStore(request2);
553        let result = service.do_action_inner(action).await.unwrap();
554        let mut stream = result.into_inner();
555        let _ = stream.try_next().await.unwrap().unwrap();
556
557        let ranges_after = inner.get_access_ranges(&path).unwrap();
558        assert_eq!(ranges_after.len(), 2);
559    }
560}