datafusion_cli/
functions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functions that are query-able and searchable via the `\h` command
19
20use std::fmt;
21use std::fs::File;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow::array::{Int64Array, StringArray};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches;
29use datafusion::catalog::{Session, TableFunctionImpl};
30use datafusion::common::{plan_err, Column};
31use datafusion::datasource::memory::MemorySourceConfig;
32use datafusion::datasource::TableProvider;
33use datafusion::error::Result;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_plan::ExecutionPlan;
36use datafusion::scalar::ScalarValue;
37
38use async_trait::async_trait;
39use parquet::basic::ConvertedType;
40use parquet::data_type::{ByteArray, FixedLenByteArray};
41use parquet::file::reader::FileReader;
42use parquet::file::serialized_reader::SerializedFileReader;
43use parquet::file::statistics::Statistics;
44
45#[derive(Debug)]
46pub enum Function {
47    Select,
48    Explain,
49    Show,
50    CreateTable,
51    CreateTableAs,
52    Insert,
53    DropTable,
54}
55
56const ALL_FUNCTIONS: [Function; 7] = [
57    Function::CreateTable,
58    Function::CreateTableAs,
59    Function::DropTable,
60    Function::Explain,
61    Function::Insert,
62    Function::Select,
63    Function::Show,
64];
65
66impl Function {
67    pub fn function_details(&self) -> Result<&str> {
68        let details = match self {
69            Function::Select => {
70                r#"
71Command:     SELECT
72Description: retrieve rows from a table or view
73Syntax:
74SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
75    [ * | expression [ [ AS ] output_name ] [, ...] ]
76    [ FROM from_item [, ...] ]
77    [ WHERE condition ]
78    [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
79    [ HAVING condition ]
80    [ WINDOW window_name AS ( window_definition ) [, ...] ]
81    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
82    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
83    [ LIMIT { count | ALL } ]
84    [ OFFSET start [ ROW | ROWS ] ]
85
86where from_item can be one of:
87
88    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
89                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
90    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
91    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
92    [ LATERAL ] function_name ( [ argument [, ...] ] )
93                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
94    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
95    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
96    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
97                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
98    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
99
100and grouping_element can be one of:
101
102    ( )
103    expression
104    ( expression [, ...] )
105
106and with_query is:
107
108    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
109
110TABLE [ ONLY ] table_name [ * ]"#
111            }
112            Function::Explain => {
113                r#"
114Command:     EXPLAIN
115Description: show the execution plan of a statement
116Syntax:
117EXPLAIN [ ANALYZE ] statement
118"#
119            }
120            Function::Show => {
121                r#"
122Command:     SHOW
123Description: show the value of a run-time parameter
124Syntax:
125SHOW name
126"#
127            }
128            Function::CreateTable => {
129                r#"
130Command:     CREATE TABLE
131Description: define a new table
132Syntax:
133CREATE [ EXTERNAL ]  TABLE table_name ( [
134  { column_name data_type }
135    [, ... ]
136] )
137"#
138            }
139            Function::CreateTableAs => {
140                r#"
141Command:     CREATE TABLE AS
142Description: define a new table from the results of a query
143Syntax:
144CREATE TABLE table_name
145    [ (column_name [, ...] ) ]
146    AS query
147    [ WITH [ NO ] DATA ]
148"#
149            }
150            Function::Insert => {
151                r#"
152Command:     INSERT
153Description: create new rows in a table
154Syntax:
155INSERT INTO table_name [ ( column_name [, ...] ) ]
156    { VALUES ( { expression } [, ...] ) [, ...] }
157"#
158            }
159            Function::DropTable => {
160                r#"
161Command:     DROP TABLE
162Description: remove a table
163Syntax:
164DROP TABLE [ IF EXISTS ] name [, ...]
165"#
166            }
167        };
168        Ok(details)
169    }
170}
171
172impl FromStr for Function {
173    type Err = ();
174
175    fn from_str(s: &str) -> Result<Self, Self::Err> {
176        Ok(match s.trim().to_uppercase().as_str() {
177            "SELECT" => Self::Select,
178            "EXPLAIN" => Self::Explain,
179            "SHOW" => Self::Show,
180            "CREATE TABLE" => Self::CreateTable,
181            "CREATE TABLE AS" => Self::CreateTableAs,
182            "INSERT" => Self::Insert,
183            "DROP TABLE" => Self::DropTable,
184            _ => return Err(()),
185        })
186    }
187}
188
189impl fmt::Display for Function {
190    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191        match *self {
192            Function::Select => write!(f, "SELECT"),
193            Function::Explain => write!(f, "EXPLAIN"),
194            Function::Show => write!(f, "SHOW"),
195            Function::CreateTable => write!(f, "CREATE TABLE"),
196            Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
197            Function::Insert => write!(f, "INSERT"),
198            Function::DropTable => write!(f, "DROP TABLE"),
199        }
200    }
201}
202
203pub fn display_all_functions() -> Result<()> {
204    println!("Available help:");
205    let array = StringArray::from(
206        ALL_FUNCTIONS
207            .iter()
208            .map(|f| format!("{f}"))
209            .collect::<Vec<String>>(),
210    );
211    let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
212    let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
213    println!("{}", pretty_format_batches(&[batch]).unwrap());
214    Ok(())
215}
216
217/// PARQUET_META table function
218#[derive(Debug)]
219struct ParquetMetadataTable {
220    schema: SchemaRef,
221    batch: RecordBatch,
222}
223
224#[async_trait]
225impl TableProvider for ParquetMetadataTable {
226    fn as_any(&self) -> &dyn std::any::Any {
227        self
228    }
229
230    fn schema(&self) -> arrow::datatypes::SchemaRef {
231        self.schema.clone()
232    }
233
234    fn table_type(&self) -> datafusion::logical_expr::TableType {
235        datafusion::logical_expr::TableType::Base
236    }
237
238    async fn scan(
239        &self,
240        _state: &dyn Session,
241        projection: Option<&Vec<usize>>,
242        _filters: &[Expr],
243        _limit: Option<usize>,
244    ) -> Result<Arc<dyn ExecutionPlan>> {
245        Ok(MemorySourceConfig::try_new_exec(
246            &[vec![self.batch.clone()]],
247            TableProvider::schema(self),
248            projection.cloned(),
249        )?)
250    }
251}
252
253fn convert_parquet_statistics(
254    value: &Statistics,
255    converted_type: ConvertedType,
256) -> (Option<String>, Option<String>) {
257    match (value, converted_type) {
258        (Statistics::Boolean(val), _) => (
259            val.min_opt().map(|v| v.to_string()),
260            val.max_opt().map(|v| v.to_string()),
261        ),
262        (Statistics::Int32(val), _) => (
263            val.min_opt().map(|v| v.to_string()),
264            val.max_opt().map(|v| v.to_string()),
265        ),
266        (Statistics::Int64(val), _) => (
267            val.min_opt().map(|v| v.to_string()),
268            val.max_opt().map(|v| v.to_string()),
269        ),
270        (Statistics::Int96(val), _) => (
271            val.min_opt().map(|v| v.to_string()),
272            val.max_opt().map(|v| v.to_string()),
273        ),
274        (Statistics::Float(val), _) => (
275            val.min_opt().map(|v| v.to_string()),
276            val.max_opt().map(|v| v.to_string()),
277        ),
278        (Statistics::Double(val), _) => (
279            val.min_opt().map(|v| v.to_string()),
280            val.max_opt().map(|v| v.to_string()),
281        ),
282        (Statistics::ByteArray(val), ConvertedType::UTF8) => (
283            byte_array_to_string(val.min_opt()),
284            byte_array_to_string(val.max_opt()),
285        ),
286        (Statistics::ByteArray(val), _) => (
287            val.min_opt().map(|v| v.to_string()),
288            val.max_opt().map(|v| v.to_string()),
289        ),
290        (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
291            fixed_len_byte_array_to_string(val.min_opt()),
292            fixed_len_byte_array_to_string(val.max_opt()),
293        ),
294        (Statistics::FixedLenByteArray(val), _) => (
295            val.min_opt().map(|v| v.to_string()),
296            val.max_opt().map(|v| v.to_string()),
297        ),
298    }
299}
300
301/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
302fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
303    val.map(|v| {
304        v.as_utf8()
305            .map(|s| s.to_string())
306            .unwrap_or_else(|_e| v.to_string())
307    })
308}
309
310/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
311fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
312    val.map(|v| {
313        v.as_utf8()
314            .map(|s| s.to_string())
315            .unwrap_or_else(|_e| v.to_string())
316    })
317}
318
319#[derive(Debug)]
320pub struct ParquetMetadataFunc {}
321
322impl TableFunctionImpl for ParquetMetadataFunc {
323    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
324        let filename = match exprs.first() {
325            Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
326            Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
327            _ => {
328                return plan_err!(
329                    "parquet_metadata requires string argument as its input"
330                );
331            }
332        };
333
334        let file = File::open(filename.clone())?;
335        let reader = SerializedFileReader::new(file)?;
336        let metadata = reader.metadata();
337
338        let schema = Arc::new(Schema::new(vec![
339            Field::new("filename", DataType::Utf8, true),
340            Field::new("row_group_id", DataType::Int64, true),
341            Field::new("row_group_num_rows", DataType::Int64, true),
342            Field::new("row_group_num_columns", DataType::Int64, true),
343            Field::new("row_group_bytes", DataType::Int64, true),
344            Field::new("column_id", DataType::Int64, true),
345            Field::new("file_offset", DataType::Int64, true),
346            Field::new("num_values", DataType::Int64, true),
347            Field::new("path_in_schema", DataType::Utf8, true),
348            Field::new("type", DataType::Utf8, true),
349            Field::new("stats_min", DataType::Utf8, true),
350            Field::new("stats_max", DataType::Utf8, true),
351            Field::new("stats_null_count", DataType::Int64, true),
352            Field::new("stats_distinct_count", DataType::Int64, true),
353            Field::new("stats_min_value", DataType::Utf8, true),
354            Field::new("stats_max_value", DataType::Utf8, true),
355            Field::new("compression", DataType::Utf8, true),
356            Field::new("encodings", DataType::Utf8, true),
357            Field::new("index_page_offset", DataType::Int64, true),
358            Field::new("dictionary_page_offset", DataType::Int64, true),
359            Field::new("data_page_offset", DataType::Int64, true),
360            Field::new("total_compressed_size", DataType::Int64, true),
361            Field::new("total_uncompressed_size", DataType::Int64, true),
362        ]));
363
364        // construct record batch from metadata
365        let mut filename_arr = vec![];
366        let mut row_group_id_arr = vec![];
367        let mut row_group_num_rows_arr = vec![];
368        let mut row_group_num_columns_arr = vec![];
369        let mut row_group_bytes_arr = vec![];
370        let mut column_id_arr = vec![];
371        let mut file_offset_arr = vec![];
372        let mut num_values_arr = vec![];
373        let mut path_in_schema_arr = vec![];
374        let mut type_arr = vec![];
375        let mut stats_min_arr = vec![];
376        let mut stats_max_arr = vec![];
377        let mut stats_null_count_arr = vec![];
378        let mut stats_distinct_count_arr = vec![];
379        let mut stats_min_value_arr = vec![];
380        let mut stats_max_value_arr = vec![];
381        let mut compression_arr = vec![];
382        let mut encodings_arr = vec![];
383        let mut index_page_offset_arr = vec![];
384        let mut dictionary_page_offset_arr = vec![];
385        let mut data_page_offset_arr = vec![];
386        let mut total_compressed_size_arr = vec![];
387        let mut total_uncompressed_size_arr = vec![];
388        for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
389            for (col_idx, column) in row_group.columns().iter().enumerate() {
390                filename_arr.push(filename.clone());
391                row_group_id_arr.push(rg_idx as i64);
392                row_group_num_rows_arr.push(row_group.num_rows());
393                row_group_num_columns_arr.push(row_group.num_columns() as i64);
394                row_group_bytes_arr.push(row_group.total_byte_size());
395                column_id_arr.push(col_idx as i64);
396                file_offset_arr.push(column.file_offset());
397                num_values_arr.push(column.num_values());
398                path_in_schema_arr.push(column.column_path().to_string());
399                type_arr.push(column.column_type().to_string());
400                let converted_type = column.column_descr().converted_type();
401
402                if let Some(s) = column.statistics() {
403                    let (min_val, max_val) =
404                        convert_parquet_statistics(s, converted_type);
405                    stats_min_arr.push(min_val.clone());
406                    stats_max_arr.push(max_val.clone());
407                    stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
408                    stats_distinct_count_arr
409                        .push(s.distinct_count_opt().map(|c| c as i64));
410                    stats_min_value_arr.push(min_val);
411                    stats_max_value_arr.push(max_val);
412                } else {
413                    stats_min_arr.push(None);
414                    stats_max_arr.push(None);
415                    stats_null_count_arr.push(None);
416                    stats_distinct_count_arr.push(None);
417                    stats_min_value_arr.push(None);
418                    stats_max_value_arr.push(None);
419                };
420                compression_arr.push(format!("{:?}", column.compression()));
421                encodings_arr.push(format!("{:?}", column.encodings()));
422                index_page_offset_arr.push(column.index_page_offset());
423                dictionary_page_offset_arr.push(column.dictionary_page_offset());
424                data_page_offset_arr.push(column.data_page_offset());
425                total_compressed_size_arr.push(column.compressed_size());
426                total_uncompressed_size_arr.push(column.uncompressed_size());
427            }
428        }
429
430        let rb = RecordBatch::try_new(
431            schema.clone(),
432            vec![
433                Arc::new(StringArray::from(filename_arr)),
434                Arc::new(Int64Array::from(row_group_id_arr)),
435                Arc::new(Int64Array::from(row_group_num_rows_arr)),
436                Arc::new(Int64Array::from(row_group_num_columns_arr)),
437                Arc::new(Int64Array::from(row_group_bytes_arr)),
438                Arc::new(Int64Array::from(column_id_arr)),
439                Arc::new(Int64Array::from(file_offset_arr)),
440                Arc::new(Int64Array::from(num_values_arr)),
441                Arc::new(StringArray::from(path_in_schema_arr)),
442                Arc::new(StringArray::from(type_arr)),
443                Arc::new(StringArray::from(stats_min_arr)),
444                Arc::new(StringArray::from(stats_max_arr)),
445                Arc::new(Int64Array::from(stats_null_count_arr)),
446                Arc::new(Int64Array::from(stats_distinct_count_arr)),
447                Arc::new(StringArray::from(stats_min_value_arr)),
448                Arc::new(StringArray::from(stats_max_value_arr)),
449                Arc::new(StringArray::from(compression_arr)),
450                Arc::new(StringArray::from(encodings_arr)),
451                Arc::new(Int64Array::from(index_page_offset_arr)),
452                Arc::new(Int64Array::from(dictionary_page_offset_arr)),
453                Arc::new(Int64Array::from(data_page_offset_arr)),
454                Arc::new(Int64Array::from(total_compressed_size_arr)),
455                Arc::new(Int64Array::from(total_uncompressed_size_arr)),
456            ],
457        )?;
458
459        let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
460        Ok(Arc::new(parquet_metadata))
461    }
462}