liquid_cache_common/
lib.rs

1#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
2
3use std::ops::Deref;
4use std::str::FromStr;
5use std::{fmt::Display, sync::Arc};
6
7use arrow::array::ArrayRef;
8use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
9pub mod rpc;
10pub mod utils;
11/// Specify how LiquidCache should cache the data
12#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
13pub enum CacheMode {
14    /// Cache parquet files
15    Parquet,
16    /// Cache LiquidArray, transcode happens in background
17    #[default]
18    Liquid,
19    /// Transcode blocks query execution
20    LiquidEagerTranscode,
21    /// Cache Arrow, transcode happens in background
22    Arrow,
23    /// Static file server mode
24    StaticFileServer,
25}
26
27/// Specify which eviction logic the cache should use
28/// See [`LiquidCacheMode`](https://docs.rs/liquid-cache-parquet/latest/liquid_cache_parquet/policies/index.html)
29#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
30pub enum CacheEvictionStrategy {
31    /// Don't cache new data.
32    #[default]
33    Discard,
34    /// First In Last Out
35    Filo,
36    /// Least Recently Used
37    Lru,
38    /// Write to disk when the cache is full.
39    ToDisk,
40}
41
42impl Display for CacheMode {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(
45            f,
46            "{}",
47            match self {
48                CacheMode::Parquet => "parquet",
49                CacheMode::Liquid => "liquid",
50                CacheMode::LiquidEagerTranscode => "liquid_eager_transcode",
51                CacheMode::Arrow => "arrow",
52                CacheMode::StaticFileServer => "static_file_server",
53            }
54        )
55    }
56}
57
58impl FromStr for CacheMode {
59    type Err = String;
60
61    fn from_str(s: &str) -> Result<Self, Self::Err> {
62        Ok(match s {
63            "parquet" => CacheMode::Parquet,
64            "liquid" => CacheMode::Liquid,
65            "liquid_eager_transcode" => CacheMode::LiquidEagerTranscode,
66            "arrow" => CacheMode::Arrow,
67            "static_file_server" => CacheMode::StaticFileServer,
68            _ => {
69                return Err(format!(
70                    "Invalid cache mode: {s}, must be one of: parquet, liquid, liquid_eager_transcode, arrow, static_file_server"
71                ));
72            }
73        })
74    }
75}
76
77/// The mode of the cache.
78#[derive(Debug, Copy, Clone)]
79pub enum LiquidCacheMode {
80    /// The baseline that reads the arrays as is.
81    Arrow,
82    /// The baseline that reads the arrays as is, but transcode the data into liquid arrays in the background.
83    Liquid {
84        /// Whether to transcode the data into liquid arrays in the background.
85        transcode_in_background: bool,
86    },
87}
88
89impl Default for LiquidCacheMode {
90    fn default() -> Self {
91        Self::Liquid {
92            transcode_in_background: true,
93        }
94    }
95}
96
97impl LiquidCacheMode {
98    fn string_type(&self) -> DataType {
99        match self {
100            LiquidCacheMode::Arrow => DataType::Utf8,
101            LiquidCacheMode::Liquid { .. } => {
102                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
103            }
104        }
105    }
106}
107
108impl From<CacheMode> for LiquidCacheMode {
109    fn from(value: CacheMode) -> Self {
110        match value {
111            CacheMode::Liquid => LiquidCacheMode::Liquid {
112                transcode_in_background: true,
113            },
114            CacheMode::Arrow => LiquidCacheMode::Arrow,
115            CacheMode::LiquidEagerTranscode => LiquidCacheMode::Liquid {
116                transcode_in_background: false,
117            },
118            CacheMode::Parquet => unreachable!(),
119            CacheMode::StaticFileServer => unreachable!(),
120        }
121    }
122}
123
124/// Create a new field with the specified data type, copying the other
125/// properties from the input field
126fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
127    Arc::new(field.as_ref().clone().with_data_type(new_type))
128}
129
130pub fn coerce_string_to_liquid_type(data_type: &DataType, mode: &LiquidCacheMode) -> DataType {
131    match mode {
132        LiquidCacheMode::Arrow => data_type.clone(),
133        LiquidCacheMode::Liquid { .. } => match data_type {
134            DataType::Utf8
135            | DataType::LargeUtf8
136            | DataType::Utf8View
137            | DataType::Binary
138            | DataType::LargeBinary
139            | DataType::BinaryView => {
140                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
141            }
142            _ => data_type.clone(),
143        },
144    }
145}
146
147pub fn coerce_to_liquid_cache_types(schema: &Schema, mode: &LiquidCacheMode) -> Schema {
148    match mode {
149        // if in memory arrow, we cache as utf8 not dict or utf8view
150        LiquidCacheMode::Arrow => schema.clone(),
151        LiquidCacheMode::Liquid { .. } => {
152            let transformed_fields: Vec<Arc<Field>> = schema
153                .fields
154                .iter()
155                .map(|field| {
156                    field_with_new_type(
157                        field,
158                        coerce_string_to_liquid_type(field.data_type(), mode),
159                    )
160                })
161                .collect();
162            Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
163        }
164    }
165}
166
167pub fn coerce_from_parquet_to_liquid_type(
168    data_type: &DataType,
169    cache_mode: &LiquidCacheMode,
170) -> DataType {
171    match cache_mode {
172        LiquidCacheMode::Arrow => {
173            if data_type.equals_datatype(&DataType::Utf8View) {
174                DataType::Utf8
175            } else {
176                data_type.clone()
177            }
178        }
179        LiquidCacheMode::Liquid { .. } => {
180            if data_type.equals_datatype(&DataType::Utf8View) {
181                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
182            } else {
183                data_type.clone()
184            }
185        }
186    }
187}
188
189pub fn cast_from_parquet_to_liquid_type(array: ArrayRef, cache_mode: &LiquidCacheMode) -> ArrayRef {
190    match cache_mode {
191        LiquidCacheMode::Arrow => {
192            if array.data_type() == &DataType::Utf8View {
193                arrow::compute::kernels::cast(&array, &DataType::Utf8).unwrap()
194            } else {
195                array
196            }
197        }
198        LiquidCacheMode::Liquid { .. } => {
199            if array.data_type() == &DataType::Utf8View {
200                arrow::compute::kernels::cast(
201                    &array,
202                    &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
203                )
204                .unwrap()
205            } else {
206                array
207            }
208        }
209    }
210}
211/// Coerce the schema from LiquidParquetReader to LiquidCache types.
212pub fn coerce_from_parquet_reader_to_liquid_types(
213    schema: &Schema,
214    cache_mode: &LiquidCacheMode,
215) -> Schema {
216    let transformed_fields: Vec<Arc<Field>> = schema
217        .fields
218        .iter()
219        .map(|field| {
220            field_with_new_type(
221                field,
222                coerce_from_parquet_to_liquid_type(field.data_type(), cache_mode),
223            )
224        })
225        .collect();
226    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
227}
228
229pub fn coerce_binary_to_string(schema: &Schema) -> Schema {
230    let transformed_fields: Vec<Arc<Field>> = schema
231        .fields
232        .iter()
233        .map(|field| match field.data_type() {
234            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
235                field_with_new_type(field, DataType::Utf8)
236            }
237            _ => field.clone(),
238        })
239        .collect();
240    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
241}
242
243pub fn coerce_string_to_view(schema: &Schema) -> Schema {
244    let transformed_fields: Vec<Arc<Field>> = schema
245        .fields
246        .iter()
247        .map(|field| match field.data_type() {
248            DataType::Utf8 | DataType::LargeUtf8 => field_with_new_type(field, DataType::Utf8View),
249            _ => field.clone(),
250        })
251        .collect();
252    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
253}
254
255/// A schema that where strings are stored as `Utf8View`
256pub struct ParquetReaderSchema {}
257
258impl ParquetReaderSchema {
259    pub fn from(schema: &Schema) -> SchemaRef {
260        let transformed_fields: Vec<Arc<Field>> = schema
261            .fields
262            .iter()
263            .map(|field| {
264                let data_type = field.data_type();
265                match data_type {
266                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
267                        field_with_new_type(field, DataType::Utf8View)
268                    }
269                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
270                        field_with_new_type(field, DataType::BinaryView)
271                    }
272                    _ => field.clone(),
273                }
274            })
275            .collect();
276        Arc::new(Schema::new_with_metadata(
277            transformed_fields,
278            schema.metadata.clone(),
279        ))
280    }
281}
282
283/// A schema that where strings are stored as `Dictionary<UInt16, Utf8>`
284pub struct CacheSchema {
285    schema: SchemaRef,
286}
287
288impl CacheSchema {
289    /// Create a new `DictStringSchema` from a `SchemaRef`.
290    ///
291    /// # Panics
292    ///
293    /// This function will panic if the schema contains a `Utf8` or `Utf8View` field.
294    pub fn new_checked(schema: SchemaRef, mode: &LiquidCacheMode) -> Self {
295        {
296            for field in schema.fields() {
297                match mode {
298                    LiquidCacheMode::Arrow => {
299                        // cache schema is always utf8
300                        assert!(
301                            !field.data_type().equals_datatype(&DataType::Utf8View),
302                            "Field {} must not be a Utf8View",
303                            field.name()
304                        );
305                        assert!(
306                            !field.data_type().equals_datatype(&DataType::Dictionary(
307                                Box::new(DataType::UInt16),
308                                Box::new(DataType::Utf8)
309                            )),
310                            "Field {} must not be a Dict<UInt16, Utf8>",
311                            field.name()
312                        );
313                    }
314                    LiquidCacheMode::Liquid { .. } => {
315                        // cache schema is always dict string
316                        assert!(
317                            !field.data_type().equals_datatype(&DataType::Utf8),
318                            "Field {} must not be a Utf8",
319                            field.name()
320                        );
321                        assert!(
322                            !field.data_type().equals_datatype(&DataType::Utf8View),
323                            "Field {} must not be a Utf8View",
324                            field.name()
325                        );
326                    }
327                }
328            }
329        }
330        Self { schema }
331    }
332
333    pub fn from(downstream_full_schema: SchemaRef, mode: &LiquidCacheMode) -> Self {
334        let transformed_fields: Vec<Arc<Field>> = downstream_full_schema
335            .fields
336            .iter()
337            .map(|field| {
338                let data_type = field.data_type();
339                match data_type {
340                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
341                        field_with_new_type(field, mode.string_type())
342                    }
343                    _ => field.clone(),
344                }
345            })
346            .collect();
347        Self {
348            schema: Arc::new(Schema::new_with_metadata(
349                transformed_fields,
350                downstream_full_schema.metadata.clone(),
351            )),
352        }
353    }
354}
355
356impl Deref for CacheSchema {
357    type Target = SchemaRef;
358
359    fn deref(&self) -> &Self::Target {
360        &self.schema
361    }
362}