frostbow_cli/
functions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functions that are query-able and searchable via the `\h` command
19use arrow::array::{Int64Array, StringArray};
20use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
21use arrow::record_batch::RecordBatch;
22use arrow::util::pretty::pretty_format_batches;
23use async_trait::async_trait;
24
25use datafusion::catalog::Session;
26use datafusion::common::{plan_err, Column};
27use datafusion::datasource::function::TableFunctionImpl;
28use datafusion::datasource::TableProvider;
29use datafusion::error::Result;
30use datafusion::logical_expr::Expr;
31use datafusion::physical_plan::memory::MemoryExec;
32use datafusion::physical_plan::ExecutionPlan;
33use datafusion::scalar::ScalarValue;
34use parquet::basic::ConvertedType;
35use parquet::data_type::{ByteArray, FixedLenByteArray};
36use parquet::file::reader::FileReader;
37use parquet::file::serialized_reader::SerializedFileReader;
38use parquet::file::statistics::Statistics;
39use std::fmt;
40use std::fs::File;
41use std::str::FromStr;
42use std::sync::Arc;
43
44#[derive(Debug)]
45pub enum Function {
46    Select,
47    Explain,
48    Show,
49    CreateTable,
50    CreateTableAs,
51    Insert,
52    DropTable,
53}
54
55const ALL_FUNCTIONS: [Function; 7] = [
56    Function::CreateTable,
57    Function::CreateTableAs,
58    Function::DropTable,
59    Function::Explain,
60    Function::Insert,
61    Function::Select,
62    Function::Show,
63];
64
65impl Function {
66    pub fn function_details(&self) -> Result<&str> {
67        let details = match self {
68            Function::Select => {
69                r#"
70Command:     SELECT
71Description: retrieve rows from a table or view
72Syntax:
73SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
74    [ * | expression [ [ AS ] output_name ] [, ...] ]
75    [ FROM from_item [, ...] ]
76    [ WHERE condition ]
77    [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
78    [ HAVING condition ]
79    [ WINDOW window_name AS ( window_definition ) [, ...] ]
80    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
81    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
82    [ LIMIT { count | ALL } ]
83    [ OFFSET start [ ROW | ROWS ] ]
84
85where from_item can be one of:
86
87    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
88                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
89    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
90    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
91    [ LATERAL ] function_name ( [ argument [, ...] ] )
92                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
93    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
94    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
95    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
96                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
97    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
98
99and grouping_element can be one of:
100
101    ( )
102    expression
103    ( expression [, ...] )
104
105and with_query is:
106
107    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
108
109TABLE [ ONLY ] table_name [ * ]"#
110            }
111            Function::Explain => {
112                r#"
113Command:     EXPLAIN
114Description: show the execution plan of a statement
115Syntax:
116EXPLAIN [ ANALYZE ] statement
117"#
118            }
119            Function::Show => {
120                r#"
121Command:     SHOW
122Description: show the value of a run-time parameter
123Syntax:
124SHOW name
125"#
126            }
127            Function::CreateTable => {
128                r#"
129Command:     CREATE TABLE
130Description: define a new table
131Syntax:
132CREATE [ EXTERNAL ]  TABLE table_name ( [
133  { column_name data_type }
134    [, ... ]
135] )
136"#
137            }
138            Function::CreateTableAs => {
139                r#"
140Command:     CREATE TABLE AS
141Description: define a new table from the results of a query
142Syntax:
143CREATE TABLE table_name
144    [ (column_name [, ...] ) ]
145    AS query
146    [ WITH [ NO ] DATA ]
147"#
148            }
149            Function::Insert => {
150                r#"
151Command:     INSERT
152Description: create new rows in a table
153Syntax:
154INSERT INTO table_name [ ( column_name [, ...] ) ]
155    { VALUES ( { expression } [, ...] ) [, ...] }
156"#
157            }
158            Function::DropTable => {
159                r#"
160Command:     DROP TABLE
161Description: remove a table
162Syntax:
163DROP TABLE [ IF EXISTS ] name [, ...]
164"#
165            }
166        };
167        Ok(details)
168    }
169}
170
171impl FromStr for Function {
172    type Err = ();
173
174    fn from_str(s: &str) -> Result<Self, Self::Err> {
175        Ok(match s.trim().to_uppercase().as_str() {
176            "SELECT" => Self::Select,
177            "EXPLAIN" => Self::Explain,
178            "SHOW" => Self::Show,
179            "CREATE TABLE" => Self::CreateTable,
180            "CREATE TABLE AS" => Self::CreateTableAs,
181            "INSERT" => Self::Insert,
182            "DROP TABLE" => Self::DropTable,
183            _ => return Err(()),
184        })
185    }
186}
187
188impl fmt::Display for Function {
189    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190        match *self {
191            Function::Select => write!(f, "SELECT"),
192            Function::Explain => write!(f, "EXPLAIN"),
193            Function::Show => write!(f, "SHOW"),
194            Function::CreateTable => write!(f, "CREATE TABLE"),
195            Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
196            Function::Insert => write!(f, "INSERT"),
197            Function::DropTable => write!(f, "DROP TABLE"),
198        }
199    }
200}
201
202pub fn display_all_functions() -> Result<()> {
203    println!("Available help:");
204    let array = StringArray::from(
205        ALL_FUNCTIONS
206            .iter()
207            .map(|f| format!("{}", f))
208            .collect::<Vec<String>>(),
209    );
210    let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
211    let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
212    println!("{}", pretty_format_batches(&[batch]).unwrap());
213    Ok(())
214}
215
216/// PARQUET_META table function
217#[derive(Debug)]
218struct ParquetMetadataTable {
219    schema: SchemaRef,
220    batch: RecordBatch,
221}
222
223#[async_trait]
224impl TableProvider for ParquetMetadataTable {
225    fn as_any(&self) -> &dyn std::any::Any {
226        self
227    }
228
229    fn schema(&self) -> arrow::datatypes::SchemaRef {
230        self.schema.clone()
231    }
232
233    fn table_type(&self) -> datafusion::logical_expr::TableType {
234        datafusion::logical_expr::TableType::Base
235    }
236
237    async fn scan(
238        &self,
239        _state: &dyn Session,
240        projection: Option<&Vec<usize>>,
241        _filters: &[Expr],
242        _limit: Option<usize>,
243    ) -> Result<Arc<dyn ExecutionPlan>> {
244        Ok(Arc::new(MemoryExec::try_new(
245            &[vec![self.batch.clone()]],
246            TableProvider::schema(self),
247            projection.cloned(),
248        )?))
249    }
250}
251
252fn convert_parquet_statistics(
253    value: &Statistics,
254    converted_type: ConvertedType,
255) -> (Option<String>, Option<String>) {
256    match (value, converted_type) {
257        (Statistics::Boolean(val), _) => (
258            val.min_opt().map(|v| v.to_string()),
259            val.max_opt().map(|v| v.to_string()),
260        ),
261        (Statistics::Int32(val), _) => (
262            val.min_opt().map(|v| v.to_string()),
263            val.max_opt().map(|v| v.to_string()),
264        ),
265        (Statistics::Int64(val), _) => (
266            val.min_opt().map(|v| v.to_string()),
267            val.max_opt().map(|v| v.to_string()),
268        ),
269        (Statistics::Int96(val), _) => (
270            val.min_opt().map(|v| v.to_string()),
271            val.max_opt().map(|v| v.to_string()),
272        ),
273        (Statistics::Float(val), _) => (
274            val.min_opt().map(|v| v.to_string()),
275            val.max_opt().map(|v| v.to_string()),
276        ),
277        (Statistics::Double(val), _) => (
278            val.min_opt().map(|v| v.to_string()),
279            val.max_opt().map(|v| v.to_string()),
280        ),
281        (Statistics::ByteArray(val), ConvertedType::UTF8) => (
282            byte_array_to_string(val.min_opt()),
283            byte_array_to_string(val.max_opt()),
284        ),
285        (Statistics::ByteArray(val), _) => (
286            val.min_opt().map(|v| v.to_string()),
287            val.max_opt().map(|v| v.to_string()),
288        ),
289        (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
290            fixed_len_byte_array_to_string(val.min_opt()),
291            fixed_len_byte_array_to_string(val.max_opt()),
292        ),
293        (Statistics::FixedLenByteArray(val), _) => (
294            val.min_opt().map(|v| v.to_string()),
295            val.max_opt().map(|v| v.to_string()),
296        ),
297    }
298}
299
300/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
301fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
302    val.map(|v| {
303        v.as_utf8()
304            .map(|s| s.to_string())
305            .unwrap_or_else(|_e| v.to_string())
306    })
307}
308
309/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
310fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
311    val.map(|v| {
312        v.as_utf8()
313            .map(|s| s.to_string())
314            .unwrap_or_else(|_e| v.to_string())
315    })
316}
317
318#[derive(Debug)]
319pub struct ParquetMetadataFunc {}
320
321impl TableFunctionImpl for ParquetMetadataFunc {
322    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
323        let filename = match exprs.first() {
324            Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
325            Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
326            _ => {
327                return plan_err!(
328                    "parquet_metadata requires string argument as its input"
329                );
330            }
331        };
332
333        let file = File::open(filename.clone())?;
334        let reader = SerializedFileReader::new(file)?;
335        let metadata = reader.metadata();
336
337        let schema = Arc::new(Schema::new(vec![
338            Field::new("filename", DataType::Utf8, true),
339            Field::new("row_group_id", DataType::Int64, true),
340            Field::new("row_group_num_rows", DataType::Int64, true),
341            Field::new("row_group_num_columns", DataType::Int64, true),
342            Field::new("row_group_bytes", DataType::Int64, true),
343            Field::new("column_id", DataType::Int64, true),
344            Field::new("file_offset", DataType::Int64, true),
345            Field::new("num_values", DataType::Int64, true),
346            Field::new("path_in_schema", DataType::Utf8, true),
347            Field::new("type", DataType::Utf8, true),
348            Field::new("stats_min", DataType::Utf8, true),
349            Field::new("stats_max", DataType::Utf8, true),
350            Field::new("stats_null_count", DataType::Int64, true),
351            Field::new("stats_distinct_count", DataType::Int64, true),
352            Field::new("stats_min_value", DataType::Utf8, true),
353            Field::new("stats_max_value", DataType::Utf8, true),
354            Field::new("compression", DataType::Utf8, true),
355            Field::new("encodings", DataType::Utf8, true),
356            Field::new("index_page_offset", DataType::Int64, true),
357            Field::new("dictionary_page_offset", DataType::Int64, true),
358            Field::new("data_page_offset", DataType::Int64, true),
359            Field::new("total_compressed_size", DataType::Int64, true),
360            Field::new("total_uncompressed_size", DataType::Int64, true),
361        ]));
362
363        // construct recordbatch from metadata
364        let mut filename_arr = vec![];
365        let mut row_group_id_arr = vec![];
366        let mut row_group_num_rows_arr = vec![];
367        let mut row_group_num_columns_arr = vec![];
368        let mut row_group_bytes_arr = vec![];
369        let mut column_id_arr = vec![];
370        let mut file_offset_arr = vec![];
371        let mut num_values_arr = vec![];
372        let mut path_in_schema_arr = vec![];
373        let mut type_arr = vec![];
374        let mut stats_min_arr = vec![];
375        let mut stats_max_arr = vec![];
376        let mut stats_null_count_arr = vec![];
377        let mut stats_distinct_count_arr = vec![];
378        let mut stats_min_value_arr = vec![];
379        let mut stats_max_value_arr = vec![];
380        let mut compression_arr = vec![];
381        let mut encodings_arr = vec![];
382        let mut index_page_offset_arr = vec![];
383        let mut dictionary_page_offset_arr = vec![];
384        let mut data_page_offset_arr = vec![];
385        let mut total_compressed_size_arr = vec![];
386        let mut total_uncompressed_size_arr = vec![];
387        for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
388            for (col_idx, column) in row_group.columns().iter().enumerate() {
389                filename_arr.push(filename.clone());
390                row_group_id_arr.push(rg_idx as i64);
391                row_group_num_rows_arr.push(row_group.num_rows());
392                row_group_num_columns_arr.push(row_group.num_columns() as i64);
393                row_group_bytes_arr.push(row_group.total_byte_size());
394                column_id_arr.push(col_idx as i64);
395                file_offset_arr.push(column.file_offset());
396                num_values_arr.push(column.num_values());
397                path_in_schema_arr.push(column.column_path().to_string());
398                type_arr.push(column.column_type().to_string());
399                let converted_type = column.column_descr().converted_type();
400
401                if let Some(s) = column.statistics() {
402                    let (min_val, max_val) =
403                        convert_parquet_statistics(s, converted_type);
404                    stats_min_arr.push(min_val.clone());
405                    stats_max_arr.push(max_val.clone());
406                    stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
407                    stats_distinct_count_arr
408                        .push(s.distinct_count_opt().map(|c| c as i64));
409                    stats_min_value_arr.push(min_val);
410                    stats_max_value_arr.push(max_val);
411                } else {
412                    stats_min_arr.push(None);
413                    stats_max_arr.push(None);
414                    stats_null_count_arr.push(None);
415                    stats_distinct_count_arr.push(None);
416                    stats_min_value_arr.push(None);
417                    stats_max_value_arr.push(None);
418                };
419                compression_arr.push(format!("{:?}", column.compression()));
420                encodings_arr.push(format!("{:?}", column.encodings()));
421                index_page_offset_arr.push(column.index_page_offset());
422                dictionary_page_offset_arr.push(column.dictionary_page_offset());
423                data_page_offset_arr.push(column.data_page_offset());
424                total_compressed_size_arr.push(column.compressed_size());
425                total_uncompressed_size_arr.push(column.uncompressed_size());
426            }
427        }
428
429        let rb = RecordBatch::try_new(
430            schema.clone(),
431            vec![
432                Arc::new(StringArray::from(filename_arr)),
433                Arc::new(Int64Array::from(row_group_id_arr)),
434                Arc::new(Int64Array::from(row_group_num_rows_arr)),
435                Arc::new(Int64Array::from(row_group_num_columns_arr)),
436                Arc::new(Int64Array::from(row_group_bytes_arr)),
437                Arc::new(Int64Array::from(column_id_arr)),
438                Arc::new(Int64Array::from(file_offset_arr)),
439                Arc::new(Int64Array::from(num_values_arr)),
440                Arc::new(StringArray::from(path_in_schema_arr)),
441                Arc::new(StringArray::from(type_arr)),
442                Arc::new(StringArray::from(stats_min_arr)),
443                Arc::new(StringArray::from(stats_max_arr)),
444                Arc::new(Int64Array::from(stats_null_count_arr)),
445                Arc::new(Int64Array::from(stats_distinct_count_arr)),
446                Arc::new(StringArray::from(stats_min_value_arr)),
447                Arc::new(StringArray::from(stats_max_value_arr)),
448                Arc::new(StringArray::from(compression_arr)),
449                Arc::new(StringArray::from(encodings_arr)),
450                Arc::new(Int64Array::from(index_page_offset_arr)),
451                Arc::new(Int64Array::from(dictionary_page_offset_arr)),
452                Arc::new(Int64Array::from(data_page_offset_arr)),
453                Arc::new(Int64Array::from(total_compressed_size_arr)),
454                Arc::new(Int64Array::from(total_uncompressed_size_arr)),
455            ],
456        )?;
457
458        let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
459        Ok(Arc::new(parquet_metadata))
460    }
461}