datafusion_cli/
functions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functions that are query-able and searchable via the `\h` command
19
20use datafusion_common::instant::Instant;
21use std::fmt;
22use std::fs::File;
23use std::str::FromStr;
24use std::sync::Arc;
25
26use arrow::array::{
27    DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
28    TimestampMillisecondArray, UInt64Array,
29};
30use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
31use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
32use arrow::record_batch::RecordBatch;
33use arrow::util::pretty::pretty_format_batches;
34use datafusion::catalog::{Session, TableFunctionImpl};
35use datafusion::common::{Column, plan_err};
36use datafusion::datasource::TableProvider;
37use datafusion::datasource::memory::MemorySourceConfig;
38use datafusion::error::Result;
39use datafusion::execution::cache::cache_manager::CacheManager;
40use datafusion::logical_expr::Expr;
41use datafusion::physical_plan::ExecutionPlan;
42use datafusion::scalar::ScalarValue;
43
44use async_trait::async_trait;
45use parquet::basic::ConvertedType;
46use parquet::data_type::{ByteArray, FixedLenByteArray};
47use parquet::file::reader::FileReader;
48use parquet::file::serialized_reader::SerializedFileReader;
49use parquet::file::statistics::Statistics;
50
51#[derive(Debug)]
52pub enum Function {
53    Select,
54    Explain,
55    Show,
56    CreateTable,
57    CreateTableAs,
58    Insert,
59    DropTable,
60}
61
62const ALL_FUNCTIONS: [Function; 7] = [
63    Function::CreateTable,
64    Function::CreateTableAs,
65    Function::DropTable,
66    Function::Explain,
67    Function::Insert,
68    Function::Select,
69    Function::Show,
70];
71
72impl Function {
73    pub fn function_details(&self) -> Result<&str> {
74        let details = match self {
75            Function::Select => {
76                r#"
77Command:     SELECT
78Description: retrieve rows from a table or view
79Syntax:
80SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
81    [ * | expression [ [ AS ] output_name ] [, ...] ]
82    [ FROM from_item [, ...] ]
83    [ WHERE condition ]
84    [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
85    [ HAVING condition ]
86    [ WINDOW window_name AS ( window_definition ) [, ...] ]
87    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
88    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
89    [ LIMIT { count | ALL } ]
90    [ OFFSET start [ ROW | ROWS ] ]
91
92where from_item can be one of:
93
94    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
95                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
96    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
97    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
98    [ LATERAL ] function_name ( [ argument [, ...] ] )
99                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
100    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
101    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
102    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
103                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
104    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
105
106and grouping_element can be one of:
107
108    ( )
109    expression
110    ( expression [, ...] )
111
112and with_query is:
113
114    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
115
116TABLE [ ONLY ] table_name [ * ]"#
117            }
118            Function::Explain => {
119                r#"
120Command:     EXPLAIN
121Description: show the execution plan of a statement
122Syntax:
123EXPLAIN [ ANALYZE ] statement
124"#
125            }
126            Function::Show => {
127                r#"
128Command:     SHOW
129Description: show the value of a run-time parameter
130Syntax:
131SHOW name
132"#
133            }
134            Function::CreateTable => {
135                r#"
136Command:     CREATE TABLE
137Description: define a new table
138Syntax:
139CREATE [ EXTERNAL ]  TABLE table_name ( [
140  { column_name data_type }
141    [, ... ]
142] )
143"#
144            }
145            Function::CreateTableAs => {
146                r#"
147Command:     CREATE TABLE AS
148Description: define a new table from the results of a query
149Syntax:
150CREATE TABLE table_name
151    [ (column_name [, ...] ) ]
152    AS query
153    [ WITH [ NO ] DATA ]
154"#
155            }
156            Function::Insert => {
157                r#"
158Command:     INSERT
159Description: create new rows in a table
160Syntax:
161INSERT INTO table_name [ ( column_name [, ...] ) ]
162    { VALUES ( { expression } [, ...] ) [, ...] }
163"#
164            }
165            Function::DropTable => {
166                r#"
167Command:     DROP TABLE
168Description: remove a table
169Syntax:
170DROP TABLE [ IF EXISTS ] name [, ...]
171"#
172            }
173        };
174        Ok(details)
175    }
176}
177
178impl FromStr for Function {
179    type Err = ();
180
181    fn from_str(s: &str) -> Result<Self, Self::Err> {
182        Ok(match s.trim().to_uppercase().as_str() {
183            "SELECT" => Self::Select,
184            "EXPLAIN" => Self::Explain,
185            "SHOW" => Self::Show,
186            "CREATE TABLE" => Self::CreateTable,
187            "CREATE TABLE AS" => Self::CreateTableAs,
188            "INSERT" => Self::Insert,
189            "DROP TABLE" => Self::DropTable,
190            _ => return Err(()),
191        })
192    }
193}
194
195impl fmt::Display for Function {
196    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197        match *self {
198            Function::Select => write!(f, "SELECT"),
199            Function::Explain => write!(f, "EXPLAIN"),
200            Function::Show => write!(f, "SHOW"),
201            Function::CreateTable => write!(f, "CREATE TABLE"),
202            Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
203            Function::Insert => write!(f, "INSERT"),
204            Function::DropTable => write!(f, "DROP TABLE"),
205        }
206    }
207}
208
209pub fn display_all_functions() -> Result<()> {
210    println!("Available help:");
211    let array = StringArray::from(
212        ALL_FUNCTIONS
213            .iter()
214            .map(|f| format!("{f}"))
215            .collect::<Vec<String>>(),
216    );
217    let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
218    let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
219    println!("{}", pretty_format_batches(&[batch]).unwrap());
220    Ok(())
221}
222
223/// PARQUET_META table function
224#[derive(Debug)]
225struct ParquetMetadataTable {
226    schema: SchemaRef,
227    batch: RecordBatch,
228}
229
230#[async_trait]
231impl TableProvider for ParquetMetadataTable {
232    fn as_any(&self) -> &dyn std::any::Any {
233        self
234    }
235
236    fn schema(&self) -> arrow::datatypes::SchemaRef {
237        self.schema.clone()
238    }
239
240    fn table_type(&self) -> datafusion::logical_expr::TableType {
241        datafusion::logical_expr::TableType::Base
242    }
243
244    async fn scan(
245        &self,
246        _state: &dyn Session,
247        projection: Option<&Vec<usize>>,
248        _filters: &[Expr],
249        _limit: Option<usize>,
250    ) -> Result<Arc<dyn ExecutionPlan>> {
251        Ok(MemorySourceConfig::try_new_exec(
252            &[vec![self.batch.clone()]],
253            TableProvider::schema(self),
254            projection.cloned(),
255        )?)
256    }
257}
258
259fn convert_parquet_statistics(
260    value: &Statistics,
261    converted_type: ConvertedType,
262) -> (Option<String>, Option<String>) {
263    match (value, converted_type) {
264        (Statistics::Boolean(val), _) => (
265            val.min_opt().map(|v| v.to_string()),
266            val.max_opt().map(|v| v.to_string()),
267        ),
268        (Statistics::Int32(val), _) => (
269            val.min_opt().map(|v| v.to_string()),
270            val.max_opt().map(|v| v.to_string()),
271        ),
272        (Statistics::Int64(val), _) => (
273            val.min_opt().map(|v| v.to_string()),
274            val.max_opt().map(|v| v.to_string()),
275        ),
276        (Statistics::Int96(val), _) => (
277            val.min_opt().map(|v| v.to_string()),
278            val.max_opt().map(|v| v.to_string()),
279        ),
280        (Statistics::Float(val), _) => (
281            val.min_opt().map(|v| v.to_string()),
282            val.max_opt().map(|v| v.to_string()),
283        ),
284        (Statistics::Double(val), _) => (
285            val.min_opt().map(|v| v.to_string()),
286            val.max_opt().map(|v| v.to_string()),
287        ),
288        (Statistics::ByteArray(val), ConvertedType::UTF8) => (
289            byte_array_to_string(val.min_opt()),
290            byte_array_to_string(val.max_opt()),
291        ),
292        (Statistics::ByteArray(val), _) => (
293            val.min_opt().map(|v| v.to_string()),
294            val.max_opt().map(|v| v.to_string()),
295        ),
296        (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
297            fixed_len_byte_array_to_string(val.min_opt()),
298            fixed_len_byte_array_to_string(val.max_opt()),
299        ),
300        (Statistics::FixedLenByteArray(val), _) => (
301            val.min_opt().map(|v| v.to_string()),
302            val.max_opt().map(|v| v.to_string()),
303        ),
304    }
305}
306
307/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
308fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
309    val.map(|v| {
310        v.as_utf8()
311            .map(|s| s.to_string())
312            .unwrap_or_else(|_e| v.to_string())
313    })
314}
315
316/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
317fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
318    val.map(|v| {
319        v.as_utf8()
320            .map(|s| s.to_string())
321            .unwrap_or_else(|_e| v.to_string())
322    })
323}
324
325#[derive(Debug)]
326pub struct ParquetMetadataFunc {}
327
328impl TableFunctionImpl for ParquetMetadataFunc {
329    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
330        let filename = match exprs.first() {
331            Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
332            Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
333            _ => {
334                return plan_err!(
335                    "parquet_metadata requires string argument as its input"
336                );
337            }
338        };
339
340        let file = File::open(filename.clone())?;
341        let reader = SerializedFileReader::new(file)?;
342        let metadata = reader.metadata();
343
344        let schema = Arc::new(Schema::new(vec![
345            Field::new("filename", DataType::Utf8, true),
346            Field::new("row_group_id", DataType::Int64, true),
347            Field::new("row_group_num_rows", DataType::Int64, true),
348            Field::new("row_group_num_columns", DataType::Int64, true),
349            Field::new("row_group_bytes", DataType::Int64, true),
350            Field::new("column_id", DataType::Int64, true),
351            Field::new("file_offset", DataType::Int64, true),
352            Field::new("num_values", DataType::Int64, true),
353            Field::new("path_in_schema", DataType::Utf8, true),
354            Field::new("type", DataType::Utf8, true),
355            Field::new("stats_min", DataType::Utf8, true),
356            Field::new("stats_max", DataType::Utf8, true),
357            Field::new("stats_null_count", DataType::Int64, true),
358            Field::new("stats_distinct_count", DataType::Int64, true),
359            Field::new("stats_min_value", DataType::Utf8, true),
360            Field::new("stats_max_value", DataType::Utf8, true),
361            Field::new("compression", DataType::Utf8, true),
362            Field::new("encodings", DataType::Utf8, true),
363            Field::new("index_page_offset", DataType::Int64, true),
364            Field::new("dictionary_page_offset", DataType::Int64, true),
365            Field::new("data_page_offset", DataType::Int64, true),
366            Field::new("total_compressed_size", DataType::Int64, true),
367            Field::new("total_uncompressed_size", DataType::Int64, true),
368        ]));
369
370        // construct record batch from metadata
371        let mut filename_arr = vec![];
372        let mut row_group_id_arr = vec![];
373        let mut row_group_num_rows_arr = vec![];
374        let mut row_group_num_columns_arr = vec![];
375        let mut row_group_bytes_arr = vec![];
376        let mut column_id_arr = vec![];
377        let mut file_offset_arr = vec![];
378        let mut num_values_arr = vec![];
379        let mut path_in_schema_arr = vec![];
380        let mut type_arr = vec![];
381        let mut stats_min_arr = vec![];
382        let mut stats_max_arr = vec![];
383        let mut stats_null_count_arr = vec![];
384        let mut stats_distinct_count_arr = vec![];
385        let mut stats_min_value_arr = vec![];
386        let mut stats_max_value_arr = vec![];
387        let mut compression_arr = vec![];
388        let mut encodings_arr = vec![];
389        let mut index_page_offset_arr = vec![];
390        let mut dictionary_page_offset_arr = vec![];
391        let mut data_page_offset_arr = vec![];
392        let mut total_compressed_size_arr = vec![];
393        let mut total_uncompressed_size_arr = vec![];
394        for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
395            for (col_idx, column) in row_group.columns().iter().enumerate() {
396                filename_arr.push(filename.clone());
397                row_group_id_arr.push(rg_idx as i64);
398                row_group_num_rows_arr.push(row_group.num_rows());
399                row_group_num_columns_arr.push(row_group.num_columns() as i64);
400                row_group_bytes_arr.push(row_group.total_byte_size());
401                column_id_arr.push(col_idx as i64);
402                file_offset_arr.push(column.file_offset());
403                num_values_arr.push(column.num_values());
404                path_in_schema_arr.push(column.column_path().to_string());
405                type_arr.push(column.column_type().to_string());
406                let converted_type = column.column_descr().converted_type();
407
408                if let Some(s) = column.statistics() {
409                    let (min_val, max_val) =
410                        convert_parquet_statistics(s, converted_type);
411                    stats_min_arr.push(min_val.clone());
412                    stats_max_arr.push(max_val.clone());
413                    stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
414                    stats_distinct_count_arr
415                        .push(s.distinct_count_opt().map(|c| c as i64));
416                    stats_min_value_arr.push(min_val);
417                    stats_max_value_arr.push(max_val);
418                } else {
419                    stats_min_arr.push(None);
420                    stats_max_arr.push(None);
421                    stats_null_count_arr.push(None);
422                    stats_distinct_count_arr.push(None);
423                    stats_min_value_arr.push(None);
424                    stats_max_value_arr.push(None);
425                };
426                compression_arr.push(format!("{:?}", column.compression()));
427                // need to collect into Vec to format
428                let encodings: Vec<_> = column.encodings().collect();
429                encodings_arr.push(format!("{:?}", encodings));
430                index_page_offset_arr.push(column.index_page_offset());
431                dictionary_page_offset_arr.push(column.dictionary_page_offset());
432                data_page_offset_arr.push(column.data_page_offset());
433                total_compressed_size_arr.push(column.compressed_size());
434                total_uncompressed_size_arr.push(column.uncompressed_size());
435            }
436        }
437
438        let rb = RecordBatch::try_new(
439            schema.clone(),
440            vec![
441                Arc::new(StringArray::from(filename_arr)),
442                Arc::new(Int64Array::from(row_group_id_arr)),
443                Arc::new(Int64Array::from(row_group_num_rows_arr)),
444                Arc::new(Int64Array::from(row_group_num_columns_arr)),
445                Arc::new(Int64Array::from(row_group_bytes_arr)),
446                Arc::new(Int64Array::from(column_id_arr)),
447                Arc::new(Int64Array::from(file_offset_arr)),
448                Arc::new(Int64Array::from(num_values_arr)),
449                Arc::new(StringArray::from(path_in_schema_arr)),
450                Arc::new(StringArray::from(type_arr)),
451                Arc::new(StringArray::from(stats_min_arr)),
452                Arc::new(StringArray::from(stats_max_arr)),
453                Arc::new(Int64Array::from(stats_null_count_arr)),
454                Arc::new(Int64Array::from(stats_distinct_count_arr)),
455                Arc::new(StringArray::from(stats_min_value_arr)),
456                Arc::new(StringArray::from(stats_max_value_arr)),
457                Arc::new(StringArray::from(compression_arr)),
458                Arc::new(StringArray::from(encodings_arr)),
459                Arc::new(Int64Array::from(index_page_offset_arr)),
460                Arc::new(Int64Array::from(dictionary_page_offset_arr)),
461                Arc::new(Int64Array::from(data_page_offset_arr)),
462                Arc::new(Int64Array::from(total_compressed_size_arr)),
463                Arc::new(Int64Array::from(total_uncompressed_size_arr)),
464            ],
465        )?;
466
467        let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
468        Ok(Arc::new(parquet_metadata))
469    }
470}
471
472/// METADATA_CACHE table function
473#[derive(Debug)]
474struct MetadataCacheTable {
475    schema: SchemaRef,
476    batch: RecordBatch,
477}
478
479#[async_trait]
480impl TableProvider for MetadataCacheTable {
481    fn as_any(&self) -> &dyn std::any::Any {
482        self
483    }
484
485    fn schema(&self) -> arrow::datatypes::SchemaRef {
486        self.schema.clone()
487    }
488
489    fn table_type(&self) -> datafusion::logical_expr::TableType {
490        datafusion::logical_expr::TableType::Base
491    }
492
493    async fn scan(
494        &self,
495        _state: &dyn Session,
496        projection: Option<&Vec<usize>>,
497        _filters: &[Expr],
498        _limit: Option<usize>,
499    ) -> Result<Arc<dyn ExecutionPlan>> {
500        Ok(MemorySourceConfig::try_new_exec(
501            &[vec![self.batch.clone()]],
502            TableProvider::schema(self),
503            projection.cloned(),
504        )?)
505    }
506}
507
508#[derive(Debug)]
509pub struct MetadataCacheFunc {
510    cache_manager: Arc<CacheManager>,
511}
512
513impl MetadataCacheFunc {
514    pub fn new(cache_manager: Arc<CacheManager>) -> Self {
515        Self { cache_manager }
516    }
517}
518
519impl TableFunctionImpl for MetadataCacheFunc {
520    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
521        if !exprs.is_empty() {
522            return plan_err!("metadata_cache should have no arguments");
523        }
524
525        let schema = Arc::new(Schema::new(vec![
526            Field::new("path", DataType::Utf8, false),
527            Field::new(
528                "file_modified",
529                DataType::Timestamp(TimeUnit::Millisecond, None),
530                false,
531            ),
532            Field::new("file_size_bytes", DataType::UInt64, false),
533            Field::new("e_tag", DataType::Utf8, true),
534            Field::new("version", DataType::Utf8, true),
535            Field::new("metadata_size_bytes", DataType::UInt64, false),
536            Field::new("hits", DataType::UInt64, false),
537            Field::new("extra", DataType::Utf8, true),
538        ]));
539
540        // construct record batch from metadata
541        let mut path_arr = vec![];
542        let mut file_modified_arr = vec![];
543        let mut file_size_bytes_arr = vec![];
544        let mut e_tag_arr = vec![];
545        let mut version_arr = vec![];
546        let mut metadata_size_bytes = vec![];
547        let mut hits_arr = vec![];
548        let mut extra_arr = vec![];
549
550        let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
551
552        for (path, entry) in cached_entries {
553            path_arr.push(path.to_string());
554            file_modified_arr
555                .push(Some(entry.object_meta.last_modified.timestamp_millis()));
556            file_size_bytes_arr.push(entry.object_meta.size);
557            e_tag_arr.push(entry.object_meta.e_tag);
558            version_arr.push(entry.object_meta.version);
559            metadata_size_bytes.push(entry.size_bytes as u64);
560            hits_arr.push(entry.hits as u64);
561
562            let mut extra = entry
563                .extra
564                .iter()
565                .map(|(k, v)| format!("{k}={v}"))
566                .collect::<Vec<_>>();
567            extra.sort();
568            extra_arr.push(extra.join(" "));
569        }
570
571        let batch = RecordBatch::try_new(
572            schema.clone(),
573            vec![
574                Arc::new(StringArray::from(path_arr)),
575                Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
576                Arc::new(UInt64Array::from(file_size_bytes_arr)),
577                Arc::new(StringArray::from(e_tag_arr)),
578                Arc::new(StringArray::from(version_arr)),
579                Arc::new(UInt64Array::from(metadata_size_bytes)),
580                Arc::new(UInt64Array::from(hits_arr)),
581                Arc::new(StringArray::from(extra_arr)),
582            ],
583        )?;
584
585        let metadata_cache = MetadataCacheTable { schema, batch };
586        Ok(Arc::new(metadata_cache))
587    }
588}
589
590/// STATISTICS_CACHE table function
591#[derive(Debug)]
592struct StatisticsCacheTable {
593    schema: SchemaRef,
594    batch: RecordBatch,
595}
596
597#[async_trait]
598impl TableProvider for StatisticsCacheTable {
599    fn as_any(&self) -> &dyn std::any::Any {
600        self
601    }
602
603    fn schema(&self) -> arrow::datatypes::SchemaRef {
604        self.schema.clone()
605    }
606
607    fn table_type(&self) -> datafusion::logical_expr::TableType {
608        datafusion::logical_expr::TableType::Base
609    }
610
611    async fn scan(
612        &self,
613        _state: &dyn Session,
614        projection: Option<&Vec<usize>>,
615        _filters: &[Expr],
616        _limit: Option<usize>,
617    ) -> Result<Arc<dyn ExecutionPlan>> {
618        Ok(MemorySourceConfig::try_new_exec(
619            &[vec![self.batch.clone()]],
620            TableProvider::schema(self),
621            projection.cloned(),
622        )?)
623    }
624}
625
626#[derive(Debug)]
627pub struct StatisticsCacheFunc {
628    cache_manager: Arc<CacheManager>,
629}
630
631impl StatisticsCacheFunc {
632    pub fn new(cache_manager: Arc<CacheManager>) -> Self {
633        Self { cache_manager }
634    }
635}
636
637impl TableFunctionImpl for StatisticsCacheFunc {
638    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
639        if !exprs.is_empty() {
640            return plan_err!("statistics_cache should have no arguments");
641        }
642
643        let schema = Arc::new(Schema::new(vec![
644            Field::new("path", DataType::Utf8, false),
645            Field::new(
646                "file_modified",
647                DataType::Timestamp(TimeUnit::Millisecond, None),
648                false,
649            ),
650            Field::new("file_size_bytes", DataType::UInt64, false),
651            Field::new("e_tag", DataType::Utf8, true),
652            Field::new("version", DataType::Utf8, true),
653            Field::new("num_rows", DataType::Utf8, false),
654            Field::new("num_columns", DataType::UInt64, false),
655            Field::new("table_size_bytes", DataType::Utf8, false),
656            Field::new("statistics_size_bytes", DataType::UInt64, false),
657        ]));
658
659        // construct record batch from metadata
660        let mut path_arr = vec![];
661        let mut file_modified_arr = vec![];
662        let mut file_size_bytes_arr = vec![];
663        let mut e_tag_arr = vec![];
664        let mut version_arr = vec![];
665        let mut num_rows_arr = vec![];
666        let mut num_columns_arr = vec![];
667        let mut table_size_bytes_arr = vec![];
668        let mut statistics_size_bytes_arr = vec![];
669
670        if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
671        {
672            for (path, entry) in file_statistics_cache.list_entries() {
673                path_arr.push(path.to_string());
674                file_modified_arr
675                    .push(Some(entry.object_meta.last_modified.timestamp_millis()));
676                file_size_bytes_arr.push(entry.object_meta.size);
677                e_tag_arr.push(entry.object_meta.e_tag);
678                version_arr.push(entry.object_meta.version);
679                num_rows_arr.push(entry.num_rows.to_string());
680                num_columns_arr.push(entry.num_columns as u64);
681                table_size_bytes_arr.push(entry.table_size_bytes.to_string());
682                statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64);
683            }
684        }
685
686        let batch = RecordBatch::try_new(
687            schema.clone(),
688            vec![
689                Arc::new(StringArray::from(path_arr)),
690                Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
691                Arc::new(UInt64Array::from(file_size_bytes_arr)),
692                Arc::new(StringArray::from(e_tag_arr)),
693                Arc::new(StringArray::from(version_arr)),
694                Arc::new(StringArray::from(num_rows_arr)),
695                Arc::new(UInt64Array::from(num_columns_arr)),
696                Arc::new(StringArray::from(table_size_bytes_arr)),
697                Arc::new(UInt64Array::from(statistics_size_bytes_arr)),
698            ],
699        )?;
700
701        let statistics_cache = StatisticsCacheTable { schema, batch };
702        Ok(Arc::new(statistics_cache))
703    }
704}
705
706// Implementation of the `list_files_cache` table function in datafusion-cli.
707///
708/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object.
709/// DataFusion uses these cached results to plan queries against external tables.
710/// # Schema
711/// ```sql
712/// > describe select * from list_files_cache();
713/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
714/// | column_name         | data_type                                                                                                                                                                | is_nullable |
715/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
716/// | table               | Utf8                                                                                                                                                                     | NO          |
717/// | path                | Utf8                                                                                                                                                                     | NO          |
718/// | metadata_size_bytes | UInt64                                                                                                                                                                   | NO          |
719/// | expires_in          | Duration(ms)                                                                                                                                                             | YES         |
720/// | metadata_list       | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES         |
721/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
722/// ```
723#[derive(Debug)]
724struct ListFilesCacheTable {
725    schema: SchemaRef,
726    batch: RecordBatch,
727}
728
729#[async_trait]
730impl TableProvider for ListFilesCacheTable {
731    fn as_any(&self) -> &dyn std::any::Any {
732        self
733    }
734
735    fn schema(&self) -> arrow::datatypes::SchemaRef {
736        self.schema.clone()
737    }
738
739    fn table_type(&self) -> datafusion::logical_expr::TableType {
740        datafusion::logical_expr::TableType::Base
741    }
742
743    async fn scan(
744        &self,
745        _state: &dyn Session,
746        projection: Option<&Vec<usize>>,
747        _filters: &[Expr],
748        _limit: Option<usize>,
749    ) -> Result<Arc<dyn ExecutionPlan>> {
750        Ok(MemorySourceConfig::try_new_exec(
751            &[vec![self.batch.clone()]],
752            TableProvider::schema(self),
753            projection.cloned(),
754        )?)
755    }
756}
757
758#[derive(Debug)]
759pub struct ListFilesCacheFunc {
760    cache_manager: Arc<CacheManager>,
761}
762
763impl ListFilesCacheFunc {
764    pub fn new(cache_manager: Arc<CacheManager>) -> Self {
765        Self { cache_manager }
766    }
767}
768
769impl TableFunctionImpl for ListFilesCacheFunc {
770    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
771        if !exprs.is_empty() {
772            return plan_err!("list_files_cache should have no arguments");
773        }
774
775        let nested_fields = Fields::from(vec![
776            Field::new("file_path", DataType::Utf8, false),
777            Field::new(
778                "file_modified",
779                DataType::Timestamp(TimeUnit::Millisecond, None),
780                false,
781            ),
782            Field::new("file_size_bytes", DataType::UInt64, false),
783            Field::new("e_tag", DataType::Utf8, true),
784            Field::new("version", DataType::Utf8, true),
785        ]);
786
787        let metadata_field =
788            Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
789
790        let schema = Arc::new(Schema::new(vec![
791            Field::new("table", DataType::Utf8, false),
792            Field::new("path", DataType::Utf8, false),
793            Field::new("metadata_size_bytes", DataType::UInt64, false),
794            // expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
795            Field::new(
796                "expires_in",
797                DataType::Duration(TimeUnit::Millisecond),
798                true,
799            ),
800            Field::new(
801                "metadata_list",
802                DataType::List(Arc::new(metadata_field.clone())),
803                true,
804            ),
805        ]));
806
807        let mut table_arr = vec![];
808        let mut path_arr = vec![];
809        let mut metadata_size_bytes_arr = vec![];
810        let mut expires_arr = vec![];
811
812        let mut file_path_arr = vec![];
813        let mut file_modified_arr = vec![];
814        let mut file_size_bytes_arr = vec![];
815        let mut etag_arr = vec![];
816        let mut version_arr = vec![];
817        let mut offsets: Vec<i32> = vec![0];
818
819        if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
820            let now = Instant::now();
821            let mut current_offset: i32 = 0;
822
823            for (path, entry) in list_files_cache.list_entries() {
824                table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string()));
825                path_arr.push(path.path.to_string());
826                metadata_size_bytes_arr.push(entry.size_bytes as u64);
827                // calculates time left before entry expires
828                expires_arr.push(
829                    entry
830                        .expires
831                        .map(|t| t.duration_since(now).as_millis() as i64),
832                );
833
834                for meta in entry.metas.iter() {
835                    file_path_arr.push(meta.location.to_string());
836                    file_modified_arr.push(meta.last_modified.timestamp_millis());
837                    file_size_bytes_arr.push(meta.size);
838                    etag_arr.push(meta.e_tag.clone());
839                    version_arr.push(meta.version.clone());
840                }
841                current_offset += entry.metas.len() as i32;
842                offsets.push(current_offset);
843            }
844        }
845
846        let struct_arr = StructArray::new(
847            nested_fields,
848            vec![
849                Arc::new(StringArray::from(file_path_arr)),
850                Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
851                Arc::new(UInt64Array::from(file_size_bytes_arr)),
852                Arc::new(StringArray::from(etag_arr)),
853                Arc::new(StringArray::from(version_arr)),
854            ],
855            None,
856        );
857
858        let offsets_buffer: OffsetBuffer<i32> =
859            OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
860
861        let batch = RecordBatch::try_new(
862            schema.clone(),
863            vec![
864                Arc::new(StringArray::from(table_arr)),
865                Arc::new(StringArray::from(path_arr)),
866                Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
867                Arc::new(DurationMillisecondArray::from(expires_arr)),
868                Arc::new(GenericListArray::new(
869                    Arc::new(metadata_field),
870                    offsets_buffer,
871                    Arc::new(struct_arr),
872                    None,
873                )),
874            ],
875        )?;
876
877        let list_files_cache = ListFilesCacheTable { schema, batch };
878        Ok(Arc::new(list_files_cache))
879    }
880}