datafusion_catalog/
information_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
19//!
20//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
21
22use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26    array::{StringBuilder, UInt64Builder},
27    datatypes::{DataType, Field, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::DataFusionError;
32use datafusion_common::config::{ConfigEntry, ConfigOptions};
33use datafusion_common::error::Result;
34use datafusion_common::types::NativeType;
35use datafusion_execution::TaskContext;
36use datafusion_execution::runtime_env::RuntimeEnv;
37use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
38use datafusion_expr::{TableType, Volatility};
39use datafusion_physical_plan::SendableRecordBatchStream;
40use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
41use datafusion_physical_plan::streaming::PartitionStream;
42use std::collections::{BTreeSet, HashMap, HashSet};
43use std::fmt::Debug;
44use std::{any::Any, sync::Arc};
45
46pub const INFORMATION_SCHEMA: &str = "information_schema";
47pub(crate) const TABLES: &str = "tables";
48pub(crate) const VIEWS: &str = "views";
49pub(crate) const COLUMNS: &str = "columns";
50pub(crate) const DF_SETTINGS: &str = "df_settings";
51pub(crate) const SCHEMATA: &str = "schemata";
52pub(crate) const ROUTINES: &str = "routines";
53pub(crate) const PARAMETERS: &str = "parameters";
54
55/// All information schema tables
56pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
57    TABLES,
58    VIEWS,
59    COLUMNS,
60    DF_SETTINGS,
61    SCHEMATA,
62    ROUTINES,
63    PARAMETERS,
64];
65
66/// Implements the `information_schema` virtual schema and tables
67///
68/// The underlying tables in the `information_schema` are created on
69/// demand. This means that if more tables are added to the underlying
70/// providers, they will appear the next time the `information_schema`
71/// table is queried.
72#[derive(Debug)]
73pub struct InformationSchemaProvider {
74    config: InformationSchemaConfig,
75}
76
77impl InformationSchemaProvider {
78    /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
79    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
80        Self {
81            config: InformationSchemaConfig { catalog_list },
82        }
83    }
84}
85
86#[derive(Clone, Debug)]
87struct InformationSchemaConfig {
88    catalog_list: Arc<dyn CatalogProviderList>,
89}
90
91impl InformationSchemaConfig {
92    /// Construct the `information_schema.tables` virtual table
93    async fn make_tables(
94        &self,
95        builder: &mut InformationSchemaTablesBuilder,
96    ) -> Result<(), DataFusionError> {
97        // create a mem table with the names of tables
98
99        for catalog_name in self.catalog_list.catalog_names() {
100            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
101
102            for schema_name in catalog.schema_names() {
103                if schema_name != INFORMATION_SCHEMA {
104                    // schema name may not exist in the catalog, so we need to check
105                    if let Some(schema) = catalog.schema(&schema_name) {
106                        for table_name in schema.table_names() {
107                            if let Some(table_type) =
108                                schema.table_type(&table_name).await?
109                            {
110                                builder.add_table(
111                                    &catalog_name,
112                                    &schema_name,
113                                    &table_name,
114                                    table_type,
115                                );
116                            }
117                        }
118                    }
119                }
120            }
121
122            // Add a final list for the information schema tables themselves
123            for table_name in INFORMATION_SCHEMA_TABLES {
124                builder.add_table(
125                    &catalog_name,
126                    INFORMATION_SCHEMA,
127                    table_name,
128                    TableType::View,
129                );
130            }
131        }
132
133        Ok(())
134    }
135
136    async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
137        for catalog_name in self.catalog_list.catalog_names() {
138            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
139
140            for schema_name in catalog.schema_names() {
141                if schema_name != INFORMATION_SCHEMA
142                    && let Some(schema) = catalog.schema(&schema_name)
143                {
144                    let schema_owner = schema.owner_name();
145                    builder.add_schemata(&catalog_name, &schema_name, schema_owner);
146                }
147            }
148        }
149    }
150
151    async fn make_views(
152        &self,
153        builder: &mut InformationSchemaViewBuilder,
154    ) -> Result<(), DataFusionError> {
155        for catalog_name in self.catalog_list.catalog_names() {
156            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
157
158            for schema_name in catalog.schema_names() {
159                if schema_name != INFORMATION_SCHEMA {
160                    // schema name may not exist in the catalog, so we need to check
161                    if let Some(schema) = catalog.schema(&schema_name) {
162                        for table_name in schema.table_names() {
163                            if let Some(table) = schema.table(&table_name).await? {
164                                builder.add_view(
165                                    &catalog_name,
166                                    &schema_name,
167                                    &table_name,
168                                    table.get_table_definition(),
169                                )
170                            }
171                        }
172                    }
173                }
174            }
175        }
176
177        Ok(())
178    }
179
180    /// Construct the `information_schema.columns` virtual table
181    async fn make_columns(
182        &self,
183        builder: &mut InformationSchemaColumnsBuilder,
184    ) -> Result<(), DataFusionError> {
185        for catalog_name in self.catalog_list.catalog_names() {
186            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
187
188            for schema_name in catalog.schema_names() {
189                if schema_name != INFORMATION_SCHEMA {
190                    // schema name may not exist in the catalog, so we need to check
191                    if let Some(schema) = catalog.schema(&schema_name) {
192                        for table_name in schema.table_names() {
193                            if let Some(table) = schema.table(&table_name).await? {
194                                for (field_position, field) in
195                                    table.schema().fields().iter().enumerate()
196                                {
197                                    builder.add_column(
198                                        &catalog_name,
199                                        &schema_name,
200                                        &table_name,
201                                        field_position,
202                                        field,
203                                    )
204                                }
205                            }
206                        }
207                    }
208                }
209            }
210        }
211
212        Ok(())
213    }
214
215    /// Construct the `information_schema.df_settings` virtual table
216    fn make_df_settings(
217        &self,
218        config_options: &ConfigOptions,
219        runtime_env: &Arc<RuntimeEnv>,
220        builder: &mut InformationSchemaDfSettingsBuilder,
221    ) {
222        for entry in config_options.entries() {
223            builder.add_setting(entry);
224        }
225        // Add runtime configuration entries
226        for entry in runtime_env.config_entries() {
227            builder.add_setting(entry);
228        }
229    }
230
231    fn make_routines(
232        &self,
233        udfs: &HashMap<String, Arc<ScalarUDF>>,
234        udafs: &HashMap<String, Arc<AggregateUDF>>,
235        udwfs: &HashMap<String, Arc<WindowUDF>>,
236        config_options: &ConfigOptions,
237        builder: &mut InformationSchemaRoutinesBuilder,
238    ) -> Result<()> {
239        let catalog_name = &config_options.catalog.default_catalog;
240        let schema_name = &config_options.catalog.default_schema;
241
242        for (name, udf) in udfs {
243            let return_types = get_udf_args_and_return_types(udf)?
244                .into_iter()
245                .map(|(_, return_type)| return_type)
246                .collect::<HashSet<_>>();
247            for return_type in return_types {
248                builder.add_routine(
249                    catalog_name,
250                    schema_name,
251                    name,
252                    "FUNCTION",
253                    Self::is_deterministic(udf.signature()),
254                    return_type.as_ref(),
255                    "SCALAR",
256                    udf.documentation().map(|d| d.description.to_string()),
257                    udf.documentation().map(|d| d.syntax_example.to_string()),
258                )
259            }
260        }
261
262        for (name, udaf) in udafs {
263            let return_types = get_udaf_args_and_return_types(udaf)?
264                .into_iter()
265                .map(|(_, return_type)| return_type)
266                .collect::<HashSet<_>>();
267            for return_type in return_types {
268                builder.add_routine(
269                    catalog_name,
270                    schema_name,
271                    name,
272                    "FUNCTION",
273                    Self::is_deterministic(udaf.signature()),
274                    return_type.as_ref(),
275                    "AGGREGATE",
276                    udaf.documentation().map(|d| d.description.to_string()),
277                    udaf.documentation().map(|d| d.syntax_example.to_string()),
278                )
279            }
280        }
281
282        for (name, udwf) in udwfs {
283            let return_types = get_udwf_args_and_return_types(udwf)?
284                .into_iter()
285                .map(|(_, return_type)| return_type)
286                .collect::<HashSet<_>>();
287            for return_type in return_types {
288                builder.add_routine(
289                    catalog_name,
290                    schema_name,
291                    name,
292                    "FUNCTION",
293                    Self::is_deterministic(udwf.signature()),
294                    return_type.as_ref(),
295                    "WINDOW",
296                    udwf.documentation().map(|d| d.description.to_string()),
297                    udwf.documentation().map(|d| d.syntax_example.to_string()),
298                )
299            }
300        }
301        Ok(())
302    }
303
304    fn is_deterministic(signature: &Signature) -> bool {
305        signature.volatility == Volatility::Immutable
306    }
307    fn make_parameters(
308        &self,
309        udfs: &HashMap<String, Arc<ScalarUDF>>,
310        udafs: &HashMap<String, Arc<AggregateUDF>>,
311        udwfs: &HashMap<String, Arc<WindowUDF>>,
312        config_options: &ConfigOptions,
313        builder: &mut InformationSchemaParametersBuilder,
314    ) -> Result<()> {
315        let catalog_name = &config_options.catalog.default_catalog;
316        let schema_name = &config_options.catalog.default_schema;
317        let mut add_parameters = |func_name: &str,
318                                  args: Option<&Vec<(String, String)>>,
319                                  arg_types: Vec<String>,
320                                  return_type: Option<String>,
321                                  is_variadic: bool,
322                                  rid: u8| {
323            for (position, type_name) in arg_types.iter().enumerate() {
324                let param_name =
325                    args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
326                builder.add_parameter(
327                    catalog_name,
328                    schema_name,
329                    func_name,
330                    position as u64 + 1,
331                    "IN",
332                    param_name,
333                    type_name,
334                    None::<&str>,
335                    is_variadic,
336                    rid,
337                );
338            }
339            if let Some(return_type) = return_type {
340                builder.add_parameter(
341                    catalog_name,
342                    schema_name,
343                    func_name,
344                    1,
345                    "OUT",
346                    None::<&str>,
347                    return_type.as_str(),
348                    None::<&str>,
349                    false,
350                    rid,
351                );
352            }
353        };
354
355        for (func_name, udf) in udfs {
356            let args = udf.documentation().and_then(|d| d.arguments.clone());
357            let combinations = get_udf_args_and_return_types(udf)?;
358            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
359                add_parameters(
360                    func_name,
361                    args.as_ref(),
362                    arg_types,
363                    return_type,
364                    Self::is_variadic(udf.signature()),
365                    rid as u8,
366                );
367            }
368        }
369
370        for (func_name, udaf) in udafs {
371            let args = udaf.documentation().and_then(|d| d.arguments.clone());
372            let combinations = get_udaf_args_and_return_types(udaf)?;
373            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
374                add_parameters(
375                    func_name,
376                    args.as_ref(),
377                    arg_types,
378                    return_type,
379                    Self::is_variadic(udaf.signature()),
380                    rid as u8,
381                );
382            }
383        }
384
385        for (func_name, udwf) in udwfs {
386            let args = udwf.documentation().and_then(|d| d.arguments.clone());
387            let combinations = get_udwf_args_and_return_types(udwf)?;
388            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
389                add_parameters(
390                    func_name,
391                    args.as_ref(),
392                    arg_types,
393                    return_type,
394                    Self::is_variadic(udwf.signature()),
395                    rid as u8,
396                );
397            }
398        }
399
400        Ok(())
401    }
402
403    fn is_variadic(signature: &Signature) -> bool {
404        matches!(
405            signature.type_signature,
406            TypeSignature::Variadic(_) | TypeSignature::VariadicAny
407        )
408    }
409}
410
411/// get the arguments and return types of a UDF
412/// returns a tuple of (arg_types, return_type)
413fn get_udf_args_and_return_types(
414    udf: &Arc<ScalarUDF>,
415) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
416    let signature = udf.signature();
417    let arg_types = signature.type_signature.get_example_types();
418    if arg_types.is_empty() {
419        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
420    } else {
421        Ok(arg_types
422            .into_iter()
423            .map(|arg_types| {
424                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
425                let return_type = udf
426                    .return_type(&arg_types)
427                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
428                    .ok();
429                let arg_types = arg_types
430                    .into_iter()
431                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
432                    .collect::<Vec<_>>();
433                (arg_types, return_type)
434            })
435            .collect::<BTreeSet<_>>())
436    }
437}
438
439fn get_udaf_args_and_return_types(
440    udaf: &Arc<AggregateUDF>,
441) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
442    let signature = udaf.signature();
443    let arg_types = signature.type_signature.get_example_types();
444    if arg_types.is_empty() {
445        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
446    } else {
447        Ok(arg_types
448            .into_iter()
449            .map(|arg_types| {
450                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
451                let return_type = udaf
452                    .return_type(&arg_types)
453                    .ok()
454                    .map(|t| remove_native_type_prefix(&NativeType::from(t)));
455                let arg_types = arg_types
456                    .into_iter()
457                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
458                    .collect::<Vec<_>>();
459                (arg_types, return_type)
460            })
461            .collect::<BTreeSet<_>>())
462    }
463}
464
465fn get_udwf_args_and_return_types(
466    udwf: &Arc<WindowUDF>,
467) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
468    let signature = udwf.signature();
469    let arg_types = signature.type_signature.get_example_types();
470    if arg_types.is_empty() {
471        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
472    } else {
473        Ok(arg_types
474            .into_iter()
475            .map(|arg_types| {
476                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
477                let arg_types = arg_types
478                    .into_iter()
479                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
480                    .collect::<Vec<_>>();
481                (arg_types, None)
482            })
483            .collect::<BTreeSet<_>>())
484    }
485}
486
487#[inline]
488fn remove_native_type_prefix(native_type: &NativeType) -> String {
489    format!("{native_type}")
490}
491
492#[async_trait]
493impl SchemaProvider for InformationSchemaProvider {
494    fn as_any(&self) -> &dyn Any {
495        self
496    }
497
498    fn table_names(&self) -> Vec<String> {
499        INFORMATION_SCHEMA_TABLES
500            .iter()
501            .map(|t| (*t).to_string())
502            .collect()
503    }
504
505    async fn table(
506        &self,
507        name: &str,
508    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
509        let config = self.config.clone();
510        let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
511            TABLES => Arc::new(InformationSchemaTables::new(config)),
512            COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
513            VIEWS => Arc::new(InformationSchemaViews::new(config)),
514            DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
515            SCHEMATA => Arc::new(InformationSchemata::new(config)),
516            ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
517            PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
518            _ => return Ok(None),
519        };
520
521        Ok(Some(Arc::new(
522            StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
523        )))
524    }
525
526    fn table_exist(&self, name: &str) -> bool {
527        INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
528    }
529}
530
531#[derive(Debug)]
532struct InformationSchemaTables {
533    schema: SchemaRef,
534    config: InformationSchemaConfig,
535}
536
537impl InformationSchemaTables {
538    fn new(config: InformationSchemaConfig) -> Self {
539        let schema = Arc::new(Schema::new(vec![
540            Field::new("table_catalog", DataType::Utf8, false),
541            Field::new("table_schema", DataType::Utf8, false),
542            Field::new("table_name", DataType::Utf8, false),
543            Field::new("table_type", DataType::Utf8, false),
544        ]));
545
546        Self { schema, config }
547    }
548
549    fn builder(&self) -> InformationSchemaTablesBuilder {
550        InformationSchemaTablesBuilder {
551            catalog_names: StringBuilder::new(),
552            schema_names: StringBuilder::new(),
553            table_names: StringBuilder::new(),
554            table_types: StringBuilder::new(),
555            schema: Arc::clone(&self.schema),
556        }
557    }
558}
559
560impl PartitionStream for InformationSchemaTables {
561    fn schema(&self) -> &SchemaRef {
562        &self.schema
563    }
564
565    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
566        let mut builder = self.builder();
567        let config = self.config.clone();
568        Box::pin(RecordBatchStreamAdapter::new(
569            Arc::clone(&self.schema),
570            // TODO: Stream this
571            futures::stream::once(async move {
572                config.make_tables(&mut builder).await?;
573                Ok(builder.finish())
574            }),
575        ))
576    }
577}
578
579/// Builds the `information_schema.TABLE` table row by row
580///
581/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
582struct InformationSchemaTablesBuilder {
583    schema: SchemaRef,
584    catalog_names: StringBuilder,
585    schema_names: StringBuilder,
586    table_names: StringBuilder,
587    table_types: StringBuilder,
588}
589
590impl InformationSchemaTablesBuilder {
591    fn add_table(
592        &mut self,
593        catalog_name: impl AsRef<str>,
594        schema_name: impl AsRef<str>,
595        table_name: impl AsRef<str>,
596        table_type: TableType,
597    ) {
598        // Note: append_value is actually infallible.
599        self.catalog_names.append_value(catalog_name.as_ref());
600        self.schema_names.append_value(schema_name.as_ref());
601        self.table_names.append_value(table_name.as_ref());
602        self.table_types.append_value(match table_type {
603            TableType::Base => "BASE TABLE",
604            TableType::View => "VIEW",
605            TableType::Temporary => "LOCAL TEMPORARY",
606        });
607    }
608
609    fn finish(&mut self) -> RecordBatch {
610        RecordBatch::try_new(
611            Arc::clone(&self.schema),
612            vec![
613                Arc::new(self.catalog_names.finish()),
614                Arc::new(self.schema_names.finish()),
615                Arc::new(self.table_names.finish()),
616                Arc::new(self.table_types.finish()),
617            ],
618        )
619        .unwrap()
620    }
621}
622
623#[derive(Debug)]
624struct InformationSchemaViews {
625    schema: SchemaRef,
626    config: InformationSchemaConfig,
627}
628
629impl InformationSchemaViews {
630    fn new(config: InformationSchemaConfig) -> Self {
631        let schema = Arc::new(Schema::new(vec![
632            Field::new("table_catalog", DataType::Utf8, false),
633            Field::new("table_schema", DataType::Utf8, false),
634            Field::new("table_name", DataType::Utf8, false),
635            Field::new("definition", DataType::Utf8, true),
636        ]));
637
638        Self { schema, config }
639    }
640
641    fn builder(&self) -> InformationSchemaViewBuilder {
642        InformationSchemaViewBuilder {
643            catalog_names: StringBuilder::new(),
644            schema_names: StringBuilder::new(),
645            table_names: StringBuilder::new(),
646            definitions: StringBuilder::new(),
647            schema: Arc::clone(&self.schema),
648        }
649    }
650}
651
652impl PartitionStream for InformationSchemaViews {
653    fn schema(&self) -> &SchemaRef {
654        &self.schema
655    }
656
657    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
658        let mut builder = self.builder();
659        let config = self.config.clone();
660        Box::pin(RecordBatchStreamAdapter::new(
661            Arc::clone(&self.schema),
662            // TODO: Stream this
663            futures::stream::once(async move {
664                config.make_views(&mut builder).await?;
665                Ok(builder.finish())
666            }),
667        ))
668    }
669}
670
671/// Builds the `information_schema.VIEWS` table row by row
672///
673/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
674struct InformationSchemaViewBuilder {
675    schema: SchemaRef,
676    catalog_names: StringBuilder,
677    schema_names: StringBuilder,
678    table_names: StringBuilder,
679    definitions: StringBuilder,
680}
681
682impl InformationSchemaViewBuilder {
683    fn add_view(
684        &mut self,
685        catalog_name: impl AsRef<str>,
686        schema_name: impl AsRef<str>,
687        table_name: impl AsRef<str>,
688        definition: Option<&(impl AsRef<str> + ?Sized)>,
689    ) {
690        // Note: append_value is actually infallible.
691        self.catalog_names.append_value(catalog_name.as_ref());
692        self.schema_names.append_value(schema_name.as_ref());
693        self.table_names.append_value(table_name.as_ref());
694        self.definitions.append_option(definition.as_ref());
695    }
696
697    fn finish(&mut self) -> RecordBatch {
698        RecordBatch::try_new(
699            Arc::clone(&self.schema),
700            vec![
701                Arc::new(self.catalog_names.finish()),
702                Arc::new(self.schema_names.finish()),
703                Arc::new(self.table_names.finish()),
704                Arc::new(self.definitions.finish()),
705            ],
706        )
707        .unwrap()
708    }
709}
710
711#[derive(Debug)]
712struct InformationSchemaColumns {
713    schema: SchemaRef,
714    config: InformationSchemaConfig,
715}
716
717impl InformationSchemaColumns {
718    fn new(config: InformationSchemaConfig) -> Self {
719        let schema = Arc::new(Schema::new(vec![
720            Field::new("table_catalog", DataType::Utf8, false),
721            Field::new("table_schema", DataType::Utf8, false),
722            Field::new("table_name", DataType::Utf8, false),
723            Field::new("column_name", DataType::Utf8, false),
724            Field::new("ordinal_position", DataType::UInt64, false),
725            Field::new("column_default", DataType::Utf8, true),
726            Field::new("is_nullable", DataType::Utf8, false),
727            Field::new("data_type", DataType::Utf8, false),
728            Field::new("character_maximum_length", DataType::UInt64, true),
729            Field::new("character_octet_length", DataType::UInt64, true),
730            Field::new("numeric_precision", DataType::UInt64, true),
731            Field::new("numeric_precision_radix", DataType::UInt64, true),
732            Field::new("numeric_scale", DataType::UInt64, true),
733            Field::new("datetime_precision", DataType::UInt64, true),
734            Field::new("interval_type", DataType::Utf8, true),
735        ]));
736
737        Self { schema, config }
738    }
739
740    fn builder(&self) -> InformationSchemaColumnsBuilder {
741        // StringBuilder requires providing an initial capacity, so
742        // pick 10 here arbitrarily as this is not performance
743        // critical code and the number of tables is unavailable here.
744        let default_capacity = 10;
745
746        InformationSchemaColumnsBuilder {
747            catalog_names: StringBuilder::new(),
748            schema_names: StringBuilder::new(),
749            table_names: StringBuilder::new(),
750            column_names: StringBuilder::new(),
751            ordinal_positions: UInt64Builder::with_capacity(default_capacity),
752            column_defaults: StringBuilder::new(),
753            is_nullables: StringBuilder::new(),
754            data_types: StringBuilder::new(),
755            character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
756            character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
757            numeric_precisions: UInt64Builder::with_capacity(default_capacity),
758            numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
759            numeric_scales: UInt64Builder::with_capacity(default_capacity),
760            datetime_precisions: UInt64Builder::with_capacity(default_capacity),
761            interval_types: StringBuilder::new(),
762            schema: Arc::clone(&self.schema),
763        }
764    }
765}
766
767impl PartitionStream for InformationSchemaColumns {
768    fn schema(&self) -> &SchemaRef {
769        &self.schema
770    }
771
772    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
773        let mut builder = self.builder();
774        let config = self.config.clone();
775        Box::pin(RecordBatchStreamAdapter::new(
776            Arc::clone(&self.schema),
777            // TODO: Stream this
778            futures::stream::once(async move {
779                config.make_columns(&mut builder).await?;
780                Ok(builder.finish())
781            }),
782        ))
783    }
784}
785
786/// Builds the `information_schema.COLUMNS` table row by row
787///
788/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
789struct InformationSchemaColumnsBuilder {
790    schema: SchemaRef,
791    catalog_names: StringBuilder,
792    schema_names: StringBuilder,
793    table_names: StringBuilder,
794    column_names: StringBuilder,
795    ordinal_positions: UInt64Builder,
796    column_defaults: StringBuilder,
797    is_nullables: StringBuilder,
798    data_types: StringBuilder,
799    character_maximum_lengths: UInt64Builder,
800    character_octet_lengths: UInt64Builder,
801    numeric_precisions: UInt64Builder,
802    numeric_precision_radixes: UInt64Builder,
803    numeric_scales: UInt64Builder,
804    datetime_precisions: UInt64Builder,
805    interval_types: StringBuilder,
806}
807
808impl InformationSchemaColumnsBuilder {
809    fn add_column(
810        &mut self,
811        catalog_name: &str,
812        schema_name: &str,
813        table_name: &str,
814        field_position: usize,
815        field: &Field,
816    ) {
817        use DataType::*;
818
819        // Note: append_value is actually infallible.
820        self.catalog_names.append_value(catalog_name);
821        self.schema_names.append_value(schema_name);
822        self.table_names.append_value(table_name);
823
824        self.column_names.append_value(field.name());
825
826        self.ordinal_positions.append_value(field_position as u64);
827
828        // DataFusion does not support column default values, so null
829        self.column_defaults.append_null();
830
831        // "YES if the column is possibly nullable, NO if it is known not nullable. "
832        let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
833        self.is_nullables.append_value(nullable_str);
834
835        // "System supplied type" --> Use debug format of the datatype
836        self.data_types.append_value(field.data_type().to_string());
837
838        // "If data_type identifies a character or bit string type, the
839        // declared maximum length; null for all other data types or
840        // if no maximum length was declared."
841        //
842        // Arrow has no equivalent of VARCHAR(20), so we leave this as Null
843        let max_chars = None;
844        self.character_maximum_lengths.append_option(max_chars);
845
846        // "Maximum length, in bytes, for binary data, character data,
847        // or text and image data."
848        let char_len: Option<u64> = match field.data_type() {
849            Utf8 | Binary => Some(i32::MAX as u64),
850            LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
851            _ => None,
852        };
853        self.character_octet_lengths.append_option(char_len);
854
855        // numeric_precision: "If data_type identifies a numeric type, this column
856        // contains the (declared or implicit) precision of the type
857        // for this column. The precision indicates the number of
858        // significant digits. It can be expressed in decimal (base
859        // 10) or binary (base 2) terms, as specified in the column
860        // numeric_precision_radix. For all other data types, this
861        // column is null."
862        //
863        // numeric_radix: If data_type identifies a numeric type, this
864        // column indicates in which base the values in the columns
865        // numeric_precision and numeric_scale are expressed. The
866        // value is either 2 or 10. For all other data types, this
867        // column is null.
868        //
869        // numeric_scale: If data_type identifies an exact numeric
870        // type, this column contains the (declared or implicit) scale
871        // of the type for this column. The scale indicates the number
872        // of significant digits to the right of the decimal point. It
873        // can be expressed in decimal (base 10) or binary (base 2)
874        // terms, as specified in the column
875        // numeric_precision_radix. For all other data types, this
876        // column is null.
877        let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
878            Int8 | UInt8 => (Some(8), Some(2), None),
879            Int16 | UInt16 => (Some(16), Some(2), None),
880            Int32 | UInt32 => (Some(32), Some(2), None),
881            // From max value of 65504 as explained on
882            // https://en.wikipedia.org/wiki/Half-precision_floating-point_format#Exponent_encoding
883            Float16 => (Some(15), Some(2), None),
884            // Numbers from postgres `real` type
885            Float32 => (Some(24), Some(2), None),
886            // Numbers from postgres `double` type
887            Float64 => (Some(24), Some(2), None),
888            Decimal128(precision, scale) => {
889                (Some(*precision as u64), Some(10), Some(*scale as u64))
890            }
891            _ => (None, None, None),
892        };
893
894        self.numeric_precisions.append_option(numeric_precision);
895        self.numeric_precision_radixes.append_option(numeric_radix);
896        self.numeric_scales.append_option(numeric_scale);
897
898        self.datetime_precisions.append_option(None);
899        self.interval_types.append_null();
900    }
901
902    fn finish(&mut self) -> RecordBatch {
903        RecordBatch::try_new(
904            Arc::clone(&self.schema),
905            vec![
906                Arc::new(self.catalog_names.finish()),
907                Arc::new(self.schema_names.finish()),
908                Arc::new(self.table_names.finish()),
909                Arc::new(self.column_names.finish()),
910                Arc::new(self.ordinal_positions.finish()),
911                Arc::new(self.column_defaults.finish()),
912                Arc::new(self.is_nullables.finish()),
913                Arc::new(self.data_types.finish()),
914                Arc::new(self.character_maximum_lengths.finish()),
915                Arc::new(self.character_octet_lengths.finish()),
916                Arc::new(self.numeric_precisions.finish()),
917                Arc::new(self.numeric_precision_radixes.finish()),
918                Arc::new(self.numeric_scales.finish()),
919                Arc::new(self.datetime_precisions.finish()),
920                Arc::new(self.interval_types.finish()),
921            ],
922        )
923        .unwrap()
924    }
925}
926
927#[derive(Debug)]
928struct InformationSchemata {
929    schema: SchemaRef,
930    config: InformationSchemaConfig,
931}
932
933impl InformationSchemata {
934    fn new(config: InformationSchemaConfig) -> Self {
935        let schema = Arc::new(Schema::new(vec![
936            Field::new("catalog_name", DataType::Utf8, false),
937            Field::new("schema_name", DataType::Utf8, false),
938            Field::new("schema_owner", DataType::Utf8, true),
939            Field::new("default_character_set_catalog", DataType::Utf8, true),
940            Field::new("default_character_set_schema", DataType::Utf8, true),
941            Field::new("default_character_set_name", DataType::Utf8, true),
942            Field::new("sql_path", DataType::Utf8, true),
943        ]));
944        Self { schema, config }
945    }
946
947    fn builder(&self) -> InformationSchemataBuilder {
948        InformationSchemataBuilder {
949            schema: Arc::clone(&self.schema),
950            catalog_name: StringBuilder::new(),
951            schema_name: StringBuilder::new(),
952            schema_owner: StringBuilder::new(),
953            default_character_set_catalog: StringBuilder::new(),
954            default_character_set_schema: StringBuilder::new(),
955            default_character_set_name: StringBuilder::new(),
956            sql_path: StringBuilder::new(),
957        }
958    }
959}
960
961struct InformationSchemataBuilder {
962    schema: SchemaRef,
963    catalog_name: StringBuilder,
964    schema_name: StringBuilder,
965    schema_owner: StringBuilder,
966    default_character_set_catalog: StringBuilder,
967    default_character_set_schema: StringBuilder,
968    default_character_set_name: StringBuilder,
969    sql_path: StringBuilder,
970}
971
972impl InformationSchemataBuilder {
973    fn add_schemata(
974        &mut self,
975        catalog_name: &str,
976        schema_name: &str,
977        schema_owner: Option<&str>,
978    ) {
979        self.catalog_name.append_value(catalog_name);
980        self.schema_name.append_value(schema_name);
981        match schema_owner {
982            Some(owner) => self.schema_owner.append_value(owner),
983            None => self.schema_owner.append_null(),
984        }
985        // refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
986        // these rows apply to a feature that is not implemented in DataFusion
987        self.default_character_set_catalog.append_null();
988        self.default_character_set_schema.append_null();
989        self.default_character_set_name.append_null();
990        self.sql_path.append_null();
991    }
992
993    fn finish(&mut self) -> RecordBatch {
994        RecordBatch::try_new(
995            Arc::clone(&self.schema),
996            vec![
997                Arc::new(self.catalog_name.finish()),
998                Arc::new(self.schema_name.finish()),
999                Arc::new(self.schema_owner.finish()),
1000                Arc::new(self.default_character_set_catalog.finish()),
1001                Arc::new(self.default_character_set_schema.finish()),
1002                Arc::new(self.default_character_set_name.finish()),
1003                Arc::new(self.sql_path.finish()),
1004            ],
1005        )
1006        .unwrap()
1007    }
1008}
1009
1010impl PartitionStream for InformationSchemata {
1011    fn schema(&self) -> &SchemaRef {
1012        &self.schema
1013    }
1014
1015    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1016        let mut builder = self.builder();
1017        let config = self.config.clone();
1018        Box::pin(RecordBatchStreamAdapter::new(
1019            Arc::clone(&self.schema),
1020            // TODO: Stream this
1021            futures::stream::once(async move {
1022                config.make_schemata(&mut builder).await;
1023                Ok(builder.finish())
1024            }),
1025        ))
1026    }
1027}
1028
1029#[derive(Debug)]
1030struct InformationSchemaDfSettings {
1031    schema: SchemaRef,
1032    config: InformationSchemaConfig,
1033}
1034
1035impl InformationSchemaDfSettings {
1036    fn new(config: InformationSchemaConfig) -> Self {
1037        let schema = Arc::new(Schema::new(vec![
1038            Field::new("name", DataType::Utf8, false),
1039            Field::new("value", DataType::Utf8, true),
1040            Field::new("description", DataType::Utf8, true),
1041        ]));
1042
1043        Self { schema, config }
1044    }
1045
1046    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1047        InformationSchemaDfSettingsBuilder {
1048            names: StringBuilder::new(),
1049            values: StringBuilder::new(),
1050            descriptions: StringBuilder::new(),
1051            schema: Arc::clone(&self.schema),
1052        }
1053    }
1054}
1055
1056impl PartitionStream for InformationSchemaDfSettings {
1057    fn schema(&self) -> &SchemaRef {
1058        &self.schema
1059    }
1060
1061    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1062        let config = self.config.clone();
1063        let mut builder = self.builder();
1064        Box::pin(RecordBatchStreamAdapter::new(
1065            Arc::clone(&self.schema),
1066            // TODO: Stream this
1067            futures::stream::once(async move {
1068                // create a mem table with the names of tables
1069                let runtime_env = ctx.runtime_env();
1070                config.make_df_settings(
1071                    ctx.session_config().options(),
1072                    &runtime_env,
1073                    &mut builder,
1074                );
1075                Ok(builder.finish())
1076            }),
1077        ))
1078    }
1079}
1080
1081struct InformationSchemaDfSettingsBuilder {
1082    schema: SchemaRef,
1083    names: StringBuilder,
1084    values: StringBuilder,
1085    descriptions: StringBuilder,
1086}
1087
1088impl InformationSchemaDfSettingsBuilder {
1089    fn add_setting(&mut self, entry: ConfigEntry) {
1090        self.names.append_value(entry.key);
1091        self.values.append_option(entry.value);
1092        self.descriptions.append_value(entry.description);
1093    }
1094
1095    fn finish(&mut self) -> RecordBatch {
1096        RecordBatch::try_new(
1097            Arc::clone(&self.schema),
1098            vec![
1099                Arc::new(self.names.finish()),
1100                Arc::new(self.values.finish()),
1101                Arc::new(self.descriptions.finish()),
1102            ],
1103        )
1104        .unwrap()
1105    }
1106}
1107
1108#[derive(Debug)]
1109struct InformationSchemaRoutines {
1110    schema: SchemaRef,
1111    config: InformationSchemaConfig,
1112}
1113
1114impl InformationSchemaRoutines {
1115    fn new(config: InformationSchemaConfig) -> Self {
1116        let schema = Arc::new(Schema::new(vec![
1117            Field::new("specific_catalog", DataType::Utf8, false),
1118            Field::new("specific_schema", DataType::Utf8, false),
1119            Field::new("specific_name", DataType::Utf8, false),
1120            Field::new("routine_catalog", DataType::Utf8, false),
1121            Field::new("routine_schema", DataType::Utf8, false),
1122            Field::new("routine_name", DataType::Utf8, false),
1123            Field::new("routine_type", DataType::Utf8, false),
1124            Field::new("is_deterministic", DataType::Boolean, true),
1125            Field::new("data_type", DataType::Utf8, true),
1126            Field::new("function_type", DataType::Utf8, true),
1127            Field::new("description", DataType::Utf8, true),
1128            Field::new("syntax_example", DataType::Utf8, true),
1129        ]));
1130
1131        Self { schema, config }
1132    }
1133
1134    fn builder(&self) -> InformationSchemaRoutinesBuilder {
1135        InformationSchemaRoutinesBuilder {
1136            schema: Arc::clone(&self.schema),
1137            specific_catalog: StringBuilder::new(),
1138            specific_schema: StringBuilder::new(),
1139            specific_name: StringBuilder::new(),
1140            routine_catalog: StringBuilder::new(),
1141            routine_schema: StringBuilder::new(),
1142            routine_name: StringBuilder::new(),
1143            routine_type: StringBuilder::new(),
1144            is_deterministic: BooleanBuilder::new(),
1145            data_type: StringBuilder::new(),
1146            function_type: StringBuilder::new(),
1147            description: StringBuilder::new(),
1148            syntax_example: StringBuilder::new(),
1149        }
1150    }
1151}
1152
1153struct InformationSchemaRoutinesBuilder {
1154    schema: SchemaRef,
1155    specific_catalog: StringBuilder,
1156    specific_schema: StringBuilder,
1157    specific_name: StringBuilder,
1158    routine_catalog: StringBuilder,
1159    routine_schema: StringBuilder,
1160    routine_name: StringBuilder,
1161    routine_type: StringBuilder,
1162    is_deterministic: BooleanBuilder,
1163    data_type: StringBuilder,
1164    function_type: StringBuilder,
1165    description: StringBuilder,
1166    syntax_example: StringBuilder,
1167}
1168
1169impl InformationSchemaRoutinesBuilder {
1170    #[expect(clippy::too_many_arguments)]
1171    fn add_routine(
1172        &mut self,
1173        catalog_name: impl AsRef<str>,
1174        schema_name: impl AsRef<str>,
1175        routine_name: impl AsRef<str>,
1176        routine_type: impl AsRef<str>,
1177        is_deterministic: bool,
1178        data_type: Option<&impl AsRef<str>>,
1179        function_type: impl AsRef<str>,
1180        description: Option<impl AsRef<str>>,
1181        syntax_example: Option<impl AsRef<str>>,
1182    ) {
1183        self.specific_catalog.append_value(catalog_name.as_ref());
1184        self.specific_schema.append_value(schema_name.as_ref());
1185        self.specific_name.append_value(routine_name.as_ref());
1186        self.routine_catalog.append_value(catalog_name.as_ref());
1187        self.routine_schema.append_value(schema_name.as_ref());
1188        self.routine_name.append_value(routine_name.as_ref());
1189        self.routine_type.append_value(routine_type.as_ref());
1190        self.is_deterministic.append_value(is_deterministic);
1191        self.data_type.append_option(data_type.as_ref());
1192        self.function_type.append_value(function_type.as_ref());
1193        self.description.append_option(description);
1194        self.syntax_example.append_option(syntax_example);
1195    }
1196
1197    fn finish(&mut self) -> RecordBatch {
1198        RecordBatch::try_new(
1199            Arc::clone(&self.schema),
1200            vec![
1201                Arc::new(self.specific_catalog.finish()),
1202                Arc::new(self.specific_schema.finish()),
1203                Arc::new(self.specific_name.finish()),
1204                Arc::new(self.routine_catalog.finish()),
1205                Arc::new(self.routine_schema.finish()),
1206                Arc::new(self.routine_name.finish()),
1207                Arc::new(self.routine_type.finish()),
1208                Arc::new(self.is_deterministic.finish()),
1209                Arc::new(self.data_type.finish()),
1210                Arc::new(self.function_type.finish()),
1211                Arc::new(self.description.finish()),
1212                Arc::new(self.syntax_example.finish()),
1213            ],
1214        )
1215        .unwrap()
1216    }
1217}
1218
1219impl PartitionStream for InformationSchemaRoutines {
1220    fn schema(&self) -> &SchemaRef {
1221        &self.schema
1222    }
1223
1224    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1225        let config = self.config.clone();
1226        let mut builder = self.builder();
1227        Box::pin(RecordBatchStreamAdapter::new(
1228            Arc::clone(&self.schema),
1229            futures::stream::once(async move {
1230                config.make_routines(
1231                    ctx.scalar_functions(),
1232                    ctx.aggregate_functions(),
1233                    ctx.window_functions(),
1234                    ctx.session_config().options(),
1235                    &mut builder,
1236                )?;
1237                Ok(builder.finish())
1238            }),
1239        ))
1240    }
1241}
1242
1243#[derive(Debug)]
1244struct InformationSchemaParameters {
1245    schema: SchemaRef,
1246    config: InformationSchemaConfig,
1247}
1248
1249impl InformationSchemaParameters {
1250    fn new(config: InformationSchemaConfig) -> Self {
1251        let schema = Arc::new(Schema::new(vec![
1252            Field::new("specific_catalog", DataType::Utf8, false),
1253            Field::new("specific_schema", DataType::Utf8, false),
1254            Field::new("specific_name", DataType::Utf8, false),
1255            Field::new("ordinal_position", DataType::UInt64, false),
1256            Field::new("parameter_mode", DataType::Utf8, false),
1257            Field::new("parameter_name", DataType::Utf8, true),
1258            Field::new("data_type", DataType::Utf8, false),
1259            Field::new("parameter_default", DataType::Utf8, true),
1260            Field::new("is_variadic", DataType::Boolean, false),
1261            // `rid` (short for `routine id`) is used to differentiate parameters from different signatures
1262            // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
1263            // For example, the following signatures have different `rid` values:
1264            //     - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
1265            //     - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
1266            Field::new("rid", DataType::UInt8, false),
1267        ]));
1268
1269        Self { schema, config }
1270    }
1271
1272    fn builder(&self) -> InformationSchemaParametersBuilder {
1273        InformationSchemaParametersBuilder {
1274            schema: Arc::clone(&self.schema),
1275            specific_catalog: StringBuilder::new(),
1276            specific_schema: StringBuilder::new(),
1277            specific_name: StringBuilder::new(),
1278            ordinal_position: UInt64Builder::new(),
1279            parameter_mode: StringBuilder::new(),
1280            parameter_name: StringBuilder::new(),
1281            data_type: StringBuilder::new(),
1282            parameter_default: StringBuilder::new(),
1283            is_variadic: BooleanBuilder::new(),
1284            rid: UInt8Builder::new(),
1285        }
1286    }
1287}
1288
1289struct InformationSchemaParametersBuilder {
1290    schema: SchemaRef,
1291    specific_catalog: StringBuilder,
1292    specific_schema: StringBuilder,
1293    specific_name: StringBuilder,
1294    ordinal_position: UInt64Builder,
1295    parameter_mode: StringBuilder,
1296    parameter_name: StringBuilder,
1297    data_type: StringBuilder,
1298    parameter_default: StringBuilder,
1299    is_variadic: BooleanBuilder,
1300    rid: UInt8Builder,
1301}
1302
1303impl InformationSchemaParametersBuilder {
1304    #[expect(clippy::too_many_arguments)]
1305    fn add_parameter(
1306        &mut self,
1307        specific_catalog: impl AsRef<str>,
1308        specific_schema: impl AsRef<str>,
1309        specific_name: impl AsRef<str>,
1310        ordinal_position: u64,
1311        parameter_mode: impl AsRef<str>,
1312        parameter_name: Option<&(impl AsRef<str> + ?Sized)>,
1313        data_type: impl AsRef<str>,
1314        parameter_default: Option<impl AsRef<str>>,
1315        is_variadic: bool,
1316        rid: u8,
1317    ) {
1318        self.specific_catalog
1319            .append_value(specific_catalog.as_ref());
1320        self.specific_schema.append_value(specific_schema.as_ref());
1321        self.specific_name.append_value(specific_name.as_ref());
1322        self.ordinal_position.append_value(ordinal_position);
1323        self.parameter_mode.append_value(parameter_mode.as_ref());
1324        self.parameter_name.append_option(parameter_name.as_ref());
1325        self.data_type.append_value(data_type.as_ref());
1326        self.parameter_default.append_option(parameter_default);
1327        self.is_variadic.append_value(is_variadic);
1328        self.rid.append_value(rid);
1329    }
1330
1331    fn finish(&mut self) -> RecordBatch {
1332        RecordBatch::try_new(
1333            Arc::clone(&self.schema),
1334            vec![
1335                Arc::new(self.specific_catalog.finish()),
1336                Arc::new(self.specific_schema.finish()),
1337                Arc::new(self.specific_name.finish()),
1338                Arc::new(self.ordinal_position.finish()),
1339                Arc::new(self.parameter_mode.finish()),
1340                Arc::new(self.parameter_name.finish()),
1341                Arc::new(self.data_type.finish()),
1342                Arc::new(self.parameter_default.finish()),
1343                Arc::new(self.is_variadic.finish()),
1344                Arc::new(self.rid.finish()),
1345            ],
1346        )
1347        .unwrap()
1348    }
1349}
1350
1351impl PartitionStream for InformationSchemaParameters {
1352    fn schema(&self) -> &SchemaRef {
1353        &self.schema
1354    }
1355
1356    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1357        let config = self.config.clone();
1358        let mut builder = self.builder();
1359        Box::pin(RecordBatchStreamAdapter::new(
1360            Arc::clone(&self.schema),
1361            futures::stream::once(async move {
1362                config.make_parameters(
1363                    ctx.scalar_functions(),
1364                    ctx.aggregate_functions(),
1365                    ctx.window_functions(),
1366                    ctx.session_config().options(),
1367                    &mut builder,
1368                )?;
1369                Ok(builder.finish())
1370            }),
1371        ))
1372    }
1373}
1374
1375#[cfg(test)]
1376mod tests {
1377    use super::*;
1378    use crate::CatalogProvider;
1379
1380    #[tokio::test]
1381    async fn make_tables_uses_table_type() {
1382        let config = InformationSchemaConfig {
1383            catalog_list: Arc::new(Fixture),
1384        };
1385        let mut builder = InformationSchemaTablesBuilder {
1386            catalog_names: StringBuilder::new(),
1387            schema_names: StringBuilder::new(),
1388            table_names: StringBuilder::new(),
1389            table_types: StringBuilder::new(),
1390            schema: Arc::new(Schema::empty()),
1391        };
1392
1393        assert!(config.make_tables(&mut builder).await.is_ok());
1394
1395        assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1396    }
1397
1398    #[derive(Debug)]
1399    struct Fixture;
1400
1401    #[async_trait]
1402    impl SchemaProvider for Fixture {
1403        // InformationSchemaConfig::make_tables should use this.
1404        async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1405            Ok(Some(TableType::Base))
1406        }
1407
1408        // InformationSchemaConfig::make_tables used this before `table_type`
1409        // existed but should not, as it may be expensive.
1410        async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1411            panic!(
1412                "InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
1413            )
1414        }
1415
1416        fn as_any(&self) -> &dyn Any {
1417            unimplemented!("not required for these tests")
1418        }
1419
1420        fn table_names(&self) -> Vec<String> {
1421            vec!["atable".to_string()]
1422        }
1423
1424        fn table_exist(&self, _: &str) -> bool {
1425            unimplemented!("not required for these tests")
1426        }
1427    }
1428
1429    impl CatalogProviderList for Fixture {
1430        fn as_any(&self) -> &dyn Any {
1431            unimplemented!("not required for these tests")
1432        }
1433
1434        fn register_catalog(
1435            &self,
1436            _: String,
1437            _: Arc<dyn CatalogProvider>,
1438        ) -> Option<Arc<dyn CatalogProvider>> {
1439            unimplemented!("not required for these tests")
1440        }
1441
1442        fn catalog_names(&self) -> Vec<String> {
1443            vec!["acatalog".to_string()]
1444        }
1445
1446        fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1447            Some(Arc::new(Self))
1448        }
1449    }
1450
1451    impl CatalogProvider for Fixture {
1452        fn as_any(&self) -> &dyn Any {
1453            unimplemented!("not required for these tests")
1454        }
1455
1456        fn schema_names(&self) -> Vec<String> {
1457            vec!["aschema".to_string()]
1458        }
1459
1460        fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1461            Some(Arc::new(Self))
1462        }
1463    }
1464}