datafusion_catalog/
information_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
19//!
20//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
21
22use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26    array::{StringBuilder, UInt64Builder},
27    datatypes::{DataType, Field, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::config::{ConfigEntry, ConfigOptions};
32use datafusion_common::error::Result;
33use datafusion_common::DataFusionError;
34use datafusion_execution::TaskContext;
35use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
36use datafusion_expr::{TableType, Volatility};
37use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
38use datafusion_physical_plan::streaming::PartitionStream;
39use datafusion_physical_plan::SendableRecordBatchStream;
40use std::collections::{HashMap, HashSet};
41use std::fmt::Debug;
42use std::{any::Any, sync::Arc};
43
44pub const INFORMATION_SCHEMA: &str = "information_schema";
45pub(crate) const TABLES: &str = "tables";
46pub(crate) const VIEWS: &str = "views";
47pub(crate) const COLUMNS: &str = "columns";
48pub(crate) const DF_SETTINGS: &str = "df_settings";
49pub(crate) const SCHEMATA: &str = "schemata";
50pub(crate) const ROUTINES: &str = "routines";
51pub(crate) const PARAMETERS: &str = "parameters";
52
53/// All information schema tables
54pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
55    TABLES,
56    VIEWS,
57    COLUMNS,
58    DF_SETTINGS,
59    SCHEMATA,
60    ROUTINES,
61    PARAMETERS,
62];
63
64/// Implements the `information_schema` virtual schema and tables
65///
66/// The underlying tables in the `information_schema` are created on
67/// demand. This means that if more tables are added to the underlying
68/// providers, they will appear the next time the `information_schema`
69/// table is queried.
70#[derive(Debug)]
71pub struct InformationSchemaProvider {
72    config: InformationSchemaConfig,
73}
74
75impl InformationSchemaProvider {
76    /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
77    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
78        Self {
79            config: InformationSchemaConfig { catalog_list },
80        }
81    }
82}
83
84#[derive(Clone, Debug)]
85struct InformationSchemaConfig {
86    catalog_list: Arc<dyn CatalogProviderList>,
87}
88
89impl InformationSchemaConfig {
90    /// Construct the `information_schema.tables` virtual table
91    async fn make_tables(
92        &self,
93        builder: &mut InformationSchemaTablesBuilder,
94    ) -> Result<(), DataFusionError> {
95        // create a mem table with the names of tables
96
97        for catalog_name in self.catalog_list.catalog_names() {
98            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
99
100            for schema_name in catalog.schema_names() {
101                if schema_name != INFORMATION_SCHEMA {
102                    // schema name may not exist in the catalog, so we need to check
103                    if let Some(schema) = catalog.schema(&schema_name) {
104                        for table_name in schema.table_names() {
105                            if let Some(table) = schema.table(&table_name).await? {
106                                builder.add_table(
107                                    &catalog_name,
108                                    &schema_name,
109                                    &table_name,
110                                    table.table_type(),
111                                );
112                            }
113                        }
114                    }
115                }
116            }
117
118            // Add a final list for the information schema tables themselves
119            for table_name in INFORMATION_SCHEMA_TABLES {
120                builder.add_table(
121                    &catalog_name,
122                    INFORMATION_SCHEMA,
123                    table_name,
124                    TableType::View,
125                );
126            }
127        }
128
129        Ok(())
130    }
131
132    async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
133        for catalog_name in self.catalog_list.catalog_names() {
134            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
135
136            for schema_name in catalog.schema_names() {
137                if schema_name != INFORMATION_SCHEMA {
138                    if let Some(schema) = catalog.schema(&schema_name) {
139                        let schema_owner = schema.owner_name();
140                        builder.add_schemata(&catalog_name, &schema_name, schema_owner);
141                    }
142                }
143            }
144        }
145    }
146
147    async fn make_views(
148        &self,
149        builder: &mut InformationSchemaViewBuilder,
150    ) -> Result<(), DataFusionError> {
151        for catalog_name in self.catalog_list.catalog_names() {
152            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
153
154            for schema_name in catalog.schema_names() {
155                if schema_name != INFORMATION_SCHEMA {
156                    // schema name may not exist in the catalog, so we need to check
157                    if let Some(schema) = catalog.schema(&schema_name) {
158                        for table_name in schema.table_names() {
159                            if let Some(table) = schema.table(&table_name).await? {
160                                builder.add_view(
161                                    &catalog_name,
162                                    &schema_name,
163                                    &table_name,
164                                    table.get_table_definition(),
165                                )
166                            }
167                        }
168                    }
169                }
170            }
171        }
172
173        Ok(())
174    }
175
176    /// Construct the `information_schema.columns` virtual table
177    async fn make_columns(
178        &self,
179        builder: &mut InformationSchemaColumnsBuilder,
180    ) -> Result<(), DataFusionError> {
181        for catalog_name in self.catalog_list.catalog_names() {
182            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
183
184            for schema_name in catalog.schema_names() {
185                if schema_name != INFORMATION_SCHEMA {
186                    // schema name may not exist in the catalog, so we need to check
187                    if let Some(schema) = catalog.schema(&schema_name) {
188                        for table_name in schema.table_names() {
189                            if let Some(table) = schema.table(&table_name).await? {
190                                for (field_position, field) in
191                                    table.schema().fields().iter().enumerate()
192                                {
193                                    builder.add_column(
194                                        &catalog_name,
195                                        &schema_name,
196                                        &table_name,
197                                        field_position,
198                                        field,
199                                    )
200                                }
201                            }
202                        }
203                    }
204                }
205            }
206        }
207
208        Ok(())
209    }
210
211    /// Construct the `information_schema.df_settings` virtual table
212    fn make_df_settings(
213        &self,
214        config_options: &ConfigOptions,
215        builder: &mut InformationSchemaDfSettingsBuilder,
216    ) {
217        for entry in config_options.entries() {
218            builder.add_setting(entry);
219        }
220    }
221
222    fn make_routines(
223        &self,
224        udfs: &HashMap<String, Arc<ScalarUDF>>,
225        udafs: &HashMap<String, Arc<AggregateUDF>>,
226        udwfs: &HashMap<String, Arc<WindowUDF>>,
227        config_options: &ConfigOptions,
228        builder: &mut InformationSchemaRoutinesBuilder,
229    ) -> Result<()> {
230        let catalog_name = &config_options.catalog.default_catalog;
231        let schema_name = &config_options.catalog.default_schema;
232
233        for (name, udf) in udfs {
234            let return_types = get_udf_args_and_return_types(udf)?
235                .into_iter()
236                .map(|(_, return_type)| return_type)
237                .collect::<HashSet<_>>();
238            for return_type in return_types {
239                builder.add_routine(
240                    catalog_name,
241                    schema_name,
242                    name,
243                    "FUNCTION",
244                    Self::is_deterministic(udf.signature()),
245                    return_type,
246                    "SCALAR",
247                    udf.documentation().map(|d| d.description.to_string()),
248                    udf.documentation().map(|d| d.syntax_example.to_string()),
249                )
250            }
251        }
252
253        for (name, udaf) in udafs {
254            let return_types = get_udaf_args_and_return_types(udaf)?
255                .into_iter()
256                .map(|(_, return_type)| return_type)
257                .collect::<HashSet<_>>();
258            for return_type in return_types {
259                builder.add_routine(
260                    catalog_name,
261                    schema_name,
262                    name,
263                    "FUNCTION",
264                    Self::is_deterministic(udaf.signature()),
265                    return_type,
266                    "AGGREGATE",
267                    udaf.documentation().map(|d| d.description.to_string()),
268                    udaf.documentation().map(|d| d.syntax_example.to_string()),
269                )
270            }
271        }
272
273        for (name, udwf) in udwfs {
274            let return_types = get_udwf_args_and_return_types(udwf)?
275                .into_iter()
276                .map(|(_, return_type)| return_type)
277                .collect::<HashSet<_>>();
278            for return_type in return_types {
279                builder.add_routine(
280                    catalog_name,
281                    schema_name,
282                    name,
283                    "FUNCTION",
284                    Self::is_deterministic(udwf.signature()),
285                    return_type,
286                    "WINDOW",
287                    udwf.documentation().map(|d| d.description.to_string()),
288                    udwf.documentation().map(|d| d.syntax_example.to_string()),
289                )
290            }
291        }
292        Ok(())
293    }
294
295    fn is_deterministic(signature: &Signature) -> bool {
296        signature.volatility == Volatility::Immutable
297    }
298    fn make_parameters(
299        &self,
300        udfs: &HashMap<String, Arc<ScalarUDF>>,
301        udafs: &HashMap<String, Arc<AggregateUDF>>,
302        udwfs: &HashMap<String, Arc<WindowUDF>>,
303        config_options: &ConfigOptions,
304        builder: &mut InformationSchemaParametersBuilder,
305    ) -> Result<()> {
306        let catalog_name = &config_options.catalog.default_catalog;
307        let schema_name = &config_options.catalog.default_schema;
308        let mut add_parameters = |func_name: &str,
309                                  args: Option<&Vec<(String, String)>>,
310                                  arg_types: Vec<String>,
311                                  return_type: Option<String>,
312                                  is_variadic: bool,
313                                  rid: u8| {
314            for (position, type_name) in arg_types.iter().enumerate() {
315                let param_name =
316                    args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
317                builder.add_parameter(
318                    catalog_name,
319                    schema_name,
320                    func_name,
321                    position as u64 + 1,
322                    "IN",
323                    param_name,
324                    type_name,
325                    None::<&str>,
326                    is_variadic,
327                    rid,
328                );
329            }
330            if let Some(return_type) = return_type {
331                builder.add_parameter(
332                    catalog_name,
333                    schema_name,
334                    func_name,
335                    1,
336                    "OUT",
337                    None::<&str>,
338                    return_type.as_str(),
339                    None::<&str>,
340                    false,
341                    rid,
342                );
343            }
344        };
345
346        for (func_name, udf) in udfs {
347            let args = udf.documentation().and_then(|d| d.arguments.clone());
348            let combinations = get_udf_args_and_return_types(udf)?;
349            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
350                add_parameters(
351                    func_name,
352                    args.as_ref(),
353                    arg_types,
354                    return_type,
355                    Self::is_variadic(udf.signature()),
356                    rid as u8,
357                );
358            }
359        }
360
361        for (func_name, udaf) in udafs {
362            let args = udaf.documentation().and_then(|d| d.arguments.clone());
363            let combinations = get_udaf_args_and_return_types(udaf)?;
364            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
365                add_parameters(
366                    func_name,
367                    args.as_ref(),
368                    arg_types,
369                    return_type,
370                    Self::is_variadic(udaf.signature()),
371                    rid as u8,
372                );
373            }
374        }
375
376        for (func_name, udwf) in udwfs {
377            let args = udwf.documentation().and_then(|d| d.arguments.clone());
378            let combinations = get_udwf_args_and_return_types(udwf)?;
379            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
380                add_parameters(
381                    func_name,
382                    args.as_ref(),
383                    arg_types,
384                    return_type,
385                    Self::is_variadic(udwf.signature()),
386                    rid as u8,
387                );
388            }
389        }
390
391        Ok(())
392    }
393
394    fn is_variadic(signature: &Signature) -> bool {
395        matches!(
396            signature.type_signature,
397            TypeSignature::Variadic(_) | TypeSignature::VariadicAny
398        )
399    }
400}
401
402/// get the arguments and return types of a UDF
403/// returns a tuple of (arg_types, return_type)
404fn get_udf_args_and_return_types(
405    udf: &Arc<ScalarUDF>,
406) -> Result<Vec<(Vec<String>, Option<String>)>> {
407    let signature = udf.signature();
408    let arg_types = signature.type_signature.get_example_types();
409    if arg_types.is_empty() {
410        Ok(vec![(vec![], None)])
411    } else {
412        Ok(arg_types
413            .into_iter()
414            .map(|arg_types| {
415                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
416                let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string());
417                let arg_types = arg_types
418                    .into_iter()
419                    .map(|t| t.to_string())
420                    .collect::<Vec<_>>();
421                (arg_types, return_type)
422            })
423            .collect::<Vec<_>>())
424    }
425}
426
427fn get_udaf_args_and_return_types(
428    udaf: &Arc<AggregateUDF>,
429) -> Result<Vec<(Vec<String>, Option<String>)>> {
430    let signature = udaf.signature();
431    let arg_types = signature.type_signature.get_example_types();
432    if arg_types.is_empty() {
433        Ok(vec![(vec![], None)])
434    } else {
435        Ok(arg_types
436            .into_iter()
437            .map(|arg_types| {
438                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
439                let return_type =
440                    udaf.return_type(&arg_types).ok().map(|t| t.to_string());
441                let arg_types = arg_types
442                    .into_iter()
443                    .map(|t| t.to_string())
444                    .collect::<Vec<_>>();
445                (arg_types, return_type)
446            })
447            .collect::<Vec<_>>())
448    }
449}
450
451fn get_udwf_args_and_return_types(
452    udwf: &Arc<WindowUDF>,
453) -> Result<Vec<(Vec<String>, Option<String>)>> {
454    let signature = udwf.signature();
455    let arg_types = signature.type_signature.get_example_types();
456    if arg_types.is_empty() {
457        Ok(vec![(vec![], None)])
458    } else {
459        Ok(arg_types
460            .into_iter()
461            .map(|arg_types| {
462                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
463                let arg_types = arg_types
464                    .into_iter()
465                    .map(|t| t.to_string())
466                    .collect::<Vec<_>>();
467                (arg_types, None)
468            })
469            .collect::<Vec<_>>())
470    }
471}
472
473#[async_trait]
474impl SchemaProvider for InformationSchemaProvider {
475    fn as_any(&self) -> &dyn Any {
476        self
477    }
478
479    fn table_names(&self) -> Vec<String> {
480        INFORMATION_SCHEMA_TABLES
481            .iter()
482            .map(|t| t.to_string())
483            .collect()
484    }
485
486    async fn table(
487        &self,
488        name: &str,
489    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
490        let config = self.config.clone();
491        let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
492            TABLES => Arc::new(InformationSchemaTables::new(config)),
493            COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
494            VIEWS => Arc::new(InformationSchemaViews::new(config)),
495            DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
496            SCHEMATA => Arc::new(InformationSchemata::new(config)),
497            ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
498            PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
499            _ => return Ok(None),
500        };
501
502        Ok(Some(Arc::new(
503            StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
504        )))
505    }
506
507    fn table_exist(&self, name: &str) -> bool {
508        INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
509    }
510}
511
512#[derive(Debug)]
513struct InformationSchemaTables {
514    schema: SchemaRef,
515    config: InformationSchemaConfig,
516}
517
518impl InformationSchemaTables {
519    fn new(config: InformationSchemaConfig) -> Self {
520        let schema = Arc::new(Schema::new(vec![
521            Field::new("table_catalog", DataType::Utf8, false),
522            Field::new("table_schema", DataType::Utf8, false),
523            Field::new("table_name", DataType::Utf8, false),
524            Field::new("table_type", DataType::Utf8, false),
525        ]));
526
527        Self { schema, config }
528    }
529
530    fn builder(&self) -> InformationSchemaTablesBuilder {
531        InformationSchemaTablesBuilder {
532            catalog_names: StringBuilder::new(),
533            schema_names: StringBuilder::new(),
534            table_names: StringBuilder::new(),
535            table_types: StringBuilder::new(),
536            schema: Arc::clone(&self.schema),
537        }
538    }
539}
540
541impl PartitionStream for InformationSchemaTables {
542    fn schema(&self) -> &SchemaRef {
543        &self.schema
544    }
545
546    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
547        let mut builder = self.builder();
548        let config = self.config.clone();
549        Box::pin(RecordBatchStreamAdapter::new(
550            Arc::clone(&self.schema),
551            // TODO: Stream this
552            futures::stream::once(async move {
553                config.make_tables(&mut builder).await?;
554                Ok(builder.finish())
555            }),
556        ))
557    }
558}
559
560/// Builds the `information_schema.TABLE` table row by row
561///
562/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
563struct InformationSchemaTablesBuilder {
564    schema: SchemaRef,
565    catalog_names: StringBuilder,
566    schema_names: StringBuilder,
567    table_names: StringBuilder,
568    table_types: StringBuilder,
569}
570
571impl InformationSchemaTablesBuilder {
572    fn add_table(
573        &mut self,
574        catalog_name: impl AsRef<str>,
575        schema_name: impl AsRef<str>,
576        table_name: impl AsRef<str>,
577        table_type: TableType,
578    ) {
579        // Note: append_value is actually infallible.
580        self.catalog_names.append_value(catalog_name.as_ref());
581        self.schema_names.append_value(schema_name.as_ref());
582        self.table_names.append_value(table_name.as_ref());
583        self.table_types.append_value(match table_type {
584            TableType::Base => "BASE TABLE",
585            TableType::View => "VIEW",
586            TableType::Temporary => "LOCAL TEMPORARY",
587        });
588    }
589
590    fn finish(&mut self) -> RecordBatch {
591        RecordBatch::try_new(
592            Arc::clone(&self.schema),
593            vec![
594                Arc::new(self.catalog_names.finish()),
595                Arc::new(self.schema_names.finish()),
596                Arc::new(self.table_names.finish()),
597                Arc::new(self.table_types.finish()),
598            ],
599        )
600        .unwrap()
601    }
602}
603
604#[derive(Debug)]
605struct InformationSchemaViews {
606    schema: SchemaRef,
607    config: InformationSchemaConfig,
608}
609
610impl InformationSchemaViews {
611    fn new(config: InformationSchemaConfig) -> Self {
612        let schema = Arc::new(Schema::new(vec![
613            Field::new("table_catalog", DataType::Utf8, false),
614            Field::new("table_schema", DataType::Utf8, false),
615            Field::new("table_name", DataType::Utf8, false),
616            Field::new("definition", DataType::Utf8, true),
617        ]));
618
619        Self { schema, config }
620    }
621
622    fn builder(&self) -> InformationSchemaViewBuilder {
623        InformationSchemaViewBuilder {
624            catalog_names: StringBuilder::new(),
625            schema_names: StringBuilder::new(),
626            table_names: StringBuilder::new(),
627            definitions: StringBuilder::new(),
628            schema: Arc::clone(&self.schema),
629        }
630    }
631}
632
633impl PartitionStream for InformationSchemaViews {
634    fn schema(&self) -> &SchemaRef {
635        &self.schema
636    }
637
638    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
639        let mut builder = self.builder();
640        let config = self.config.clone();
641        Box::pin(RecordBatchStreamAdapter::new(
642            Arc::clone(&self.schema),
643            // TODO: Stream this
644            futures::stream::once(async move {
645                config.make_views(&mut builder).await?;
646                Ok(builder.finish())
647            }),
648        ))
649    }
650}
651
652/// Builds the `information_schema.VIEWS` table row by row
653///
654/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
655struct InformationSchemaViewBuilder {
656    schema: SchemaRef,
657    catalog_names: StringBuilder,
658    schema_names: StringBuilder,
659    table_names: StringBuilder,
660    definitions: StringBuilder,
661}
662
663impl InformationSchemaViewBuilder {
664    fn add_view(
665        &mut self,
666        catalog_name: impl AsRef<str>,
667        schema_name: impl AsRef<str>,
668        table_name: impl AsRef<str>,
669        definition: Option<impl AsRef<str>>,
670    ) {
671        // Note: append_value is actually infallible.
672        self.catalog_names.append_value(catalog_name.as_ref());
673        self.schema_names.append_value(schema_name.as_ref());
674        self.table_names.append_value(table_name.as_ref());
675        self.definitions.append_option(definition.as_ref());
676    }
677
678    fn finish(&mut self) -> RecordBatch {
679        RecordBatch::try_new(
680            Arc::clone(&self.schema),
681            vec![
682                Arc::new(self.catalog_names.finish()),
683                Arc::new(self.schema_names.finish()),
684                Arc::new(self.table_names.finish()),
685                Arc::new(self.definitions.finish()),
686            ],
687        )
688        .unwrap()
689    }
690}
691
692#[derive(Debug)]
693struct InformationSchemaColumns {
694    schema: SchemaRef,
695    config: InformationSchemaConfig,
696}
697
698impl InformationSchemaColumns {
699    fn new(config: InformationSchemaConfig) -> Self {
700        let schema = Arc::new(Schema::new(vec![
701            Field::new("table_catalog", DataType::Utf8, false),
702            Field::new("table_schema", DataType::Utf8, false),
703            Field::new("table_name", DataType::Utf8, false),
704            Field::new("column_name", DataType::Utf8, false),
705            Field::new("ordinal_position", DataType::UInt64, false),
706            Field::new("column_default", DataType::Utf8, true),
707            Field::new("is_nullable", DataType::Utf8, false),
708            Field::new("data_type", DataType::Utf8, false),
709            Field::new("character_maximum_length", DataType::UInt64, true),
710            Field::new("character_octet_length", DataType::UInt64, true),
711            Field::new("numeric_precision", DataType::UInt64, true),
712            Field::new("numeric_precision_radix", DataType::UInt64, true),
713            Field::new("numeric_scale", DataType::UInt64, true),
714            Field::new("datetime_precision", DataType::UInt64, true),
715            Field::new("interval_type", DataType::Utf8, true),
716        ]));
717
718        Self { schema, config }
719    }
720
721    fn builder(&self) -> InformationSchemaColumnsBuilder {
722        // StringBuilder requires providing an initial capacity, so
723        // pick 10 here arbitrarily as this is not performance
724        // critical code and the number of tables is unavailable here.
725        let default_capacity = 10;
726
727        InformationSchemaColumnsBuilder {
728            catalog_names: StringBuilder::new(),
729            schema_names: StringBuilder::new(),
730            table_names: StringBuilder::new(),
731            column_names: StringBuilder::new(),
732            ordinal_positions: UInt64Builder::with_capacity(default_capacity),
733            column_defaults: StringBuilder::new(),
734            is_nullables: StringBuilder::new(),
735            data_types: StringBuilder::new(),
736            character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
737            character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
738            numeric_precisions: UInt64Builder::with_capacity(default_capacity),
739            numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
740            numeric_scales: UInt64Builder::with_capacity(default_capacity),
741            datetime_precisions: UInt64Builder::with_capacity(default_capacity),
742            interval_types: StringBuilder::new(),
743            schema: Arc::clone(&self.schema),
744        }
745    }
746}
747
748impl PartitionStream for InformationSchemaColumns {
749    fn schema(&self) -> &SchemaRef {
750        &self.schema
751    }
752
753    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
754        let mut builder = self.builder();
755        let config = self.config.clone();
756        Box::pin(RecordBatchStreamAdapter::new(
757            Arc::clone(&self.schema),
758            // TODO: Stream this
759            futures::stream::once(async move {
760                config.make_columns(&mut builder).await?;
761                Ok(builder.finish())
762            }),
763        ))
764    }
765}
766
767/// Builds the `information_schema.COLUMNS` table row by row
768///
769/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
770struct InformationSchemaColumnsBuilder {
771    schema: SchemaRef,
772    catalog_names: StringBuilder,
773    schema_names: StringBuilder,
774    table_names: StringBuilder,
775    column_names: StringBuilder,
776    ordinal_positions: UInt64Builder,
777    column_defaults: StringBuilder,
778    is_nullables: StringBuilder,
779    data_types: StringBuilder,
780    character_maximum_lengths: UInt64Builder,
781    character_octet_lengths: UInt64Builder,
782    numeric_precisions: UInt64Builder,
783    numeric_precision_radixes: UInt64Builder,
784    numeric_scales: UInt64Builder,
785    datetime_precisions: UInt64Builder,
786    interval_types: StringBuilder,
787}
788
789impl InformationSchemaColumnsBuilder {
790    fn add_column(
791        &mut self,
792        catalog_name: &str,
793        schema_name: &str,
794        table_name: &str,
795        field_position: usize,
796        field: &Field,
797    ) {
798        use DataType::*;
799
800        // Note: append_value is actually infallable.
801        self.catalog_names.append_value(catalog_name);
802        self.schema_names.append_value(schema_name);
803        self.table_names.append_value(table_name);
804
805        self.column_names.append_value(field.name());
806
807        self.ordinal_positions.append_value(field_position as u64);
808
809        // DataFusion does not support column default values, so null
810        self.column_defaults.append_null();
811
812        // "YES if the column is possibly nullable, NO if it is known not nullable. "
813        let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
814        self.is_nullables.append_value(nullable_str);
815
816        // "System supplied type" --> Use debug format of the datatype
817        self.data_types
818            .append_value(format!("{:?}", field.data_type()));
819
820        // "If data_type identifies a character or bit string type, the
821        // declared maximum length; null for all other data types or
822        // if no maximum length was declared."
823        //
824        // Arrow has no equivalent of VARCHAR(20), so we leave this as Null
825        let max_chars = None;
826        self.character_maximum_lengths.append_option(max_chars);
827
828        // "Maximum length, in bytes, for binary data, character data,
829        // or text and image data."
830        let char_len: Option<u64> = match field.data_type() {
831            Utf8 | Binary => Some(i32::MAX as u64),
832            LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
833            _ => None,
834        };
835        self.character_octet_lengths.append_option(char_len);
836
837        // numeric_precision: "If data_type identifies a numeric type, this column
838        // contains the (declared or implicit) precision of the type
839        // for this column. The precision indicates the number of
840        // significant digits. It can be expressed in decimal (base
841        // 10) or binary (base 2) terms, as specified in the column
842        // numeric_precision_radix. For all other data types, this
843        // column is null."
844        //
845        // numeric_radix: If data_type identifies a numeric type, this
846        // column indicates in which base the values in the columns
847        // numeric_precision and numeric_scale are expressed. The
848        // value is either 2 or 10. For all other data types, this
849        // column is null.
850        //
851        // numeric_scale: If data_type identifies an exact numeric
852        // type, this column contains the (declared or implicit) scale
853        // of the type for this column. The scale indicates the number
854        // of significant digits to the right of the decimal point. It
855        // can be expressed in decimal (base 10) or binary (base 2)
856        // terms, as specified in the column
857        // numeric_precision_radix. For all other data types, this
858        // column is null.
859        let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
860            Int8 | UInt8 => (Some(8), Some(2), None),
861            Int16 | UInt16 => (Some(16), Some(2), None),
862            Int32 | UInt32 => (Some(32), Some(2), None),
863            // From max value of 65504 as explained on
864            // https://en.wikipedia.org/wiki/Half-precision_floating-point_format#Exponent_encoding
865            Float16 => (Some(15), Some(2), None),
866            // Numbers from postgres `real` type
867            Float32 => (Some(24), Some(2), None),
868            // Numbers from postgres `double` type
869            Float64 => (Some(24), Some(2), None),
870            Decimal128(precision, scale) => {
871                (Some(*precision as u64), Some(10), Some(*scale as u64))
872            }
873            _ => (None, None, None),
874        };
875
876        self.numeric_precisions.append_option(numeric_precision);
877        self.numeric_precision_radixes.append_option(numeric_radix);
878        self.numeric_scales.append_option(numeric_scale);
879
880        self.datetime_precisions.append_option(None);
881        self.interval_types.append_null();
882    }
883
884    fn finish(&mut self) -> RecordBatch {
885        RecordBatch::try_new(
886            Arc::clone(&self.schema),
887            vec![
888                Arc::new(self.catalog_names.finish()),
889                Arc::new(self.schema_names.finish()),
890                Arc::new(self.table_names.finish()),
891                Arc::new(self.column_names.finish()),
892                Arc::new(self.ordinal_positions.finish()),
893                Arc::new(self.column_defaults.finish()),
894                Arc::new(self.is_nullables.finish()),
895                Arc::new(self.data_types.finish()),
896                Arc::new(self.character_maximum_lengths.finish()),
897                Arc::new(self.character_octet_lengths.finish()),
898                Arc::new(self.numeric_precisions.finish()),
899                Arc::new(self.numeric_precision_radixes.finish()),
900                Arc::new(self.numeric_scales.finish()),
901                Arc::new(self.datetime_precisions.finish()),
902                Arc::new(self.interval_types.finish()),
903            ],
904        )
905        .unwrap()
906    }
907}
908
909#[derive(Debug)]
910struct InformationSchemata {
911    schema: SchemaRef,
912    config: InformationSchemaConfig,
913}
914
915impl InformationSchemata {
916    fn new(config: InformationSchemaConfig) -> Self {
917        let schema = Arc::new(Schema::new(vec![
918            Field::new("catalog_name", DataType::Utf8, false),
919            Field::new("schema_name", DataType::Utf8, false),
920            Field::new("schema_owner", DataType::Utf8, true),
921            Field::new("default_character_set_catalog", DataType::Utf8, true),
922            Field::new("default_character_set_schema", DataType::Utf8, true),
923            Field::new("default_character_set_name", DataType::Utf8, true),
924            Field::new("sql_path", DataType::Utf8, true),
925        ]));
926        Self { schema, config }
927    }
928
929    fn builder(&self) -> InformationSchemataBuilder {
930        InformationSchemataBuilder {
931            schema: Arc::clone(&self.schema),
932            catalog_name: StringBuilder::new(),
933            schema_name: StringBuilder::new(),
934            schema_owner: StringBuilder::new(),
935            default_character_set_catalog: StringBuilder::new(),
936            default_character_set_schema: StringBuilder::new(),
937            default_character_set_name: StringBuilder::new(),
938            sql_path: StringBuilder::new(),
939        }
940    }
941}
942
943struct InformationSchemataBuilder {
944    schema: SchemaRef,
945    catalog_name: StringBuilder,
946    schema_name: StringBuilder,
947    schema_owner: StringBuilder,
948    default_character_set_catalog: StringBuilder,
949    default_character_set_schema: StringBuilder,
950    default_character_set_name: StringBuilder,
951    sql_path: StringBuilder,
952}
953
954impl InformationSchemataBuilder {
955    fn add_schemata(
956        &mut self,
957        catalog_name: &str,
958        schema_name: &str,
959        schema_owner: Option<&str>,
960    ) {
961        self.catalog_name.append_value(catalog_name);
962        self.schema_name.append_value(schema_name);
963        match schema_owner {
964            Some(owner) => self.schema_owner.append_value(owner),
965            None => self.schema_owner.append_null(),
966        }
967        // refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
968        // these rows apply to a feature that is not implemented in DataFusion
969        self.default_character_set_catalog.append_null();
970        self.default_character_set_schema.append_null();
971        self.default_character_set_name.append_null();
972        self.sql_path.append_null();
973    }
974
975    fn finish(&mut self) -> RecordBatch {
976        RecordBatch::try_new(
977            Arc::clone(&self.schema),
978            vec![
979                Arc::new(self.catalog_name.finish()),
980                Arc::new(self.schema_name.finish()),
981                Arc::new(self.schema_owner.finish()),
982                Arc::new(self.default_character_set_catalog.finish()),
983                Arc::new(self.default_character_set_schema.finish()),
984                Arc::new(self.default_character_set_name.finish()),
985                Arc::new(self.sql_path.finish()),
986            ],
987        )
988        .unwrap()
989    }
990}
991
992impl PartitionStream for InformationSchemata {
993    fn schema(&self) -> &SchemaRef {
994        &self.schema
995    }
996
997    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
998        let mut builder = self.builder();
999        let config = self.config.clone();
1000        Box::pin(RecordBatchStreamAdapter::new(
1001            Arc::clone(&self.schema),
1002            // TODO: Stream this
1003            futures::stream::once(async move {
1004                config.make_schemata(&mut builder).await;
1005                Ok(builder.finish())
1006            }),
1007        ))
1008    }
1009}
1010
1011#[derive(Debug)]
1012struct InformationSchemaDfSettings {
1013    schema: SchemaRef,
1014    config: InformationSchemaConfig,
1015}
1016
1017impl InformationSchemaDfSettings {
1018    fn new(config: InformationSchemaConfig) -> Self {
1019        let schema = Arc::new(Schema::new(vec![
1020            Field::new("name", DataType::Utf8, false),
1021            Field::new("value", DataType::Utf8, true),
1022            Field::new("description", DataType::Utf8, true),
1023        ]));
1024
1025        Self { schema, config }
1026    }
1027
1028    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1029        InformationSchemaDfSettingsBuilder {
1030            names: StringBuilder::new(),
1031            values: StringBuilder::new(),
1032            descriptions: StringBuilder::new(),
1033            schema: Arc::clone(&self.schema),
1034        }
1035    }
1036}
1037
1038impl PartitionStream for InformationSchemaDfSettings {
1039    fn schema(&self) -> &SchemaRef {
1040        &self.schema
1041    }
1042
1043    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1044        let config = self.config.clone();
1045        let mut builder = self.builder();
1046        Box::pin(RecordBatchStreamAdapter::new(
1047            Arc::clone(&self.schema),
1048            // TODO: Stream this
1049            futures::stream::once(async move {
1050                // create a mem table with the names of tables
1051                config.make_df_settings(ctx.session_config().options(), &mut builder);
1052                Ok(builder.finish())
1053            }),
1054        ))
1055    }
1056}
1057
1058struct InformationSchemaDfSettingsBuilder {
1059    schema: SchemaRef,
1060    names: StringBuilder,
1061    values: StringBuilder,
1062    descriptions: StringBuilder,
1063}
1064
1065impl InformationSchemaDfSettingsBuilder {
1066    fn add_setting(&mut self, entry: ConfigEntry) {
1067        self.names.append_value(entry.key);
1068        self.values.append_option(entry.value);
1069        self.descriptions.append_value(entry.description);
1070    }
1071
1072    fn finish(&mut self) -> RecordBatch {
1073        RecordBatch::try_new(
1074            Arc::clone(&self.schema),
1075            vec![
1076                Arc::new(self.names.finish()),
1077                Arc::new(self.values.finish()),
1078                Arc::new(self.descriptions.finish()),
1079            ],
1080        )
1081        .unwrap()
1082    }
1083}
1084
1085#[derive(Debug)]
1086struct InformationSchemaRoutines {
1087    schema: SchemaRef,
1088    config: InformationSchemaConfig,
1089}
1090
1091impl InformationSchemaRoutines {
1092    fn new(config: InformationSchemaConfig) -> Self {
1093        let schema = Arc::new(Schema::new(vec![
1094            Field::new("specific_catalog", DataType::Utf8, false),
1095            Field::new("specific_schema", DataType::Utf8, false),
1096            Field::new("specific_name", DataType::Utf8, false),
1097            Field::new("routine_catalog", DataType::Utf8, false),
1098            Field::new("routine_schema", DataType::Utf8, false),
1099            Field::new("routine_name", DataType::Utf8, false),
1100            Field::new("routine_type", DataType::Utf8, false),
1101            Field::new("is_deterministic", DataType::Boolean, true),
1102            Field::new("data_type", DataType::Utf8, true),
1103            Field::new("function_type", DataType::Utf8, true),
1104            Field::new("description", DataType::Utf8, true),
1105            Field::new("syntax_example", DataType::Utf8, true),
1106        ]));
1107
1108        Self { schema, config }
1109    }
1110
1111    fn builder(&self) -> InformationSchemaRoutinesBuilder {
1112        InformationSchemaRoutinesBuilder {
1113            schema: Arc::clone(&self.schema),
1114            specific_catalog: StringBuilder::new(),
1115            specific_schema: StringBuilder::new(),
1116            specific_name: StringBuilder::new(),
1117            routine_catalog: StringBuilder::new(),
1118            routine_schema: StringBuilder::new(),
1119            routine_name: StringBuilder::new(),
1120            routine_type: StringBuilder::new(),
1121            is_deterministic: BooleanBuilder::new(),
1122            data_type: StringBuilder::new(),
1123            function_type: StringBuilder::new(),
1124            description: StringBuilder::new(),
1125            syntax_example: StringBuilder::new(),
1126        }
1127    }
1128}
1129
1130struct InformationSchemaRoutinesBuilder {
1131    schema: SchemaRef,
1132    specific_catalog: StringBuilder,
1133    specific_schema: StringBuilder,
1134    specific_name: StringBuilder,
1135    routine_catalog: StringBuilder,
1136    routine_schema: StringBuilder,
1137    routine_name: StringBuilder,
1138    routine_type: StringBuilder,
1139    is_deterministic: BooleanBuilder,
1140    data_type: StringBuilder,
1141    function_type: StringBuilder,
1142    description: StringBuilder,
1143    syntax_example: StringBuilder,
1144}
1145
1146impl InformationSchemaRoutinesBuilder {
1147    #[allow(clippy::too_many_arguments)]
1148    fn add_routine(
1149        &mut self,
1150        catalog_name: impl AsRef<str>,
1151        schema_name: impl AsRef<str>,
1152        routine_name: impl AsRef<str>,
1153        routine_type: impl AsRef<str>,
1154        is_deterministic: bool,
1155        data_type: Option<impl AsRef<str>>,
1156        function_type: impl AsRef<str>,
1157        description: Option<impl AsRef<str>>,
1158        syntax_example: Option<impl AsRef<str>>,
1159    ) {
1160        self.specific_catalog.append_value(catalog_name.as_ref());
1161        self.specific_schema.append_value(schema_name.as_ref());
1162        self.specific_name.append_value(routine_name.as_ref());
1163        self.routine_catalog.append_value(catalog_name.as_ref());
1164        self.routine_schema.append_value(schema_name.as_ref());
1165        self.routine_name.append_value(routine_name.as_ref());
1166        self.routine_type.append_value(routine_type.as_ref());
1167        self.is_deterministic.append_value(is_deterministic);
1168        self.data_type.append_option(data_type.as_ref());
1169        self.function_type.append_value(function_type.as_ref());
1170        self.description.append_option(description);
1171        self.syntax_example.append_option(syntax_example);
1172    }
1173
1174    fn finish(&mut self) -> RecordBatch {
1175        RecordBatch::try_new(
1176            Arc::clone(&self.schema),
1177            vec![
1178                Arc::new(self.specific_catalog.finish()),
1179                Arc::new(self.specific_schema.finish()),
1180                Arc::new(self.specific_name.finish()),
1181                Arc::new(self.routine_catalog.finish()),
1182                Arc::new(self.routine_schema.finish()),
1183                Arc::new(self.routine_name.finish()),
1184                Arc::new(self.routine_type.finish()),
1185                Arc::new(self.is_deterministic.finish()),
1186                Arc::new(self.data_type.finish()),
1187                Arc::new(self.function_type.finish()),
1188                Arc::new(self.description.finish()),
1189                Arc::new(self.syntax_example.finish()),
1190            ],
1191        )
1192        .unwrap()
1193    }
1194}
1195
1196impl PartitionStream for InformationSchemaRoutines {
1197    fn schema(&self) -> &SchemaRef {
1198        &self.schema
1199    }
1200
1201    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1202        let config = self.config.clone();
1203        let mut builder = self.builder();
1204        Box::pin(RecordBatchStreamAdapter::new(
1205            Arc::clone(&self.schema),
1206            futures::stream::once(async move {
1207                config.make_routines(
1208                    ctx.scalar_functions(),
1209                    ctx.aggregate_functions(),
1210                    ctx.window_functions(),
1211                    ctx.session_config().options(),
1212                    &mut builder,
1213                )?;
1214                Ok(builder.finish())
1215            }),
1216        ))
1217    }
1218}
1219
1220#[derive(Debug)]
1221struct InformationSchemaParameters {
1222    schema: SchemaRef,
1223    config: InformationSchemaConfig,
1224}
1225
1226impl InformationSchemaParameters {
1227    fn new(config: InformationSchemaConfig) -> Self {
1228        let schema = Arc::new(Schema::new(vec![
1229            Field::new("specific_catalog", DataType::Utf8, false),
1230            Field::new("specific_schema", DataType::Utf8, false),
1231            Field::new("specific_name", DataType::Utf8, false),
1232            Field::new("ordinal_position", DataType::UInt64, false),
1233            Field::new("parameter_mode", DataType::Utf8, false),
1234            Field::new("parameter_name", DataType::Utf8, true),
1235            Field::new("data_type", DataType::Utf8, false),
1236            Field::new("parameter_default", DataType::Utf8, true),
1237            Field::new("is_variadic", DataType::Boolean, false),
1238            // `rid` (short for `routine id`) is used to differentiate parameters from different signatures
1239            // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
1240            // For example, the following signatures have different `rid` values:
1241            //     - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
1242            //     - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
1243            Field::new("rid", DataType::UInt8, false),
1244        ]));
1245
1246        Self { schema, config }
1247    }
1248
1249    fn builder(&self) -> InformationSchemaParametersBuilder {
1250        InformationSchemaParametersBuilder {
1251            schema: Arc::clone(&self.schema),
1252            specific_catalog: StringBuilder::new(),
1253            specific_schema: StringBuilder::new(),
1254            specific_name: StringBuilder::new(),
1255            ordinal_position: UInt64Builder::new(),
1256            parameter_mode: StringBuilder::new(),
1257            parameter_name: StringBuilder::new(),
1258            data_type: StringBuilder::new(),
1259            parameter_default: StringBuilder::new(),
1260            is_variadic: BooleanBuilder::new(),
1261            rid: UInt8Builder::new(),
1262        }
1263    }
1264}
1265
1266struct InformationSchemaParametersBuilder {
1267    schema: SchemaRef,
1268    specific_catalog: StringBuilder,
1269    specific_schema: StringBuilder,
1270    specific_name: StringBuilder,
1271    ordinal_position: UInt64Builder,
1272    parameter_mode: StringBuilder,
1273    parameter_name: StringBuilder,
1274    data_type: StringBuilder,
1275    parameter_default: StringBuilder,
1276    is_variadic: BooleanBuilder,
1277    rid: UInt8Builder,
1278}
1279
1280impl InformationSchemaParametersBuilder {
1281    #[allow(clippy::too_many_arguments)]
1282    fn add_parameter(
1283        &mut self,
1284        specific_catalog: impl AsRef<str>,
1285        specific_schema: impl AsRef<str>,
1286        specific_name: impl AsRef<str>,
1287        ordinal_position: u64,
1288        parameter_mode: impl AsRef<str>,
1289        parameter_name: Option<impl AsRef<str>>,
1290        data_type: impl AsRef<str>,
1291        parameter_default: Option<impl AsRef<str>>,
1292        is_variadic: bool,
1293        rid: u8,
1294    ) {
1295        self.specific_catalog
1296            .append_value(specific_catalog.as_ref());
1297        self.specific_schema.append_value(specific_schema.as_ref());
1298        self.specific_name.append_value(specific_name.as_ref());
1299        self.ordinal_position.append_value(ordinal_position);
1300        self.parameter_mode.append_value(parameter_mode.as_ref());
1301        self.parameter_name.append_option(parameter_name.as_ref());
1302        self.data_type.append_value(data_type.as_ref());
1303        self.parameter_default.append_option(parameter_default);
1304        self.is_variadic.append_value(is_variadic);
1305        self.rid.append_value(rid);
1306    }
1307
1308    fn finish(&mut self) -> RecordBatch {
1309        RecordBatch::try_new(
1310            Arc::clone(&self.schema),
1311            vec![
1312                Arc::new(self.specific_catalog.finish()),
1313                Arc::new(self.specific_schema.finish()),
1314                Arc::new(self.specific_name.finish()),
1315                Arc::new(self.ordinal_position.finish()),
1316                Arc::new(self.parameter_mode.finish()),
1317                Arc::new(self.parameter_name.finish()),
1318                Arc::new(self.data_type.finish()),
1319                Arc::new(self.parameter_default.finish()),
1320                Arc::new(self.is_variadic.finish()),
1321                Arc::new(self.rid.finish()),
1322            ],
1323        )
1324        .unwrap()
1325    }
1326}
1327
1328impl PartitionStream for InformationSchemaParameters {
1329    fn schema(&self) -> &SchemaRef {
1330        &self.schema
1331    }
1332
1333    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1334        let config = self.config.clone();
1335        let mut builder = self.builder();
1336        Box::pin(RecordBatchStreamAdapter::new(
1337            Arc::clone(&self.schema),
1338            futures::stream::once(async move {
1339                config.make_parameters(
1340                    ctx.scalar_functions(),
1341                    ctx.aggregate_functions(),
1342                    ctx.window_functions(),
1343                    ctx.session_config().options(),
1344                    &mut builder,
1345                )?;
1346                Ok(builder.finish())
1347            }),
1348        ))
1349    }
1350}