1#![cfg_attr(not(doctest), doc = include_str!(concat!("../", std::env!("CARGO_PKG_README"))))]
2
3use std::ops::Deref;
4use std::str::FromStr;
5use std::{fmt::Display, sync::Arc};
6
7use arrow::array::ArrayRef;
8use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
9pub mod rpc;
10pub mod utils;
11#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
13pub enum CacheMode {
14 Parquet,
16 #[default]
18 Liquid,
19 LiquidEagerTranscode,
21 Arrow,
23 StaticFileServer,
25}
26
27#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
30pub enum CacheEvictionStrategy {
31 #[default]
33 Discard,
34 Filo,
36 Lru,
38 ToDisk,
40}
41
42impl Display for CacheMode {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(
45 f,
46 "{}",
47 match self {
48 CacheMode::Parquet => "parquet",
49 CacheMode::Liquid => "liquid",
50 CacheMode::LiquidEagerTranscode => "liquid_eager_transcode",
51 CacheMode::Arrow => "arrow",
52 CacheMode::StaticFileServer => "static_file_server",
53 }
54 )
55 }
56}
57
58impl FromStr for CacheMode {
59 type Err = String;
60
61 fn from_str(s: &str) -> Result<Self, Self::Err> {
62 Ok(match s {
63 "parquet" => CacheMode::Parquet,
64 "liquid" => CacheMode::Liquid,
65 "liquid_eager_transcode" => CacheMode::LiquidEagerTranscode,
66 "arrow" => CacheMode::Arrow,
67 "static_file_server" => CacheMode::StaticFileServer,
68 _ => {
69 return Err(format!(
70 "Invalid cache mode: {s}, must be one of: parquet, liquid, liquid_eager_transcode, arrow, static_file_server"
71 ));
72 }
73 })
74 }
75}
76
77#[derive(Debug, Copy, Clone)]
79pub enum LiquidCacheMode {
80 Arrow,
82 Liquid {
84 transcode_in_background: bool,
86 },
87}
88
89impl Default for LiquidCacheMode {
90 fn default() -> Self {
91 Self::Liquid {
92 transcode_in_background: true,
93 }
94 }
95}
96
97impl LiquidCacheMode {
98 fn string_type(&self) -> DataType {
99 match self {
100 LiquidCacheMode::Arrow => DataType::Utf8,
101 LiquidCacheMode::Liquid { .. } => {
102 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
103 }
104 }
105 }
106}
107
108impl From<CacheMode> for LiquidCacheMode {
109 fn from(value: CacheMode) -> Self {
110 match value {
111 CacheMode::Liquid => LiquidCacheMode::Liquid {
112 transcode_in_background: true,
113 },
114 CacheMode::Arrow => LiquidCacheMode::Arrow,
115 CacheMode::LiquidEagerTranscode => LiquidCacheMode::Liquid {
116 transcode_in_background: false,
117 },
118 CacheMode::Parquet => unreachable!(),
119 CacheMode::StaticFileServer => unreachable!(),
120 }
121 }
122}
123
124fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
127 Arc::new(field.as_ref().clone().with_data_type(new_type))
128}
129
130pub fn coerce_string_to_liquid_type(data_type: &DataType, mode: &LiquidCacheMode) -> DataType {
131 match mode {
132 LiquidCacheMode::Arrow => data_type.clone(),
133 LiquidCacheMode::Liquid { .. } => match data_type {
134 DataType::Utf8
135 | DataType::LargeUtf8
136 | DataType::Utf8View
137 | DataType::Binary
138 | DataType::LargeBinary
139 | DataType::BinaryView => {
140 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
141 }
142 _ => data_type.clone(),
143 },
144 }
145}
146
147pub fn coerce_to_liquid_cache_types(schema: &Schema, mode: &LiquidCacheMode) -> Schema {
148 match mode {
149 LiquidCacheMode::Arrow => schema.clone(),
151 LiquidCacheMode::Liquid { .. } => {
152 let transformed_fields: Vec<Arc<Field>> = schema
153 .fields
154 .iter()
155 .map(|field| {
156 field_with_new_type(
157 field,
158 coerce_string_to_liquid_type(field.data_type(), mode),
159 )
160 })
161 .collect();
162 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
163 }
164 }
165}
166
167pub fn coerce_from_parquet_to_liquid_type(
168 data_type: &DataType,
169 cache_mode: &LiquidCacheMode,
170) -> DataType {
171 match cache_mode {
172 LiquidCacheMode::Arrow => {
173 if data_type.equals_datatype(&DataType::Utf8View) {
174 DataType::Utf8
175 } else {
176 data_type.clone()
177 }
178 }
179 LiquidCacheMode::Liquid { .. } => {
180 if data_type.equals_datatype(&DataType::Utf8View) {
181 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
182 } else {
183 data_type.clone()
184 }
185 }
186 }
187}
188
189pub fn cast_from_parquet_to_liquid_type(array: ArrayRef, cache_mode: &LiquidCacheMode) -> ArrayRef {
190 match cache_mode {
191 LiquidCacheMode::Arrow => {
192 if array.data_type() == &DataType::Utf8View {
193 arrow::compute::kernels::cast(&array, &DataType::Utf8).unwrap()
194 } else {
195 array
196 }
197 }
198 LiquidCacheMode::Liquid { .. } => {
199 if array.data_type() == &DataType::Utf8View {
200 arrow::compute::kernels::cast(
201 &array,
202 &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
203 )
204 .unwrap()
205 } else {
206 array
207 }
208 }
209 }
210}
211pub fn coerce_from_parquet_reader_to_liquid_types(
213 schema: &Schema,
214 cache_mode: &LiquidCacheMode,
215) -> Schema {
216 let transformed_fields: Vec<Arc<Field>> = schema
217 .fields
218 .iter()
219 .map(|field| {
220 field_with_new_type(
221 field,
222 coerce_from_parquet_to_liquid_type(field.data_type(), cache_mode),
223 )
224 })
225 .collect();
226 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
227}
228
229pub fn coerce_binary_to_string(schema: &Schema) -> Schema {
230 let transformed_fields: Vec<Arc<Field>> = schema
231 .fields
232 .iter()
233 .map(|field| match field.data_type() {
234 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
235 field_with_new_type(field, DataType::Utf8)
236 }
237 _ => field.clone(),
238 })
239 .collect();
240 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
241}
242
243pub fn coerce_string_to_view(schema: &Schema) -> Schema {
244 let transformed_fields: Vec<Arc<Field>> = schema
245 .fields
246 .iter()
247 .map(|field| match field.data_type() {
248 DataType::Utf8 | DataType::LargeUtf8 => field_with_new_type(field, DataType::Utf8View),
249 _ => field.clone(),
250 })
251 .collect();
252 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
253}
254
255pub struct ParquetReaderSchema {}
257
258impl ParquetReaderSchema {
259 pub fn from(schema: &Schema) -> SchemaRef {
260 let transformed_fields: Vec<Arc<Field>> = schema
261 .fields
262 .iter()
263 .map(|field| {
264 let data_type = field.data_type();
265 match data_type {
266 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
267 field_with_new_type(field, DataType::Utf8View)
268 }
269 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
270 field_with_new_type(field, DataType::BinaryView)
271 }
272 _ => field.clone(),
273 }
274 })
275 .collect();
276 Arc::new(Schema::new_with_metadata(
277 transformed_fields,
278 schema.metadata.clone(),
279 ))
280 }
281}
282
283pub struct CacheSchema {
285 schema: SchemaRef,
286}
287
288impl CacheSchema {
289 pub fn new_checked(schema: SchemaRef, mode: &LiquidCacheMode) -> Self {
295 {
296 for field in schema.fields() {
297 match mode {
298 LiquidCacheMode::Arrow => {
299 assert!(
301 !field.data_type().equals_datatype(&DataType::Utf8View),
302 "Field {} must not be a Utf8View",
303 field.name()
304 );
305 assert!(
306 !field.data_type().equals_datatype(&DataType::Dictionary(
307 Box::new(DataType::UInt16),
308 Box::new(DataType::Utf8)
309 )),
310 "Field {} must not be a Dict<UInt16, Utf8>",
311 field.name()
312 );
313 }
314 LiquidCacheMode::Liquid { .. } => {
315 assert!(
317 !field.data_type().equals_datatype(&DataType::Utf8),
318 "Field {} must not be a Utf8",
319 field.name()
320 );
321 assert!(
322 !field.data_type().equals_datatype(&DataType::Utf8View),
323 "Field {} must not be a Utf8View",
324 field.name()
325 );
326 }
327 }
328 }
329 }
330 Self { schema }
331 }
332
333 pub fn from(downstream_full_schema: SchemaRef, mode: &LiquidCacheMode) -> Self {
334 let transformed_fields: Vec<Arc<Field>> = downstream_full_schema
335 .fields
336 .iter()
337 .map(|field| {
338 let data_type = field.data_type();
339 match data_type {
340 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
341 field_with_new_type(field, mode.string_type())
342 }
343 _ => field.clone(),
344 }
345 })
346 .collect();
347 Self {
348 schema: Arc::new(Schema::new_with_metadata(
349 transformed_fields,
350 downstream_full_schema.metadata.clone(),
351 )),
352 }
353 }
354}
355
356impl Deref for CacheSchema {
357 type Target = SchemaRef;
358
359 fn deref(&self) -> &Self::Target {
360 &self.schema
361 }
362}