1#![warn(missing_docs)]
19#![doc = include_str!("../README.md")]
20
21use arrow::ipc::writer::IpcWriteOptions;
22use arrow_flight::{
23 Action, HandshakeRequest, HandshakeResponse, Ticket,
24 encode::{DictionaryHandling, FlightDataEncoderBuilder},
25 flight_service_server::FlightService,
26 sql::{
27 Any, CommandPreparedStatementUpdate, SqlInfo,
28 server::{FlightSqlService, PeekableFlightDataStream},
29 },
30};
31use datafusion::{
32 error::DataFusionError,
33 execution::{SessionStateBuilder, object_store::ObjectStoreUrl},
34 prelude::{SessionConfig, SessionContext},
35};
36use datafusion_proto::bytes::physical_plan_from_bytes;
37use fastrace::prelude::SpanContext;
38use futures::{Stream, TryStreamExt};
39use liquid_cache_common::{
40 CacheMode,
41 rpc::{FetchResults, LiquidCacheActions},
42};
43use liquid_cache_parquet::cache::LiquidCacheRef;
44use log::info;
45use prost::bytes::Bytes;
46use service::LiquidCacheServiceInner;
47use std::pin::Pin;
48use std::{path::PathBuf, sync::Arc};
49use tonic::{Request, Response, Status, Streaming};
50use url::Url;
51use uuid::Uuid;
52mod service;
53mod utils;
54use utils::FinalStream;
55mod admin_server;
56mod errors;
57pub use admin_server::{models::*, run_admin_server};
58pub use errors::{
59 LiquidCacheErrorExt, LiquidCacheResult, anyhow_to_status, df_error_to_status_with_trace,
60};
61pub use liquid_cache_common as common;
62pub use liquid_cache_storage as storage;
63use liquid_cache_storage::policies::{CachePolicy, LruPolicy};
64use object_store::path::Path;
65use object_store::{GetOptions, GetRange};
66
67#[cfg(test)]
68mod tests;
69
70pub struct LiquidCacheService {
87 inner: LiquidCacheServiceInner,
88}
89
90impl Default for LiquidCacheService {
91 fn default() -> Self {
92 Self::try_new().unwrap()
93 }
94}
95
96impl LiquidCacheService {
97 pub fn try_new() -> anyhow::Result<Self> {
100 let ctx = Self::context()?;
101 Self::new(
102 ctx,
103 None,
104 None,
105 CacheMode::LiquidEagerTranscode,
106 Box::new(LruPolicy::new()),
107 )
108 }
109
110 pub fn new(
119 ctx: SessionContext,
120 max_cache_bytes: Option<usize>,
121 disk_cache_dir: Option<PathBuf>,
122 cache_mode: CacheMode,
123 cache_policy: Box<dyn CachePolicy>,
124 ) -> anyhow::Result<Self> {
125 let disk_cache_dir = match disk_cache_dir {
126 Some(dir) => dir,
127 None => {
128 let dir = tempfile::tempdir()?.keep();
129 info!("Using temporary directory for disk cache: {dir:?}");
130 dir
131 }
132 };
133 Ok(Self {
134 inner: LiquidCacheServiceInner::new(
135 Arc::new(ctx),
136 max_cache_bytes,
137 disk_cache_dir,
138 cache_mode,
139 cache_policy,
140 ),
141 })
142 }
143
144 pub fn cache(&self) -> &Option<LiquidCacheRef> {
146 self.inner.cache()
147 }
148
149 pub fn context() -> Result<SessionContext, DataFusionError> {
152 let mut session_config = SessionConfig::from_env()?;
153 let options_mut = session_config.options_mut();
154 options_mut.execution.parquet.pushdown_filters = true;
155 options_mut.execution.batch_size = 8192 * 2;
156
157 {
158 options_mut.execution.parquet.schema_force_view_types = false;
162 }
163
164 let object_store_url = ObjectStoreUrl::parse("file://")?;
165 let object_store = object_store::local::LocalFileSystem::new();
166
167 let state = SessionStateBuilder::new()
168 .with_config(session_config)
169 .with_default_features()
170 .with_object_store(object_store_url.as_ref(), Arc::new(object_store))
171 .build();
172
173 let ctx = SessionContext::new_with_state(state);
174 Ok(ctx)
175 }
176
177 pub fn get_parquet_cache_dir(&self) -> &PathBuf {
179 self.inner.get_parquet_cache_dir()
180 }
181
182 pub(crate) fn inner(&self) -> &LiquidCacheServiceInner {
183 &self.inner
184 }
185
186 async fn do_get_fallback_inner(
187 &self,
188 message: Any,
189 ) -> anyhow::Result<Response<<Self as FlightService>::DoGetStream>> {
190 if !message.is::<FetchResults>() {
191 return Err(anyhow::anyhow!(
192 "do_get: The defined request is invalid: {}",
193 message.type_url
194 ));
195 }
196
197 let fetch_results: FetchResults = message
198 .unpack()?
199 .ok_or_else(|| anyhow::anyhow!("Expected FetchResults but got None!"))?;
200
201 let span_context = SpanContext::decode_w3c_traceparent(&fetch_results.traceparent)
202 .unwrap_or_else(SpanContext::random);
203 let span = fastrace::Span::root("poll_stream", span_context);
204
205 let handle = Uuid::from_bytes_ref(fetch_results.handle.as_ref().try_into()?);
206 let partition = fetch_results.partition as usize;
207 let stream = self.inner.execute_plan(handle, partition).await?;
208 let stream = FinalStream::new(stream, self.inner.batch_size(), span).map_err(|e| {
209 let status = anyhow_to_status(anyhow::Error::from(e).context("Error executing plan"));
210 arrow_flight::error::FlightError::Tonic(Box::new(status))
211 });
212
213 let ipc_options = IpcWriteOptions::default();
214 let stream = FlightDataEncoderBuilder::new()
215 .with_options(ipc_options)
216 .with_dictionary_handling(DictionaryHandling::Resend)
217 .build(stream)
218 .map_err(Status::from);
219
220 Ok(Response::new(Box::pin(stream)))
221 }
222
223 async fn do_action_inner(
224 &self,
225 action: LiquidCacheActions,
226 ) -> anyhow::Result<Response<<Self as FlightService>::DoActionStream>> {
227 match action {
228 LiquidCacheActions::RegisterObjectStore(cmd) => {
229 let url = Url::parse(&cmd.url)?;
230 self.inner.register_object_store(&url, cmd.options).await?;
231
232 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
233 body: Bytes::default(),
234 })]);
235 Ok(Response::new(Box::pin(output)))
236 }
237 LiquidCacheActions::RegisterPlan(cmd) => {
238 let plan = cmd.plan;
239 let plan = physical_plan_from_bytes(&plan, self.inner.get_ctx())?;
240 let handle = Uuid::from_bytes_ref(cmd.handle.as_ref().try_into()?);
241 self.inner.register_plan(*handle, plan);
242 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
243 body: Bytes::default(),
244 })]);
245 Ok(Response::new(Box::pin(output)))
246 }
247 LiquidCacheActions::PrefetchFromObjectStore(cmd) => {
248 let url = Url::parse(&cmd.url)?;
250
251 self.inner
253 .register_object_store(&url, cmd.store_options)
254 .await?;
255
256 let local_cache = self.inner.get_object_store(&url)?;
258
259 let path = Path::from(cmd.location);
261
262 let get_options = if cmd.range_start.is_some() && cmd.range_end.is_some() {
264 let chunk_range =
265 GetRange::Bounded(cmd.range_start.unwrap()..cmd.range_end.unwrap());
266 GetOptions {
267 range: Some(chunk_range),
268 ..GetOptions::default()
269 }
270 } else {
271 GetOptions::default()
272 };
273
274 local_cache.get_opts(&path, get_options).await?;
277
278 let output = futures::stream::iter(vec![Ok(arrow_flight::Result {
280 body: Bytes::default(),
281 })]);
282 Ok(Response::new(Box::pin(output)))
283 }
284 }
285 }
286}
287
288#[tonic::async_trait]
289impl FlightSqlService for LiquidCacheService {
290 type FlightService = LiquidCacheService;
291
292 async fn do_handshake(
293 &self,
294 _request: Request<Streaming<HandshakeRequest>>,
295 ) -> Result<
296 Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
297 Status,
298 > {
299 unimplemented!("We don't do handshake")
300 }
301
302 #[fastrace::trace]
303 async fn do_get_fallback(
304 &self,
305 _request: Request<Ticket>,
306 message: Any,
307 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
308 self.do_get_fallback_inner(message)
309 .await
310 .map_err(anyhow_to_status)
311 }
312
313 async fn do_put_prepared_statement_update(
314 &self,
315 _handle: CommandPreparedStatementUpdate,
316 _request: Request<PeekableFlightDataStream>,
317 ) -> Result<i64, Status> {
318 info!("do_put_prepared_statement_update");
319 Ok(-1)
322 }
323
324 #[fastrace::trace]
325 async fn do_action_fallback(
326 &self,
327 request: Request<Action>,
328 ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
329 let action = LiquidCacheActions::from(request.into_inner());
330 self.do_action_inner(action).await.map_err(anyhow_to_status)
331 }
332
333 async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
334}
335
336#[cfg(test)]
337mod server_actions_tests {
338 use super::*;
339 use liquid_cache_common::rpc::PrefetchFromObjectStoreRequest;
340 use liquid_cache_storage::ByteCache;
341 use std::collections::HashMap;
342 use tokio::fs::File;
343 use tokio::io::AsyncWriteExt;
344
345 async fn create_test_file(file_path: &str, size_mb: usize) -> anyhow::Result<()> {
346 let mut file = File::create(file_path).await?;
347 let size_bytes = size_mb * 1024 * 1024;
348
349 let chunk_size = 64 * 1024;
350 let chunk_data = vec![0u8; chunk_size];
351 let mut written = 0;
352
353 while written < size_bytes {
354 let remaining = size_bytes - written;
355 let write_size = std::cmp::min(chunk_size, remaining);
356 file.write_all(&chunk_data[..write_size]).await?;
357 written += write_size;
358 }
359
360 file.flush().await?;
361 Ok(())
362 }
363
364 #[tokio::test]
365 async fn test_prefetch_from_object_store() {
366 let service = LiquidCacheService::default();
367
368 let temp_dir = tempfile::tempdir().unwrap();
370 let file_path = temp_dir.path().join("test_prefetch_data.bin");
371 let file_path_str = file_path.to_string_lossy().to_string();
372
373 create_test_file(&file_path_str, 16).await.unwrap();
375
376 let url = Url::parse("file:///").unwrap();
378
379 let request = PrefetchFromObjectStoreRequest {
380 url: url.to_string(),
381 store_options: HashMap::new(),
382 location: file_path_str.clone(),
383 range_start: None,
384 range_end: None,
385 };
386
387 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
388 let result = service.do_action_inner(action).await.unwrap();
389
390 let mut stream = result.into_inner();
391 let result = stream.try_next().await.unwrap().unwrap();
392 assert!(result.body.is_empty());
393
394 }
396
397 #[tokio::test]
398 async fn test_prefetch_from_object_store_with_range() {
399 let service = LiquidCacheService::default();
400
401 let temp_dir = tempfile::tempdir().unwrap();
403 let file_path = temp_dir.path().join("test_prefetch_data_range.bin");
404 let file_path_str = file_path.to_string_lossy().to_string();
405
406 create_test_file(&file_path_str, 16).await.unwrap();
408
409 let url = Url::parse("file:///").unwrap();
411
412 let range_start = 1024 * 1024;
414 let range_end = 10 * 1024 * 1024;
415
416 let request = PrefetchFromObjectStoreRequest {
417 url: url.to_string(),
418 store_options: HashMap::new(),
419 location: file_path_str.clone(),
420 range_start: Some(range_start),
421 range_end: Some(range_end),
422 };
423
424 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
425 let result = service.do_action_inner(action).await.unwrap();
426
427 let mut stream = result.into_inner();
428 let result = stream.try_next().await.unwrap().unwrap();
429 assert!(result.body.is_empty());
430
431 }
433
434 #[tokio::test]
435 async fn test_prefetch_invalid_object_store() {
436 let service = LiquidCacheService::default();
437
438 let request = PrefetchFromObjectStoreRequest {
439 url: "invalid://url".to_string(),
440 store_options: HashMap::new(),
441 location: "test.parquet".to_string(),
442 range_start: Some(0),
443 range_end: Some(1024),
444 };
445
446 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
447 let result = service.do_action_inner(action).await;
448 assert!(result.is_err());
449 }
450
451 #[tokio::test]
452 async fn test_prefetch_invalid_location() {
453 let service = LiquidCacheService::default();
454
455 let url = Url::parse("file:///").unwrap();
456 let request = PrefetchFromObjectStoreRequest {
457 url: url.to_string(),
458 store_options: HashMap::new(),
459 location: "non_existent_file.parquet".to_string(),
460 range_start: Some(0),
461 range_end: Some(1024),
462 };
463
464 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
465 let result = service.do_action_inner(action).await;
466 assert!(result.is_err());
467 }
468
469 #[tokio::test]
470 async fn test_prefetch_with_mock_store_metrics() {
471 use datafusion::execution::object_store::ObjectStoreUrl;
472 use liquid_cache_common::mock_store::MockStore;
473 use liquid_cache_common::utils::sanitize_object_store_url_for_dirname;
474
475 const BLOCK_SIZE: u64 = 1024 * 1024 * 4;
476
477 let service = LiquidCacheService::default();
478
479 let url = Url::parse("s3://mock").unwrap();
480
481 let inner = Arc::new(MockStore::new_with_files(1, (BLOCK_SIZE * 3) as usize));
482
483 let cache_dir = service
484 .get_parquet_cache_dir()
485 .join(sanitize_object_store_url_for_dirname(&url));
486 let local_cache = ByteCache::new(inner.clone(), cache_dir);
487
488 let object_store_url = ObjectStoreUrl::parse(url.as_str()).unwrap();
489 service
490 .inner()
491 .get_ctx()
492 .runtime_env()
493 .register_object_store(object_store_url.as_ref(), Arc::new(local_cache));
494
495 let start = BLOCK_SIZE / 2;
496 let end = BLOCK_SIZE + start;
497
498 let request = PrefetchFromObjectStoreRequest {
499 url: url.to_string(),
500 store_options: HashMap::new(),
501 location: "0.parquet".to_string(),
502 range_start: Some(start),
503 range_end: Some(end),
504 };
505
506 let action = LiquidCacheActions::PrefetchFromObjectStore(request);
507 let result = service.do_action_inner(action).await.unwrap();
508 let mut stream = result.into_inner();
509 let _ = stream.try_next().await.unwrap().unwrap();
510
511 let path = Path::from("0.parquet");
512 let ranges = inner.get_access_ranges(&path).unwrap();
513 assert_eq!(ranges.len(), 2);
514 assert_eq!(ranges[0], 0..BLOCK_SIZE);
515 assert_eq!(ranges[1], BLOCK_SIZE..BLOCK_SIZE * 2);
516
517 let request2 = PrefetchFromObjectStoreRequest {
518 url: url.to_string(),
519 store_options: HashMap::new(),
520 location: "0.parquet".to_string(),
521 range_start: Some(start + 1024),
522 range_end: Some(end - 1024),
523 };
524
525 let action = LiquidCacheActions::PrefetchFromObjectStore(request2);
526 let result = service.do_action_inner(action).await.unwrap();
527 let mut stream = result.into_inner();
528 let _ = stream.try_next().await.unwrap().unwrap();
529
530 let ranges_after = inner.get_access_ranges(&path).unwrap();
531 assert_eq!(ranges_after.len(), 2);
532 }
533}