liquid_cache_common/
lib.rs1#![doc = include_str!("../README.md")]
2
3use std::str::FromStr;
4use std::{fmt::Display, sync::Arc};
5
6use arrow::array::ArrayRef;
7use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
8pub mod mock_store;
9pub mod rpc;
10pub mod utils;
11#[derive(Clone, Debug, Default, Copy, PartialEq, Eq)]
13pub enum CacheMode {
14 Parquet,
16 #[default]
18 Liquid,
19 LiquidEagerTranscode,
21 Arrow,
23 StaticFileServer,
25}
26
27impl Display for CacheMode {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 write!(
30 f,
31 "{}",
32 match self {
33 CacheMode::Parquet => "parquet",
34 CacheMode::Liquid => "liquid",
35 CacheMode::LiquidEagerTranscode => "liquid_eager_transcode",
36 CacheMode::Arrow => "arrow",
37 CacheMode::StaticFileServer => "static_file_server",
38 }
39 )
40 }
41}
42
43impl FromStr for CacheMode {
44 type Err = String;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 Ok(match s {
48 "parquet" => CacheMode::Parquet,
49 "liquid" => CacheMode::Liquid,
50 "liquid_eager_transcode" => CacheMode::LiquidEagerTranscode,
51 "arrow" => CacheMode::Arrow,
52 "static_file_server" => CacheMode::StaticFileServer,
53 _ => {
54 return Err(format!(
55 "Invalid cache mode: {s}, must be one of: parquet, liquid, liquid_eager_transcode, arrow, static_file_server"
56 ));
57 }
58 })
59 }
60}
61
62#[derive(Debug, Copy, Clone)]
64pub enum LiquidCacheMode {
65 Arrow,
67 Liquid,
69 LiquidBlocking,
71}
72
73impl Default for LiquidCacheMode {
74 fn default() -> Self {
75 Self::Liquid
76 }
77}
78
79impl From<CacheMode> for LiquidCacheMode {
80 fn from(value: CacheMode) -> Self {
81 match value {
82 CacheMode::Liquid => LiquidCacheMode::Liquid,
83 CacheMode::Arrow => LiquidCacheMode::Arrow,
84 CacheMode::LiquidEagerTranscode => LiquidCacheMode::LiquidBlocking,
85 CacheMode::Parquet => unreachable!(),
86 CacheMode::StaticFileServer => unreachable!(),
87 }
88 }
89}
90
91fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
94 Arc::new(field.as_ref().clone().with_data_type(new_type))
95}
96
97pub fn coerce_parquet_type_to_liquid_type(
98 data_type: &DataType,
99 cache_mode: &LiquidCacheMode,
100) -> DataType {
101 match cache_mode {
102 LiquidCacheMode::Arrow => {
103 if data_type.equals_datatype(&DataType::Utf8View) {
104 DataType::Utf8
105 } else if data_type.equals_datatype(&DataType::BinaryView) {
106 DataType::Binary
107 } else {
108 data_type.clone()
109 }
110 }
111 LiquidCacheMode::Liquid | LiquidCacheMode::LiquidBlocking => {
112 if data_type.equals_datatype(&DataType::Utf8View) {
113 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
114 } else if data_type.equals_datatype(&DataType::BinaryView) {
115 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
116 } else {
117 data_type.clone()
118 }
119 }
120 }
121}
122
123pub fn cast_from_parquet_to_liquid_type(array: ArrayRef, cache_mode: &LiquidCacheMode) -> ArrayRef {
124 match cache_mode {
125 LiquidCacheMode::Arrow => {
126 if array.data_type() == &DataType::Utf8View {
127 arrow::compute::kernels::cast(&array, &DataType::Utf8).unwrap()
128 } else if array.data_type() == &DataType::BinaryView {
129 arrow::compute::kernels::cast(&array, &DataType::Binary).unwrap()
130 } else {
131 array
132 }
133 }
134 LiquidCacheMode::Liquid | LiquidCacheMode::LiquidBlocking => {
135 if array.data_type() == &DataType::Utf8View {
136 arrow::compute::kernels::cast(
137 &array,
138 &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
139 )
140 .unwrap()
141 } else if array.data_type() == &DataType::BinaryView {
142 arrow::compute::kernels::cast(
143 &array,
144 &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)),
145 )
146 .unwrap()
147 } else {
148 array
149 }
150 }
151 }
152}
153pub fn coerce_parquet_schema_to_liquid_schema(
155 schema: &Schema,
156 cache_mode: &LiquidCacheMode,
157) -> Schema {
158 let transformed_fields: Vec<Arc<Field>> = schema
159 .fields
160 .iter()
161 .map(|field| {
162 field_with_new_type(
163 field,
164 coerce_parquet_type_to_liquid_type(field.data_type(), cache_mode),
165 )
166 })
167 .collect();
168 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
169}
170
171pub struct ParquetReaderSchema {}
173
174impl ParquetReaderSchema {
175 pub fn from(schema: &Schema) -> SchemaRef {
176 let transformed_fields: Vec<Arc<Field>> = schema
177 .fields
178 .iter()
179 .map(|field| {
180 let data_type = field.data_type();
181 match data_type {
182 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
183 field_with_new_type(field, DataType::Utf8View)
184 }
185 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
186 field_with_new_type(field, DataType::BinaryView)
187 }
188 _ => field.clone(),
189 }
190 })
191 .collect();
192 Arc::new(Schema::new_with_metadata(
193 transformed_fields,
194 schema.metadata.clone(),
195 ))
196 }
197}