liquid_cache_server/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![warn(missing_docs)]
19#![doc = include_str!("../README.md")]
20
21use arrow::ipc::writer::IpcWriteOptions;
22use arrow_flight::{
23    Action, HandshakeRequest, HandshakeResponse, Ticket,
24    encode::{DictionaryHandling, FlightDataEncoderBuilder},
25    flight_service_server::FlightService,
26    sql::{
27        Any, CommandPreparedStatementUpdate, SqlInfo,
28        server::{FlightSqlService, PeekableFlightDataStream},
29    },
30};
31use datafusion::{
32    error::DataFusionError,
33    execution::{SessionStateBuilder, object_store::ObjectStoreUrl},
34    prelude::{SessionConfig, SessionContext},
35};
36use datafusion_proto::bytes::physical_plan_from_bytes;
37use fastrace::prelude::SpanContext;
38use futures::{Stream, TryStreamExt};
39use liquid_cache_common::{
40    CacheMode,
41    rpc::{FetchResults, LiquidCacheActions},
42};
43use liquid_cache_parquet::cache::LiquidCacheRef;
44use log::info;
45use prost::bytes::Bytes;
46use service::LiquidCacheServiceInner;
47use std::pin::Pin;
48use std::{path::PathBuf, sync::Arc};
49use tonic::{Request, Response, Status, Streaming};
50use url::Url;
51use uuid::Uuid;
52mod service;
53mod utils;
54use utils::FinalStream;
55mod admin_server;
56mod errors;
57pub use admin_server::{models::*, run_admin_server};
58pub use errors::{
59    LiquidCacheErrorExt, LiquidCacheResult, anyhow_to_status, df_error_to_status_with_trace,
60};
61pub use liquid_cache_common as common;
62pub use liquid_cache_storage as storage;
63use liquid_cache_storage::policies::{CachePolicy, LruPolicy};
64use object_store::path::Path;
65use object_store::{GetOptions, GetRange};
66
67#[cfg(test)]
68mod tests;
69
70/// The LiquidCache server.
71///
72/// # Example
73///
74/// ```rust
75/// use arrow_flight::flight_service_server::FlightServiceServer;
76/// use datafusion::prelude::SessionContext;
77/// use liquid_cache_server::LiquidCacheService;
78/// use tonic::transport::Server;
79/// use liquid_cache_server::storage::policies::LruPolicy;
80/// let liquid_cache = LiquidCacheService::new(SessionContext::new(), None, None, Default::default(), Box::new(LruPolicy::new())).unwrap();
81/// let flight = FlightServiceServer::new(liquid_cache);
82/// Server::builder()
83///     .add_service(flight)
84///     .serve("0.0.0.0:15214".parse().unwrap());
85/// ```
86pub struct LiquidCacheService {
87    inner: LiquidCacheServiceInner,
88}
89
90impl Default for LiquidCacheService {
91    fn default() -> Self {
92        Self::try_new().unwrap()
93    }
94}
95
96impl LiquidCacheService {
97    /// Create a new [LiquidCacheService] with a default [SessionContext]
98    /// With no disk cache and unbounded memory usage.
99    pub fn try_new() -> anyhow::Result<Self> {
100        let ctx = Self::context()?;
101        Self::new(
102            ctx,
103            None,
104            None,
105            CacheMode::LiquidEagerTranscode,
106            Box::new(LruPolicy::new()),
107        )
108    }
109
110    /// Create a new [LiquidCacheService] with a custom [SessionContext]
111    ///
112    /// # Arguments
113    ///
114    /// * `ctx` - The [SessionContext] to use
115    /// * `max_cache_bytes` - The maximum number of bytes to cache in memory
116    /// * `disk_cache_dir` - The directory to store the disk cache
117    /// * `cache_mode` - The [CacheMode] to use
118    pub fn new(
119        ctx: SessionContext,
120        max_cache_bytes: Option<usize>,
121        disk_cache_dir: Option<PathBuf>,
122        cache_mode: CacheMode,
123        cache_policy: Box<dyn CachePolicy>,
124    ) -> anyhow::Result<Self> {
125        let disk_cache_dir = match disk_cache_dir {
126            Some(dir) => dir,
127            None => {
128                let dir = tempfile::tempdir()?.keep();
129                info!("Using temporary directory for disk cache: {dir:?}");
130                dir
131            }
132        };
133        Ok(Self {
134            inner: LiquidCacheServiceInner::new(
135                Arc::new(ctx),
136                max_cache_bytes,
137                disk_cache_dir,
138                cache_mode,
139                cache_policy,
140            ),
141        })
142    }
143
144    /// Get a reference to the cache
145    pub fn cache(&self) -> &Option<LiquidCacheRef> {
146        self.inner.cache()
147    }
148
149    /// Create a new [SessionContext] with good defaults
150    /// This is the recommended way to create a [SessionContext] for LiquidCache
151    pub fn context() -> Result<SessionContext, DataFusionError> {
152        let mut session_config = SessionConfig::from_env()?;
153        let options_mut = session_config.options_mut();
154        options_mut.execution.parquet.pushdown_filters = true;
155        options_mut.execution.batch_size = 8192 * 2;
156
157        {
158            // view types cause excessive memory usage because they are not gced.
159            // For Arrow memory mode, we need to read as UTF-8
160            // For Liquid cache, we have our own way of handling string columns
161            options_mut.execution.parquet.schema_force_view_types = false;
162        }
163
164        let object_store_url = ObjectStoreUrl::parse("file://")?;
165        let object_store = object_store::local::LocalFileSystem::new();
166
167        let state = SessionStateBuilder::new()
168            .with_config(session_config)
169            .with_default_features()
170            .with_object_store(object_store_url.as_ref(), Arc::new(object_store))
171            .build();
172
173        let ctx = SessionContext::new_with_state(state);
174        Ok(ctx)
175    }
176
177    /// Get the parquet cache directory
178    pub fn get_parquet_cache_dir(&self) -> &PathBuf {
179        self.inner.get_parquet_cache_dir()
180    }
181
182    pub(crate) fn inner(&self) -> &LiquidCacheServiceInner {
183        &self.inner
184    }
185
186    async fn do_get_fallback_inner(
187        &self,
188        message: Any,
189    ) -> anyhow::Result<Response<<Self as FlightService>::DoGetStream>> {
190        if !message.is::<FetchResults>() {
191            return Err(anyhow::anyhow!(
192                "do_get: The defined request is invalid: {}",
193                message.type_url
194            ));
195        }
196
197        let fetch_results: FetchResults = message
198            .unpack()?
199            .ok_or_else(|| anyhow::anyhow!("Expected FetchResults but got None!"))?;
200
201        let span_context = SpanContext::decode_w3c_traceparent(&fetch_results.traceparent)
202            .unwrap_or_else(SpanContext::random);
203        let span = fastrace::Span::root("poll_stream", span_context);
204
205        let handle = Uuid::from_bytes_ref(fetch_results.handle.as_ref().try_into()?);
206        let partition = fetch_results.partition as usize;
207        let stream = self.inner.execute_plan(handle, partition).await?;
208        let stream = FinalStream::new(stream, self.inner.batch_size(), span).map_err(|e| {
209            let status = anyhow_to_status(anyhow::Error::from(e).context("Error executing plan"));
210            arrow_flight::error::FlightError::Tonic(Box::new(status))
211        });
212
213        let ipc_options = IpcWriteOptions::default();
214        let stream = FlightDataEncoderBuilder::new()
215            .with_options(ipc_options)
216            .with_dictionary_handling(DictionaryHandling::Resend)
217            .build(stream)
218            .map_err(Status::from);
219
220        Ok(Response::new(Box::pin(stream)))
221    }
222
223    async fn do_action_inner(
224        &self,
225        action: LiquidCacheActions,
226    ) -> anyhow::Result<Response<<Self as FlightService>::DoActionStream>> {
227        match action {
228            LiquidCacheActions::RegisterObjectStore(cmd) => {
229                let url = Url::parse(&cmd.url)?;
230                self.inner.register_object_store(&url, cmd.options).await?;
231
232                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
233                    body: Bytes::default(),
234                })]);
235                Ok(Response::new(Box::pin(output)))
236            }
237            LiquidCacheActions::RegisterPlan(cmd) => {
238                let plan = cmd.plan;
239                let plan = physical_plan_from_bytes(&plan, self.inner.get_ctx())?;
240                let handle = Uuid::from_bytes_ref(cmd.handle.as_ref().try_into()?);
241                self.inner.register_plan(*handle, plan);
242                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
243                    body: Bytes::default(),
244                })]);
245                Ok(Response::new(Box::pin(output)))
246            }
247            LiquidCacheActions::PrefetchFromObjectStore(cmd) => {
248                // Parse the object store URL (e.g., s3://bucket)
249                let url = Url::parse(&cmd.url)?;
250
251                // Register the object store if not already registered
252                self.inner
253                    .register_object_store(&url, cmd.store_options)
254                    .await?;
255
256                // Get the local cache wrapper for this object store
257                let local_cache = self.inner.get_object_store(&url)?;
258
259                // Parse the path to the object within the store (e.g., path/to/file.parquet)
260                let path = Path::from(cmd.location);
261
262                // Create a range for the specific chunk we want to prefetch, if specified
263                let get_options = if cmd.range_start.is_some() && cmd.range_end.is_some() {
264                    let chunk_range =
265                        GetRange::Bounded(cmd.range_start.unwrap()..cmd.range_end.unwrap());
266                    GetOptions {
267                        range: Some(chunk_range),
268                        ..GetOptions::default()
269                    }
270                } else {
271                    GetOptions::default()
272                };
273
274                // Call the underlying object store to get the data and cache it
275                // The LocalCache wrapper will handle caching the fetched data
276                local_cache.get_opts(&path, get_options).await?;
277
278                // Return empty response to indicate successful prefetch
279                let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
280                    body: Bytes::default(),
281                })]);
282                Ok(Response::new(Box::pin(output)))
283            }
284        }
285    }
286}
287
288#[tonic::async_trait]
289impl FlightSqlService for LiquidCacheService {
290    type FlightService = LiquidCacheService;
291
292    async fn do_handshake(
293        &self,
294        _request: Request<Streaming<HandshakeRequest>>,
295    ) -> Result<
296        Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
297        Status,
298    > {
299        unimplemented!("We don't do handshake")
300    }
301
302    #[fastrace::trace]
303    async fn do_get_fallback(
304        &self,
305        _request: Request<Ticket>,
306        message: Any,
307    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
308        self.do_get_fallback_inner(message)
309            .await
310            .map_err(anyhow_to_status)
311    }
312
313    async fn do_put_prepared_statement_update(
314        &self,
315        _handle: CommandPreparedStatementUpdate,
316        _request: Request<PeekableFlightDataStream>,
317    ) -> Result<i64, Status> {
318        info!("do_put_prepared_statement_update");
319        // statements like "CREATE TABLE.." or "SET datafusion.nnn.." call this function
320        // and we are required to return some row count here
321        Ok(-1)
322    }
323
324    #[fastrace::trace]
325    async fn do_action_fallback(
326        &self,
327        request: Request<Action>,
328    ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
329        let action = LiquidCacheActions::from(request.into_inner());
330        self.do_action_inner(action).await.map_err(anyhow_to_status)
331    }
332
333    async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
334}
335
336#[cfg(test)]
337mod server_actions_tests {
338    use super::*;
339    use liquid_cache_common::rpc::PrefetchFromObjectStoreRequest;
340    use liquid_cache_storage::ByteCache;
341    use std::collections::HashMap;
342    use tokio::fs::File;
343    use tokio::io::AsyncWriteExt;
344
345    async fn create_test_file(file_path: &str, size_mb: usize) -> anyhow::Result<()> {
346        let mut file = File::create(file_path).await?;
347        let size_bytes = size_mb * 1024 * 1024;
348
349        let chunk_size = 64 * 1024;
350        let chunk_data = vec![0u8; chunk_size];
351        let mut written = 0;
352
353        while written < size_bytes {
354            let remaining = size_bytes - written;
355            let write_size = std::cmp::min(chunk_size, remaining);
356            file.write_all(&chunk_data[..write_size]).await?;
357            written += write_size;
358        }
359
360        file.flush().await?;
361        Ok(())
362    }
363
364    #[tokio::test]
365    async fn test_prefetch_from_object_store() {
366        let service = LiquidCacheService::default();
367
368        // Create temporary test file
369        let temp_dir = tempfile::tempdir().unwrap();
370        let file_path = temp_dir.path().join("test_prefetch_data.bin");
371        let file_path_str = file_path.to_string_lossy().to_string();
372
373        // Generate 16MB test file
374        create_test_file(&file_path_str, 16).await.unwrap();
375
376        // Test with local file system
377        let url = Url::parse("file:///").unwrap();
378
379        let request = PrefetchFromObjectStoreRequest {
380            url: url.to_string(),
381            store_options: HashMap::new(),
382            location: file_path_str.clone(),
383            range_start: None,
384            range_end: None,
385        };
386
387        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
388        let result = service.do_action_inner(action).await.unwrap();
389
390        let mut stream = result.into_inner();
391        let result = stream.try_next().await.unwrap().unwrap();
392        assert!(result.body.is_empty());
393
394        // Cleanup is handled by tempdir drop
395    }
396
397    #[tokio::test]
398    async fn test_prefetch_from_object_store_with_range() {
399        let service = LiquidCacheService::default();
400
401        // Create temporary test file
402        let temp_dir = tempfile::tempdir().unwrap();
403        let file_path = temp_dir.path().join("test_prefetch_data_range.bin");
404        let file_path_str = file_path.to_string_lossy().to_string();
405
406        // Generate 16MB test file
407        create_test_file(&file_path_str, 16).await.unwrap();
408
409        // Test with local file system
410        let url = Url::parse("file:///").unwrap();
411
412        // range from 1mb to 10mb
413        let range_start = 1024 * 1024;
414        let range_end = 10 * 1024 * 1024;
415
416        let request = PrefetchFromObjectStoreRequest {
417            url: url.to_string(),
418            store_options: HashMap::new(),
419            location: file_path_str.clone(),
420            range_start: Some(range_start),
421            range_end: Some(range_end),
422        };
423
424        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
425        let result = service.do_action_inner(action).await.unwrap();
426
427        let mut stream = result.into_inner();
428        let result = stream.try_next().await.unwrap().unwrap();
429        assert!(result.body.is_empty());
430
431        // Cleanup is handled by tempdir drop
432    }
433
434    #[tokio::test]
435    async fn test_prefetch_invalid_object_store() {
436        let service = LiquidCacheService::default();
437
438        let request = PrefetchFromObjectStoreRequest {
439            url: "invalid://url".to_string(),
440            store_options: HashMap::new(),
441            location: "test.parquet".to_string(),
442            range_start: Some(0),
443            range_end: Some(1024),
444        };
445
446        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
447        let result = service.do_action_inner(action).await;
448        assert!(result.is_err());
449    }
450
451    #[tokio::test]
452    async fn test_prefetch_invalid_location() {
453        let service = LiquidCacheService::default();
454
455        let url = Url::parse("file:///").unwrap();
456        let request = PrefetchFromObjectStoreRequest {
457            url: url.to_string(),
458            store_options: HashMap::new(),
459            location: "non_existent_file.parquet".to_string(),
460            range_start: Some(0),
461            range_end: Some(1024),
462        };
463
464        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
465        let result = service.do_action_inner(action).await;
466        assert!(result.is_err());
467    }
468
469    #[tokio::test]
470    async fn test_prefetch_with_mock_store_metrics() {
471        use datafusion::execution::object_store::ObjectStoreUrl;
472        use liquid_cache_common::mock_store::MockStore;
473        use liquid_cache_common::utils::sanitize_object_store_url_for_dirname;
474
475        const BLOCK_SIZE: u64 = 1024 * 1024 * 4;
476
477        let service = LiquidCacheService::default();
478
479        let url = Url::parse("s3://mock").unwrap();
480
481        let inner = Arc::new(MockStore::new_with_files(1, (BLOCK_SIZE * 3) as usize));
482
483        let cache_dir = service
484            .get_parquet_cache_dir()
485            .join(sanitize_object_store_url_for_dirname(&url));
486        let local_cache = ByteCache::new(inner.clone(), cache_dir);
487
488        let object_store_url = ObjectStoreUrl::parse(url.as_str()).unwrap();
489        service
490            .inner()
491            .get_ctx()
492            .runtime_env()
493            .register_object_store(object_store_url.as_ref(), Arc::new(local_cache));
494
495        let start = BLOCK_SIZE / 2;
496        let end = BLOCK_SIZE + start;
497
498        let request = PrefetchFromObjectStoreRequest {
499            url: url.to_string(),
500            store_options: HashMap::new(),
501            location: "0.parquet".to_string(),
502            range_start: Some(start),
503            range_end: Some(end),
504        };
505
506        let action = LiquidCacheActions::PrefetchFromObjectStore(request);
507        let result = service.do_action_inner(action).await.unwrap();
508        let mut stream = result.into_inner();
509        let _ = stream.try_next().await.unwrap().unwrap();
510
511        let path = Path::from("0.parquet");
512        let ranges = inner.get_access_ranges(&path).unwrap();
513        assert_eq!(ranges.len(), 2);
514        assert_eq!(ranges[0], 0..BLOCK_SIZE);
515        assert_eq!(ranges[1], BLOCK_SIZE..BLOCK_SIZE * 2);
516
517        let request2 = PrefetchFromObjectStoreRequest {
518            url: url.to_string(),
519            store_options: HashMap::new(),
520            location: "0.parquet".to_string(),
521            range_start: Some(start + 1024),
522            range_end: Some(end - 1024),
523        };
524
525        let action = LiquidCacheActions::PrefetchFromObjectStore(request2);
526        let result = service.do_action_inner(action).await.unwrap();
527        let mut stream = result.into_inner();
528        let _ = stream.try_next().await.unwrap().unwrap();
529
530        let ranges_after = inner.get_access_ranges(&path).unwrap();
531        assert_eq!(ranges_after.len(), 2);
532    }
533}