liquid_cache_common/
lib.rs

1#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
2
3use std::ops::Deref;
4use std::str::FromStr;
5use std::{fmt::Display, sync::Arc};
6
7use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
8pub mod rpc;
9pub mod utils;
10/// Specify how LiquidCache should cache the data
11#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
12pub enum CacheMode {
13    /// Cache parquet files
14    Parquet,
15    /// Cache LiquidArray, transcode happens in background
16    #[default]
17    Liquid,
18    /// Transcode blocks query execution
19    LiquidEagerTranscode,
20    /// Cache Arrow, transcode happens in background
21    Arrow,
22}
23
24impl Display for CacheMode {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        write!(
27            f,
28            "{}",
29            match self {
30                CacheMode::Parquet => "parquet",
31                CacheMode::Liquid => "liquid",
32                CacheMode::LiquidEagerTranscode => "liquid_eager_transcode",
33                CacheMode::Arrow => "arrow",
34            }
35        )
36    }
37}
38
39impl FromStr for CacheMode {
40    type Err = String;
41
42    fn from_str(s: &str) -> Result<Self, Self::Err> {
43        Ok(match s {
44            "parquet" => CacheMode::Parquet,
45            "liquid" => CacheMode::Liquid,
46            "liquid_eager_transcode" => CacheMode::LiquidEagerTranscode,
47            "arrow" => CacheMode::Arrow,
48            _ => {
49                return Err(format!(
50                    "Invalid cache mode: {}, must be one of: parquet, liquid, liquid_eager_transcode, arrow",
51                    s
52                ));
53            }
54        })
55    }
56}
57
58/// Create a new field with the specified data type, copying the other
59/// properties from the input field
60fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
61    Arc::new(field.as_ref().clone().with_data_type(new_type))
62}
63
64pub fn coerce_to_liquid_cache_types(schema: &Schema) -> Schema {
65    let transformed_fields: Vec<Arc<Field>> = schema
66        .fields
67        .iter()
68        .map(|field| match field.data_type() {
69            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => field_with_new_type(
70                field,
71                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
72            ),
73            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => field_with_new_type(
74                field,
75                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
76            ),
77            _ => field.clone(),
78        })
79        .collect();
80    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
81}
82
83/// Coerce the schema from LiquidParquetReader to LiquidCache types.
84pub fn coerce_from_parquet_reader_to_liquid_types(schema: &Schema) -> Schema {
85    let transformed_fields: Vec<Arc<Field>> = schema
86        .fields
87        .iter()
88        .map(|field| {
89            if field.data_type().equals_datatype(&DataType::Utf8View) {
90                field_with_new_type(
91                    field,
92                    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
93                )
94            } else {
95                field.clone()
96            }
97        })
98        .collect();
99    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
100}
101
102pub fn coerce_binary_to_string(schema: &Schema) -> Schema {
103    let transformed_fields: Vec<Arc<Field>> = schema
104        .fields
105        .iter()
106        .map(|field| match field.data_type() {
107            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
108                field_with_new_type(field, DataType::Utf8)
109            }
110            _ => field.clone(),
111        })
112        .collect();
113    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
114}
115
116pub fn coerce_string_to_view(schema: &Schema) -> Schema {
117    let transformed_fields: Vec<Arc<Field>> = schema
118        .fields
119        .iter()
120        .map(|field| match field.data_type() {
121            DataType::Utf8 | DataType::LargeUtf8 => field_with_new_type(field, DataType::Utf8View),
122            _ => field.clone(),
123        })
124        .collect();
125    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
126}
127
128/// A schema that where strings are stored as `Utf8View`
129pub struct StringViewSchema {
130    schema: SchemaRef,
131}
132
133impl Deref for StringViewSchema {
134    type Target = SchemaRef;
135
136    fn deref(&self) -> &Self::Target {
137        &self.schema
138    }
139}
140
141impl From<&DictStringSchema> for StringViewSchema {
142    fn from(schema: &DictStringSchema) -> Self {
143        let dict_string_type =
144            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
145        let transformed_fields: Vec<Arc<Field>> = schema
146            .schema
147            .fields
148            .iter()
149            .map(|field| {
150                if field.data_type().equals_datatype(&dict_string_type) {
151                    field_with_new_type(field, DataType::Utf8View)
152                } else {
153                    field.clone()
154                }
155            })
156            .collect();
157        Self {
158            schema: Arc::new(Schema::new_with_metadata(
159                transformed_fields,
160                schema.schema.metadata.clone(),
161            )),
162        }
163    }
164}
165
166/// A schema that where strings are stored as `Dictionary<UInt16, Utf8>`
167pub struct DictStringSchema {
168    schema: SchemaRef,
169}
170
171impl DictStringSchema {
172    /// Create a new `DictStringSchema` from a `SchemaRef`.
173    ///
174    /// # Panics
175    ///
176    /// This function will panic if the schema contains a `Utf8` or `Utf8View` field.
177    pub fn new(schema: SchemaRef) -> Self {
178        {
179            for field in schema.fields() {
180                assert!(
181                    !field.data_type().equals_datatype(&DataType::Utf8),
182                    "Field {} must not be a Utf8",
183                    field.name()
184                );
185                assert!(
186                    !field.data_type().equals_datatype(&DataType::Utf8View),
187                    "Field {} must not be a Utf8View",
188                    field.name()
189                );
190            }
191        }
192        Self { schema }
193    }
194}
195
196impl Deref for DictStringSchema {
197    type Target = SchemaRef;
198
199    fn deref(&self) -> &Self::Target {
200        &self.schema
201    }
202}
203
204/// A schema that where strings are stored as `Utf8`
205pub struct StringSchema {
206    schema: SchemaRef,
207}
208
209impl Deref for StringSchema {
210    type Target = SchemaRef;
211
212    fn deref(&self) -> &Self::Target {
213        &self.schema
214    }
215}
216
217impl From<&DictStringSchema> for StringSchema {
218    fn from(schema: &DictStringSchema) -> Self {
219        let dict_string_type =
220            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
221        let transformed_fields: Vec<Arc<Field>> = schema
222            .schema
223            .fields
224            .iter()
225            .map(|field| {
226                if field.data_type().equals_datatype(&dict_string_type) {
227                    field_with_new_type(field, DataType::Utf8)
228                } else {
229                    field.clone()
230                }
231            })
232            .collect();
233        Self {
234            schema: Arc::new(Schema::new_with_metadata(
235                transformed_fields,
236                schema.schema.metadata.clone(),
237            )),
238        }
239    }
240}