liquid_cache_common/
lib.rs1#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
2
3use std::ops::Deref;
4use std::str::FromStr;
5use std::{fmt::Display, sync::Arc};
6
7use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
8pub mod rpc;
9pub mod utils;
10#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
12pub enum CacheMode {
13 Parquet,
15 #[default]
17 Liquid,
18 LiquidEagerTranscode,
20 Arrow,
22}
23
24impl Display for CacheMode {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(
27 f,
28 "{}",
29 match self {
30 CacheMode::Parquet => "parquet",
31 CacheMode::Liquid => "liquid",
32 CacheMode::LiquidEagerTranscode => "liquid_eager_transcode",
33 CacheMode::Arrow => "arrow",
34 }
35 )
36 }
37}
38
39impl FromStr for CacheMode {
40 type Err = String;
41
42 fn from_str(s: &str) -> Result<Self, Self::Err> {
43 Ok(match s {
44 "parquet" => CacheMode::Parquet,
45 "liquid" => CacheMode::Liquid,
46 "liquid_eager_transcode" => CacheMode::LiquidEagerTranscode,
47 "arrow" => CacheMode::Arrow,
48 _ => {
49 return Err(format!(
50 "Invalid cache mode: {}, must be one of: parquet, liquid, liquid_eager_transcode, arrow",
51 s
52 ));
53 }
54 })
55 }
56}
57
58fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
61 Arc::new(field.as_ref().clone().with_data_type(new_type))
62}
63
64pub fn coerce_to_liquid_cache_types(schema: &Schema) -> Schema {
65 let transformed_fields: Vec<Arc<Field>> = schema
66 .fields
67 .iter()
68 .map(|field| match field.data_type() {
69 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => field_with_new_type(
70 field,
71 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
72 ),
73 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => field_with_new_type(
74 field,
75 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
76 ),
77 _ => field.clone(),
78 })
79 .collect();
80 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
81}
82
83pub fn coerce_from_parquet_reader_to_liquid_types(schema: &Schema) -> Schema {
85 let transformed_fields: Vec<Arc<Field>> = schema
86 .fields
87 .iter()
88 .map(|field| {
89 if field.data_type().equals_datatype(&DataType::Utf8View) {
90 field_with_new_type(
91 field,
92 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
93 )
94 } else {
95 field.clone()
96 }
97 })
98 .collect();
99 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
100}
101
102pub fn coerce_binary_to_string(schema: &Schema) -> Schema {
103 let transformed_fields: Vec<Arc<Field>> = schema
104 .fields
105 .iter()
106 .map(|field| match field.data_type() {
107 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
108 field_with_new_type(field, DataType::Utf8)
109 }
110 _ => field.clone(),
111 })
112 .collect();
113 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
114}
115
116pub fn coerce_string_to_view(schema: &Schema) -> Schema {
117 let transformed_fields: Vec<Arc<Field>> = schema
118 .fields
119 .iter()
120 .map(|field| match field.data_type() {
121 DataType::Utf8 | DataType::LargeUtf8 => field_with_new_type(field, DataType::Utf8View),
122 _ => field.clone(),
123 })
124 .collect();
125 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
126}
127
128pub struct StringViewSchema {
130 schema: SchemaRef,
131}
132
133impl Deref for StringViewSchema {
134 type Target = SchemaRef;
135
136 fn deref(&self) -> &Self::Target {
137 &self.schema
138 }
139}
140
141impl From<&DictStringSchema> for StringViewSchema {
142 fn from(schema: &DictStringSchema) -> Self {
143 let dict_string_type =
144 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
145 let transformed_fields: Vec<Arc<Field>> = schema
146 .schema
147 .fields
148 .iter()
149 .map(|field| {
150 if field.data_type().equals_datatype(&dict_string_type) {
151 field_with_new_type(field, DataType::Utf8View)
152 } else {
153 field.clone()
154 }
155 })
156 .collect();
157 Self {
158 schema: Arc::new(Schema::new_with_metadata(
159 transformed_fields,
160 schema.schema.metadata.clone(),
161 )),
162 }
163 }
164}
165
166pub struct DictStringSchema {
168 schema: SchemaRef,
169}
170
171impl DictStringSchema {
172 pub fn new(schema: SchemaRef) -> Self {
178 {
179 for field in schema.fields() {
180 assert!(
181 !field.data_type().equals_datatype(&DataType::Utf8),
182 "Field {} must not be a Utf8",
183 field.name()
184 );
185 assert!(
186 !field.data_type().equals_datatype(&DataType::Utf8View),
187 "Field {} must not be a Utf8View",
188 field.name()
189 );
190 }
191 }
192 Self { schema }
193 }
194}
195
196impl Deref for DictStringSchema {
197 type Target = SchemaRef;
198
199 fn deref(&self) -> &Self::Target {
200 &self.schema
201 }
202}
203
204pub struct StringSchema {
206 schema: SchemaRef,
207}
208
209impl Deref for StringSchema {
210 type Target = SchemaRef;
211
212 fn deref(&self) -> &Self::Target {
213 &self.schema
214 }
215}
216
217impl From<&DictStringSchema> for StringSchema {
218 fn from(schema: &DictStringSchema) -> Self {
219 let dict_string_type =
220 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
221 let transformed_fields: Vec<Arc<Field>> = schema
222 .schema
223 .fields
224 .iter()
225 .map(|field| {
226 if field.data_type().equals_datatype(&dict_string_type) {
227 field_with_new_type(field, DataType::Utf8)
228 } else {
229 field.clone()
230 }
231 })
232 .collect();
233 Self {
234 schema: Arc::new(Schema::new_with_metadata(
235 transformed_fields,
236 schema.schema.metadata.clone(),
237 )),
238 }
239 }
240}