liquid_cache_datafusion_server/
lib.rs1#![doc = include_str!("../README.md")]
19
20use arrow::ipc::writer::IpcWriteOptions;
21use arrow_flight::{
22 Action, HandshakeRequest, HandshakeResponse, Ticket,
23 encode::{DictionaryHandling, FlightDataEncoderBuilder},
24 flight_service_server::FlightService,
25 sql::{
26 Any, CommandPreparedStatementUpdate, SqlInfo,
27 server::{FlightSqlService, PeekableFlightDataStream},
28 },
29};
30use datafusion::{
31 error::DataFusionError,
32 execution::{SessionStateBuilder, object_store::ObjectStoreUrl},
33 prelude::{SessionConfig, SessionContext},
34};
35use datafusion_proto::bytes::physical_plan_from_bytes;
36use fastrace::prelude::SpanContext;
37use futures::{Stream, TryStreamExt};
38use liquid_cache_common::{
39 IoMode,
40 rpc::{FetchResults, LiquidCacheActions},
41};
42use liquid_cache_datafusion::cache::LiquidCacheParquetRef;
43use log::info;
44use prost::bytes::Bytes;
45use service::LiquidCacheServiceInner;
46use std::pin::Pin;
47use std::{path::PathBuf, sync::Arc};
48use tonic::{Request, Response, Status, Streaming};
49use url::Url;
50use uuid::Uuid;
51mod service;
52mod utils;
53use utils::FinalStream;
54mod admin_server;
55mod errors;
56pub use admin_server::{models::*, run_admin_server};
57pub use errors::{
58 LiquidCacheErrorExt, LiquidCacheResult, anyhow_to_status, df_error_to_status_with_trace,
59};
60pub use liquid_cache as storage;
61use liquid_cache::{
62 cache::{
63 AlwaysHydrate, HydrationPolicy,
64 squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict},
65 },
66 cache_policies::{CachePolicy, LiquidPolicy},
67};
68pub use liquid_cache_common as common;
69use object_store::path::Path;
70use object_store::{GetOptions, GetRange};
71
72#[cfg(test)]
73mod tests;
74
75pub struct LiquidCacheService {
103 inner: LiquidCacheServiceInner,
104}
105
106impl Default for LiquidCacheService {
107 fn default() -> Self {
108 Self::try_new().unwrap()
109 }
110}
111
112impl LiquidCacheService {
113 pub fn try_new() -> anyhow::Result<Self> {
116 let ctx = Self::context()?;
117 Self::new(
118 ctx,
119 None,
120 None,
121 Box::new(LiquidPolicy::new()),
122 Box::new(TranscodeSqueezeEvict),
123 Box::new(AlwaysHydrate::new()),
124 None,
125 )
126 }
127
128 pub fn new(
137 ctx: SessionContext,
138 max_cache_bytes: Option<usize>,
139 disk_cache_dir: Option<PathBuf>,
140 cache_policy: Box<dyn CachePolicy>,
141 squeeze_policy: Box<dyn SqueezePolicy>,
142 hydration_policy: Box<dyn HydrationPolicy>,
143 io_mode: Option<IoMode>,
144 ) -> anyhow::Result<Self> {
145 let disk_cache_dir = match disk_cache_dir {
146 Some(dir) => dir,
147 None => {
148 let dir = tempfile::tempdir()?.keep();
149 info!("Using temporary directory for disk cache: {dir:?}");
150 dir
151 }
152 };
153 let io_mode = match io_mode {
154 Some(io) => io,
155 None => IoMode::Uring,
156 };
157 Ok(Self {
158 inner: LiquidCacheServiceInner::new(
159 Arc::new(ctx),
160 max_cache_bytes,
161 disk_cache_dir,
162 cache_policy,
163 squeeze_policy,
164 hydration_policy,
165 io_mode,
166 ),
167 })
168 }
169
170 pub fn cache(&self) -> &LiquidCacheParquetRef {
172 self.inner.cache()
173 }
174
175 pub fn context() -> Result<SessionContext, DataFusionError> {
178 let mut session_config = SessionConfig::from_env()?;
179 let options_mut = session_config.options_mut();
180 options_mut.execution.parquet.pushdown_filters = true;
181 options_mut.execution.batch_size = 8192 * 2;
182
183 {
184 options_mut.execution.parquet.schema_force_view_types = false;
188 }
189
190 let object_store_url = ObjectStoreUrl::parse("file://")?;
191 let object_store = object_store::local::LocalFileSystem::new();
192
193 let state = SessionStateBuilder::new()
194 .with_config(session_config)
195 .with_default_features()
196 .with_object_store(object_store_url.as_ref(), Arc::new(object_store))
197 .build();
198
199 let ctx = SessionContext::new_with_state(state);
200 Ok(ctx)
201 }
202
203 pub fn get_parquet_cache_dir(&self) -> &PathBuf {
205 self.inner.get_parquet_cache_dir()
206 }
207
208 pub(crate) fn inner(&self) -> &LiquidCacheServiceInner {
209 &self.inner
210 }
211
212 async fn do_get_fallback_inner(
213 &self,
214 message: Any,
215 ) -> anyhow::Result<Response<<Self as FlightService>::DoGetStream>> {
216 if !message.is::<FetchResults>() {
217 return Err(anyhow::anyhow!(
218 "do_get: The defined request is invalid: {}",
219 message.type_url
220 ));
221 }
222
223 let fetch_results: FetchResults = message
224 .unpack()?
225 .ok_or_else(|| anyhow::anyhow!("Expected FetchResults but got None!"))?;
226
227 let span_context = SpanContext::decode_w3c_traceparent(&fetch_results.traceparent)
228 .unwrap_or_else(SpanContext::random);
229 let span = fastrace::Span::root("poll_stream", span_context);
230
231 let handle = Uuid::from_bytes_ref(fetch_results.handle.as_ref().try_into()?);
232 let partition = fetch_results.partition as usize;
233 let stream = self.inner.execute_plan(handle, partition).await?;
234 let stream = FinalStream::new(stream, self.inner.batch_size(), span).map_err(|e| {
235 let status = anyhow_to_status(anyhow::Error::from(e).context("Error executing plan"));
236 arrow_flight::error::FlightError::Tonic(Box::new(status))
237 });
238
239 let ipc_options = IpcWriteOptions::default();
240 let stream = FlightDataEncoderBuilder::new()
241 .with_options(ipc_options)
242 .with_dictionary_handling(DictionaryHandling::Resend)
243 .build(stream)
244 .map_err(Status::from);
245
246 Ok(Response::new(Box::pin(stream)))
247 }
248
249 async fn do_action_inner(
250 &self,
251 action: LiquidCacheActions,
252 ) -> anyhow::Result<Response<<Self as FlightService>::DoActionStream>> {
253 match action {
254 LiquidCacheActions::RegisterObjectStore(cmd) => {
255 let url = Url::parse(&cmd.url)?;
256 self.inner.register_object_store(&url, cmd.options).await?;
257
258 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
259 body: Bytes::default(),
260 })]);
261 Ok(Response::new(Box::pin(output)))
262 }
263 LiquidCacheActions::RegisterPlan(cmd) => {
264 let plan = cmd.plan;
265 let plan = physical_plan_from_bytes(&plan, &self.inner.get_ctx().task_ctx())?;
266 let handle = Uuid::from_bytes_ref(cmd.handle.as_ref().try_into()?);
267 self.inner.register_plan(*handle, plan);
268 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
269 body: Bytes::default(),
270 })]);
271 Ok(Response::new(Box::pin(output)))
272 }
273 LiquidCacheActions::PrefetchFromObjectStore(cmd) => {
274 let url = Url::parse(&cmd.url)?;
276
277 self.inner
279 .register_object_store(&url, cmd.store_options)
280 .await?;
281
282 let local_cache = self.inner.get_object_store(&url)?;
284
285 let path = Path::from(cmd.location);
287
288 let get_options = if let (Some(range_start), Some(range_end)) =
290 (cmd.range_start, cmd.range_end)
291 {
292 let chunk_range = GetRange::Bounded(range_start..range_end);
293 GetOptions {
294 range: Some(chunk_range),
295 ..GetOptions::default()
296 }
297 } else {
298 GetOptions::default()
299 };
300
301 local_cache.get_opts(&path, get_options).await?;
304
305 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
307 body: Bytes::default(),
308 })]);
309 Ok(Response::new(Box::pin(output)))
310 }
311 }
312 }
313}
314
315#[tonic::async_trait]
316impl FlightSqlService for LiquidCacheService {
317 type FlightService = LiquidCacheService;
318
319 async fn do_handshake(
320 &self,
321 _request: Request<Streaming<HandshakeRequest>>,
322 ) -> Result<
323 Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
324 Status,
325 > {
326 unimplemented!("We don't do handshake")
327 }
328
329 #[fastrace::trace]
330 async fn do_get_fallback(
331 &self,
332 _request: Request<Ticket>,
333 message: Any,
334 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
335 self.do_get_fallback_inner(message)
336 .await
337 .map_err(anyhow_to_status)
338 }
339
340 async fn do_put_prepared_statement_update(
341 &self,
342 _handle: CommandPreparedStatementUpdate,
343 _request: Request<PeekableFlightDataStream>,
344 ) -> Result<i64, Status> {
345 info!("do_put_prepared_statement_update");
346 Ok(-1)
349 }
350
351 #[fastrace::trace]
352 async fn do_action_fallback(
353 &self,
354 request: Request<Action>,
355 ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
356 let action = LiquidCacheActions::from(request.into_inner());
357 self.do_action_inner(action).await.map_err(anyhow_to_status)
358 }
359
360 async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
361}
362
363#[cfg(test)]
364mod server_actions_tests {
365 use super::*;
366 use liquid_cache::ByteCache;
367 use liquid_cache_common::rpc::PrefetchFromObjectStoreRequest;
368 use std::collections::HashMap;
369 use tokio::fs::File;
370 use tokio::io::AsyncWriteExt;
371
372 async fn create_test_file(file_path: &str, size_mb: usize) -> anyhow::Result<()> {
373 let mut file = File::create(file_path).await?;
374 let size_bytes = size_mb * 1024 * 1024;
375
376 let chunk_size = 64 * 1024;
377 let chunk_data = vec![0u8; chunk_size];
378 let mut written = 0;
379
380 while written < size_bytes {
381 let remaining = size_bytes - written;
382 let write_size = std::cmp::min(chunk_size, remaining);
383 file.write_all(&chunk_data[..write_size]).await?;
384 written += write_size;
385 }
386
387 file.flush().await?;
388 Ok(())
389 }
390
391 #[tokio::test]
392 async fn test_prefetch_from_object_store() {
393 let service = LiquidCacheService::default();
394
395 let temp_dir = tempfile::tempdir().unwrap();
397 let file_path = temp_dir.path().join("test_prefetch_data.bin");
398 let file_path_str = file_path.to_string_lossy().to_string();
399
400 create_test_file(&file_path_str, 16).await.unwrap();
402
403 let url = Url::parse("file:///").unwrap();
405
406 let request = PrefetchFromObjectStoreRequest {
407 url: url.to_string(),
408 store_options: HashMap::new(),
409 location: file_path_str.clone(),
410 range_start: None,
411 range_end: None,
412 };
413
414 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
415 let result = service.do_action_inner(action).await.unwrap();
416
417 let mut stream = result.into_inner();
418 let result = stream.try_next().await.unwrap().unwrap();
419 assert!(result.body.is_empty());
420
421 }
423
424 #[tokio::test]
425 async fn test_prefetch_from_object_store_with_range() {
426 let service = LiquidCacheService::default();
427
428 let temp_dir = tempfile::tempdir().unwrap();
430 let file_path = temp_dir.path().join("test_prefetch_data_range.bin");
431 let file_path_str = file_path.to_string_lossy().to_string();
432
433 create_test_file(&file_path_str, 16).await.unwrap();
435
436 let url = Url::parse("file:///").unwrap();
438
439 let range_start = 1024 * 1024;
441 let range_end = 10 * 1024 * 1024;
442
443 let request = PrefetchFromObjectStoreRequest {
444 url: url.to_string(),
445 store_options: HashMap::new(),
446 location: file_path_str.clone(),
447 range_start: Some(range_start),
448 range_end: Some(range_end),
449 };
450
451 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
452 let result = service.do_action_inner(action).await.unwrap();
453
454 let mut stream = result.into_inner();
455 let result = stream.try_next().await.unwrap().unwrap();
456 assert!(result.body.is_empty());
457
458 }
460
461 #[tokio::test]
462 async fn test_prefetch_invalid_object_store() {
463 let service = LiquidCacheService::default();
464
465 let request = PrefetchFromObjectStoreRequest {
466 url: "invalid://url".to_string(),
467 store_options: HashMap::new(),
468 location: "test.parquet".to_string(),
469 range_start: Some(0),
470 range_end: Some(1024),
471 };
472
473 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
474 let result = service.do_action_inner(action).await;
475 assert!(result.is_err());
476 }
477
478 #[tokio::test]
479 async fn test_prefetch_invalid_location() {
480 let service = LiquidCacheService::default();
481
482 let url = Url::parse("file:///").unwrap();
483 let request = PrefetchFromObjectStoreRequest {
484 url: url.to_string(),
485 store_options: HashMap::new(),
486 location: "non_existent_file.parquet".to_string(),
487 range_start: Some(0),
488 range_end: Some(1024),
489 };
490
491 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
492 let result = service.do_action_inner(action).await;
493 assert!(result.is_err());
494 }
495
496 #[tokio::test]
497 async fn test_prefetch_with_mock_store_metrics() {
498 use datafusion::execution::object_store::ObjectStoreUrl;
499 use liquid_cache_common::mock_store::MockStore;
500 use liquid_cache_common::utils::sanitize_object_store_url_for_dirname;
501
502 const BLOCK_SIZE: u64 = 1024 * 1024 * 4;
503
504 let service = LiquidCacheService::default();
505
506 let url = Url::parse("s3://mock").unwrap();
507
508 let inner = Arc::new(MockStore::new_with_files(1, (BLOCK_SIZE * 3) as usize));
509
510 let cache_dir = service
511 .get_parquet_cache_dir()
512 .join(sanitize_object_store_url_for_dirname(&url));
513 let local_cache = ByteCache::new(inner.clone(), cache_dir);
514
515 let object_store_url = ObjectStoreUrl::parse(url.as_str()).unwrap();
516 service
517 .inner()
518 .get_ctx()
519 .runtime_env()
520 .register_object_store(object_store_url.as_ref(), Arc::new(local_cache));
521
522 let start = BLOCK_SIZE / 2;
523 let end = BLOCK_SIZE + start;
524
525 let request = PrefetchFromObjectStoreRequest {
526 url: url.to_string(),
527 store_options: HashMap::new(),
528 location: "0.parquet".to_string(),
529 range_start: Some(start),
530 range_end: Some(end),
531 };
532
533 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
534 let result = service.do_action_inner(action).await.unwrap();
535 let mut stream = result.into_inner();
536 let _ = stream.try_next().await.unwrap().unwrap();
537
538 let path = Path::from("0.parquet");
539 let ranges = inner.get_access_ranges(&path).unwrap();
540 assert_eq!(ranges.len(), 2);
541 assert_eq!(ranges[0], 0..BLOCK_SIZE);
542 assert_eq!(ranges[1], BLOCK_SIZE..BLOCK_SIZE * 2);
543
544 let request2 = PrefetchFromObjectStoreRequest {
545 url: url.to_string(),
546 store_options: HashMap::new(),
547 location: "0.parquet".to_string(),
548 range_start: Some(start + 1024),
549 range_end: Some(end - 1024),
550 };
551
552 let action = LiquidCacheActions::PrefetchFromObjectStore(request2);
553 let result = service.do_action_inner(action).await.unwrap();
554 let mut stream = result.into_inner();
555 let _ = stream.try_next().await.unwrap().unwrap();
556
557 let ranges_after = inner.get_access_ranges(&path).unwrap();
558 assert_eq!(ranges_after.len(), 2);
559 }
560}