1#![doc = include_str!("../README.md")]
19
20use arrow::ipc::writer::IpcWriteOptions;
21use arrow_flight::{
22 Action, HandshakeRequest, HandshakeResponse, Ticket,
23 encode::{DictionaryHandling, FlightDataEncoderBuilder},
24 flight_service_server::FlightService,
25 sql::{
26 Any, CommandPreparedStatementUpdate, SqlInfo,
27 server::{FlightSqlService, PeekableFlightDataStream},
28 },
29};
30use datafusion::{
31 error::DataFusionError,
32 execution::{SessionStateBuilder, object_store::ObjectStoreUrl},
33 prelude::{SessionConfig, SessionContext},
34};
35use datafusion_proto::bytes::physical_plan_from_bytes;
36use fastrace::prelude::SpanContext;
37use futures::{Stream, TryStreamExt};
38use liquid_cache_common::{
39 IoMode,
40 rpc::{FetchResults, LiquidCacheActions},
41};
42use liquid_cache_parquet::cache::LiquidCacheRef;
43use log::info;
44use prost::bytes::Bytes;
45use service::LiquidCacheServiceInner;
46use std::pin::Pin;
47use std::{path::PathBuf, sync::Arc};
48use tonic::{Request, Response, Status, Streaming};
49use url::Url;
50use uuid::Uuid;
51mod service;
52mod utils;
53use utils::FinalStream;
54mod admin_server;
55mod errors;
56pub use admin_server::{models::*, run_admin_server};
57pub use errors::{
58 LiquidCacheErrorExt, LiquidCacheResult, anyhow_to_status, df_error_to_status_with_trace,
59};
60pub use liquid_cache_common as common;
61pub use liquid_cache_storage as storage;
62use liquid_cache_storage::{
63 cache::squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict},
64 cache_policies::{CachePolicy, LiquidPolicy},
65};
66use object_store::path::Path;
67use object_store::{GetOptions, GetRange};
68
69#[cfg(test)]
70mod tests;
71
72pub struct LiquidCacheService {
98 inner: LiquidCacheServiceInner,
99}
100
101impl Default for LiquidCacheService {
102 fn default() -> Self {
103 Self::try_new().unwrap()
104 }
105}
106
107impl LiquidCacheService {
108 pub fn try_new() -> anyhow::Result<Self> {
111 let ctx = Self::context()?;
112 Self::new(
113 ctx,
114 None,
115 None,
116 Box::new(LiquidPolicy::new()),
117 Box::new(TranscodeSqueezeEvict),
118 None,
119 )
120 }
121
122 pub fn new(
131 ctx: SessionContext,
132 max_cache_bytes: Option<usize>,
133 disk_cache_dir: Option<PathBuf>,
134 cache_policy: Box<dyn CachePolicy>,
135 squeeze_policy: Box<dyn SqueezePolicy>,
136 io_mode: Option<IoMode>,
137 ) -> anyhow::Result<Self> {
138 let disk_cache_dir = match disk_cache_dir {
139 Some(dir) => dir,
140 None => {
141 let dir = tempfile::tempdir()?.keep();
142 info!("Using temporary directory for disk cache: {dir:?}");
143 dir
144 }
145 };
146 let io_mode = match io_mode {
147 Some(io) => io,
148 None => IoMode::Uring,
149 };
150 Ok(Self {
151 inner: LiquidCacheServiceInner::new(
152 Arc::new(ctx),
153 max_cache_bytes,
154 disk_cache_dir,
155 cache_policy,
156 squeeze_policy,
157 io_mode,
158 ),
159 })
160 }
161
162 pub fn cache(&self) -> &LiquidCacheRef {
164 self.inner.cache()
165 }
166
167 pub fn context() -> Result<SessionContext, DataFusionError> {
170 let mut session_config = SessionConfig::from_env()?;
171 let options_mut = session_config.options_mut();
172 options_mut.execution.parquet.pushdown_filters = true;
173 options_mut.execution.batch_size = 8192 * 2;
174
175 {
176 options_mut.execution.parquet.schema_force_view_types = false;
180 }
181
182 let object_store_url = ObjectStoreUrl::parse("file://")?;
183 let object_store = object_store::local::LocalFileSystem::new();
184
185 let state = SessionStateBuilder::new()
186 .with_config(session_config)
187 .with_default_features()
188 .with_object_store(object_store_url.as_ref(), Arc::new(object_store))
189 .build();
190
191 let ctx = SessionContext::new_with_state(state);
192 Ok(ctx)
193 }
194
195 pub fn get_parquet_cache_dir(&self) -> &PathBuf {
197 self.inner.get_parquet_cache_dir()
198 }
199
200 pub(crate) fn inner(&self) -> &LiquidCacheServiceInner {
201 &self.inner
202 }
203
204 async fn do_get_fallback_inner(
205 &self,
206 message: Any,
207 ) -> anyhow::Result<Response<<Self as FlightService>::DoGetStream>> {
208 if !message.is::<FetchResults>() {
209 return Err(anyhow::anyhow!(
210 "do_get: The defined request is invalid: {}",
211 message.type_url
212 ));
213 }
214
215 let fetch_results: FetchResults = message
216 .unpack()?
217 .ok_or_else(|| anyhow::anyhow!("Expected FetchResults but got None!"))?;
218
219 let span_context = SpanContext::decode_w3c_traceparent(&fetch_results.traceparent)
220 .unwrap_or_else(SpanContext::random);
221 let span = fastrace::Span::root("poll_stream", span_context);
222
223 let handle = Uuid::from_bytes_ref(fetch_results.handle.as_ref().try_into()?);
224 let partition = fetch_results.partition as usize;
225 let stream = self.inner.execute_plan(handle, partition).await?;
226 let stream = FinalStream::new(stream, self.inner.batch_size(), span).map_err(|e| {
227 let status = anyhow_to_status(anyhow::Error::from(e).context("Error executing plan"));
228 arrow_flight::error::FlightError::Tonic(Box::new(status))
229 });
230
231 let ipc_options = IpcWriteOptions::default();
232 let stream = FlightDataEncoderBuilder::new()
233 .with_options(ipc_options)
234 .with_dictionary_handling(DictionaryHandling::Resend)
235 .build(stream)
236 .map_err(Status::from);
237
238 Ok(Response::new(Box::pin(stream)))
239 }
240
241 async fn do_action_inner(
242 &self,
243 action: LiquidCacheActions,
244 ) -> anyhow::Result<Response<<Self as FlightService>::DoActionStream>> {
245 match action {
246 LiquidCacheActions::RegisterObjectStore(cmd) => {
247 let url = Url::parse(&cmd.url)?;
248 self.inner.register_object_store(&url, cmd.options).await?;
249
250 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
251 body: Bytes::default(),
252 })]);
253 Ok(Response::new(Box::pin(output)))
254 }
255 LiquidCacheActions::RegisterPlan(cmd) => {
256 let plan = cmd.plan;
257 let plan = physical_plan_from_bytes(&plan, self.inner.get_ctx())?;
258 let handle = Uuid::from_bytes_ref(cmd.handle.as_ref().try_into()?);
259 self.inner.register_plan(*handle, plan);
260 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
261 body: Bytes::default(),
262 })]);
263 Ok(Response::new(Box::pin(output)))
264 }
265 LiquidCacheActions::PrefetchFromObjectStore(cmd) => {
266 let url = Url::parse(&cmd.url)?;
268
269 self.inner
271 .register_object_store(&url, cmd.store_options)
272 .await?;
273
274 let local_cache = self.inner.get_object_store(&url)?;
276
277 let path = Path::from(cmd.location);
279
280 let get_options = if cmd.range_start.is_some() && cmd.range_end.is_some() {
282 let chunk_range =
283 GetRange::Bounded(cmd.range_start.unwrap()..cmd.range_end.unwrap());
284 GetOptions {
285 range: Some(chunk_range),
286 ..GetOptions::default()
287 }
288 } else {
289 GetOptions::default()
290 };
291
292 local_cache.get_opts(&path, get_options).await?;
295
296 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
298 body: Bytes::default(),
299 })]);
300 Ok(Response::new(Box::pin(output)))
301 }
302 }
303 }
304}
305
306#[tonic::async_trait]
307impl FlightSqlService for LiquidCacheService {
308 type FlightService = LiquidCacheService;
309
310 async fn do_handshake(
311 &self,
312 _request: Request<Streaming<HandshakeRequest>>,
313 ) -> Result<
314 Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
315 Status,
316 > {
317 unimplemented!("We don't do handshake")
318 }
319
320 #[fastrace::trace]
321 async fn do_get_fallback(
322 &self,
323 _request: Request<Ticket>,
324 message: Any,
325 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
326 self.do_get_fallback_inner(message)
327 .await
328 .map_err(anyhow_to_status)
329 }
330
331 async fn do_put_prepared_statement_update(
332 &self,
333 _handle: CommandPreparedStatementUpdate,
334 _request: Request<PeekableFlightDataStream>,
335 ) -> Result<i64, Status> {
336 info!("do_put_prepared_statement_update");
337 Ok(-1)
340 }
341
342 #[fastrace::trace]
343 async fn do_action_fallback(
344 &self,
345 request: Request<Action>,
346 ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
347 let action = LiquidCacheActions::from(request.into_inner());
348 self.do_action_inner(action).await.map_err(anyhow_to_status)
349 }
350
351 async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
352}
353
354#[cfg(test)]
355mod server_actions_tests {
356 use super::*;
357 use liquid_cache_common::rpc::PrefetchFromObjectStoreRequest;
358 use liquid_cache_storage::ByteCache;
359 use std::collections::HashMap;
360 use tokio::fs::File;
361 use tokio::io::AsyncWriteExt;
362
363 async fn create_test_file(file_path: &str, size_mb: usize) -> anyhow::Result<()> {
364 let mut file = File::create(file_path).await?;
365 let size_bytes = size_mb * 1024 * 1024;
366
367 let chunk_size = 64 * 1024;
368 let chunk_data = vec![0u8; chunk_size];
369 let mut written = 0;
370
371 while written < size_bytes {
372 let remaining = size_bytes - written;
373 let write_size = std::cmp::min(chunk_size, remaining);
374 file.write_all(&chunk_data[..write_size]).await?;
375 written += write_size;
376 }
377
378 file.flush().await?;
379 Ok(())
380 }
381
382 #[tokio::test]
383 async fn test_prefetch_from_object_store() {
384 let service = LiquidCacheService::default();
385
386 let temp_dir = tempfile::tempdir().unwrap();
388 let file_path = temp_dir.path().join("test_prefetch_data.bin");
389 let file_path_str = file_path.to_string_lossy().to_string();
390
391 create_test_file(&file_path_str, 16).await.unwrap();
393
394 let url = Url::parse("file:///").unwrap();
396
397 let request = PrefetchFromObjectStoreRequest {
398 url: url.to_string(),
399 store_options: HashMap::new(),
400 location: file_path_str.clone(),
401 range_start: None,
402 range_end: None,
403 };
404
405 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
406 let result = service.do_action_inner(action).await.unwrap();
407
408 let mut stream = result.into_inner();
409 let result = stream.try_next().await.unwrap().unwrap();
410 assert!(result.body.is_empty());
411
412 }
414
415 #[tokio::test]
416 async fn test_prefetch_from_object_store_with_range() {
417 let service = LiquidCacheService::default();
418
419 let temp_dir = tempfile::tempdir().unwrap();
421 let file_path = temp_dir.path().join("test_prefetch_data_range.bin");
422 let file_path_str = file_path.to_string_lossy().to_string();
423
424 create_test_file(&file_path_str, 16).await.unwrap();
426
427 let url = Url::parse("file:///").unwrap();
429
430 let range_start = 1024 * 1024;
432 let range_end = 10 * 1024 * 1024;
433
434 let request = PrefetchFromObjectStoreRequest {
435 url: url.to_string(),
436 store_options: HashMap::new(),
437 location: file_path_str.clone(),
438 range_start: Some(range_start),
439 range_end: Some(range_end),
440 };
441
442 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
443 let result = service.do_action_inner(action).await.unwrap();
444
445 let mut stream = result.into_inner();
446 let result = stream.try_next().await.unwrap().unwrap();
447 assert!(result.body.is_empty());
448
449 }
451
452 #[tokio::test]
453 async fn test_prefetch_invalid_object_store() {
454 let service = LiquidCacheService::default();
455
456 let request = PrefetchFromObjectStoreRequest {
457 url: "invalid://url".to_string(),
458 store_options: HashMap::new(),
459 location: "test.parquet".to_string(),
460 range_start: Some(0),
461 range_end: Some(1024),
462 };
463
464 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
465 let result = service.do_action_inner(action).await;
466 assert!(result.is_err());
467 }
468
469 #[tokio::test]
470 async fn test_prefetch_invalid_location() {
471 let service = LiquidCacheService::default();
472
473 let url = Url::parse("file:///").unwrap();
474 let request = PrefetchFromObjectStoreRequest {
475 url: url.to_string(),
476 store_options: HashMap::new(),
477 location: "non_existent_file.parquet".to_string(),
478 range_start: Some(0),
479 range_end: Some(1024),
480 };
481
482 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
483 let result = service.do_action_inner(action).await;
484 assert!(result.is_err());
485 }
486
487 #[tokio::test]
488 async fn test_prefetch_with_mock_store_metrics() {
489 use datafusion::execution::object_store::ObjectStoreUrl;
490 use liquid_cache_common::mock_store::MockStore;
491 use liquid_cache_common::utils::sanitize_object_store_url_for_dirname;
492
493 const BLOCK_SIZE: u64 = 1024 * 1024 * 4;
494
495 let service = LiquidCacheService::default();
496
497 let url = Url::parse("s3://mock").unwrap();
498
499 let inner = Arc::new(MockStore::new_with_files(1, (BLOCK_SIZE * 3) as usize));
500
501 let cache_dir = service
502 .get_parquet_cache_dir()
503 .join(sanitize_object_store_url_for_dirname(&url));
504 let local_cache = ByteCache::new(inner.clone(), cache_dir);
505
506 let object_store_url = ObjectStoreUrl::parse(url.as_str()).unwrap();
507 service
508 .inner()
509 .get_ctx()
510 .runtime_env()
511 .register_object_store(object_store_url.as_ref(), Arc::new(local_cache));
512
513 let start = BLOCK_SIZE / 2;
514 let end = BLOCK_SIZE + start;
515
516 let request = PrefetchFromObjectStoreRequest {
517 url: url.to_string(),
518 store_options: HashMap::new(),
519 location: "0.parquet".to_string(),
520 range_start: Some(start),
521 range_end: Some(end),
522 };
523
524 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
525 let result = service.do_action_inner(action).await.unwrap();
526 let mut stream = result.into_inner();
527 let _ = stream.try_next().await.unwrap().unwrap();
528
529 let path = Path::from("0.parquet");
530 let ranges = inner.get_access_ranges(&path).unwrap();
531 assert_eq!(ranges.len(), 2);
532 assert_eq!(ranges[0], 0..BLOCK_SIZE);
533 assert_eq!(ranges[1], BLOCK_SIZE..BLOCK_SIZE * 2);
534
535 let request2 = PrefetchFromObjectStoreRequest {
536 url: url.to_string(),
537 store_options: HashMap::new(),
538 location: "0.parquet".to_string(),
539 range_start: Some(start + 1024),
540 range_end: Some(end - 1024),
541 };
542
543 let action = LiquidCacheActions::PrefetchFromObjectStore(request2);
544 let result = service.do_action_inner(action).await.unwrap();
545 let mut stream = result.into_inner();
546 let _ = stream.try_next().await.unwrap().unwrap();
547
548 let ranges_after = inner.get_access_ranges(&path).unwrap();
549 assert_eq!(ranges_after.len(), 2);
550 }
551}