1use std::fmt;
21use std::fs::File;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow::array::{Int64Array, StringArray};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches;
29use datafusion::catalog::{Session, TableFunctionImpl};
30use datafusion::common::{plan_err, Column};
31use datafusion::datasource::memory::MemorySourceConfig;
32use datafusion::datasource::TableProvider;
33use datafusion::error::Result;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_plan::ExecutionPlan;
36use datafusion::scalar::ScalarValue;
37
38use async_trait::async_trait;
39use parquet::basic::ConvertedType;
40use parquet::data_type::{ByteArray, FixedLenByteArray};
41use parquet::file::reader::FileReader;
42use parquet::file::serialized_reader::SerializedFileReader;
43use parquet::file::statistics::Statistics;
44
45#[derive(Debug)]
46pub enum Function {
47 Select,
48 Explain,
49 Show,
50 CreateTable,
51 CreateTableAs,
52 Insert,
53 DropTable,
54}
55
56const ALL_FUNCTIONS: [Function; 7] = [
57 Function::CreateTable,
58 Function::CreateTableAs,
59 Function::DropTable,
60 Function::Explain,
61 Function::Insert,
62 Function::Select,
63 Function::Show,
64];
65
66impl Function {
67 pub fn function_details(&self) -> Result<&str> {
68 let details = match self {
69 Function::Select => {
70 r#"
71Command: SELECT
72Description: retrieve rows from a table or view
73Syntax:
74SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
75 [ * | expression [ [ AS ] output_name ] [, ...] ]
76 [ FROM from_item [, ...] ]
77 [ WHERE condition ]
78 [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
79 [ HAVING condition ]
80 [ WINDOW window_name AS ( window_definition ) [, ...] ]
81 [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
82 [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
83 [ LIMIT { count | ALL } ]
84 [ OFFSET start [ ROW | ROWS ] ]
85
86where from_item can be one of:
87
88 [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
89 [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
90 [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
91 with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
92 [ LATERAL ] function_name ( [ argument [, ...] ] )
93 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
94 [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
95 [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
96 [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
97 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
98 from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
99
100and grouping_element can be one of:
101
102 ( )
103 expression
104 ( expression [, ...] )
105
106and with_query is:
107
108 with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
109
110TABLE [ ONLY ] table_name [ * ]"#
111 }
112 Function::Explain => {
113 r#"
114Command: EXPLAIN
115Description: show the execution plan of a statement
116Syntax:
117EXPLAIN [ ANALYZE ] statement
118"#
119 }
120 Function::Show => {
121 r#"
122Command: SHOW
123Description: show the value of a run-time parameter
124Syntax:
125SHOW name
126"#
127 }
128 Function::CreateTable => {
129 r#"
130Command: CREATE TABLE
131Description: define a new table
132Syntax:
133CREATE [ EXTERNAL ] TABLE table_name ( [
134 { column_name data_type }
135 [, ... ]
136] )
137"#
138 }
139 Function::CreateTableAs => {
140 r#"
141Command: CREATE TABLE AS
142Description: define a new table from the results of a query
143Syntax:
144CREATE TABLE table_name
145 [ (column_name [, ...] ) ]
146 AS query
147 [ WITH [ NO ] DATA ]
148"#
149 }
150 Function::Insert => {
151 r#"
152Command: INSERT
153Description: create new rows in a table
154Syntax:
155INSERT INTO table_name [ ( column_name [, ...] ) ]
156 { VALUES ( { expression } [, ...] ) [, ...] }
157"#
158 }
159 Function::DropTable => {
160 r#"
161Command: DROP TABLE
162Description: remove a table
163Syntax:
164DROP TABLE [ IF EXISTS ] name [, ...]
165"#
166 }
167 };
168 Ok(details)
169 }
170}
171
172impl FromStr for Function {
173 type Err = ();
174
175 fn from_str(s: &str) -> Result<Self, Self::Err> {
176 Ok(match s.trim().to_uppercase().as_str() {
177 "SELECT" => Self::Select,
178 "EXPLAIN" => Self::Explain,
179 "SHOW" => Self::Show,
180 "CREATE TABLE" => Self::CreateTable,
181 "CREATE TABLE AS" => Self::CreateTableAs,
182 "INSERT" => Self::Insert,
183 "DROP TABLE" => Self::DropTable,
184 _ => return Err(()),
185 })
186 }
187}
188
189impl fmt::Display for Function {
190 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191 match *self {
192 Function::Select => write!(f, "SELECT"),
193 Function::Explain => write!(f, "EXPLAIN"),
194 Function::Show => write!(f, "SHOW"),
195 Function::CreateTable => write!(f, "CREATE TABLE"),
196 Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
197 Function::Insert => write!(f, "INSERT"),
198 Function::DropTable => write!(f, "DROP TABLE"),
199 }
200 }
201}
202
203pub fn display_all_functions() -> Result<()> {
204 println!("Available help:");
205 let array = StringArray::from(
206 ALL_FUNCTIONS
207 .iter()
208 .map(|f| format!("{f}"))
209 .collect::<Vec<String>>(),
210 );
211 let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
212 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
213 println!("{}", pretty_format_batches(&[batch]).unwrap());
214 Ok(())
215}
216
217#[derive(Debug)]
219struct ParquetMetadataTable {
220 schema: SchemaRef,
221 batch: RecordBatch,
222}
223
224#[async_trait]
225impl TableProvider for ParquetMetadataTable {
226 fn as_any(&self) -> &dyn std::any::Any {
227 self
228 }
229
230 fn schema(&self) -> arrow::datatypes::SchemaRef {
231 self.schema.clone()
232 }
233
234 fn table_type(&self) -> datafusion::logical_expr::TableType {
235 datafusion::logical_expr::TableType::Base
236 }
237
238 async fn scan(
239 &self,
240 _state: &dyn Session,
241 projection: Option<&Vec<usize>>,
242 _filters: &[Expr],
243 _limit: Option<usize>,
244 ) -> Result<Arc<dyn ExecutionPlan>> {
245 Ok(MemorySourceConfig::try_new_exec(
246 &[vec![self.batch.clone()]],
247 TableProvider::schema(self),
248 projection.cloned(),
249 )?)
250 }
251}
252
253fn convert_parquet_statistics(
254 value: &Statistics,
255 converted_type: ConvertedType,
256) -> (Option<String>, Option<String>) {
257 match (value, converted_type) {
258 (Statistics::Boolean(val), _) => (
259 val.min_opt().map(|v| v.to_string()),
260 val.max_opt().map(|v| v.to_string()),
261 ),
262 (Statistics::Int32(val), _) => (
263 val.min_opt().map(|v| v.to_string()),
264 val.max_opt().map(|v| v.to_string()),
265 ),
266 (Statistics::Int64(val), _) => (
267 val.min_opt().map(|v| v.to_string()),
268 val.max_opt().map(|v| v.to_string()),
269 ),
270 (Statistics::Int96(val), _) => (
271 val.min_opt().map(|v| v.to_string()),
272 val.max_opt().map(|v| v.to_string()),
273 ),
274 (Statistics::Float(val), _) => (
275 val.min_opt().map(|v| v.to_string()),
276 val.max_opt().map(|v| v.to_string()),
277 ),
278 (Statistics::Double(val), _) => (
279 val.min_opt().map(|v| v.to_string()),
280 val.max_opt().map(|v| v.to_string()),
281 ),
282 (Statistics::ByteArray(val), ConvertedType::UTF8) => (
283 byte_array_to_string(val.min_opt()),
284 byte_array_to_string(val.max_opt()),
285 ),
286 (Statistics::ByteArray(val), _) => (
287 val.min_opt().map(|v| v.to_string()),
288 val.max_opt().map(|v| v.to_string()),
289 ),
290 (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
291 fixed_len_byte_array_to_string(val.min_opt()),
292 fixed_len_byte_array_to_string(val.max_opt()),
293 ),
294 (Statistics::FixedLenByteArray(val), _) => (
295 val.min_opt().map(|v| v.to_string()),
296 val.max_opt().map(|v| v.to_string()),
297 ),
298 }
299}
300
301fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
303 val.map(|v| {
304 v.as_utf8()
305 .map(|s| s.to_string())
306 .unwrap_or_else(|_e| v.to_string())
307 })
308}
309
310fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
312 val.map(|v| {
313 v.as_utf8()
314 .map(|s| s.to_string())
315 .unwrap_or_else(|_e| v.to_string())
316 })
317}
318
319#[derive(Debug)]
320pub struct ParquetMetadataFunc {}
321
322impl TableFunctionImpl for ParquetMetadataFunc {
323 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
324 let filename = match exprs.first() {
325 Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, Some(Expr::Column(Column { name, .. })) => name, _ => {
328 return plan_err!(
329 "parquet_metadata requires string argument as its input"
330 );
331 }
332 };
333
334 let file = File::open(filename.clone())?;
335 let reader = SerializedFileReader::new(file)?;
336 let metadata = reader.metadata();
337
338 let schema = Arc::new(Schema::new(vec![
339 Field::new("filename", DataType::Utf8, true),
340 Field::new("row_group_id", DataType::Int64, true),
341 Field::new("row_group_num_rows", DataType::Int64, true),
342 Field::new("row_group_num_columns", DataType::Int64, true),
343 Field::new("row_group_bytes", DataType::Int64, true),
344 Field::new("column_id", DataType::Int64, true),
345 Field::new("file_offset", DataType::Int64, true),
346 Field::new("num_values", DataType::Int64, true),
347 Field::new("path_in_schema", DataType::Utf8, true),
348 Field::new("type", DataType::Utf8, true),
349 Field::new("stats_min", DataType::Utf8, true),
350 Field::new("stats_max", DataType::Utf8, true),
351 Field::new("stats_null_count", DataType::Int64, true),
352 Field::new("stats_distinct_count", DataType::Int64, true),
353 Field::new("stats_min_value", DataType::Utf8, true),
354 Field::new("stats_max_value", DataType::Utf8, true),
355 Field::new("compression", DataType::Utf8, true),
356 Field::new("encodings", DataType::Utf8, true),
357 Field::new("index_page_offset", DataType::Int64, true),
358 Field::new("dictionary_page_offset", DataType::Int64, true),
359 Field::new("data_page_offset", DataType::Int64, true),
360 Field::new("total_compressed_size", DataType::Int64, true),
361 Field::new("total_uncompressed_size", DataType::Int64, true),
362 ]));
363
364 let mut filename_arr = vec![];
366 let mut row_group_id_arr = vec![];
367 let mut row_group_num_rows_arr = vec![];
368 let mut row_group_num_columns_arr = vec![];
369 let mut row_group_bytes_arr = vec![];
370 let mut column_id_arr = vec![];
371 let mut file_offset_arr = vec![];
372 let mut num_values_arr = vec![];
373 let mut path_in_schema_arr = vec![];
374 let mut type_arr = vec![];
375 let mut stats_min_arr = vec![];
376 let mut stats_max_arr = vec![];
377 let mut stats_null_count_arr = vec![];
378 let mut stats_distinct_count_arr = vec![];
379 let mut stats_min_value_arr = vec![];
380 let mut stats_max_value_arr = vec![];
381 let mut compression_arr = vec![];
382 let mut encodings_arr = vec![];
383 let mut index_page_offset_arr = vec![];
384 let mut dictionary_page_offset_arr = vec![];
385 let mut data_page_offset_arr = vec![];
386 let mut total_compressed_size_arr = vec![];
387 let mut total_uncompressed_size_arr = vec![];
388 for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
389 for (col_idx, column) in row_group.columns().iter().enumerate() {
390 filename_arr.push(filename.clone());
391 row_group_id_arr.push(rg_idx as i64);
392 row_group_num_rows_arr.push(row_group.num_rows());
393 row_group_num_columns_arr.push(row_group.num_columns() as i64);
394 row_group_bytes_arr.push(row_group.total_byte_size());
395 column_id_arr.push(col_idx as i64);
396 file_offset_arr.push(column.file_offset());
397 num_values_arr.push(column.num_values());
398 path_in_schema_arr.push(column.column_path().to_string());
399 type_arr.push(column.column_type().to_string());
400 let converted_type = column.column_descr().converted_type();
401
402 if let Some(s) = column.statistics() {
403 let (min_val, max_val) =
404 convert_parquet_statistics(s, converted_type);
405 stats_min_arr.push(min_val.clone());
406 stats_max_arr.push(max_val.clone());
407 stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
408 stats_distinct_count_arr
409 .push(s.distinct_count_opt().map(|c| c as i64));
410 stats_min_value_arr.push(min_val);
411 stats_max_value_arr.push(max_val);
412 } else {
413 stats_min_arr.push(None);
414 stats_max_arr.push(None);
415 stats_null_count_arr.push(None);
416 stats_distinct_count_arr.push(None);
417 stats_min_value_arr.push(None);
418 stats_max_value_arr.push(None);
419 };
420 compression_arr.push(format!("{:?}", column.compression()));
421 encodings_arr.push(format!("{:?}", column.encodings()));
422 index_page_offset_arr.push(column.index_page_offset());
423 dictionary_page_offset_arr.push(column.dictionary_page_offset());
424 data_page_offset_arr.push(column.data_page_offset());
425 total_compressed_size_arr.push(column.compressed_size());
426 total_uncompressed_size_arr.push(column.uncompressed_size());
427 }
428 }
429
430 let rb = RecordBatch::try_new(
431 schema.clone(),
432 vec![
433 Arc::new(StringArray::from(filename_arr)),
434 Arc::new(Int64Array::from(row_group_id_arr)),
435 Arc::new(Int64Array::from(row_group_num_rows_arr)),
436 Arc::new(Int64Array::from(row_group_num_columns_arr)),
437 Arc::new(Int64Array::from(row_group_bytes_arr)),
438 Arc::new(Int64Array::from(column_id_arr)),
439 Arc::new(Int64Array::from(file_offset_arr)),
440 Arc::new(Int64Array::from(num_values_arr)),
441 Arc::new(StringArray::from(path_in_schema_arr)),
442 Arc::new(StringArray::from(type_arr)),
443 Arc::new(StringArray::from(stats_min_arr)),
444 Arc::new(StringArray::from(stats_max_arr)),
445 Arc::new(Int64Array::from(stats_null_count_arr)),
446 Arc::new(Int64Array::from(stats_distinct_count_arr)),
447 Arc::new(StringArray::from(stats_min_value_arr)),
448 Arc::new(StringArray::from(stats_max_value_arr)),
449 Arc::new(StringArray::from(compression_arr)),
450 Arc::new(StringArray::from(encodings_arr)),
451 Arc::new(Int64Array::from(index_page_offset_arr)),
452 Arc::new(Int64Array::from(dictionary_page_offset_arr)),
453 Arc::new(Int64Array::from(data_page_offset_arr)),
454 Arc::new(Int64Array::from(total_compressed_size_arr)),
455 Arc::new(Int64Array::from(total_uncompressed_size_arr)),
456 ],
457 )?;
458
459 let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
460 Ok(Arc::new(parquet_metadata))
461 }
462}