1use arrow::array::{Int64Array, StringArray};
20use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
21use arrow::record_batch::RecordBatch;
22use arrow::util::pretty::pretty_format_batches;
23use async_trait::async_trait;
24
25use datafusion::catalog::Session;
26use datafusion::common::{plan_err, Column};
27use datafusion::datasource::function::TableFunctionImpl;
28use datafusion::datasource::TableProvider;
29use datafusion::error::Result;
30use datafusion::logical_expr::Expr;
31use datafusion::physical_plan::memory::MemoryExec;
32use datafusion::physical_plan::ExecutionPlan;
33use datafusion::scalar::ScalarValue;
34use parquet::basic::ConvertedType;
35use parquet::data_type::{ByteArray, FixedLenByteArray};
36use parquet::file::reader::FileReader;
37use parquet::file::serialized_reader::SerializedFileReader;
38use parquet::file::statistics::Statistics;
39use std::fmt;
40use std::fs::File;
41use std::str::FromStr;
42use std::sync::Arc;
43
44#[derive(Debug)]
45pub enum Function {
46 Select,
47 Explain,
48 Show,
49 CreateTable,
50 CreateTableAs,
51 Insert,
52 DropTable,
53}
54
55const ALL_FUNCTIONS: [Function; 7] = [
56 Function::CreateTable,
57 Function::CreateTableAs,
58 Function::DropTable,
59 Function::Explain,
60 Function::Insert,
61 Function::Select,
62 Function::Show,
63];
64
65impl Function {
66 pub fn function_details(&self) -> Result<&str> {
67 let details = match self {
68 Function::Select => {
69 r#"
70Command: SELECT
71Description: retrieve rows from a table or view
72Syntax:
73SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
74 [ * | expression [ [ AS ] output_name ] [, ...] ]
75 [ FROM from_item [, ...] ]
76 [ WHERE condition ]
77 [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
78 [ HAVING condition ]
79 [ WINDOW window_name AS ( window_definition ) [, ...] ]
80 [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
81 [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
82 [ LIMIT { count | ALL } ]
83 [ OFFSET start [ ROW | ROWS ] ]
84
85where from_item can be one of:
86
87 [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
88 [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
89 [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
90 with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
91 [ LATERAL ] function_name ( [ argument [, ...] ] )
92 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
93 [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
94 [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
95 [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
96 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
97 from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
98
99and grouping_element can be one of:
100
101 ( )
102 expression
103 ( expression [, ...] )
104
105and with_query is:
106
107 with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
108
109TABLE [ ONLY ] table_name [ * ]"#
110 }
111 Function::Explain => {
112 r#"
113Command: EXPLAIN
114Description: show the execution plan of a statement
115Syntax:
116EXPLAIN [ ANALYZE ] statement
117"#
118 }
119 Function::Show => {
120 r#"
121Command: SHOW
122Description: show the value of a run-time parameter
123Syntax:
124SHOW name
125"#
126 }
127 Function::CreateTable => {
128 r#"
129Command: CREATE TABLE
130Description: define a new table
131Syntax:
132CREATE [ EXTERNAL ] TABLE table_name ( [
133 { column_name data_type }
134 [, ... ]
135] )
136"#
137 }
138 Function::CreateTableAs => {
139 r#"
140Command: CREATE TABLE AS
141Description: define a new table from the results of a query
142Syntax:
143CREATE TABLE table_name
144 [ (column_name [, ...] ) ]
145 AS query
146 [ WITH [ NO ] DATA ]
147"#
148 }
149 Function::Insert => {
150 r#"
151Command: INSERT
152Description: create new rows in a table
153Syntax:
154INSERT INTO table_name [ ( column_name [, ...] ) ]
155 { VALUES ( { expression } [, ...] ) [, ...] }
156"#
157 }
158 Function::DropTable => {
159 r#"
160Command: DROP TABLE
161Description: remove a table
162Syntax:
163DROP TABLE [ IF EXISTS ] name [, ...]
164"#
165 }
166 };
167 Ok(details)
168 }
169}
170
171impl FromStr for Function {
172 type Err = ();
173
174 fn from_str(s: &str) -> Result<Self, Self::Err> {
175 Ok(match s.trim().to_uppercase().as_str() {
176 "SELECT" => Self::Select,
177 "EXPLAIN" => Self::Explain,
178 "SHOW" => Self::Show,
179 "CREATE TABLE" => Self::CreateTable,
180 "CREATE TABLE AS" => Self::CreateTableAs,
181 "INSERT" => Self::Insert,
182 "DROP TABLE" => Self::DropTable,
183 _ => return Err(()),
184 })
185 }
186}
187
188impl fmt::Display for Function {
189 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190 match *self {
191 Function::Select => write!(f, "SELECT"),
192 Function::Explain => write!(f, "EXPLAIN"),
193 Function::Show => write!(f, "SHOW"),
194 Function::CreateTable => write!(f, "CREATE TABLE"),
195 Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
196 Function::Insert => write!(f, "INSERT"),
197 Function::DropTable => write!(f, "DROP TABLE"),
198 }
199 }
200}
201
202pub fn display_all_functions() -> Result<()> {
203 println!("Available help:");
204 let array = StringArray::from(
205 ALL_FUNCTIONS
206 .iter()
207 .map(|f| format!("{}", f))
208 .collect::<Vec<String>>(),
209 );
210 let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
211 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
212 println!("{}", pretty_format_batches(&[batch]).unwrap());
213 Ok(())
214}
215
216#[derive(Debug)]
218struct ParquetMetadataTable {
219 schema: SchemaRef,
220 batch: RecordBatch,
221}
222
223#[async_trait]
224impl TableProvider for ParquetMetadataTable {
225 fn as_any(&self) -> &dyn std::any::Any {
226 self
227 }
228
229 fn schema(&self) -> arrow::datatypes::SchemaRef {
230 self.schema.clone()
231 }
232
233 fn table_type(&self) -> datafusion::logical_expr::TableType {
234 datafusion::logical_expr::TableType::Base
235 }
236
237 async fn scan(
238 &self,
239 _state: &dyn Session,
240 projection: Option<&Vec<usize>>,
241 _filters: &[Expr],
242 _limit: Option<usize>,
243 ) -> Result<Arc<dyn ExecutionPlan>> {
244 Ok(Arc::new(MemoryExec::try_new(
245 &[vec![self.batch.clone()]],
246 TableProvider::schema(self),
247 projection.cloned(),
248 )?))
249 }
250}
251
252fn convert_parquet_statistics(
253 value: &Statistics,
254 converted_type: ConvertedType,
255) -> (Option<String>, Option<String>) {
256 match (value, converted_type) {
257 (Statistics::Boolean(val), _) => (
258 val.min_opt().map(|v| v.to_string()),
259 val.max_opt().map(|v| v.to_string()),
260 ),
261 (Statistics::Int32(val), _) => (
262 val.min_opt().map(|v| v.to_string()),
263 val.max_opt().map(|v| v.to_string()),
264 ),
265 (Statistics::Int64(val), _) => (
266 val.min_opt().map(|v| v.to_string()),
267 val.max_opt().map(|v| v.to_string()),
268 ),
269 (Statistics::Int96(val), _) => (
270 val.min_opt().map(|v| v.to_string()),
271 val.max_opt().map(|v| v.to_string()),
272 ),
273 (Statistics::Float(val), _) => (
274 val.min_opt().map(|v| v.to_string()),
275 val.max_opt().map(|v| v.to_string()),
276 ),
277 (Statistics::Double(val), _) => (
278 val.min_opt().map(|v| v.to_string()),
279 val.max_opt().map(|v| v.to_string()),
280 ),
281 (Statistics::ByteArray(val), ConvertedType::UTF8) => (
282 byte_array_to_string(val.min_opt()),
283 byte_array_to_string(val.max_opt()),
284 ),
285 (Statistics::ByteArray(val), _) => (
286 val.min_opt().map(|v| v.to_string()),
287 val.max_opt().map(|v| v.to_string()),
288 ),
289 (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
290 fixed_len_byte_array_to_string(val.min_opt()),
291 fixed_len_byte_array_to_string(val.max_opt()),
292 ),
293 (Statistics::FixedLenByteArray(val), _) => (
294 val.min_opt().map(|v| v.to_string()),
295 val.max_opt().map(|v| v.to_string()),
296 ),
297 }
298}
299
300fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
302 val.map(|v| {
303 v.as_utf8()
304 .map(|s| s.to_string())
305 .unwrap_or_else(|_e| v.to_string())
306 })
307}
308
309fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
311 val.map(|v| {
312 v.as_utf8()
313 .map(|s| s.to_string())
314 .unwrap_or_else(|_e| v.to_string())
315 })
316}
317
318#[derive(Debug)]
319pub struct ParquetMetadataFunc {}
320
321impl TableFunctionImpl for ParquetMetadataFunc {
322 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
323 let filename = match exprs.first() {
324 Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, Some(Expr::Column(Column { name, .. })) => name, _ => {
327 return plan_err!(
328 "parquet_metadata requires string argument as its input"
329 );
330 }
331 };
332
333 let file = File::open(filename.clone())?;
334 let reader = SerializedFileReader::new(file)?;
335 let metadata = reader.metadata();
336
337 let schema = Arc::new(Schema::new(vec![
338 Field::new("filename", DataType::Utf8, true),
339 Field::new("row_group_id", DataType::Int64, true),
340 Field::new("row_group_num_rows", DataType::Int64, true),
341 Field::new("row_group_num_columns", DataType::Int64, true),
342 Field::new("row_group_bytes", DataType::Int64, true),
343 Field::new("column_id", DataType::Int64, true),
344 Field::new("file_offset", DataType::Int64, true),
345 Field::new("num_values", DataType::Int64, true),
346 Field::new("path_in_schema", DataType::Utf8, true),
347 Field::new("type", DataType::Utf8, true),
348 Field::new("stats_min", DataType::Utf8, true),
349 Field::new("stats_max", DataType::Utf8, true),
350 Field::new("stats_null_count", DataType::Int64, true),
351 Field::new("stats_distinct_count", DataType::Int64, true),
352 Field::new("stats_min_value", DataType::Utf8, true),
353 Field::new("stats_max_value", DataType::Utf8, true),
354 Field::new("compression", DataType::Utf8, true),
355 Field::new("encodings", DataType::Utf8, true),
356 Field::new("index_page_offset", DataType::Int64, true),
357 Field::new("dictionary_page_offset", DataType::Int64, true),
358 Field::new("data_page_offset", DataType::Int64, true),
359 Field::new("total_compressed_size", DataType::Int64, true),
360 Field::new("total_uncompressed_size", DataType::Int64, true),
361 ]));
362
363 let mut filename_arr = vec![];
365 let mut row_group_id_arr = vec![];
366 let mut row_group_num_rows_arr = vec![];
367 let mut row_group_num_columns_arr = vec![];
368 let mut row_group_bytes_arr = vec![];
369 let mut column_id_arr = vec![];
370 let mut file_offset_arr = vec![];
371 let mut num_values_arr = vec![];
372 let mut path_in_schema_arr = vec![];
373 let mut type_arr = vec![];
374 let mut stats_min_arr = vec![];
375 let mut stats_max_arr = vec![];
376 let mut stats_null_count_arr = vec![];
377 let mut stats_distinct_count_arr = vec![];
378 let mut stats_min_value_arr = vec![];
379 let mut stats_max_value_arr = vec![];
380 let mut compression_arr = vec![];
381 let mut encodings_arr = vec![];
382 let mut index_page_offset_arr = vec![];
383 let mut dictionary_page_offset_arr = vec![];
384 let mut data_page_offset_arr = vec![];
385 let mut total_compressed_size_arr = vec![];
386 let mut total_uncompressed_size_arr = vec![];
387 for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
388 for (col_idx, column) in row_group.columns().iter().enumerate() {
389 filename_arr.push(filename.clone());
390 row_group_id_arr.push(rg_idx as i64);
391 row_group_num_rows_arr.push(row_group.num_rows());
392 row_group_num_columns_arr.push(row_group.num_columns() as i64);
393 row_group_bytes_arr.push(row_group.total_byte_size());
394 column_id_arr.push(col_idx as i64);
395 file_offset_arr.push(column.file_offset());
396 num_values_arr.push(column.num_values());
397 path_in_schema_arr.push(column.column_path().to_string());
398 type_arr.push(column.column_type().to_string());
399 let converted_type = column.column_descr().converted_type();
400
401 if let Some(s) = column.statistics() {
402 let (min_val, max_val) =
403 convert_parquet_statistics(s, converted_type);
404 stats_min_arr.push(min_val.clone());
405 stats_max_arr.push(max_val.clone());
406 stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
407 stats_distinct_count_arr
408 .push(s.distinct_count_opt().map(|c| c as i64));
409 stats_min_value_arr.push(min_val);
410 stats_max_value_arr.push(max_val);
411 } else {
412 stats_min_arr.push(None);
413 stats_max_arr.push(None);
414 stats_null_count_arr.push(None);
415 stats_distinct_count_arr.push(None);
416 stats_min_value_arr.push(None);
417 stats_max_value_arr.push(None);
418 };
419 compression_arr.push(format!("{:?}", column.compression()));
420 encodings_arr.push(format!("{:?}", column.encodings()));
421 index_page_offset_arr.push(column.index_page_offset());
422 dictionary_page_offset_arr.push(column.dictionary_page_offset());
423 data_page_offset_arr.push(column.data_page_offset());
424 total_compressed_size_arr.push(column.compressed_size());
425 total_uncompressed_size_arr.push(column.uncompressed_size());
426 }
427 }
428
429 let rb = RecordBatch::try_new(
430 schema.clone(),
431 vec![
432 Arc::new(StringArray::from(filename_arr)),
433 Arc::new(Int64Array::from(row_group_id_arr)),
434 Arc::new(Int64Array::from(row_group_num_rows_arr)),
435 Arc::new(Int64Array::from(row_group_num_columns_arr)),
436 Arc::new(Int64Array::from(row_group_bytes_arr)),
437 Arc::new(Int64Array::from(column_id_arr)),
438 Arc::new(Int64Array::from(file_offset_arr)),
439 Arc::new(Int64Array::from(num_values_arr)),
440 Arc::new(StringArray::from(path_in_schema_arr)),
441 Arc::new(StringArray::from(type_arr)),
442 Arc::new(StringArray::from(stats_min_arr)),
443 Arc::new(StringArray::from(stats_max_arr)),
444 Arc::new(Int64Array::from(stats_null_count_arr)),
445 Arc::new(Int64Array::from(stats_distinct_count_arr)),
446 Arc::new(StringArray::from(stats_min_value_arr)),
447 Arc::new(StringArray::from(stats_max_value_arr)),
448 Arc::new(StringArray::from(compression_arr)),
449 Arc::new(StringArray::from(encodings_arr)),
450 Arc::new(Int64Array::from(index_page_offset_arr)),
451 Arc::new(Int64Array::from(dictionary_page_offset_arr)),
452 Arc::new(Int64Array::from(data_page_offset_arr)),
453 Arc::new(Int64Array::from(total_compressed_size_arr)),
454 Arc::new(Int64Array::from(total_uncompressed_size_arr)),
455 ],
456 )?;
457
458 let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
459 Ok(Arc::new(parquet_metadata))
460 }
461}