liquid_cache_common/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::str::FromStr;
4use std::{fmt::Display, sync::Arc};
5
6use arrow::array::ArrayRef;
7use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
8pub mod mock_store;
9pub mod rpc;
10pub mod utils;
11/// Specify how LiquidCache should cache the data
12#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
13pub enum CacheMode {
14    /// Cache parquet files
15    Parquet,
16    /// Cache LiquidArray, transcode happens in background
17    #[default]
18    Liquid,
19    /// Transcode blocks query execution
20    LiquidEagerTranscode,
21    /// Cache Arrow, transcode happens in background
22    Arrow,
23    /// Static file server mode
24    StaticFileServer,
25}
26
27impl Display for CacheMode {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        write!(
30            f,
31            "{}",
32            match self {
33                CacheMode::Parquet => "parquet",
34                CacheMode::Liquid => "liquid",
35                CacheMode::LiquidEagerTranscode => "liquid_eager_transcode",
36                CacheMode::Arrow => "arrow",
37                CacheMode::StaticFileServer => "static_file_server",
38            }
39        )
40    }
41}
42
43impl FromStr for CacheMode {
44    type Err = String;
45
46    fn from_str(s: &str) -> Result<Self, Self::Err> {
47        Ok(match s {
48            "parquet" => CacheMode::Parquet,
49            "liquid" => CacheMode::Liquid,
50            "liquid_eager_transcode" => CacheMode::LiquidEagerTranscode,
51            "arrow" => CacheMode::Arrow,
52            "static_file_server" => CacheMode::StaticFileServer,
53            _ => {
54                return Err(format!(
55                    "Invalid cache mode: {s}, must be one of: parquet, liquid, liquid_eager_transcode, arrow, static_file_server"
56                ));
57            }
58        })
59    }
60}
61
62/// The mode of the cache.
63#[derive(Debug, Copy, Clone)]
64pub enum LiquidCacheMode {
65    /// Cache Arrow.
66    Arrow,
67    /// Cache Liquid, it's initially cached as Arrow, and then transcode to liquid in background.
68    Liquid,
69    /// Cache Liquid, but block the thread when transcoding.
70    LiquidBlocking,
71}
72
73impl Default for LiquidCacheMode {
74    fn default() -> Self {
75        Self::Liquid
76    }
77}
78
79impl From<CacheMode> for LiquidCacheMode {
80    fn from(value: CacheMode) -> Self {
81        match value {
82            CacheMode::Liquid => LiquidCacheMode::Liquid,
83            CacheMode::Arrow => LiquidCacheMode::Arrow,
84            CacheMode::LiquidEagerTranscode => LiquidCacheMode::LiquidBlocking,
85            CacheMode::Parquet => unreachable!(),
86            CacheMode::StaticFileServer => unreachable!(),
87        }
88    }
89}
90
91/// Create a new field with the specified data type, copying the other
92/// properties from the input field
93fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
94    Arc::new(field.as_ref().clone().with_data_type(new_type))
95}
96
97pub fn coerce_parquet_type_to_liquid_type(
98    data_type: &DataType,
99    cache_mode: &LiquidCacheMode,
100) -> DataType {
101    match cache_mode {
102        LiquidCacheMode::Arrow => {
103            if data_type.equals_datatype(&DataType::Utf8View) {
104                DataType::Utf8
105            } else if data_type.equals_datatype(&DataType::BinaryView) {
106                DataType::Binary
107            } else {
108                data_type.clone()
109            }
110        }
111        LiquidCacheMode::Liquid | LiquidCacheMode::LiquidBlocking => {
112            if data_type.equals_datatype(&DataType::Utf8View) {
113                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
114            } else if data_type.equals_datatype(&DataType::BinaryView) {
115                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
116            } else {
117                data_type.clone()
118            }
119        }
120    }
121}
122
123pub fn cast_from_parquet_to_liquid_type(array: ArrayRef, cache_mode: &LiquidCacheMode) -> ArrayRef {
124    match cache_mode {
125        LiquidCacheMode::Arrow => {
126            if array.data_type() == &DataType::Utf8View {
127                arrow::compute::kernels::cast(&array, &DataType::Utf8).unwrap()
128            } else if array.data_type() == &DataType::BinaryView {
129                arrow::compute::kernels::cast(&array, &DataType::Binary).unwrap()
130            } else {
131                array
132            }
133        }
134        LiquidCacheMode::Liquid | LiquidCacheMode::LiquidBlocking => {
135            if array.data_type() == &DataType::Utf8View {
136                arrow::compute::kernels::cast(
137                    &array,
138                    &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
139                )
140                .unwrap()
141            } else if array.data_type() == &DataType::BinaryView {
142                arrow::compute::kernels::cast(
143                    &array,
144                    &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)),
145                )
146                .unwrap()
147            } else {
148                array
149            }
150        }
151    }
152}
153/// Coerce the schema from LiquidParquetReader to LiquidCache types.
154pub fn coerce_parquet_schema_to_liquid_schema(
155    schema: &Schema,
156    cache_mode: &LiquidCacheMode,
157) -> Schema {
158    let transformed_fields: Vec<Arc<Field>> = schema
159        .fields
160        .iter()
161        .map(|field| {
162            field_with_new_type(
163                field,
164                coerce_parquet_type_to_liquid_type(field.data_type(), cache_mode),
165            )
166        })
167        .collect();
168    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
169}
170
171/// A schema that where strings are stored as `Utf8View`
172pub struct ParquetReaderSchema {}
173
174impl ParquetReaderSchema {
175    pub fn from(schema: &Schema) -> SchemaRef {
176        let transformed_fields: Vec<Arc<Field>> = schema
177            .fields
178            .iter()
179            .map(|field| {
180                let data_type = field.data_type();
181                match data_type {
182                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
183                        field_with_new_type(field, DataType::Utf8View)
184                    }
185                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
186                        field_with_new_type(field, DataType::BinaryView)
187                    }
188                    _ => field.clone(),
189                }
190            })
191            .collect();
192        Arc::new(Schema::new_with_metadata(
193            transformed_fields,
194            schema.metadata.clone(),
195        ))
196    }
197}