liquid_cache_server/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc = include_str!("../README.md")]
19
20use arrow::ipc::writer::IpcWriteOptions;
21use arrow_flight::{
22    Action, HandshakeRequest, HandshakeResponse, Ticket,
23    encode::{DictionaryHandling, FlightDataEncoderBuilder},
24    flight_service_server::FlightService,
25    sql::{
26        Any, CommandPreparedStatementUpdate, SqlInfo,
27        server::{FlightSqlService, PeekableFlightDataStream},
28    },
29};
30use datafusion::{
31    error::DataFusionError,
32    execution::{SessionStateBuilder, object_store::ObjectStoreUrl},
33    prelude::{SessionConfig, SessionContext},
34};
35use datafusion_proto::bytes::physical_plan_from_bytes;
36use fastrace::prelude::SpanContext;
37use futures::{Stream, TryStreamExt};
38use liquid_cache_common::{
39    IoMode,
40    rpc::{FetchResults, LiquidCacheActions},
41};
42use liquid_cache_parquet::cache::LiquidCacheRef;
43use log::info;
44use prost::bytes::Bytes;
45use service::LiquidCacheServiceInner;
46use std::pin::Pin;
47use std::{path::PathBuf, sync::Arc};
48use tonic::{Request, Response, Status, Streaming};
49use url::Url;
50use uuid::Uuid;
51mod service;
52mod utils;
53use utils::FinalStream;
54mod admin_server;
55mod errors;
56pub use admin_server::{models::*, run_admin_server};
57pub use errors::{
58    LiquidCacheErrorExt, LiquidCacheResult, anyhow_to_status, df_error_to_status_with_trace,
59};
60pub use liquid_cache_common as common;
61pub use liquid_cache_storage as storage;
62use liquid_cache_storage::{
63    cache::squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict},
64    cache_policies::{CachePolicy, LiquidPolicy},
65};
66use object_store::path::Path;
67use object_store::{GetOptions, GetRange};
68
69#[cfg(test)]
70mod tests;
71
72/// The LiquidCache server.
73///
74/// # Example
75///
76/// ```rust
77/// use arrow_flight::flight_service_server::FlightServiceServer;
78/// use datafusion::prelude::SessionContext;
79/// use liquid_cache_server::LiquidCacheService;
80/// use liquid_cache_server::storage::cache::squeeze_policies::TranscodeSqueezeEvict;
81/// use liquid_cache_server::storage::cache_policies::LiquidPolicy;
82/// use tonic::transport::Server;
83/// let liquid_cache = LiquidCacheService::new(
84///     SessionContext::new(),
85///     None,
86///     None,
87///     Box::new(LiquidPolicy::new()),
88///     Box::new(TranscodeSqueezeEvict),
89///     None,
90/// )
91/// .unwrap();
92/// let flight = FlightServiceServer::new(liquid_cache);
93/// Server::builder()
94///     .add_service(flight)
95///     .serve("0.0.0.0:15214".parse().unwrap());
96/// ```
97pub struct LiquidCacheService {
98    inner: LiquidCacheServiceInner,
99}
100
101impl Default for LiquidCacheService {
102    fn default() -> Self {
103        Self::try_new().unwrap()
104    }
105}
106
107impl LiquidCacheService {
108    /// Create a new [LiquidCacheService] with a default [SessionContext]
109    /// With no disk cache and unbounded memory usage.
110    pub fn try_new() -> anyhow::Result<Self> {
111        let ctx = Self::context()?;
112        Self::new(
113            ctx,
114            None,
115            None,
116            Box::new(LiquidPolicy::new()),
117            Box::new(TranscodeSqueezeEvict),
118            None,
119        )
120    }
121
122    /// Create a new [LiquidCacheService] with a custom [SessionContext]
123    ///
124    /// # Arguments
125    ///
126    /// * `ctx` - The [SessionContext] to use
127    /// * `max_cache_bytes` - The maximum number of bytes to cache in memory
128    /// * `disk_cache_dir` - The directory to store the disk cache
129    /// * `io_mode` - Whether reads and writes should go through the OS page cache
130    pub fn new(
131        ctx: SessionContext,
132        max_cache_bytes: Option<usize>,
133        disk_cache_dir: Option<PathBuf>,
134        cache_policy: Box<dyn CachePolicy>,
135        squeeze_policy: Box<dyn SqueezePolicy>,
136        io_mode: Option<IoMode>,
137    ) -> anyhow::Result<Self> {
138        let disk_cache_dir = match disk_cache_dir {
139            Some(dir) => dir,
140            None => {
141                let dir = tempfile::tempdir()?.keep();
142                info!("Using temporary directory for disk cache: {dir:?}");
143                dir
144            }
145        };
146        let io_mode = match io_mode {
147            Some(io) => io,
148            None => IoMode::Uring,
149        };
150        Ok(Self {
151            inner: LiquidCacheServiceInner::new(
152                Arc::new(ctx),
153                max_cache_bytes,
154                disk_cache_dir,
155                cache_policy,
156                squeeze_policy,
157                io_mode,
158            ),
159        })
160    }
161
162    /// Get a reference to the cache
163    pub fn cache(&self) -> &LiquidCacheRef {
164        self.inner.cache()
165    }
166
167    /// Create a new [SessionContext] with good defaults
168    /// This is the recommended way to create a [SessionContext] for LiquidCache
169    pub fn context() -> Result<SessionContext, DataFusionError> {
170        let mut session_config = SessionConfig::from_env()?;
171        let options_mut = session_config.options_mut();
172        options_mut.execution.parquet.pushdown_filters = true;
173        options_mut.execution.batch_size = 8192 * 2;
174
175        {
176            // view types cause excessive memory usage because they are not gced.
177            // For Arrow memory mode, we need to read as UTF-8
178            // For Liquid cache, we have our own way of handling string columns
179            options_mut.execution.parquet.schema_force_view_types = false;
180        }
181
182        let object_store_url = ObjectStoreUrl::parse("file://")?;
183        let object_store = object_store::local::LocalFileSystem::new();
184
185        let state = SessionStateBuilder::new()
186            .with_config(session_config)
187            .with_default_features()
188            .with_object_store(object_store_url.as_ref(), Arc::new(object_store))
189            .build();
190
191        let ctx = SessionContext::new_with_state(state);
192        Ok(ctx)
193    }
194
195    /// Get the parquet cache directory
196    pub fn get_parquet_cache_dir(&self) -> &PathBuf {
197        self.inner.get_parquet_cache_dir()
198    }
199
200    pub(crate) fn inner(&self) -> &LiquidCacheServiceInner {
201        &self.inner
202    }
203
204    async fn do_get_fallback_inner(
205        &self,
206        message: Any,
207    ) -> anyhow::Result<Response<<Self as FlightService>::DoGetStream>> {
208        if !message.is::<FetchResults>() {
209            return Err(anyhow::anyhow!(
210                "do_get: The defined request is invalid: {}",
211                message.type_url
212            ));
213        }
214
215        let fetch_results: FetchResults = message
216            .unpack()?
217            .ok_or_else(|| anyhow::anyhow!("Expected FetchResults but got None!"))?;
218
219        let span_context = SpanContext::decode_w3c_traceparent(&fetch_results.traceparent)
220            .unwrap_or_else(SpanContext::random);
221        let span = fastrace::Span::root("poll_stream", span_context);
222
223        let handle = Uuid::from_bytes_ref(fetch_results.handle.as_ref().try_into()?);
224        let partition = fetch_results.partition as usize;
225        let stream = self.inner.execute_plan(handle, partition).await?;
226        let stream = FinalStream::new(stream, self.inner.batch_size(), span).map_err(|e| {
227            let status = anyhow_to_status(anyhow::Error::from(e).context("Error executing plan"));
228            arrow_flight::error::FlightError::Tonic(Box::new(status))
229        });
230
231        let ipc_options = IpcWriteOptions::default();
232        let stream = FlightDataEncoderBuilder::new()
233            .with_options(ipc_options)
234            .with_dictionary_handling(DictionaryHandling::Resend)
235            .build(stream)
236            .map_err(Status::from);
237
238        Ok(Response::new(Box::pin(stream)))
239    }
240
241    async fn do_action_inner(
242        &self,
243        action: LiquidCacheActions,
244    ) -> anyhow::Result<Response<<Self as FlightService>::DoActionStream>> {
245        match action {
246            LiquidCacheActions::RegisterObjectStore(cmd) => {
247                let url = Url::parse(&cmd.url)?;
248                self.inner.register_object_store(&url, cmd.options).await?;
249
250                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
251                    body: Bytes::default(),
252                })]);
253                Ok(Response::new(Box::pin(output)))
254            }
255            LiquidCacheActions::RegisterPlan(cmd) => {
256                let plan = cmd.plan;
257                let plan = physical_plan_from_bytes(&plan, self.inner.get_ctx())?;
258                let handle = Uuid::from_bytes_ref(cmd.handle.as_ref().try_into()?);
259                self.inner.register_plan(*handle, plan);
260                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
261                    body: Bytes::default(),
262                })]);
263                Ok(Response::new(Box::pin(output)))
264            }
265            LiquidCacheActions::PrefetchFromObjectStore(cmd) => {
266                // Parse the object store URL (e.g., s3://bucket)
267                let url = Url::parse(&cmd.url)?;
268
269                // Register the object store if not already registered
270                self.inner
271                    .register_object_store(&url, cmd.store_options)
272                    .await?;
273
274                // Get the local cache wrapper for this object store
275                let local_cache = self.inner.get_object_store(&url)?;
276
277                // Parse the path to the object within the store (e.g., path/to/file.parquet)
278                let path = Path::from(cmd.location);
279
280                // Create a range for the specific chunk we want to prefetch, if specified
281                let get_options = if cmd.range_start.is_some() && cmd.range_end.is_some() {
282                    let chunk_range =
283                        GetRange::Bounded(cmd.range_start.unwrap()..cmd.range_end.unwrap());
284                    GetOptions {
285                        range: Some(chunk_range),
286                        ..GetOptions::default()
287                    }
288                } else {
289                    GetOptions::default()
290                };
291
292                // Call the underlying object store to get the data and cache it
293                // The LocalCache wrapper will handle caching the fetched data
294                local_cache.get_opts(&path, get_options).await?;
295
296                // Return empty response to indicate successful prefetch
297                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
298                    body: Bytes::default(),
299                })]);
300                Ok(Response::new(Box::pin(output)))
301            }
302        }
303    }
304}
305
306#[tonic::async_trait]
307impl FlightSqlService for LiquidCacheService {
308    type FlightService = LiquidCacheService;
309
310    async fn do_handshake(
311        &self,
312        _request: Request<Streaming<HandshakeRequest>>,
313    ) -> Result<
314        Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
315        Status,
316    > {
317        unimplemented!("We don't do handshake")
318    }
319
320    #[fastrace::trace]
321    async fn do_get_fallback(
322        &self,
323        _request: Request<Ticket>,
324        message: Any,
325    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
326        self.do_get_fallback_inner(message)
327            .await
328            .map_err(anyhow_to_status)
329    }
330
331    async fn do_put_prepared_statement_update(
332        &self,
333        _handle: CommandPreparedStatementUpdate,
334        _request: Request<PeekableFlightDataStream>,
335    ) -> Result<i64, Status> {
336        info!("do_put_prepared_statement_update");
337        // statements like "CREATE TABLE.." or "SET datafusion.nnn.." call this function
338        // and we are required to return some row count here
339        Ok(-1)
340    }
341
342    #[fastrace::trace]
343    async fn do_action_fallback(
344        &self,
345        request: Request<Action>,
346    ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
347        let action = LiquidCacheActions::from(request.into_inner());
348        self.do_action_inner(action).await.map_err(anyhow_to_status)
349    }
350
351    async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
352}
353
354#[cfg(test)]
355mod server_actions_tests {
356    use super::*;
357    use liquid_cache_common::rpc::PrefetchFromObjectStoreRequest;
358    use liquid_cache_storage::ByteCache;
359    use std::collections::HashMap;
360    use tokio::fs::File;
361    use tokio::io::AsyncWriteExt;
362
363    async fn create_test_file(file_path: &str, size_mb: usize) -> anyhow::Result<()> {
364        let mut file = File::create(file_path).await?;
365        let size_bytes = size_mb * 1024 * 1024;
366
367        let chunk_size = 64 * 1024;
368        let chunk_data = vec![0u8; chunk_size];
369        let mut written = 0;
370
371        while written < size_bytes {
372            let remaining = size_bytes - written;
373            let write_size = std::cmp::min(chunk_size, remaining);
374            file.write_all(&chunk_data[..write_size]).await?;
375            written += write_size;
376        }
377
378        file.flush().await?;
379        Ok(())
380    }
381
382    #[tokio::test]
383    async fn test_prefetch_from_object_store() {
384        let service = LiquidCacheService::default();
385
386        // Create temporary test file
387        let temp_dir = tempfile::tempdir().unwrap();
388        let file_path = temp_dir.path().join("test_prefetch_data.bin");
389        let file_path_str = file_path.to_string_lossy().to_string();
390
391        // Generate 16MB test file
392        create_test_file(&file_path_str, 16).await.unwrap();
393
394        // Test with local file system
395        let url = Url::parse("file:///").unwrap();
396
397        let request = PrefetchFromObjectStoreRequest {
398            url: url.to_string(),
399            store_options: HashMap::new(),
400            location: file_path_str.clone(),
401            range_start: None,
402            range_end: None,
403        };
404
405        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
406        let result = service.do_action_inner(action).await.unwrap();
407
408        let mut stream = result.into_inner();
409        let result = stream.try_next().await.unwrap().unwrap();
410        assert!(result.body.is_empty());
411
412        // Cleanup is handled by tempdir drop
413    }
414
415    #[tokio::test]
416    async fn test_prefetch_from_object_store_with_range() {
417        let service = LiquidCacheService::default();
418
419        // Create temporary test file
420        let temp_dir = tempfile::tempdir().unwrap();
421        let file_path = temp_dir.path().join("test_prefetch_data_range.bin");
422        let file_path_str = file_path.to_string_lossy().to_string();
423
424        // Generate 16MB test file
425        create_test_file(&file_path_str, 16).await.unwrap();
426
427        // Test with local file system
428        let url = Url::parse("file:///").unwrap();
429
430        // range from 1mb to 10mb
431        let range_start = 1024 * 1024;
432        let range_end = 10 * 1024 * 1024;
433
434        let request = PrefetchFromObjectStoreRequest {
435            url: url.to_string(),
436            store_options: HashMap::new(),
437            location: file_path_str.clone(),
438            range_start: Some(range_start),
439            range_end: Some(range_end),
440        };
441
442        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
443        let result = service.do_action_inner(action).await.unwrap();
444
445        let mut stream = result.into_inner();
446        let result = stream.try_next().await.unwrap().unwrap();
447        assert!(result.body.is_empty());
448
449        // Cleanup is handled by tempdir drop
450    }
451
452    #[tokio::test]
453    async fn test_prefetch_invalid_object_store() {
454        let service = LiquidCacheService::default();
455
456        let request = PrefetchFromObjectStoreRequest {
457            url: "invalid://url".to_string(),
458            store_options: HashMap::new(),
459            location: "test.parquet".to_string(),
460            range_start: Some(0),
461            range_end: Some(1024),
462        };
463
464        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
465        let result = service.do_action_inner(action).await;
466        assert!(result.is_err());
467    }
468
469    #[tokio::test]
470    async fn test_prefetch_invalid_location() {
471        let service = LiquidCacheService::default();
472
473        let url = Url::parse("file:///").unwrap();
474        let request = PrefetchFromObjectStoreRequest {
475            url: url.to_string(),
476            store_options: HashMap::new(),
477            location: "non_existent_file.parquet".to_string(),
478            range_start: Some(0),
479            range_end: Some(1024),
480        };
481
482        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
483        let result = service.do_action_inner(action).await;
484        assert!(result.is_err());
485    }
486
487    #[tokio::test]
488    async fn test_prefetch_with_mock_store_metrics() {
489        use datafusion::execution::object_store::ObjectStoreUrl;
490        use liquid_cache_common::mock_store::MockStore;
491        use liquid_cache_common::utils::sanitize_object_store_url_for_dirname;
492
493        const BLOCK_SIZE: u64 = 1024 * 1024 * 4;
494
495        let service = LiquidCacheService::default();
496
497        let url = Url::parse("s3://mock").unwrap();
498
499        let inner = Arc::new(MockStore::new_with_files(1, (BLOCK_SIZE * 3) as usize));
500
501        let cache_dir = service
502            .get_parquet_cache_dir()
503            .join(sanitize_object_store_url_for_dirname(&url));
504        let local_cache = ByteCache::new(inner.clone(), cache_dir);
505
506        let object_store_url = ObjectStoreUrl::parse(url.as_str()).unwrap();
507        service
508            .inner()
509            .get_ctx()
510            .runtime_env()
511            .register_object_store(object_store_url.as_ref(), Arc::new(local_cache));
512
513        let start = BLOCK_SIZE / 2;
514        let end = BLOCK_SIZE + start;
515
516        let request = PrefetchFromObjectStoreRequest {
517            url: url.to_string(),
518            store_options: HashMap::new(),
519            location: "0.parquet".to_string(),
520            range_start: Some(start),
521            range_end: Some(end),
522        };
523
524        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
525        let result = service.do_action_inner(action).await.unwrap();
526        let mut stream = result.into_inner();
527        let _ = stream.try_next().await.unwrap().unwrap();
528
529        let path = Path::from("0.parquet");
530        let ranges = inner.get_access_ranges(&path).unwrap();
531        assert_eq!(ranges.len(), 2);
532        assert_eq!(ranges[0], 0..BLOCK_SIZE);
533        assert_eq!(ranges[1], BLOCK_SIZE..BLOCK_SIZE * 2);
534
535        let request2 = PrefetchFromObjectStoreRequest {
536            url: url.to_string(),
537            store_options: HashMap::new(),
538            location: "0.parquet".to_string(),
539            range_start: Some(start + 1024),
540            range_end: Some(end - 1024),
541        };
542
543        let action = LiquidCacheActions::PrefetchFromObjectStore(request2);
544        let result = service.do_action_inner(action).await.unwrap();
545        let mut stream = result.into_inner();
546        let _ = stream.try_next().await.unwrap().unwrap();
547
548        let ranges_after = inner.get_access_ranges(&path).unwrap();
549        assert_eq!(ranges_after.len(), 2);
550    }
551}