datafusion_cli/
functions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functions that are query-able and searchable via the `\h` command
19
20use std::fmt;
21use std::fs::File;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches;
29use datafusion::catalog::{Session, TableFunctionImpl};
30use datafusion::common::{plan_err, Column};
31use datafusion::datasource::memory::MemorySourceConfig;
32use datafusion::datasource::TableProvider;
33use datafusion::error::Result;
34use datafusion::execution::cache::cache_manager::CacheManager;
35use datafusion::logical_expr::Expr;
36use datafusion::physical_plan::ExecutionPlan;
37use datafusion::scalar::ScalarValue;
38
39use async_trait::async_trait;
40use parquet::basic::ConvertedType;
41use parquet::data_type::{ByteArray, FixedLenByteArray};
42use parquet::file::reader::FileReader;
43use parquet::file::serialized_reader::SerializedFileReader;
44use parquet::file::statistics::Statistics;
45
46#[derive(Debug)]
47pub enum Function {
48    Select,
49    Explain,
50    Show,
51    CreateTable,
52    CreateTableAs,
53    Insert,
54    DropTable,
55}
56
57const ALL_FUNCTIONS: [Function; 7] = [
58    Function::CreateTable,
59    Function::CreateTableAs,
60    Function::DropTable,
61    Function::Explain,
62    Function::Insert,
63    Function::Select,
64    Function::Show,
65];
66
67impl Function {
68    pub fn function_details(&self) -> Result<&str> {
69        let details = match self {
70            Function::Select => {
71                r#"
72Command:     SELECT
73Description: retrieve rows from a table or view
74Syntax:
75SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
76    [ * | expression [ [ AS ] output_name ] [, ...] ]
77    [ FROM from_item [, ...] ]
78    [ WHERE condition ]
79    [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
80    [ HAVING condition ]
81    [ WINDOW window_name AS ( window_definition ) [, ...] ]
82    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
83    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
84    [ LIMIT { count | ALL } ]
85    [ OFFSET start [ ROW | ROWS ] ]
86
87where from_item can be one of:
88
89    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
90                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
91    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
92    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
93    [ LATERAL ] function_name ( [ argument [, ...] ] )
94                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
95    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
96    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
97    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
98                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
99    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
100
101and grouping_element can be one of:
102
103    ( )
104    expression
105    ( expression [, ...] )
106
107and with_query is:
108
109    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
110
111TABLE [ ONLY ] table_name [ * ]"#
112            }
113            Function::Explain => {
114                r#"
115Command:     EXPLAIN
116Description: show the execution plan of a statement
117Syntax:
118EXPLAIN [ ANALYZE ] statement
119"#
120            }
121            Function::Show => {
122                r#"
123Command:     SHOW
124Description: show the value of a run-time parameter
125Syntax:
126SHOW name
127"#
128            }
129            Function::CreateTable => {
130                r#"
131Command:     CREATE TABLE
132Description: define a new table
133Syntax:
134CREATE [ EXTERNAL ]  TABLE table_name ( [
135  { column_name data_type }
136    [, ... ]
137] )
138"#
139            }
140            Function::CreateTableAs => {
141                r#"
142Command:     CREATE TABLE AS
143Description: define a new table from the results of a query
144Syntax:
145CREATE TABLE table_name
146    [ (column_name [, ...] ) ]
147    AS query
148    [ WITH [ NO ] DATA ]
149"#
150            }
151            Function::Insert => {
152                r#"
153Command:     INSERT
154Description: create new rows in a table
155Syntax:
156INSERT INTO table_name [ ( column_name [, ...] ) ]
157    { VALUES ( { expression } [, ...] ) [, ...] }
158"#
159            }
160            Function::DropTable => {
161                r#"
162Command:     DROP TABLE
163Description: remove a table
164Syntax:
165DROP TABLE [ IF EXISTS ] name [, ...]
166"#
167            }
168        };
169        Ok(details)
170    }
171}
172
173impl FromStr for Function {
174    type Err = ();
175
176    fn from_str(s: &str) -> Result<Self, Self::Err> {
177        Ok(match s.trim().to_uppercase().as_str() {
178            "SELECT" => Self::Select,
179            "EXPLAIN" => Self::Explain,
180            "SHOW" => Self::Show,
181            "CREATE TABLE" => Self::CreateTable,
182            "CREATE TABLE AS" => Self::CreateTableAs,
183            "INSERT" => Self::Insert,
184            "DROP TABLE" => Self::DropTable,
185            _ => return Err(()),
186        })
187    }
188}
189
190impl fmt::Display for Function {
191    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192        match *self {
193            Function::Select => write!(f, "SELECT"),
194            Function::Explain => write!(f, "EXPLAIN"),
195            Function::Show => write!(f, "SHOW"),
196            Function::CreateTable => write!(f, "CREATE TABLE"),
197            Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
198            Function::Insert => write!(f, "INSERT"),
199            Function::DropTable => write!(f, "DROP TABLE"),
200        }
201    }
202}
203
204pub fn display_all_functions() -> Result<()> {
205    println!("Available help:");
206    let array = StringArray::from(
207        ALL_FUNCTIONS
208            .iter()
209            .map(|f| format!("{f}"))
210            .collect::<Vec<String>>(),
211    );
212    let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
213    let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
214    println!("{}", pretty_format_batches(&[batch]).unwrap());
215    Ok(())
216}
217
218/// PARQUET_META table function
219#[derive(Debug)]
220struct ParquetMetadataTable {
221    schema: SchemaRef,
222    batch: RecordBatch,
223}
224
225#[async_trait]
226impl TableProvider for ParquetMetadataTable {
227    fn as_any(&self) -> &dyn std::any::Any {
228        self
229    }
230
231    fn schema(&self) -> arrow::datatypes::SchemaRef {
232        self.schema.clone()
233    }
234
235    fn table_type(&self) -> datafusion::logical_expr::TableType {
236        datafusion::logical_expr::TableType::Base
237    }
238
239    async fn scan(
240        &self,
241        _state: &dyn Session,
242        projection: Option<&Vec<usize>>,
243        _filters: &[Expr],
244        _limit: Option<usize>,
245    ) -> Result<Arc<dyn ExecutionPlan>> {
246        Ok(MemorySourceConfig::try_new_exec(
247            &[vec![self.batch.clone()]],
248            TableProvider::schema(self),
249            projection.cloned(),
250        )?)
251    }
252}
253
254fn convert_parquet_statistics(
255    value: &Statistics,
256    converted_type: ConvertedType,
257) -> (Option<String>, Option<String>) {
258    match (value, converted_type) {
259        (Statistics::Boolean(val), _) => (
260            val.min_opt().map(|v| v.to_string()),
261            val.max_opt().map(|v| v.to_string()),
262        ),
263        (Statistics::Int32(val), _) => (
264            val.min_opt().map(|v| v.to_string()),
265            val.max_opt().map(|v| v.to_string()),
266        ),
267        (Statistics::Int64(val), _) => (
268            val.min_opt().map(|v| v.to_string()),
269            val.max_opt().map(|v| v.to_string()),
270        ),
271        (Statistics::Int96(val), _) => (
272            val.min_opt().map(|v| v.to_string()),
273            val.max_opt().map(|v| v.to_string()),
274        ),
275        (Statistics::Float(val), _) => (
276            val.min_opt().map(|v| v.to_string()),
277            val.max_opt().map(|v| v.to_string()),
278        ),
279        (Statistics::Double(val), _) => (
280            val.min_opt().map(|v| v.to_string()),
281            val.max_opt().map(|v| v.to_string()),
282        ),
283        (Statistics::ByteArray(val), ConvertedType::UTF8) => (
284            byte_array_to_string(val.min_opt()),
285            byte_array_to_string(val.max_opt()),
286        ),
287        (Statistics::ByteArray(val), _) => (
288            val.min_opt().map(|v| v.to_string()),
289            val.max_opt().map(|v| v.to_string()),
290        ),
291        (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
292            fixed_len_byte_array_to_string(val.min_opt()),
293            fixed_len_byte_array_to_string(val.max_opt()),
294        ),
295        (Statistics::FixedLenByteArray(val), _) => (
296            val.min_opt().map(|v| v.to_string()),
297            val.max_opt().map(|v| v.to_string()),
298        ),
299    }
300}
301
302/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
303fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
304    val.map(|v| {
305        v.as_utf8()
306            .map(|s| s.to_string())
307            .unwrap_or_else(|_e| v.to_string())
308    })
309}
310
311/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
312fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
313    val.map(|v| {
314        v.as_utf8()
315            .map(|s| s.to_string())
316            .unwrap_or_else(|_e| v.to_string())
317    })
318}
319
320#[derive(Debug)]
321pub struct ParquetMetadataFunc {}
322
323impl TableFunctionImpl for ParquetMetadataFunc {
324    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
325        let filename = match exprs.first() {
326            Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
327            Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
328            _ => {
329                return plan_err!(
330                    "parquet_metadata requires string argument as its input"
331                );
332            }
333        };
334
335        let file = File::open(filename.clone())?;
336        let reader = SerializedFileReader::new(file)?;
337        let metadata = reader.metadata();
338
339        let schema = Arc::new(Schema::new(vec![
340            Field::new("filename", DataType::Utf8, true),
341            Field::new("row_group_id", DataType::Int64, true),
342            Field::new("row_group_num_rows", DataType::Int64, true),
343            Field::new("row_group_num_columns", DataType::Int64, true),
344            Field::new("row_group_bytes", DataType::Int64, true),
345            Field::new("column_id", DataType::Int64, true),
346            Field::new("file_offset", DataType::Int64, true),
347            Field::new("num_values", DataType::Int64, true),
348            Field::new("path_in_schema", DataType::Utf8, true),
349            Field::new("type", DataType::Utf8, true),
350            Field::new("stats_min", DataType::Utf8, true),
351            Field::new("stats_max", DataType::Utf8, true),
352            Field::new("stats_null_count", DataType::Int64, true),
353            Field::new("stats_distinct_count", DataType::Int64, true),
354            Field::new("stats_min_value", DataType::Utf8, true),
355            Field::new("stats_max_value", DataType::Utf8, true),
356            Field::new("compression", DataType::Utf8, true),
357            Field::new("encodings", DataType::Utf8, true),
358            Field::new("index_page_offset", DataType::Int64, true),
359            Field::new("dictionary_page_offset", DataType::Int64, true),
360            Field::new("data_page_offset", DataType::Int64, true),
361            Field::new("total_compressed_size", DataType::Int64, true),
362            Field::new("total_uncompressed_size", DataType::Int64, true),
363        ]));
364
365        // construct record batch from metadata
366        let mut filename_arr = vec![];
367        let mut row_group_id_arr = vec![];
368        let mut row_group_num_rows_arr = vec![];
369        let mut row_group_num_columns_arr = vec![];
370        let mut row_group_bytes_arr = vec![];
371        let mut column_id_arr = vec![];
372        let mut file_offset_arr = vec![];
373        let mut num_values_arr = vec![];
374        let mut path_in_schema_arr = vec![];
375        let mut type_arr = vec![];
376        let mut stats_min_arr = vec![];
377        let mut stats_max_arr = vec![];
378        let mut stats_null_count_arr = vec![];
379        let mut stats_distinct_count_arr = vec![];
380        let mut stats_min_value_arr = vec![];
381        let mut stats_max_value_arr = vec![];
382        let mut compression_arr = vec![];
383        let mut encodings_arr = vec![];
384        let mut index_page_offset_arr = vec![];
385        let mut dictionary_page_offset_arr = vec![];
386        let mut data_page_offset_arr = vec![];
387        let mut total_compressed_size_arr = vec![];
388        let mut total_uncompressed_size_arr = vec![];
389        for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
390            for (col_idx, column) in row_group.columns().iter().enumerate() {
391                filename_arr.push(filename.clone());
392                row_group_id_arr.push(rg_idx as i64);
393                row_group_num_rows_arr.push(row_group.num_rows());
394                row_group_num_columns_arr.push(row_group.num_columns() as i64);
395                row_group_bytes_arr.push(row_group.total_byte_size());
396                column_id_arr.push(col_idx as i64);
397                file_offset_arr.push(column.file_offset());
398                num_values_arr.push(column.num_values());
399                path_in_schema_arr.push(column.column_path().to_string());
400                type_arr.push(column.column_type().to_string());
401                let converted_type = column.column_descr().converted_type();
402
403                if let Some(s) = column.statistics() {
404                    let (min_val, max_val) =
405                        convert_parquet_statistics(s, converted_type);
406                    stats_min_arr.push(min_val.clone());
407                    stats_max_arr.push(max_val.clone());
408                    stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
409                    stats_distinct_count_arr
410                        .push(s.distinct_count_opt().map(|c| c as i64));
411                    stats_min_value_arr.push(min_val);
412                    stats_max_value_arr.push(max_val);
413                } else {
414                    stats_min_arr.push(None);
415                    stats_max_arr.push(None);
416                    stats_null_count_arr.push(None);
417                    stats_distinct_count_arr.push(None);
418                    stats_min_value_arr.push(None);
419                    stats_max_value_arr.push(None);
420                };
421                compression_arr.push(format!("{:?}", column.compression()));
422                // need to collect into Vec to format
423                let encodings: Vec<_> = column.encodings().collect();
424                encodings_arr.push(format!("{:?}", encodings));
425                index_page_offset_arr.push(column.index_page_offset());
426                dictionary_page_offset_arr.push(column.dictionary_page_offset());
427                data_page_offset_arr.push(column.data_page_offset());
428                total_compressed_size_arr.push(column.compressed_size());
429                total_uncompressed_size_arr.push(column.uncompressed_size());
430            }
431        }
432
433        let rb = RecordBatch::try_new(
434            schema.clone(),
435            vec![
436                Arc::new(StringArray::from(filename_arr)),
437                Arc::new(Int64Array::from(row_group_id_arr)),
438                Arc::new(Int64Array::from(row_group_num_rows_arr)),
439                Arc::new(Int64Array::from(row_group_num_columns_arr)),
440                Arc::new(Int64Array::from(row_group_bytes_arr)),
441                Arc::new(Int64Array::from(column_id_arr)),
442                Arc::new(Int64Array::from(file_offset_arr)),
443                Arc::new(Int64Array::from(num_values_arr)),
444                Arc::new(StringArray::from(path_in_schema_arr)),
445                Arc::new(StringArray::from(type_arr)),
446                Arc::new(StringArray::from(stats_min_arr)),
447                Arc::new(StringArray::from(stats_max_arr)),
448                Arc::new(Int64Array::from(stats_null_count_arr)),
449                Arc::new(Int64Array::from(stats_distinct_count_arr)),
450                Arc::new(StringArray::from(stats_min_value_arr)),
451                Arc::new(StringArray::from(stats_max_value_arr)),
452                Arc::new(StringArray::from(compression_arr)),
453                Arc::new(StringArray::from(encodings_arr)),
454                Arc::new(Int64Array::from(index_page_offset_arr)),
455                Arc::new(Int64Array::from(dictionary_page_offset_arr)),
456                Arc::new(Int64Array::from(data_page_offset_arr)),
457                Arc::new(Int64Array::from(total_compressed_size_arr)),
458                Arc::new(Int64Array::from(total_uncompressed_size_arr)),
459            ],
460        )?;
461
462        let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
463        Ok(Arc::new(parquet_metadata))
464    }
465}
466
467/// METADATA_CACHE table function
468#[derive(Debug)]
469struct MetadataCacheTable {
470    schema: SchemaRef,
471    batch: RecordBatch,
472}
473
474#[async_trait]
475impl TableProvider for MetadataCacheTable {
476    fn as_any(&self) -> &dyn std::any::Any {
477        self
478    }
479
480    fn schema(&self) -> arrow::datatypes::SchemaRef {
481        self.schema.clone()
482    }
483
484    fn table_type(&self) -> datafusion::logical_expr::TableType {
485        datafusion::logical_expr::TableType::Base
486    }
487
488    async fn scan(
489        &self,
490        _state: &dyn Session,
491        projection: Option<&Vec<usize>>,
492        _filters: &[Expr],
493        _limit: Option<usize>,
494    ) -> Result<Arc<dyn ExecutionPlan>> {
495        Ok(MemorySourceConfig::try_new_exec(
496            &[vec![self.batch.clone()]],
497            TableProvider::schema(self),
498            projection.cloned(),
499        )?)
500    }
501}
502
503#[derive(Debug)]
504pub struct MetadataCacheFunc {
505    cache_manager: Arc<CacheManager>,
506}
507
508impl MetadataCacheFunc {
509    pub fn new(cache_manager: Arc<CacheManager>) -> Self {
510        Self { cache_manager }
511    }
512}
513
514impl TableFunctionImpl for MetadataCacheFunc {
515    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
516        if !exprs.is_empty() {
517            return plan_err!("metadata_cache should have no arguments");
518        }
519
520        let schema = Arc::new(Schema::new(vec![
521            Field::new("path", DataType::Utf8, false),
522            Field::new(
523                "file_modified",
524                DataType::Timestamp(TimeUnit::Millisecond, None),
525                false,
526            ),
527            Field::new("file_size_bytes", DataType::UInt64, false),
528            Field::new("e_tag", DataType::Utf8, true),
529            Field::new("version", DataType::Utf8, true),
530            Field::new("metadata_size_bytes", DataType::UInt64, false),
531            Field::new("hits", DataType::UInt64, false),
532            Field::new("extra", DataType::Utf8, true),
533        ]));
534
535        // construct record batch from metadata
536        let mut path_arr = vec![];
537        let mut file_modified_arr = vec![];
538        let mut file_size_bytes_arr = vec![];
539        let mut e_tag_arr = vec![];
540        let mut version_arr = vec![];
541        let mut metadata_size_bytes = vec![];
542        let mut hits_arr = vec![];
543        let mut extra_arr = vec![];
544
545        let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
546
547        for (path, entry) in cached_entries {
548            path_arr.push(path.to_string());
549            file_modified_arr
550                .push(Some(entry.object_meta.last_modified.timestamp_millis()));
551            file_size_bytes_arr.push(entry.object_meta.size);
552            e_tag_arr.push(entry.object_meta.e_tag);
553            version_arr.push(entry.object_meta.version);
554            metadata_size_bytes.push(entry.size_bytes as u64);
555            hits_arr.push(entry.hits as u64);
556
557            let mut extra = entry
558                .extra
559                .iter()
560                .map(|(k, v)| format!("{k}={v}"))
561                .collect::<Vec<_>>();
562            extra.sort();
563            extra_arr.push(extra.join(" "));
564        }
565
566        let batch = RecordBatch::try_new(
567            schema.clone(),
568            vec![
569                Arc::new(StringArray::from(path_arr)),
570                Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
571                Arc::new(UInt64Array::from(file_size_bytes_arr)),
572                Arc::new(StringArray::from(e_tag_arr)),
573                Arc::new(StringArray::from(version_arr)),
574                Arc::new(UInt64Array::from(metadata_size_bytes)),
575                Arc::new(UInt64Array::from(hits_arr)),
576                Arc::new(StringArray::from(extra_arr)),
577            ],
578        )?;
579
580        let metadata_cache = MetadataCacheTable { schema, batch };
581        Ok(Arc::new(metadata_cache))
582    }
583}