Skip to main content

datafusion_catalog/
information_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
19//!
20//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
21
22use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26    array::{StringBuilder, UInt64Builder},
27    datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::DataFusionError;
32use datafusion_common::config::{ConfigEntry, ConfigOptions};
33use datafusion_common::error::Result;
34use datafusion_common::types::NativeType;
35use datafusion_execution::TaskContext;
36use datafusion_execution::runtime_env::RuntimeEnv;
37use datafusion_expr::function::WindowUDFFieldArgs;
38use datafusion_expr::{
39    AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF,
40};
41use datafusion_expr::{TableType, Volatility};
42use datafusion_physical_plan::SendableRecordBatchStream;
43use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
44use datafusion_physical_plan::streaming::PartitionStream;
45use std::collections::{BTreeSet, HashMap, HashSet};
46use std::fmt::Debug;
47use std::sync::Arc;
48
49pub const INFORMATION_SCHEMA: &str = "information_schema";
50pub(crate) const TABLES: &str = "tables";
51pub(crate) const VIEWS: &str = "views";
52pub(crate) const COLUMNS: &str = "columns";
53pub(crate) const DF_SETTINGS: &str = "df_settings";
54pub(crate) const SCHEMATA: &str = "schemata";
55pub(crate) const ROUTINES: &str = "routines";
56pub(crate) const PARAMETERS: &str = "parameters";
57
58/// All information schema tables
59pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
60    TABLES,
61    VIEWS,
62    COLUMNS,
63    DF_SETTINGS,
64    SCHEMATA,
65    ROUTINES,
66    PARAMETERS,
67];
68
69/// Implements the `information_schema` virtual schema and tables
70///
71/// The underlying tables in the `information_schema` are created on
72/// demand. This means that if more tables are added to the underlying
73/// providers, they will appear the next time the `information_schema`
74/// table is queried.
75#[derive(Debug)]
76pub struct InformationSchemaProvider {
77    config: InformationSchemaConfig,
78}
79
80impl InformationSchemaProvider {
81    /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
82    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
83        Self {
84            config: InformationSchemaConfig { catalog_list },
85        }
86    }
87}
88
89#[derive(Clone, Debug)]
90struct InformationSchemaConfig {
91    catalog_list: Arc<dyn CatalogProviderList>,
92}
93
94impl InformationSchemaConfig {
95    /// Construct the `information_schema.tables` virtual table
96    async fn make_tables(
97        &self,
98        builder: &mut InformationSchemaTablesBuilder,
99    ) -> Result<(), DataFusionError> {
100        // create a mem table with the names of tables
101
102        for catalog_name in self.catalog_list.catalog_names() {
103            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
104
105            for schema_name in catalog.schema_names() {
106                if schema_name != INFORMATION_SCHEMA {
107                    // schema name may not exist in the catalog, so we need to check
108                    if let Some(schema) = catalog.schema(&schema_name) {
109                        for table_name in schema.table_names() {
110                            if let Some(table_type) =
111                                schema.table_type(&table_name).await?
112                            {
113                                builder.add_table(
114                                    &catalog_name,
115                                    &schema_name,
116                                    &table_name,
117                                    table_type,
118                                );
119                            }
120                        }
121                    }
122                }
123            }
124
125            // Add a final list for the information schema tables themselves
126            for table_name in INFORMATION_SCHEMA_TABLES {
127                builder.add_table(
128                    &catalog_name,
129                    INFORMATION_SCHEMA,
130                    table_name,
131                    TableType::View,
132                );
133            }
134        }
135
136        Ok(())
137    }
138
139    async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
140        for catalog_name in self.catalog_list.catalog_names() {
141            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
142
143            for schema_name in catalog.schema_names() {
144                if schema_name != INFORMATION_SCHEMA
145                    && let Some(schema) = catalog.schema(&schema_name)
146                {
147                    let schema_owner = schema.owner_name();
148                    builder.add_schemata(&catalog_name, &schema_name, schema_owner);
149                }
150            }
151        }
152    }
153
154    async fn make_views(
155        &self,
156        builder: &mut InformationSchemaViewBuilder,
157    ) -> Result<(), DataFusionError> {
158        for catalog_name in self.catalog_list.catalog_names() {
159            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
160
161            for schema_name in catalog.schema_names() {
162                if schema_name != INFORMATION_SCHEMA {
163                    // schema name may not exist in the catalog, so we need to check
164                    if let Some(schema) = catalog.schema(&schema_name) {
165                        for table_name in schema.table_names() {
166                            if let Some(table) = schema.table(&table_name).await? {
167                                builder.add_view(
168                                    &catalog_name,
169                                    &schema_name,
170                                    &table_name,
171                                    table.get_table_definition(),
172                                )
173                            }
174                        }
175                    }
176                }
177            }
178        }
179
180        Ok(())
181    }
182
183    /// Construct the `information_schema.columns` virtual table
184    async fn make_columns(
185        &self,
186        builder: &mut InformationSchemaColumnsBuilder,
187    ) -> Result<(), DataFusionError> {
188        for catalog_name in self.catalog_list.catalog_names() {
189            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
190
191            for schema_name in catalog.schema_names() {
192                if schema_name != INFORMATION_SCHEMA {
193                    // schema name may not exist in the catalog, so we need to check
194                    if let Some(schema) = catalog.schema(&schema_name) {
195                        for table_name in schema.table_names() {
196                            if let Some(table) = schema.table(&table_name).await? {
197                                for (field_position, field) in
198                                    table.schema().fields().iter().enumerate()
199                                {
200                                    builder.add_column(
201                                        &catalog_name,
202                                        &schema_name,
203                                        &table_name,
204                                        field_position,
205                                        field,
206                                    )
207                                }
208                            }
209                        }
210                    }
211                }
212            }
213        }
214
215        Ok(())
216    }
217
218    /// Construct the `information_schema.df_settings` virtual table
219    fn make_df_settings(
220        &self,
221        config_options: &ConfigOptions,
222        runtime_env: &Arc<RuntimeEnv>,
223        builder: &mut InformationSchemaDfSettingsBuilder,
224    ) {
225        for entry in config_options.entries() {
226            builder.add_setting(entry);
227        }
228        // Add runtime configuration entries
229        for entry in runtime_env.config_entries() {
230            builder.add_setting(entry);
231        }
232    }
233
234    fn make_routines(
235        &self,
236        udfs: &HashMap<String, Arc<ScalarUDF>>,
237        udafs: &HashMap<String, Arc<AggregateUDF>>,
238        udwfs: &HashMap<String, Arc<WindowUDF>>,
239        config_options: &ConfigOptions,
240        builder: &mut InformationSchemaRoutinesBuilder,
241    ) -> Result<()> {
242        let catalog_name = &config_options.catalog.default_catalog;
243        let schema_name = &config_options.catalog.default_schema;
244
245        for (name, udf) in udfs {
246            let return_types = get_udf_args_and_return_types(udf)?
247                .into_iter()
248                .map(|(_, return_type)| return_type)
249                .collect::<HashSet<_>>();
250            for return_type in return_types {
251                builder.add_routine(
252                    catalog_name,
253                    schema_name,
254                    name,
255                    "FUNCTION",
256                    Self::is_deterministic(udf.signature()),
257                    return_type.as_ref(),
258                    "SCALAR",
259                    udf.documentation().map(|d| d.description.to_string()),
260                    udf.documentation().map(|d| d.syntax_example.to_string()),
261                )
262            }
263        }
264
265        for (name, udaf) in udafs {
266            let return_types = get_udaf_args_and_return_types(udaf)?
267                .into_iter()
268                .map(|(_, return_type)| return_type)
269                .collect::<HashSet<_>>();
270            for return_type in return_types {
271                builder.add_routine(
272                    catalog_name,
273                    schema_name,
274                    name,
275                    "FUNCTION",
276                    Self::is_deterministic(udaf.signature()),
277                    return_type.as_ref(),
278                    "AGGREGATE",
279                    udaf.documentation().map(|d| d.description.to_string()),
280                    udaf.documentation().map(|d| d.syntax_example.to_string()),
281                )
282            }
283        }
284
285        for (name, udwf) in udwfs {
286            let return_types = get_udwf_args_and_return_types(udwf)?
287                .into_iter()
288                .map(|(_, return_type)| return_type)
289                .collect::<HashSet<_>>();
290            for return_type in return_types {
291                builder.add_routine(
292                    catalog_name,
293                    schema_name,
294                    name,
295                    "FUNCTION",
296                    Self::is_deterministic(udwf.signature()),
297                    return_type.as_ref(),
298                    "WINDOW",
299                    udwf.documentation().map(|d| d.description.to_string()),
300                    udwf.documentation().map(|d| d.syntax_example.to_string()),
301                )
302            }
303        }
304        Ok(())
305    }
306
307    fn is_deterministic(signature: &Signature) -> bool {
308        signature.volatility == Volatility::Immutable
309    }
310    fn make_parameters(
311        &self,
312        udfs: &HashMap<String, Arc<ScalarUDF>>,
313        udafs: &HashMap<String, Arc<AggregateUDF>>,
314        udwfs: &HashMap<String, Arc<WindowUDF>>,
315        config_options: &ConfigOptions,
316        builder: &mut InformationSchemaParametersBuilder,
317    ) -> Result<()> {
318        let catalog_name = &config_options.catalog.default_catalog;
319        let schema_name = &config_options.catalog.default_schema;
320        let mut add_parameters = |func_name: &str,
321                                  args: Option<&Vec<(String, String)>>,
322                                  arg_types: Vec<String>,
323                                  return_type: Option<String>,
324                                  is_variadic: bool,
325                                  rid: u8| {
326            for (position, type_name) in arg_types.iter().enumerate() {
327                let param_name =
328                    args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
329                builder.add_parameter(
330                    catalog_name,
331                    schema_name,
332                    func_name,
333                    position as u64 + 1,
334                    "IN",
335                    param_name,
336                    type_name,
337                    None::<&str>,
338                    is_variadic,
339                    rid,
340                );
341            }
342            if let Some(return_type) = return_type {
343                builder.add_parameter(
344                    catalog_name,
345                    schema_name,
346                    func_name,
347                    1,
348                    "OUT",
349                    None::<&str>,
350                    return_type.as_str(),
351                    None::<&str>,
352                    false,
353                    rid,
354                );
355            }
356        };
357
358        for (func_name, udf) in udfs {
359            let args = udf.documentation().and_then(|d| d.arguments.clone());
360            let combinations = get_udf_args_and_return_types(udf)?;
361            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
362                add_parameters(
363                    func_name,
364                    args.as_ref(),
365                    arg_types,
366                    return_type,
367                    Self::is_variadic(udf.signature()),
368                    rid as u8,
369                );
370            }
371        }
372
373        for (func_name, udaf) in udafs {
374            let args = udaf.documentation().and_then(|d| d.arguments.clone());
375            let combinations = get_udaf_args_and_return_types(udaf)?;
376            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
377                add_parameters(
378                    func_name,
379                    args.as_ref(),
380                    arg_types,
381                    return_type,
382                    Self::is_variadic(udaf.signature()),
383                    rid as u8,
384                );
385            }
386        }
387
388        for (func_name, udwf) in udwfs {
389            let args = udwf.documentation().and_then(|d| d.arguments.clone());
390            let combinations = get_udwf_args_and_return_types(udwf)?;
391            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
392                add_parameters(
393                    func_name,
394                    args.as_ref(),
395                    arg_types,
396                    return_type,
397                    Self::is_variadic(udwf.signature()),
398                    rid as u8,
399                );
400            }
401        }
402
403        Ok(())
404    }
405
406    fn is_variadic(signature: &Signature) -> bool {
407        matches!(
408            signature.type_signature,
409            TypeSignature::Variadic(_) | TypeSignature::VariadicAny
410        )
411    }
412}
413
414/// get the arguments and return types of a UDF
415/// returns a tuple of (arg_types, return_type)
416fn get_udf_args_and_return_types(
417    udf: &Arc<ScalarUDF>,
418) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
419    let signature = udf.signature();
420    let arg_types = signature.type_signature.get_example_types();
421    if arg_types.is_empty() {
422        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
423    } else {
424        Ok(arg_types
425            .into_iter()
426            .map(|arg_types| {
427                let arg_fields: Vec<FieldRef> = arg_types
428                    .iter()
429                    .enumerate()
430                    .map(|(i, t)| {
431                        Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
432                    })
433                    .collect();
434                let scalar_arguments = vec![None; arg_fields.len()];
435                let return_type = udf
436                    .return_field_from_args(ReturnFieldArgs {
437                        arg_fields: &arg_fields,
438                        scalar_arguments: &scalar_arguments,
439                    })
440                    .map(|f| {
441                        remove_native_type_prefix(&NativeType::from(
442                            f.data_type().clone(),
443                        ))
444                    })
445                    .ok();
446                let arg_types = arg_types
447                    .into_iter()
448                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
449                    .collect::<Vec<_>>();
450                (arg_types, return_type)
451            })
452            .collect::<BTreeSet<_>>())
453    }
454}
455
456fn get_udaf_args_and_return_types(
457    udaf: &Arc<AggregateUDF>,
458) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
459    let signature = udaf.signature();
460    let arg_types = signature.type_signature.get_example_types();
461    if arg_types.is_empty() {
462        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
463    } else {
464        Ok(arg_types
465            .into_iter()
466            .map(|arg_types| {
467                let arg_fields: Vec<FieldRef> = arg_types
468                    .iter()
469                    .enumerate()
470                    .map(|(i, t)| {
471                        Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
472                    })
473                    .collect();
474                let return_type = udaf
475                    .return_field(&arg_fields)
476                    .map(|f| {
477                        remove_native_type_prefix(&NativeType::from(
478                            f.data_type().clone(),
479                        ))
480                    })
481                    .ok();
482                let arg_types = arg_types
483                    .into_iter()
484                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
485                    .collect::<Vec<_>>();
486                (arg_types, return_type)
487            })
488            .collect::<BTreeSet<_>>())
489    }
490}
491
492fn get_udwf_args_and_return_types(
493    udwf: &Arc<WindowUDF>,
494) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
495    let signature = udwf.signature();
496    let arg_types = signature.type_signature.get_example_types();
497    if arg_types.is_empty() {
498        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
499    } else {
500        Ok(arg_types
501            .into_iter()
502            .map(|arg_types| {
503                let arg_fields: Vec<FieldRef> = arg_types
504                    .iter()
505                    .enumerate()
506                    .map(|(i, t)| {
507                        Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
508                    })
509                    .collect();
510                let return_type = udwf
511                    .field(WindowUDFFieldArgs::new(&arg_fields, udwf.name()))
512                    .map(|f| {
513                        remove_native_type_prefix(&NativeType::from(
514                            f.data_type().clone(),
515                        ))
516                    })
517                    .ok();
518                let arg_types = arg_types
519                    .into_iter()
520                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
521                    .collect::<Vec<_>>();
522                (arg_types, return_type)
523            })
524            .collect::<BTreeSet<_>>())
525    }
526}
527
528#[inline]
529fn remove_native_type_prefix(native_type: &NativeType) -> String {
530    format!("{native_type}")
531}
532
533#[async_trait]
534impl SchemaProvider for InformationSchemaProvider {
535    fn table_names(&self) -> Vec<String> {
536        INFORMATION_SCHEMA_TABLES
537            .iter()
538            .map(|t| (*t).to_string())
539            .collect()
540    }
541
542    async fn table(
543        &self,
544        name: &str,
545    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
546        let config = self.config.clone();
547        let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
548            TABLES => Arc::new(InformationSchemaTables::new(config)),
549            COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
550            VIEWS => Arc::new(InformationSchemaViews::new(config)),
551            DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
552            SCHEMATA => Arc::new(InformationSchemata::new(config)),
553            ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
554            PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
555            _ => return Ok(None),
556        };
557
558        Ok(Some(Arc::new(
559            StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
560        )))
561    }
562
563    fn table_exist(&self, name: &str) -> bool {
564        INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
565    }
566}
567
568#[derive(Debug)]
569struct InformationSchemaTables {
570    schema: SchemaRef,
571    config: InformationSchemaConfig,
572}
573
574impl InformationSchemaTables {
575    fn new(config: InformationSchemaConfig) -> Self {
576        let schema = Arc::new(Schema::new(vec![
577            Field::new("table_catalog", DataType::Utf8, false),
578            Field::new("table_schema", DataType::Utf8, false),
579            Field::new("table_name", DataType::Utf8, false),
580            Field::new("table_type", DataType::Utf8, false),
581        ]));
582
583        Self { schema, config }
584    }
585
586    fn builder(&self) -> InformationSchemaTablesBuilder {
587        InformationSchemaTablesBuilder {
588            catalog_names: StringBuilder::new(),
589            schema_names: StringBuilder::new(),
590            table_names: StringBuilder::new(),
591            table_types: StringBuilder::new(),
592            schema: Arc::clone(&self.schema),
593        }
594    }
595}
596
597impl PartitionStream for InformationSchemaTables {
598    fn schema(&self) -> &SchemaRef {
599        &self.schema
600    }
601
602    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
603        let mut builder = self.builder();
604        let config = self.config.clone();
605        Box::pin(RecordBatchStreamAdapter::new(
606            Arc::clone(&self.schema),
607            // TODO: Stream this
608            futures::stream::once(async move {
609                config.make_tables(&mut builder).await?;
610                Ok(builder.finish())
611            }),
612        ))
613    }
614}
615
616/// Builds the `information_schema.TABLE` table row by row
617///
618/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
619struct InformationSchemaTablesBuilder {
620    schema: SchemaRef,
621    catalog_names: StringBuilder,
622    schema_names: StringBuilder,
623    table_names: StringBuilder,
624    table_types: StringBuilder,
625}
626
627impl InformationSchemaTablesBuilder {
628    fn add_table(
629        &mut self,
630        catalog_name: impl AsRef<str>,
631        schema_name: impl AsRef<str>,
632        table_name: impl AsRef<str>,
633        table_type: TableType,
634    ) {
635        // Note: append_value is actually infallible.
636        self.catalog_names.append_value(catalog_name.as_ref());
637        self.schema_names.append_value(schema_name.as_ref());
638        self.table_names.append_value(table_name.as_ref());
639        self.table_types.append_value(match table_type {
640            TableType::Base => "BASE TABLE",
641            TableType::View => "VIEW",
642            TableType::Temporary => "LOCAL TEMPORARY",
643        });
644    }
645
646    fn finish(&mut self) -> RecordBatch {
647        RecordBatch::try_new(
648            Arc::clone(&self.schema),
649            vec![
650                Arc::new(self.catalog_names.finish()),
651                Arc::new(self.schema_names.finish()),
652                Arc::new(self.table_names.finish()),
653                Arc::new(self.table_types.finish()),
654            ],
655        )
656        .unwrap()
657    }
658}
659
660#[derive(Debug)]
661struct InformationSchemaViews {
662    schema: SchemaRef,
663    config: InformationSchemaConfig,
664}
665
666impl InformationSchemaViews {
667    fn new(config: InformationSchemaConfig) -> Self {
668        let schema = Arc::new(Schema::new(vec![
669            Field::new("table_catalog", DataType::Utf8, false),
670            Field::new("table_schema", DataType::Utf8, false),
671            Field::new("table_name", DataType::Utf8, false),
672            Field::new("definition", DataType::Utf8, true),
673        ]));
674
675        Self { schema, config }
676    }
677
678    fn builder(&self) -> InformationSchemaViewBuilder {
679        InformationSchemaViewBuilder {
680            catalog_names: StringBuilder::new(),
681            schema_names: StringBuilder::new(),
682            table_names: StringBuilder::new(),
683            definitions: StringBuilder::new(),
684            schema: Arc::clone(&self.schema),
685        }
686    }
687}
688
689impl PartitionStream for InformationSchemaViews {
690    fn schema(&self) -> &SchemaRef {
691        &self.schema
692    }
693
694    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
695        let mut builder = self.builder();
696        let config = self.config.clone();
697        Box::pin(RecordBatchStreamAdapter::new(
698            Arc::clone(&self.schema),
699            // TODO: Stream this
700            futures::stream::once(async move {
701                config.make_views(&mut builder).await?;
702                Ok(builder.finish())
703            }),
704        ))
705    }
706}
707
708/// Builds the `information_schema.VIEWS` table row by row
709///
710/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
711struct InformationSchemaViewBuilder {
712    schema: SchemaRef,
713    catalog_names: StringBuilder,
714    schema_names: StringBuilder,
715    table_names: StringBuilder,
716    definitions: StringBuilder,
717}
718
719impl InformationSchemaViewBuilder {
720    fn add_view(
721        &mut self,
722        catalog_name: impl AsRef<str>,
723        schema_name: impl AsRef<str>,
724        table_name: impl AsRef<str>,
725        definition: Option<&(impl AsRef<str> + ?Sized)>,
726    ) {
727        // Note: append_value is actually infallible.
728        self.catalog_names.append_value(catalog_name.as_ref());
729        self.schema_names.append_value(schema_name.as_ref());
730        self.table_names.append_value(table_name.as_ref());
731        self.definitions.append_option(definition.as_ref());
732    }
733
734    fn finish(&mut self) -> RecordBatch {
735        RecordBatch::try_new(
736            Arc::clone(&self.schema),
737            vec![
738                Arc::new(self.catalog_names.finish()),
739                Arc::new(self.schema_names.finish()),
740                Arc::new(self.table_names.finish()),
741                Arc::new(self.definitions.finish()),
742            ],
743        )
744        .unwrap()
745    }
746}
747
748#[derive(Debug)]
749struct InformationSchemaColumns {
750    schema: SchemaRef,
751    config: InformationSchemaConfig,
752}
753
754impl InformationSchemaColumns {
755    fn new(config: InformationSchemaConfig) -> Self {
756        let schema = Arc::new(Schema::new(vec![
757            Field::new("table_catalog", DataType::Utf8, false),
758            Field::new("table_schema", DataType::Utf8, false),
759            Field::new("table_name", DataType::Utf8, false),
760            Field::new("column_name", DataType::Utf8, false),
761            Field::new("ordinal_position", DataType::UInt64, false),
762            Field::new("column_default", DataType::Utf8, true),
763            Field::new("is_nullable", DataType::Utf8, false),
764            Field::new("data_type", DataType::Utf8, false),
765            Field::new("character_maximum_length", DataType::UInt64, true),
766            Field::new("character_octet_length", DataType::UInt64, true),
767            Field::new("numeric_precision", DataType::UInt64, true),
768            Field::new("numeric_precision_radix", DataType::UInt64, true),
769            Field::new("numeric_scale", DataType::UInt64, true),
770            Field::new("datetime_precision", DataType::UInt64, true),
771            Field::new("interval_type", DataType::Utf8, true),
772        ]));
773
774        Self { schema, config }
775    }
776
777    fn builder(&self) -> InformationSchemaColumnsBuilder {
778        // StringBuilder requires providing an initial capacity, so
779        // pick 10 here arbitrarily as this is not performance
780        // critical code and the number of tables is unavailable here.
781        let default_capacity = 10;
782
783        InformationSchemaColumnsBuilder {
784            catalog_names: StringBuilder::new(),
785            schema_names: StringBuilder::new(),
786            table_names: StringBuilder::new(),
787            column_names: StringBuilder::new(),
788            ordinal_positions: UInt64Builder::with_capacity(default_capacity),
789            column_defaults: StringBuilder::new(),
790            is_nullables: StringBuilder::new(),
791            data_types: StringBuilder::new(),
792            character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
793            character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
794            numeric_precisions: UInt64Builder::with_capacity(default_capacity),
795            numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
796            numeric_scales: UInt64Builder::with_capacity(default_capacity),
797            datetime_precisions: UInt64Builder::with_capacity(default_capacity),
798            interval_types: StringBuilder::new(),
799            schema: Arc::clone(&self.schema),
800        }
801    }
802}
803
804impl PartitionStream for InformationSchemaColumns {
805    fn schema(&self) -> &SchemaRef {
806        &self.schema
807    }
808
809    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
810        let mut builder = self.builder();
811        let config = self.config.clone();
812        Box::pin(RecordBatchStreamAdapter::new(
813            Arc::clone(&self.schema),
814            // TODO: Stream this
815            futures::stream::once(async move {
816                config.make_columns(&mut builder).await?;
817                Ok(builder.finish())
818            }),
819        ))
820    }
821}
822
823/// Builds the `information_schema.COLUMNS` table row by row
824///
825/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
826struct InformationSchemaColumnsBuilder {
827    schema: SchemaRef,
828    catalog_names: StringBuilder,
829    schema_names: StringBuilder,
830    table_names: StringBuilder,
831    column_names: StringBuilder,
832    ordinal_positions: UInt64Builder,
833    column_defaults: StringBuilder,
834    is_nullables: StringBuilder,
835    data_types: StringBuilder,
836    character_maximum_lengths: UInt64Builder,
837    character_octet_lengths: UInt64Builder,
838    numeric_precisions: UInt64Builder,
839    numeric_precision_radixes: UInt64Builder,
840    numeric_scales: UInt64Builder,
841    datetime_precisions: UInt64Builder,
842    interval_types: StringBuilder,
843}
844
845impl InformationSchemaColumnsBuilder {
846    fn add_column(
847        &mut self,
848        catalog_name: &str,
849        schema_name: &str,
850        table_name: &str,
851        field_position: usize,
852        field: &Field,
853    ) {
854        use DataType::*;
855
856        // Note: append_value is actually infallible.
857        self.catalog_names.append_value(catalog_name);
858        self.schema_names.append_value(schema_name);
859        self.table_names.append_value(table_name);
860
861        self.column_names.append_value(field.name());
862
863        self.ordinal_positions.append_value(field_position as u64);
864
865        // DataFusion does not support column default values, so null
866        self.column_defaults.append_null();
867
868        // "YES if the column is possibly nullable, NO if it is known not nullable. "
869        let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
870        self.is_nullables.append_value(nullable_str);
871
872        // "System supplied type" --> Use debug format of the datatype
873        self.data_types.append_value(field.data_type().to_string());
874
875        // "If data_type identifies a character or bit string type, the
876        // declared maximum length; null for all other data types or
877        // if no maximum length was declared."
878        //
879        // Arrow has no equivalent of VARCHAR(20), so we leave this as Null
880        let max_chars = None;
881        self.character_maximum_lengths.append_option(max_chars);
882
883        // "Maximum length, in bytes, for binary data, character data,
884        // or text and image data."
885        let char_len: Option<u64> = match field.data_type() {
886            Utf8 | Binary => Some(i32::MAX as u64),
887            LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
888            _ => None,
889        };
890        self.character_octet_lengths.append_option(char_len);
891
892        // numeric_precision: "If data_type identifies a numeric type, this column
893        // contains the (declared or implicit) precision of the type
894        // for this column. The precision indicates the number of
895        // significant digits. It can be expressed in decimal (base
896        // 10) or binary (base 2) terms, as specified in the column
897        // numeric_precision_radix. For all other data types, this
898        // column is null."
899        //
900        // numeric_radix: If data_type identifies a numeric type, this
901        // column indicates in which base the values in the columns
902        // numeric_precision and numeric_scale are expressed. The
903        // value is either 2 or 10. For all other data types, this
904        // column is null.
905        //
906        // numeric_scale: If data_type identifies an exact numeric
907        // type, this column contains the (declared or implicit) scale
908        // of the type for this column. The scale indicates the number
909        // of significant digits to the right of the decimal point. It
910        // can be expressed in decimal (base 10) or binary (base 2)
911        // terms, as specified in the column
912        // numeric_precision_radix. For all other data types, this
913        // column is null.
914        let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
915            Int8 | UInt8 => (Some(8), Some(2), None),
916            Int16 | UInt16 => (Some(16), Some(2), None),
917            Int32 | UInt32 => (Some(32), Some(2), None),
918            // From max value of 65504 as explained on
919            // https://en.wikipedia.org/wiki/Half-precision_floating-point_format#Exponent_encoding
920            Float16 => (Some(15), Some(2), None),
921            // Numbers from postgres `real` type
922            Float32 => (Some(24), Some(2), None),
923            // Numbers from postgres `double` type
924            Float64 => (Some(24), Some(2), None),
925            Decimal128(precision, scale) => {
926                (Some(*precision as u64), Some(10), Some(*scale as u64))
927            }
928            _ => (None, None, None),
929        };
930
931        self.numeric_precisions.append_option(numeric_precision);
932        self.numeric_precision_radixes.append_option(numeric_radix);
933        self.numeric_scales.append_option(numeric_scale);
934
935        self.datetime_precisions.append_option(None);
936        self.interval_types.append_null();
937    }
938
939    fn finish(&mut self) -> RecordBatch {
940        RecordBatch::try_new(
941            Arc::clone(&self.schema),
942            vec![
943                Arc::new(self.catalog_names.finish()),
944                Arc::new(self.schema_names.finish()),
945                Arc::new(self.table_names.finish()),
946                Arc::new(self.column_names.finish()),
947                Arc::new(self.ordinal_positions.finish()),
948                Arc::new(self.column_defaults.finish()),
949                Arc::new(self.is_nullables.finish()),
950                Arc::new(self.data_types.finish()),
951                Arc::new(self.character_maximum_lengths.finish()),
952                Arc::new(self.character_octet_lengths.finish()),
953                Arc::new(self.numeric_precisions.finish()),
954                Arc::new(self.numeric_precision_radixes.finish()),
955                Arc::new(self.numeric_scales.finish()),
956                Arc::new(self.datetime_precisions.finish()),
957                Arc::new(self.interval_types.finish()),
958            ],
959        )
960        .unwrap()
961    }
962}
963
964#[derive(Debug)]
965struct InformationSchemata {
966    schema: SchemaRef,
967    config: InformationSchemaConfig,
968}
969
970impl InformationSchemata {
971    fn new(config: InformationSchemaConfig) -> Self {
972        let schema = Arc::new(Schema::new(vec![
973            Field::new("catalog_name", DataType::Utf8, false),
974            Field::new("schema_name", DataType::Utf8, false),
975            Field::new("schema_owner", DataType::Utf8, true),
976            Field::new("default_character_set_catalog", DataType::Utf8, true),
977            Field::new("default_character_set_schema", DataType::Utf8, true),
978            Field::new("default_character_set_name", DataType::Utf8, true),
979            Field::new("sql_path", DataType::Utf8, true),
980        ]));
981        Self { schema, config }
982    }
983
984    fn builder(&self) -> InformationSchemataBuilder {
985        InformationSchemataBuilder {
986            schema: Arc::clone(&self.schema),
987            catalog_name: StringBuilder::new(),
988            schema_name: StringBuilder::new(),
989            schema_owner: StringBuilder::new(),
990            default_character_set_catalog: StringBuilder::new(),
991            default_character_set_schema: StringBuilder::new(),
992            default_character_set_name: StringBuilder::new(),
993            sql_path: StringBuilder::new(),
994        }
995    }
996}
997
998struct InformationSchemataBuilder {
999    schema: SchemaRef,
1000    catalog_name: StringBuilder,
1001    schema_name: StringBuilder,
1002    schema_owner: StringBuilder,
1003    default_character_set_catalog: StringBuilder,
1004    default_character_set_schema: StringBuilder,
1005    default_character_set_name: StringBuilder,
1006    sql_path: StringBuilder,
1007}
1008
1009impl InformationSchemataBuilder {
1010    fn add_schemata(
1011        &mut self,
1012        catalog_name: &str,
1013        schema_name: &str,
1014        schema_owner: Option<&str>,
1015    ) {
1016        self.catalog_name.append_value(catalog_name);
1017        self.schema_name.append_value(schema_name);
1018        match schema_owner {
1019            Some(owner) => self.schema_owner.append_value(owner),
1020            None => self.schema_owner.append_null(),
1021        }
1022        // refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
1023        // these rows apply to a feature that is not implemented in DataFusion
1024        self.default_character_set_catalog.append_null();
1025        self.default_character_set_schema.append_null();
1026        self.default_character_set_name.append_null();
1027        self.sql_path.append_null();
1028    }
1029
1030    fn finish(&mut self) -> RecordBatch {
1031        RecordBatch::try_new(
1032            Arc::clone(&self.schema),
1033            vec![
1034                Arc::new(self.catalog_name.finish()),
1035                Arc::new(self.schema_name.finish()),
1036                Arc::new(self.schema_owner.finish()),
1037                Arc::new(self.default_character_set_catalog.finish()),
1038                Arc::new(self.default_character_set_schema.finish()),
1039                Arc::new(self.default_character_set_name.finish()),
1040                Arc::new(self.sql_path.finish()),
1041            ],
1042        )
1043        .unwrap()
1044    }
1045}
1046
1047impl PartitionStream for InformationSchemata {
1048    fn schema(&self) -> &SchemaRef {
1049        &self.schema
1050    }
1051
1052    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1053        let mut builder = self.builder();
1054        let config = self.config.clone();
1055        Box::pin(RecordBatchStreamAdapter::new(
1056            Arc::clone(&self.schema),
1057            // TODO: Stream this
1058            futures::stream::once(async move {
1059                config.make_schemata(&mut builder).await;
1060                Ok(builder.finish())
1061            }),
1062        ))
1063    }
1064}
1065
1066#[derive(Debug)]
1067struct InformationSchemaDfSettings {
1068    schema: SchemaRef,
1069    config: InformationSchemaConfig,
1070}
1071
1072impl InformationSchemaDfSettings {
1073    fn new(config: InformationSchemaConfig) -> Self {
1074        let schema = Arc::new(Schema::new(vec![
1075            Field::new("name", DataType::Utf8, false),
1076            Field::new("value", DataType::Utf8, true),
1077            Field::new("description", DataType::Utf8, true),
1078        ]));
1079
1080        Self { schema, config }
1081    }
1082
1083    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1084        InformationSchemaDfSettingsBuilder {
1085            names: StringBuilder::new(),
1086            values: StringBuilder::new(),
1087            descriptions: StringBuilder::new(),
1088            schema: Arc::clone(&self.schema),
1089        }
1090    }
1091}
1092
1093impl PartitionStream for InformationSchemaDfSettings {
1094    fn schema(&self) -> &SchemaRef {
1095        &self.schema
1096    }
1097
1098    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1099        let config = self.config.clone();
1100        let mut builder = self.builder();
1101        Box::pin(RecordBatchStreamAdapter::new(
1102            Arc::clone(&self.schema),
1103            // TODO: Stream this
1104            futures::stream::once(async move {
1105                // create a mem table with the names of tables
1106                let runtime_env = ctx.runtime_env();
1107                config.make_df_settings(
1108                    ctx.session_config().options(),
1109                    &runtime_env,
1110                    &mut builder,
1111                );
1112                Ok(builder.finish())
1113            }),
1114        ))
1115    }
1116}
1117
1118struct InformationSchemaDfSettingsBuilder {
1119    schema: SchemaRef,
1120    names: StringBuilder,
1121    values: StringBuilder,
1122    descriptions: StringBuilder,
1123}
1124
1125impl InformationSchemaDfSettingsBuilder {
1126    fn add_setting(&mut self, entry: ConfigEntry) {
1127        self.names.append_value(entry.key);
1128        self.values.append_option(entry.value);
1129        self.descriptions.append_value(entry.description);
1130    }
1131
1132    fn finish(&mut self) -> RecordBatch {
1133        RecordBatch::try_new(
1134            Arc::clone(&self.schema),
1135            vec![
1136                Arc::new(self.names.finish()),
1137                Arc::new(self.values.finish()),
1138                Arc::new(self.descriptions.finish()),
1139            ],
1140        )
1141        .unwrap()
1142    }
1143}
1144
1145#[derive(Debug)]
1146struct InformationSchemaRoutines {
1147    schema: SchemaRef,
1148    config: InformationSchemaConfig,
1149}
1150
1151impl InformationSchemaRoutines {
1152    fn new(config: InformationSchemaConfig) -> Self {
1153        let schema = Arc::new(Schema::new(vec![
1154            Field::new("specific_catalog", DataType::Utf8, false),
1155            Field::new("specific_schema", DataType::Utf8, false),
1156            Field::new("specific_name", DataType::Utf8, false),
1157            Field::new("routine_catalog", DataType::Utf8, false),
1158            Field::new("routine_schema", DataType::Utf8, false),
1159            Field::new("routine_name", DataType::Utf8, false),
1160            Field::new("routine_type", DataType::Utf8, false),
1161            Field::new("is_deterministic", DataType::Boolean, true),
1162            Field::new("data_type", DataType::Utf8, true),
1163            Field::new("function_type", DataType::Utf8, true),
1164            Field::new("description", DataType::Utf8, true),
1165            Field::new("syntax_example", DataType::Utf8, true),
1166        ]));
1167
1168        Self { schema, config }
1169    }
1170
1171    fn builder(&self) -> InformationSchemaRoutinesBuilder {
1172        InformationSchemaRoutinesBuilder {
1173            schema: Arc::clone(&self.schema),
1174            specific_catalog: StringBuilder::new(),
1175            specific_schema: StringBuilder::new(),
1176            specific_name: StringBuilder::new(),
1177            routine_catalog: StringBuilder::new(),
1178            routine_schema: StringBuilder::new(),
1179            routine_name: StringBuilder::new(),
1180            routine_type: StringBuilder::new(),
1181            is_deterministic: BooleanBuilder::new(),
1182            data_type: StringBuilder::new(),
1183            function_type: StringBuilder::new(),
1184            description: StringBuilder::new(),
1185            syntax_example: StringBuilder::new(),
1186        }
1187    }
1188}
1189
1190struct InformationSchemaRoutinesBuilder {
1191    schema: SchemaRef,
1192    specific_catalog: StringBuilder,
1193    specific_schema: StringBuilder,
1194    specific_name: StringBuilder,
1195    routine_catalog: StringBuilder,
1196    routine_schema: StringBuilder,
1197    routine_name: StringBuilder,
1198    routine_type: StringBuilder,
1199    is_deterministic: BooleanBuilder,
1200    data_type: StringBuilder,
1201    function_type: StringBuilder,
1202    description: StringBuilder,
1203    syntax_example: StringBuilder,
1204}
1205
1206impl InformationSchemaRoutinesBuilder {
1207    #[expect(clippy::too_many_arguments)]
1208    fn add_routine(
1209        &mut self,
1210        catalog_name: impl AsRef<str>,
1211        schema_name: impl AsRef<str>,
1212        routine_name: impl AsRef<str>,
1213        routine_type: impl AsRef<str>,
1214        is_deterministic: bool,
1215        data_type: Option<&impl AsRef<str>>,
1216        function_type: impl AsRef<str>,
1217        description: Option<impl AsRef<str>>,
1218        syntax_example: Option<impl AsRef<str>>,
1219    ) {
1220        self.specific_catalog.append_value(catalog_name.as_ref());
1221        self.specific_schema.append_value(schema_name.as_ref());
1222        self.specific_name.append_value(routine_name.as_ref());
1223        self.routine_catalog.append_value(catalog_name.as_ref());
1224        self.routine_schema.append_value(schema_name.as_ref());
1225        self.routine_name.append_value(routine_name.as_ref());
1226        self.routine_type.append_value(routine_type.as_ref());
1227        self.is_deterministic.append_value(is_deterministic);
1228        self.data_type.append_option(data_type.as_ref());
1229        self.function_type.append_value(function_type.as_ref());
1230        self.description.append_option(description);
1231        self.syntax_example.append_option(syntax_example);
1232    }
1233
1234    fn finish(&mut self) -> RecordBatch {
1235        RecordBatch::try_new(
1236            Arc::clone(&self.schema),
1237            vec![
1238                Arc::new(self.specific_catalog.finish()),
1239                Arc::new(self.specific_schema.finish()),
1240                Arc::new(self.specific_name.finish()),
1241                Arc::new(self.routine_catalog.finish()),
1242                Arc::new(self.routine_schema.finish()),
1243                Arc::new(self.routine_name.finish()),
1244                Arc::new(self.routine_type.finish()),
1245                Arc::new(self.is_deterministic.finish()),
1246                Arc::new(self.data_type.finish()),
1247                Arc::new(self.function_type.finish()),
1248                Arc::new(self.description.finish()),
1249                Arc::new(self.syntax_example.finish()),
1250            ],
1251        )
1252        .unwrap()
1253    }
1254}
1255
1256impl PartitionStream for InformationSchemaRoutines {
1257    fn schema(&self) -> &SchemaRef {
1258        &self.schema
1259    }
1260
1261    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1262        let config = self.config.clone();
1263        let mut builder = self.builder();
1264        Box::pin(RecordBatchStreamAdapter::new(
1265            Arc::clone(&self.schema),
1266            futures::stream::once(async move {
1267                config.make_routines(
1268                    ctx.scalar_functions(),
1269                    ctx.aggregate_functions(),
1270                    ctx.window_functions(),
1271                    ctx.session_config().options(),
1272                    &mut builder,
1273                )?;
1274                Ok(builder.finish())
1275            }),
1276        ))
1277    }
1278}
1279
1280#[derive(Debug)]
1281struct InformationSchemaParameters {
1282    schema: SchemaRef,
1283    config: InformationSchemaConfig,
1284}
1285
1286impl InformationSchemaParameters {
1287    fn new(config: InformationSchemaConfig) -> Self {
1288        let schema = Arc::new(Schema::new(vec![
1289            Field::new("specific_catalog", DataType::Utf8, false),
1290            Field::new("specific_schema", DataType::Utf8, false),
1291            Field::new("specific_name", DataType::Utf8, false),
1292            Field::new("ordinal_position", DataType::UInt64, false),
1293            Field::new("parameter_mode", DataType::Utf8, false),
1294            Field::new("parameter_name", DataType::Utf8, true),
1295            Field::new("data_type", DataType::Utf8, false),
1296            Field::new("parameter_default", DataType::Utf8, true),
1297            Field::new("is_variadic", DataType::Boolean, false),
1298            // `rid` (short for `routine id`) is used to differentiate parameters from different signatures
1299            // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
1300            // For example, the following signatures have different `rid` values:
1301            //     - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
1302            //     - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
1303            Field::new("rid", DataType::UInt8, false),
1304        ]));
1305
1306        Self { schema, config }
1307    }
1308
1309    fn builder(&self) -> InformationSchemaParametersBuilder {
1310        InformationSchemaParametersBuilder {
1311            schema: Arc::clone(&self.schema),
1312            specific_catalog: StringBuilder::new(),
1313            specific_schema: StringBuilder::new(),
1314            specific_name: StringBuilder::new(),
1315            ordinal_position: UInt64Builder::new(),
1316            parameter_mode: StringBuilder::new(),
1317            parameter_name: StringBuilder::new(),
1318            data_type: StringBuilder::new(),
1319            parameter_default: StringBuilder::new(),
1320            is_variadic: BooleanBuilder::new(),
1321            rid: UInt8Builder::new(),
1322        }
1323    }
1324}
1325
1326struct InformationSchemaParametersBuilder {
1327    schema: SchemaRef,
1328    specific_catalog: StringBuilder,
1329    specific_schema: StringBuilder,
1330    specific_name: StringBuilder,
1331    ordinal_position: UInt64Builder,
1332    parameter_mode: StringBuilder,
1333    parameter_name: StringBuilder,
1334    data_type: StringBuilder,
1335    parameter_default: StringBuilder,
1336    is_variadic: BooleanBuilder,
1337    rid: UInt8Builder,
1338}
1339
1340impl InformationSchemaParametersBuilder {
1341    #[expect(clippy::too_many_arguments)]
1342    fn add_parameter(
1343        &mut self,
1344        specific_catalog: impl AsRef<str>,
1345        specific_schema: impl AsRef<str>,
1346        specific_name: impl AsRef<str>,
1347        ordinal_position: u64,
1348        parameter_mode: impl AsRef<str>,
1349        parameter_name: Option<&(impl AsRef<str> + ?Sized)>,
1350        data_type: impl AsRef<str>,
1351        parameter_default: Option<impl AsRef<str>>,
1352        is_variadic: bool,
1353        rid: u8,
1354    ) {
1355        self.specific_catalog
1356            .append_value(specific_catalog.as_ref());
1357        self.specific_schema.append_value(specific_schema.as_ref());
1358        self.specific_name.append_value(specific_name.as_ref());
1359        self.ordinal_position.append_value(ordinal_position);
1360        self.parameter_mode.append_value(parameter_mode.as_ref());
1361        self.parameter_name.append_option(parameter_name.as_ref());
1362        self.data_type.append_value(data_type.as_ref());
1363        self.parameter_default.append_option(parameter_default);
1364        self.is_variadic.append_value(is_variadic);
1365        self.rid.append_value(rid);
1366    }
1367
1368    fn finish(&mut self) -> RecordBatch {
1369        RecordBatch::try_new(
1370            Arc::clone(&self.schema),
1371            vec![
1372                Arc::new(self.specific_catalog.finish()),
1373                Arc::new(self.specific_schema.finish()),
1374                Arc::new(self.specific_name.finish()),
1375                Arc::new(self.ordinal_position.finish()),
1376                Arc::new(self.parameter_mode.finish()),
1377                Arc::new(self.parameter_name.finish()),
1378                Arc::new(self.data_type.finish()),
1379                Arc::new(self.parameter_default.finish()),
1380                Arc::new(self.is_variadic.finish()),
1381                Arc::new(self.rid.finish()),
1382            ],
1383        )
1384        .unwrap()
1385    }
1386}
1387
1388impl PartitionStream for InformationSchemaParameters {
1389    fn schema(&self) -> &SchemaRef {
1390        &self.schema
1391    }
1392
1393    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1394        let config = self.config.clone();
1395        let mut builder = self.builder();
1396        Box::pin(RecordBatchStreamAdapter::new(
1397            Arc::clone(&self.schema),
1398            futures::stream::once(async move {
1399                config.make_parameters(
1400                    ctx.scalar_functions(),
1401                    ctx.aggregate_functions(),
1402                    ctx.window_functions(),
1403                    ctx.session_config().options(),
1404                    &mut builder,
1405                )?;
1406                Ok(builder.finish())
1407            }),
1408        ))
1409    }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414    use super::*;
1415    use crate::CatalogProvider;
1416
1417    #[tokio::test]
1418    async fn make_tables_uses_table_type() {
1419        let config = InformationSchemaConfig {
1420            catalog_list: Arc::new(Fixture),
1421        };
1422        let mut builder = InformationSchemaTablesBuilder {
1423            catalog_names: StringBuilder::new(),
1424            schema_names: StringBuilder::new(),
1425            table_names: StringBuilder::new(),
1426            table_types: StringBuilder::new(),
1427            schema: Arc::new(Schema::empty()),
1428        };
1429
1430        assert!(config.make_tables(&mut builder).await.is_ok());
1431
1432        assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1433    }
1434
1435    #[derive(Debug)]
1436    struct Fixture;
1437
1438    #[async_trait]
1439    impl SchemaProvider for Fixture {
1440        // InformationSchemaConfig::make_tables should use this.
1441        async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1442            Ok(Some(TableType::Base))
1443        }
1444
1445        // InformationSchemaConfig::make_tables used this before `table_type`
1446        // existed but should not, as it may be expensive.
1447        async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1448            panic!(
1449                "InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
1450            )
1451        }
1452
1453        fn table_names(&self) -> Vec<String> {
1454            vec!["atable".to_string()]
1455        }
1456
1457        fn table_exist(&self, _: &str) -> bool {
1458            unimplemented!("not required for these tests")
1459        }
1460    }
1461
1462    impl CatalogProviderList for Fixture {
1463        fn register_catalog(
1464            &self,
1465            _: String,
1466            _: Arc<dyn CatalogProvider>,
1467        ) -> Option<Arc<dyn CatalogProvider>> {
1468            unimplemented!("not required for these tests")
1469        }
1470
1471        fn catalog_names(&self) -> Vec<String> {
1472            vec!["acatalog".to_string()]
1473        }
1474
1475        fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1476            Some(Arc::new(Self))
1477        }
1478    }
1479
1480    impl CatalogProvider for Fixture {
1481        fn schema_names(&self) -> Vec<String> {
1482            vec!["aschema".to_string()]
1483        }
1484
1485        fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1486            Some(Arc::new(Self))
1487        }
1488    }
1489}