Skip to main content

datafusion_catalog/
information_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
19//!
20//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
21
22use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26    array::{StringBuilder, UInt64Builder},
27    datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::DataFusionError;
32use datafusion_common::config::{ConfigEntry, ConfigOptions};
33use datafusion_common::error::Result;
34use datafusion_common::types::NativeType;
35use datafusion_execution::TaskContext;
36use datafusion_execution::runtime_env::RuntimeEnv;
37use datafusion_expr::function::WindowUDFFieldArgs;
38use datafusion_expr::{
39    AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF,
40};
41use datafusion_expr::{TableType, Volatility};
42use datafusion_physical_plan::SendableRecordBatchStream;
43use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
44use datafusion_physical_plan::streaming::PartitionStream;
45use std::collections::{BTreeSet, HashMap, HashSet};
46use std::fmt::Debug;
47use std::{any::Any, sync::Arc};
48
49pub const INFORMATION_SCHEMA: &str = "information_schema";
50pub(crate) const TABLES: &str = "tables";
51pub(crate) const VIEWS: &str = "views";
52pub(crate) const COLUMNS: &str = "columns";
53pub(crate) const DF_SETTINGS: &str = "df_settings";
54pub(crate) const SCHEMATA: &str = "schemata";
55pub(crate) const ROUTINES: &str = "routines";
56pub(crate) const PARAMETERS: &str = "parameters";
57
58/// All information schema tables
59pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
60    TABLES,
61    VIEWS,
62    COLUMNS,
63    DF_SETTINGS,
64    SCHEMATA,
65    ROUTINES,
66    PARAMETERS,
67];
68
69/// Implements the `information_schema` virtual schema and tables
70///
71/// The underlying tables in the `information_schema` are created on
72/// demand. This means that if more tables are added to the underlying
73/// providers, they will appear the next time the `information_schema`
74/// table is queried.
75#[derive(Debug)]
76pub struct InformationSchemaProvider {
77    config: InformationSchemaConfig,
78}
79
80impl InformationSchemaProvider {
81    /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
82    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
83        Self {
84            config: InformationSchemaConfig { catalog_list },
85        }
86    }
87}
88
89#[derive(Clone, Debug)]
90struct InformationSchemaConfig {
91    catalog_list: Arc<dyn CatalogProviderList>,
92}
93
94impl InformationSchemaConfig {
95    /// Construct the `information_schema.tables` virtual table
96    async fn make_tables(
97        &self,
98        builder: &mut InformationSchemaTablesBuilder,
99    ) -> Result<(), DataFusionError> {
100        // create a mem table with the names of tables
101
102        for catalog_name in self.catalog_list.catalog_names() {
103            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
104
105            for schema_name in catalog.schema_names() {
106                if schema_name != INFORMATION_SCHEMA {
107                    // schema name may not exist in the catalog, so we need to check
108                    if let Some(schema) = catalog.schema(&schema_name) {
109                        for table_name in schema.table_names() {
110                            if let Some(table_type) =
111                                schema.table_type(&table_name).await?
112                            {
113                                builder.add_table(
114                                    &catalog_name,
115                                    &schema_name,
116                                    &table_name,
117                                    table_type,
118                                );
119                            }
120                        }
121                    }
122                }
123            }
124
125            // Add a final list for the information schema tables themselves
126            for table_name in INFORMATION_SCHEMA_TABLES {
127                builder.add_table(
128                    &catalog_name,
129                    INFORMATION_SCHEMA,
130                    table_name,
131                    TableType::View,
132                );
133            }
134        }
135
136        Ok(())
137    }
138
139    async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
140        for catalog_name in self.catalog_list.catalog_names() {
141            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
142
143            for schema_name in catalog.schema_names() {
144                if schema_name != INFORMATION_SCHEMA
145                    && let Some(schema) = catalog.schema(&schema_name)
146                {
147                    let schema_owner = schema.owner_name();
148                    builder.add_schemata(&catalog_name, &schema_name, schema_owner);
149                }
150            }
151        }
152    }
153
154    async fn make_views(
155        &self,
156        builder: &mut InformationSchemaViewBuilder,
157    ) -> Result<(), DataFusionError> {
158        for catalog_name in self.catalog_list.catalog_names() {
159            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
160
161            for schema_name in catalog.schema_names() {
162                if schema_name != INFORMATION_SCHEMA {
163                    // schema name may not exist in the catalog, so we need to check
164                    if let Some(schema) = catalog.schema(&schema_name) {
165                        for table_name in schema.table_names() {
166                            if let Some(table) = schema.table(&table_name).await? {
167                                builder.add_view(
168                                    &catalog_name,
169                                    &schema_name,
170                                    &table_name,
171                                    table.get_table_definition(),
172                                )
173                            }
174                        }
175                    }
176                }
177            }
178        }
179
180        Ok(())
181    }
182
183    /// Construct the `information_schema.columns` virtual table
184    async fn make_columns(
185        &self,
186        builder: &mut InformationSchemaColumnsBuilder,
187    ) -> Result<(), DataFusionError> {
188        for catalog_name in self.catalog_list.catalog_names() {
189            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
190
191            for schema_name in catalog.schema_names() {
192                if schema_name != INFORMATION_SCHEMA {
193                    // schema name may not exist in the catalog, so we need to check
194                    if let Some(schema) = catalog.schema(&schema_name) {
195                        for table_name in schema.table_names() {
196                            if let Some(table) = schema.table(&table_name).await? {
197                                for (field_position, field) in
198                                    table.schema().fields().iter().enumerate()
199                                {
200                                    builder.add_column(
201                                        &catalog_name,
202                                        &schema_name,
203                                        &table_name,
204                                        field_position,
205                                        field,
206                                    )
207                                }
208                            }
209                        }
210                    }
211                }
212            }
213        }
214
215        Ok(())
216    }
217
218    /// Construct the `information_schema.df_settings` virtual table
219    fn make_df_settings(
220        &self,
221        config_options: &ConfigOptions,
222        runtime_env: &Arc<RuntimeEnv>,
223        builder: &mut InformationSchemaDfSettingsBuilder,
224    ) {
225        for entry in config_options.entries() {
226            builder.add_setting(entry);
227        }
228        // Add runtime configuration entries
229        for entry in runtime_env.config_entries() {
230            builder.add_setting(entry);
231        }
232    }
233
234    fn make_routines(
235        &self,
236        udfs: &HashMap<String, Arc<ScalarUDF>>,
237        udafs: &HashMap<String, Arc<AggregateUDF>>,
238        udwfs: &HashMap<String, Arc<WindowUDF>>,
239        config_options: &ConfigOptions,
240        builder: &mut InformationSchemaRoutinesBuilder,
241    ) -> Result<()> {
242        let catalog_name = &config_options.catalog.default_catalog;
243        let schema_name = &config_options.catalog.default_schema;
244
245        for (name, udf) in udfs {
246            let return_types = get_udf_args_and_return_types(udf)?
247                .into_iter()
248                .map(|(_, return_type)| return_type)
249                .collect::<HashSet<_>>();
250            for return_type in return_types {
251                builder.add_routine(
252                    catalog_name,
253                    schema_name,
254                    name,
255                    "FUNCTION",
256                    Self::is_deterministic(udf.signature()),
257                    return_type.as_ref(),
258                    "SCALAR",
259                    udf.documentation().map(|d| d.description.to_string()),
260                    udf.documentation().map(|d| d.syntax_example.to_string()),
261                )
262            }
263        }
264
265        for (name, udaf) in udafs {
266            let return_types = get_udaf_args_and_return_types(udaf)?
267                .into_iter()
268                .map(|(_, return_type)| return_type)
269                .collect::<HashSet<_>>();
270            for return_type in return_types {
271                builder.add_routine(
272                    catalog_name,
273                    schema_name,
274                    name,
275                    "FUNCTION",
276                    Self::is_deterministic(udaf.signature()),
277                    return_type.as_ref(),
278                    "AGGREGATE",
279                    udaf.documentation().map(|d| d.description.to_string()),
280                    udaf.documentation().map(|d| d.syntax_example.to_string()),
281                )
282            }
283        }
284
285        for (name, udwf) in udwfs {
286            let return_types = get_udwf_args_and_return_types(udwf)?
287                .into_iter()
288                .map(|(_, return_type)| return_type)
289                .collect::<HashSet<_>>();
290            for return_type in return_types {
291                builder.add_routine(
292                    catalog_name,
293                    schema_name,
294                    name,
295                    "FUNCTION",
296                    Self::is_deterministic(udwf.signature()),
297                    return_type.as_ref(),
298                    "WINDOW",
299                    udwf.documentation().map(|d| d.description.to_string()),
300                    udwf.documentation().map(|d| d.syntax_example.to_string()),
301                )
302            }
303        }
304        Ok(())
305    }
306
307    fn is_deterministic(signature: &Signature) -> bool {
308        signature.volatility == Volatility::Immutable
309    }
310    fn make_parameters(
311        &self,
312        udfs: &HashMap<String, Arc<ScalarUDF>>,
313        udafs: &HashMap<String, Arc<AggregateUDF>>,
314        udwfs: &HashMap<String, Arc<WindowUDF>>,
315        config_options: &ConfigOptions,
316        builder: &mut InformationSchemaParametersBuilder,
317    ) -> Result<()> {
318        let catalog_name = &config_options.catalog.default_catalog;
319        let schema_name = &config_options.catalog.default_schema;
320        let mut add_parameters = |func_name: &str,
321                                  args: Option<&Vec<(String, String)>>,
322                                  arg_types: Vec<String>,
323                                  return_type: Option<String>,
324                                  is_variadic: bool,
325                                  rid: u8| {
326            for (position, type_name) in arg_types.iter().enumerate() {
327                let param_name =
328                    args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
329                builder.add_parameter(
330                    catalog_name,
331                    schema_name,
332                    func_name,
333                    position as u64 + 1,
334                    "IN",
335                    param_name,
336                    type_name,
337                    None::<&str>,
338                    is_variadic,
339                    rid,
340                );
341            }
342            if let Some(return_type) = return_type {
343                builder.add_parameter(
344                    catalog_name,
345                    schema_name,
346                    func_name,
347                    1,
348                    "OUT",
349                    None::<&str>,
350                    return_type.as_str(),
351                    None::<&str>,
352                    false,
353                    rid,
354                );
355            }
356        };
357
358        for (func_name, udf) in udfs {
359            let args = udf.documentation().and_then(|d| d.arguments.clone());
360            let combinations = get_udf_args_and_return_types(udf)?;
361            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
362                add_parameters(
363                    func_name,
364                    args.as_ref(),
365                    arg_types,
366                    return_type,
367                    Self::is_variadic(udf.signature()),
368                    rid as u8,
369                );
370            }
371        }
372
373        for (func_name, udaf) in udafs {
374            let args = udaf.documentation().and_then(|d| d.arguments.clone());
375            let combinations = get_udaf_args_and_return_types(udaf)?;
376            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
377                add_parameters(
378                    func_name,
379                    args.as_ref(),
380                    arg_types,
381                    return_type,
382                    Self::is_variadic(udaf.signature()),
383                    rid as u8,
384                );
385            }
386        }
387
388        for (func_name, udwf) in udwfs {
389            let args = udwf.documentation().and_then(|d| d.arguments.clone());
390            let combinations = get_udwf_args_and_return_types(udwf)?;
391            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
392                add_parameters(
393                    func_name,
394                    args.as_ref(),
395                    arg_types,
396                    return_type,
397                    Self::is_variadic(udwf.signature()),
398                    rid as u8,
399                );
400            }
401        }
402
403        Ok(())
404    }
405
406    fn is_variadic(signature: &Signature) -> bool {
407        matches!(
408            signature.type_signature,
409            TypeSignature::Variadic(_) | TypeSignature::VariadicAny
410        )
411    }
412}
413
414/// get the arguments and return types of a UDF
415/// returns a tuple of (arg_types, return_type)
416fn get_udf_args_and_return_types(
417    udf: &Arc<ScalarUDF>,
418) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
419    let signature = udf.signature();
420    let arg_types = signature.type_signature.get_example_types();
421    if arg_types.is_empty() {
422        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
423    } else {
424        Ok(arg_types
425            .into_iter()
426            .map(|arg_types| {
427                let arg_fields: Vec<FieldRef> = arg_types
428                    .iter()
429                    .enumerate()
430                    .map(|(i, t)| {
431                        Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
432                    })
433                    .collect();
434                let scalar_arguments = vec![None; arg_fields.len()];
435                let return_type = udf
436                    .return_field_from_args(ReturnFieldArgs {
437                        arg_fields: &arg_fields,
438                        scalar_arguments: &scalar_arguments,
439                    })
440                    .map(|f| {
441                        remove_native_type_prefix(&NativeType::from(
442                            f.data_type().clone(),
443                        ))
444                    })
445                    .ok();
446                let arg_types = arg_types
447                    .into_iter()
448                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
449                    .collect::<Vec<_>>();
450                (arg_types, return_type)
451            })
452            .collect::<BTreeSet<_>>())
453    }
454}
455
456fn get_udaf_args_and_return_types(
457    udaf: &Arc<AggregateUDF>,
458) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
459    let signature = udaf.signature();
460    let arg_types = signature.type_signature.get_example_types();
461    if arg_types.is_empty() {
462        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
463    } else {
464        Ok(arg_types
465            .into_iter()
466            .map(|arg_types| {
467                let arg_fields: Vec<FieldRef> = arg_types
468                    .iter()
469                    .enumerate()
470                    .map(|(i, t)| {
471                        Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
472                    })
473                    .collect();
474                let return_type = udaf
475                    .return_field(&arg_fields)
476                    .map(|f| {
477                        remove_native_type_prefix(&NativeType::from(
478                            f.data_type().clone(),
479                        ))
480                    })
481                    .ok();
482                let arg_types = arg_types
483                    .into_iter()
484                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
485                    .collect::<Vec<_>>();
486                (arg_types, return_type)
487            })
488            .collect::<BTreeSet<_>>())
489    }
490}
491
492fn get_udwf_args_and_return_types(
493    udwf: &Arc<WindowUDF>,
494) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
495    let signature = udwf.signature();
496    let arg_types = signature.type_signature.get_example_types();
497    if arg_types.is_empty() {
498        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
499    } else {
500        Ok(arg_types
501            .into_iter()
502            .map(|arg_types| {
503                let arg_fields: Vec<FieldRef> = arg_types
504                    .iter()
505                    .enumerate()
506                    .map(|(i, t)| {
507                        Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
508                    })
509                    .collect();
510                let return_type = udwf
511                    .field(WindowUDFFieldArgs::new(&arg_fields, udwf.name()))
512                    .map(|f| {
513                        remove_native_type_prefix(&NativeType::from(
514                            f.data_type().clone(),
515                        ))
516                    })
517                    .ok();
518                let arg_types = arg_types
519                    .into_iter()
520                    .map(|t| remove_native_type_prefix(&NativeType::from(t)))
521                    .collect::<Vec<_>>();
522                (arg_types, return_type)
523            })
524            .collect::<BTreeSet<_>>())
525    }
526}
527
528#[inline]
529fn remove_native_type_prefix(native_type: &NativeType) -> String {
530    format!("{native_type}")
531}
532
533#[async_trait]
534impl SchemaProvider for InformationSchemaProvider {
535    fn as_any(&self) -> &dyn Any {
536        self
537    }
538
539    fn table_names(&self) -> Vec<String> {
540        INFORMATION_SCHEMA_TABLES
541            .iter()
542            .map(|t| (*t).to_string())
543            .collect()
544    }
545
546    async fn table(
547        &self,
548        name: &str,
549    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
550        let config = self.config.clone();
551        let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
552            TABLES => Arc::new(InformationSchemaTables::new(config)),
553            COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
554            VIEWS => Arc::new(InformationSchemaViews::new(config)),
555            DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
556            SCHEMATA => Arc::new(InformationSchemata::new(config)),
557            ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
558            PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
559            _ => return Ok(None),
560        };
561
562        Ok(Some(Arc::new(
563            StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
564        )))
565    }
566
567    fn table_exist(&self, name: &str) -> bool {
568        INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
569    }
570}
571
572#[derive(Debug)]
573struct InformationSchemaTables {
574    schema: SchemaRef,
575    config: InformationSchemaConfig,
576}
577
578impl InformationSchemaTables {
579    fn new(config: InformationSchemaConfig) -> Self {
580        let schema = Arc::new(Schema::new(vec![
581            Field::new("table_catalog", DataType::Utf8, false),
582            Field::new("table_schema", DataType::Utf8, false),
583            Field::new("table_name", DataType::Utf8, false),
584            Field::new("table_type", DataType::Utf8, false),
585        ]));
586
587        Self { schema, config }
588    }
589
590    fn builder(&self) -> InformationSchemaTablesBuilder {
591        InformationSchemaTablesBuilder {
592            catalog_names: StringBuilder::new(),
593            schema_names: StringBuilder::new(),
594            table_names: StringBuilder::new(),
595            table_types: StringBuilder::new(),
596            schema: Arc::clone(&self.schema),
597        }
598    }
599}
600
601impl PartitionStream for InformationSchemaTables {
602    fn schema(&self) -> &SchemaRef {
603        &self.schema
604    }
605
606    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
607        let mut builder = self.builder();
608        let config = self.config.clone();
609        Box::pin(RecordBatchStreamAdapter::new(
610            Arc::clone(&self.schema),
611            // TODO: Stream this
612            futures::stream::once(async move {
613                config.make_tables(&mut builder).await?;
614                Ok(builder.finish())
615            }),
616        ))
617    }
618}
619
620/// Builds the `information_schema.TABLE` table row by row
621///
622/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
623struct InformationSchemaTablesBuilder {
624    schema: SchemaRef,
625    catalog_names: StringBuilder,
626    schema_names: StringBuilder,
627    table_names: StringBuilder,
628    table_types: StringBuilder,
629}
630
631impl InformationSchemaTablesBuilder {
632    fn add_table(
633        &mut self,
634        catalog_name: impl AsRef<str>,
635        schema_name: impl AsRef<str>,
636        table_name: impl AsRef<str>,
637        table_type: TableType,
638    ) {
639        // Note: append_value is actually infallible.
640        self.catalog_names.append_value(catalog_name.as_ref());
641        self.schema_names.append_value(schema_name.as_ref());
642        self.table_names.append_value(table_name.as_ref());
643        self.table_types.append_value(match table_type {
644            TableType::Base => "BASE TABLE",
645            TableType::View => "VIEW",
646            TableType::Temporary => "LOCAL TEMPORARY",
647        });
648    }
649
650    fn finish(&mut self) -> RecordBatch {
651        RecordBatch::try_new(
652            Arc::clone(&self.schema),
653            vec![
654                Arc::new(self.catalog_names.finish()),
655                Arc::new(self.schema_names.finish()),
656                Arc::new(self.table_names.finish()),
657                Arc::new(self.table_types.finish()),
658            ],
659        )
660        .unwrap()
661    }
662}
663
664#[derive(Debug)]
665struct InformationSchemaViews {
666    schema: SchemaRef,
667    config: InformationSchemaConfig,
668}
669
670impl InformationSchemaViews {
671    fn new(config: InformationSchemaConfig) -> Self {
672        let schema = Arc::new(Schema::new(vec![
673            Field::new("table_catalog", DataType::Utf8, false),
674            Field::new("table_schema", DataType::Utf8, false),
675            Field::new("table_name", DataType::Utf8, false),
676            Field::new("definition", DataType::Utf8, true),
677        ]));
678
679        Self { schema, config }
680    }
681
682    fn builder(&self) -> InformationSchemaViewBuilder {
683        InformationSchemaViewBuilder {
684            catalog_names: StringBuilder::new(),
685            schema_names: StringBuilder::new(),
686            table_names: StringBuilder::new(),
687            definitions: StringBuilder::new(),
688            schema: Arc::clone(&self.schema),
689        }
690    }
691}
692
693impl PartitionStream for InformationSchemaViews {
694    fn schema(&self) -> &SchemaRef {
695        &self.schema
696    }
697
698    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
699        let mut builder = self.builder();
700        let config = self.config.clone();
701        Box::pin(RecordBatchStreamAdapter::new(
702            Arc::clone(&self.schema),
703            // TODO: Stream this
704            futures::stream::once(async move {
705                config.make_views(&mut builder).await?;
706                Ok(builder.finish())
707            }),
708        ))
709    }
710}
711
712/// Builds the `information_schema.VIEWS` table row by row
713///
714/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
715struct InformationSchemaViewBuilder {
716    schema: SchemaRef,
717    catalog_names: StringBuilder,
718    schema_names: StringBuilder,
719    table_names: StringBuilder,
720    definitions: StringBuilder,
721}
722
723impl InformationSchemaViewBuilder {
724    fn add_view(
725        &mut self,
726        catalog_name: impl AsRef<str>,
727        schema_name: impl AsRef<str>,
728        table_name: impl AsRef<str>,
729        definition: Option<&(impl AsRef<str> + ?Sized)>,
730    ) {
731        // Note: append_value is actually infallible.
732        self.catalog_names.append_value(catalog_name.as_ref());
733        self.schema_names.append_value(schema_name.as_ref());
734        self.table_names.append_value(table_name.as_ref());
735        self.definitions.append_option(definition.as_ref());
736    }
737
738    fn finish(&mut self) -> RecordBatch {
739        RecordBatch::try_new(
740            Arc::clone(&self.schema),
741            vec![
742                Arc::new(self.catalog_names.finish()),
743                Arc::new(self.schema_names.finish()),
744                Arc::new(self.table_names.finish()),
745                Arc::new(self.definitions.finish()),
746            ],
747        )
748        .unwrap()
749    }
750}
751
752#[derive(Debug)]
753struct InformationSchemaColumns {
754    schema: SchemaRef,
755    config: InformationSchemaConfig,
756}
757
758impl InformationSchemaColumns {
759    fn new(config: InformationSchemaConfig) -> Self {
760        let schema = Arc::new(Schema::new(vec![
761            Field::new("table_catalog", DataType::Utf8, false),
762            Field::new("table_schema", DataType::Utf8, false),
763            Field::new("table_name", DataType::Utf8, false),
764            Field::new("column_name", DataType::Utf8, false),
765            Field::new("ordinal_position", DataType::UInt64, false),
766            Field::new("column_default", DataType::Utf8, true),
767            Field::new("is_nullable", DataType::Utf8, false),
768            Field::new("data_type", DataType::Utf8, false),
769            Field::new("character_maximum_length", DataType::UInt64, true),
770            Field::new("character_octet_length", DataType::UInt64, true),
771            Field::new("numeric_precision", DataType::UInt64, true),
772            Field::new("numeric_precision_radix", DataType::UInt64, true),
773            Field::new("numeric_scale", DataType::UInt64, true),
774            Field::new("datetime_precision", DataType::UInt64, true),
775            Field::new("interval_type", DataType::Utf8, true),
776        ]));
777
778        Self { schema, config }
779    }
780
781    fn builder(&self) -> InformationSchemaColumnsBuilder {
782        // StringBuilder requires providing an initial capacity, so
783        // pick 10 here arbitrarily as this is not performance
784        // critical code and the number of tables is unavailable here.
785        let default_capacity = 10;
786
787        InformationSchemaColumnsBuilder {
788            catalog_names: StringBuilder::new(),
789            schema_names: StringBuilder::new(),
790            table_names: StringBuilder::new(),
791            column_names: StringBuilder::new(),
792            ordinal_positions: UInt64Builder::with_capacity(default_capacity),
793            column_defaults: StringBuilder::new(),
794            is_nullables: StringBuilder::new(),
795            data_types: StringBuilder::new(),
796            character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
797            character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
798            numeric_precisions: UInt64Builder::with_capacity(default_capacity),
799            numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
800            numeric_scales: UInt64Builder::with_capacity(default_capacity),
801            datetime_precisions: UInt64Builder::with_capacity(default_capacity),
802            interval_types: StringBuilder::new(),
803            schema: Arc::clone(&self.schema),
804        }
805    }
806}
807
808impl PartitionStream for InformationSchemaColumns {
809    fn schema(&self) -> &SchemaRef {
810        &self.schema
811    }
812
813    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
814        let mut builder = self.builder();
815        let config = self.config.clone();
816        Box::pin(RecordBatchStreamAdapter::new(
817            Arc::clone(&self.schema),
818            // TODO: Stream this
819            futures::stream::once(async move {
820                config.make_columns(&mut builder).await?;
821                Ok(builder.finish())
822            }),
823        ))
824    }
825}
826
827/// Builds the `information_schema.COLUMNS` table row by row
828///
829/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
830struct InformationSchemaColumnsBuilder {
831    schema: SchemaRef,
832    catalog_names: StringBuilder,
833    schema_names: StringBuilder,
834    table_names: StringBuilder,
835    column_names: StringBuilder,
836    ordinal_positions: UInt64Builder,
837    column_defaults: StringBuilder,
838    is_nullables: StringBuilder,
839    data_types: StringBuilder,
840    character_maximum_lengths: UInt64Builder,
841    character_octet_lengths: UInt64Builder,
842    numeric_precisions: UInt64Builder,
843    numeric_precision_radixes: UInt64Builder,
844    numeric_scales: UInt64Builder,
845    datetime_precisions: UInt64Builder,
846    interval_types: StringBuilder,
847}
848
849impl InformationSchemaColumnsBuilder {
850    fn add_column(
851        &mut self,
852        catalog_name: &str,
853        schema_name: &str,
854        table_name: &str,
855        field_position: usize,
856        field: &Field,
857    ) {
858        use DataType::*;
859
860        // Note: append_value is actually infallible.
861        self.catalog_names.append_value(catalog_name);
862        self.schema_names.append_value(schema_name);
863        self.table_names.append_value(table_name);
864
865        self.column_names.append_value(field.name());
866
867        self.ordinal_positions.append_value(field_position as u64);
868
869        // DataFusion does not support column default values, so null
870        self.column_defaults.append_null();
871
872        // "YES if the column is possibly nullable, NO if it is known not nullable. "
873        let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
874        self.is_nullables.append_value(nullable_str);
875
876        // "System supplied type" --> Use debug format of the datatype
877        self.data_types.append_value(field.data_type().to_string());
878
879        // "If data_type identifies a character or bit string type, the
880        // declared maximum length; null for all other data types or
881        // if no maximum length was declared."
882        //
883        // Arrow has no equivalent of VARCHAR(20), so we leave this as Null
884        let max_chars = None;
885        self.character_maximum_lengths.append_option(max_chars);
886
887        // "Maximum length, in bytes, for binary data, character data,
888        // or text and image data."
889        let char_len: Option<u64> = match field.data_type() {
890            Utf8 | Binary => Some(i32::MAX as u64),
891            LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
892            _ => None,
893        };
894        self.character_octet_lengths.append_option(char_len);
895
896        // numeric_precision: "If data_type identifies a numeric type, this column
897        // contains the (declared or implicit) precision of the type
898        // for this column. The precision indicates the number of
899        // significant digits. It can be expressed in decimal (base
900        // 10) or binary (base 2) terms, as specified in the column
901        // numeric_precision_radix. For all other data types, this
902        // column is null."
903        //
904        // numeric_radix: If data_type identifies a numeric type, this
905        // column indicates in which base the values in the columns
906        // numeric_precision and numeric_scale are expressed. The
907        // value is either 2 or 10. For all other data types, this
908        // column is null.
909        //
910        // numeric_scale: If data_type identifies an exact numeric
911        // type, this column contains the (declared or implicit) scale
912        // of the type for this column. The scale indicates the number
913        // of significant digits to the right of the decimal point. It
914        // can be expressed in decimal (base 10) or binary (base 2)
915        // terms, as specified in the column
916        // numeric_precision_radix. For all other data types, this
917        // column is null.
918        let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
919            Int8 | UInt8 => (Some(8), Some(2), None),
920            Int16 | UInt16 => (Some(16), Some(2), None),
921            Int32 | UInt32 => (Some(32), Some(2), None),
922            // From max value of 65504 as explained on
923            // https://en.wikipedia.org/wiki/Half-precision_floating-point_format#Exponent_encoding
924            Float16 => (Some(15), Some(2), None),
925            // Numbers from postgres `real` type
926            Float32 => (Some(24), Some(2), None),
927            // Numbers from postgres `double` type
928            Float64 => (Some(24), Some(2), None),
929            Decimal128(precision, scale) => {
930                (Some(*precision as u64), Some(10), Some(*scale as u64))
931            }
932            _ => (None, None, None),
933        };
934
935        self.numeric_precisions.append_option(numeric_precision);
936        self.numeric_precision_radixes.append_option(numeric_radix);
937        self.numeric_scales.append_option(numeric_scale);
938
939        self.datetime_precisions.append_option(None);
940        self.interval_types.append_null();
941    }
942
943    fn finish(&mut self) -> RecordBatch {
944        RecordBatch::try_new(
945            Arc::clone(&self.schema),
946            vec![
947                Arc::new(self.catalog_names.finish()),
948                Arc::new(self.schema_names.finish()),
949                Arc::new(self.table_names.finish()),
950                Arc::new(self.column_names.finish()),
951                Arc::new(self.ordinal_positions.finish()),
952                Arc::new(self.column_defaults.finish()),
953                Arc::new(self.is_nullables.finish()),
954                Arc::new(self.data_types.finish()),
955                Arc::new(self.character_maximum_lengths.finish()),
956                Arc::new(self.character_octet_lengths.finish()),
957                Arc::new(self.numeric_precisions.finish()),
958                Arc::new(self.numeric_precision_radixes.finish()),
959                Arc::new(self.numeric_scales.finish()),
960                Arc::new(self.datetime_precisions.finish()),
961                Arc::new(self.interval_types.finish()),
962            ],
963        )
964        .unwrap()
965    }
966}
967
968#[derive(Debug)]
969struct InformationSchemata {
970    schema: SchemaRef,
971    config: InformationSchemaConfig,
972}
973
974impl InformationSchemata {
975    fn new(config: InformationSchemaConfig) -> Self {
976        let schema = Arc::new(Schema::new(vec![
977            Field::new("catalog_name", DataType::Utf8, false),
978            Field::new("schema_name", DataType::Utf8, false),
979            Field::new("schema_owner", DataType::Utf8, true),
980            Field::new("default_character_set_catalog", DataType::Utf8, true),
981            Field::new("default_character_set_schema", DataType::Utf8, true),
982            Field::new("default_character_set_name", DataType::Utf8, true),
983            Field::new("sql_path", DataType::Utf8, true),
984        ]));
985        Self { schema, config }
986    }
987
988    fn builder(&self) -> InformationSchemataBuilder {
989        InformationSchemataBuilder {
990            schema: Arc::clone(&self.schema),
991            catalog_name: StringBuilder::new(),
992            schema_name: StringBuilder::new(),
993            schema_owner: StringBuilder::new(),
994            default_character_set_catalog: StringBuilder::new(),
995            default_character_set_schema: StringBuilder::new(),
996            default_character_set_name: StringBuilder::new(),
997            sql_path: StringBuilder::new(),
998        }
999    }
1000}
1001
1002struct InformationSchemataBuilder {
1003    schema: SchemaRef,
1004    catalog_name: StringBuilder,
1005    schema_name: StringBuilder,
1006    schema_owner: StringBuilder,
1007    default_character_set_catalog: StringBuilder,
1008    default_character_set_schema: StringBuilder,
1009    default_character_set_name: StringBuilder,
1010    sql_path: StringBuilder,
1011}
1012
1013impl InformationSchemataBuilder {
1014    fn add_schemata(
1015        &mut self,
1016        catalog_name: &str,
1017        schema_name: &str,
1018        schema_owner: Option<&str>,
1019    ) {
1020        self.catalog_name.append_value(catalog_name);
1021        self.schema_name.append_value(schema_name);
1022        match schema_owner {
1023            Some(owner) => self.schema_owner.append_value(owner),
1024            None => self.schema_owner.append_null(),
1025        }
1026        // refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
1027        // these rows apply to a feature that is not implemented in DataFusion
1028        self.default_character_set_catalog.append_null();
1029        self.default_character_set_schema.append_null();
1030        self.default_character_set_name.append_null();
1031        self.sql_path.append_null();
1032    }
1033
1034    fn finish(&mut self) -> RecordBatch {
1035        RecordBatch::try_new(
1036            Arc::clone(&self.schema),
1037            vec![
1038                Arc::new(self.catalog_name.finish()),
1039                Arc::new(self.schema_name.finish()),
1040                Arc::new(self.schema_owner.finish()),
1041                Arc::new(self.default_character_set_catalog.finish()),
1042                Arc::new(self.default_character_set_schema.finish()),
1043                Arc::new(self.default_character_set_name.finish()),
1044                Arc::new(self.sql_path.finish()),
1045            ],
1046        )
1047        .unwrap()
1048    }
1049}
1050
1051impl PartitionStream for InformationSchemata {
1052    fn schema(&self) -> &SchemaRef {
1053        &self.schema
1054    }
1055
1056    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1057        let mut builder = self.builder();
1058        let config = self.config.clone();
1059        Box::pin(RecordBatchStreamAdapter::new(
1060            Arc::clone(&self.schema),
1061            // TODO: Stream this
1062            futures::stream::once(async move {
1063                config.make_schemata(&mut builder).await;
1064                Ok(builder.finish())
1065            }),
1066        ))
1067    }
1068}
1069
1070#[derive(Debug)]
1071struct InformationSchemaDfSettings {
1072    schema: SchemaRef,
1073    config: InformationSchemaConfig,
1074}
1075
1076impl InformationSchemaDfSettings {
1077    fn new(config: InformationSchemaConfig) -> Self {
1078        let schema = Arc::new(Schema::new(vec![
1079            Field::new("name", DataType::Utf8, false),
1080            Field::new("value", DataType::Utf8, true),
1081            Field::new("description", DataType::Utf8, true),
1082        ]));
1083
1084        Self { schema, config }
1085    }
1086
1087    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1088        InformationSchemaDfSettingsBuilder {
1089            names: StringBuilder::new(),
1090            values: StringBuilder::new(),
1091            descriptions: StringBuilder::new(),
1092            schema: Arc::clone(&self.schema),
1093        }
1094    }
1095}
1096
1097impl PartitionStream for InformationSchemaDfSettings {
1098    fn schema(&self) -> &SchemaRef {
1099        &self.schema
1100    }
1101
1102    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1103        let config = self.config.clone();
1104        let mut builder = self.builder();
1105        Box::pin(RecordBatchStreamAdapter::new(
1106            Arc::clone(&self.schema),
1107            // TODO: Stream this
1108            futures::stream::once(async move {
1109                // create a mem table with the names of tables
1110                let runtime_env = ctx.runtime_env();
1111                config.make_df_settings(
1112                    ctx.session_config().options(),
1113                    &runtime_env,
1114                    &mut builder,
1115                );
1116                Ok(builder.finish())
1117            }),
1118        ))
1119    }
1120}
1121
1122struct InformationSchemaDfSettingsBuilder {
1123    schema: SchemaRef,
1124    names: StringBuilder,
1125    values: StringBuilder,
1126    descriptions: StringBuilder,
1127}
1128
1129impl InformationSchemaDfSettingsBuilder {
1130    fn add_setting(&mut self, entry: ConfigEntry) {
1131        self.names.append_value(entry.key);
1132        self.values.append_option(entry.value);
1133        self.descriptions.append_value(entry.description);
1134    }
1135
1136    fn finish(&mut self) -> RecordBatch {
1137        RecordBatch::try_new(
1138            Arc::clone(&self.schema),
1139            vec![
1140                Arc::new(self.names.finish()),
1141                Arc::new(self.values.finish()),
1142                Arc::new(self.descriptions.finish()),
1143            ],
1144        )
1145        .unwrap()
1146    }
1147}
1148
1149#[derive(Debug)]
1150struct InformationSchemaRoutines {
1151    schema: SchemaRef,
1152    config: InformationSchemaConfig,
1153}
1154
1155impl InformationSchemaRoutines {
1156    fn new(config: InformationSchemaConfig) -> Self {
1157        let schema = Arc::new(Schema::new(vec![
1158            Field::new("specific_catalog", DataType::Utf8, false),
1159            Field::new("specific_schema", DataType::Utf8, false),
1160            Field::new("specific_name", DataType::Utf8, false),
1161            Field::new("routine_catalog", DataType::Utf8, false),
1162            Field::new("routine_schema", DataType::Utf8, false),
1163            Field::new("routine_name", DataType::Utf8, false),
1164            Field::new("routine_type", DataType::Utf8, false),
1165            Field::new("is_deterministic", DataType::Boolean, true),
1166            Field::new("data_type", DataType::Utf8, true),
1167            Field::new("function_type", DataType::Utf8, true),
1168            Field::new("description", DataType::Utf8, true),
1169            Field::new("syntax_example", DataType::Utf8, true),
1170        ]));
1171
1172        Self { schema, config }
1173    }
1174
1175    fn builder(&self) -> InformationSchemaRoutinesBuilder {
1176        InformationSchemaRoutinesBuilder {
1177            schema: Arc::clone(&self.schema),
1178            specific_catalog: StringBuilder::new(),
1179            specific_schema: StringBuilder::new(),
1180            specific_name: StringBuilder::new(),
1181            routine_catalog: StringBuilder::new(),
1182            routine_schema: StringBuilder::new(),
1183            routine_name: StringBuilder::new(),
1184            routine_type: StringBuilder::new(),
1185            is_deterministic: BooleanBuilder::new(),
1186            data_type: StringBuilder::new(),
1187            function_type: StringBuilder::new(),
1188            description: StringBuilder::new(),
1189            syntax_example: StringBuilder::new(),
1190        }
1191    }
1192}
1193
1194struct InformationSchemaRoutinesBuilder {
1195    schema: SchemaRef,
1196    specific_catalog: StringBuilder,
1197    specific_schema: StringBuilder,
1198    specific_name: StringBuilder,
1199    routine_catalog: StringBuilder,
1200    routine_schema: StringBuilder,
1201    routine_name: StringBuilder,
1202    routine_type: StringBuilder,
1203    is_deterministic: BooleanBuilder,
1204    data_type: StringBuilder,
1205    function_type: StringBuilder,
1206    description: StringBuilder,
1207    syntax_example: StringBuilder,
1208}
1209
1210impl InformationSchemaRoutinesBuilder {
1211    #[expect(clippy::too_many_arguments)]
1212    fn add_routine(
1213        &mut self,
1214        catalog_name: impl AsRef<str>,
1215        schema_name: impl AsRef<str>,
1216        routine_name: impl AsRef<str>,
1217        routine_type: impl AsRef<str>,
1218        is_deterministic: bool,
1219        data_type: Option<&impl AsRef<str>>,
1220        function_type: impl AsRef<str>,
1221        description: Option<impl AsRef<str>>,
1222        syntax_example: Option<impl AsRef<str>>,
1223    ) {
1224        self.specific_catalog.append_value(catalog_name.as_ref());
1225        self.specific_schema.append_value(schema_name.as_ref());
1226        self.specific_name.append_value(routine_name.as_ref());
1227        self.routine_catalog.append_value(catalog_name.as_ref());
1228        self.routine_schema.append_value(schema_name.as_ref());
1229        self.routine_name.append_value(routine_name.as_ref());
1230        self.routine_type.append_value(routine_type.as_ref());
1231        self.is_deterministic.append_value(is_deterministic);
1232        self.data_type.append_option(data_type.as_ref());
1233        self.function_type.append_value(function_type.as_ref());
1234        self.description.append_option(description);
1235        self.syntax_example.append_option(syntax_example);
1236    }
1237
1238    fn finish(&mut self) -> RecordBatch {
1239        RecordBatch::try_new(
1240            Arc::clone(&self.schema),
1241            vec![
1242                Arc::new(self.specific_catalog.finish()),
1243                Arc::new(self.specific_schema.finish()),
1244                Arc::new(self.specific_name.finish()),
1245                Arc::new(self.routine_catalog.finish()),
1246                Arc::new(self.routine_schema.finish()),
1247                Arc::new(self.routine_name.finish()),
1248                Arc::new(self.routine_type.finish()),
1249                Arc::new(self.is_deterministic.finish()),
1250                Arc::new(self.data_type.finish()),
1251                Arc::new(self.function_type.finish()),
1252                Arc::new(self.description.finish()),
1253                Arc::new(self.syntax_example.finish()),
1254            ],
1255        )
1256        .unwrap()
1257    }
1258}
1259
1260impl PartitionStream for InformationSchemaRoutines {
1261    fn schema(&self) -> &SchemaRef {
1262        &self.schema
1263    }
1264
1265    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1266        let config = self.config.clone();
1267        let mut builder = self.builder();
1268        Box::pin(RecordBatchStreamAdapter::new(
1269            Arc::clone(&self.schema),
1270            futures::stream::once(async move {
1271                config.make_routines(
1272                    ctx.scalar_functions(),
1273                    ctx.aggregate_functions(),
1274                    ctx.window_functions(),
1275                    ctx.session_config().options(),
1276                    &mut builder,
1277                )?;
1278                Ok(builder.finish())
1279            }),
1280        ))
1281    }
1282}
1283
1284#[derive(Debug)]
1285struct InformationSchemaParameters {
1286    schema: SchemaRef,
1287    config: InformationSchemaConfig,
1288}
1289
1290impl InformationSchemaParameters {
1291    fn new(config: InformationSchemaConfig) -> Self {
1292        let schema = Arc::new(Schema::new(vec![
1293            Field::new("specific_catalog", DataType::Utf8, false),
1294            Field::new("specific_schema", DataType::Utf8, false),
1295            Field::new("specific_name", DataType::Utf8, false),
1296            Field::new("ordinal_position", DataType::UInt64, false),
1297            Field::new("parameter_mode", DataType::Utf8, false),
1298            Field::new("parameter_name", DataType::Utf8, true),
1299            Field::new("data_type", DataType::Utf8, false),
1300            Field::new("parameter_default", DataType::Utf8, true),
1301            Field::new("is_variadic", DataType::Boolean, false),
1302            // `rid` (short for `routine id`) is used to differentiate parameters from different signatures
1303            // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
1304            // For example, the following signatures have different `rid` values:
1305            //     - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
1306            //     - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
1307            Field::new("rid", DataType::UInt8, false),
1308        ]));
1309
1310        Self { schema, config }
1311    }
1312
1313    fn builder(&self) -> InformationSchemaParametersBuilder {
1314        InformationSchemaParametersBuilder {
1315            schema: Arc::clone(&self.schema),
1316            specific_catalog: StringBuilder::new(),
1317            specific_schema: StringBuilder::new(),
1318            specific_name: StringBuilder::new(),
1319            ordinal_position: UInt64Builder::new(),
1320            parameter_mode: StringBuilder::new(),
1321            parameter_name: StringBuilder::new(),
1322            data_type: StringBuilder::new(),
1323            parameter_default: StringBuilder::new(),
1324            is_variadic: BooleanBuilder::new(),
1325            rid: UInt8Builder::new(),
1326        }
1327    }
1328}
1329
1330struct InformationSchemaParametersBuilder {
1331    schema: SchemaRef,
1332    specific_catalog: StringBuilder,
1333    specific_schema: StringBuilder,
1334    specific_name: StringBuilder,
1335    ordinal_position: UInt64Builder,
1336    parameter_mode: StringBuilder,
1337    parameter_name: StringBuilder,
1338    data_type: StringBuilder,
1339    parameter_default: StringBuilder,
1340    is_variadic: BooleanBuilder,
1341    rid: UInt8Builder,
1342}
1343
1344impl InformationSchemaParametersBuilder {
1345    #[expect(clippy::too_many_arguments)]
1346    fn add_parameter(
1347        &mut self,
1348        specific_catalog: impl AsRef<str>,
1349        specific_schema: impl AsRef<str>,
1350        specific_name: impl AsRef<str>,
1351        ordinal_position: u64,
1352        parameter_mode: impl AsRef<str>,
1353        parameter_name: Option<&(impl AsRef<str> + ?Sized)>,
1354        data_type: impl AsRef<str>,
1355        parameter_default: Option<impl AsRef<str>>,
1356        is_variadic: bool,
1357        rid: u8,
1358    ) {
1359        self.specific_catalog
1360            .append_value(specific_catalog.as_ref());
1361        self.specific_schema.append_value(specific_schema.as_ref());
1362        self.specific_name.append_value(specific_name.as_ref());
1363        self.ordinal_position.append_value(ordinal_position);
1364        self.parameter_mode.append_value(parameter_mode.as_ref());
1365        self.parameter_name.append_option(parameter_name.as_ref());
1366        self.data_type.append_value(data_type.as_ref());
1367        self.parameter_default.append_option(parameter_default);
1368        self.is_variadic.append_value(is_variadic);
1369        self.rid.append_value(rid);
1370    }
1371
1372    fn finish(&mut self) -> RecordBatch {
1373        RecordBatch::try_new(
1374            Arc::clone(&self.schema),
1375            vec![
1376                Arc::new(self.specific_catalog.finish()),
1377                Arc::new(self.specific_schema.finish()),
1378                Arc::new(self.specific_name.finish()),
1379                Arc::new(self.ordinal_position.finish()),
1380                Arc::new(self.parameter_mode.finish()),
1381                Arc::new(self.parameter_name.finish()),
1382                Arc::new(self.data_type.finish()),
1383                Arc::new(self.parameter_default.finish()),
1384                Arc::new(self.is_variadic.finish()),
1385                Arc::new(self.rid.finish()),
1386            ],
1387        )
1388        .unwrap()
1389    }
1390}
1391
1392impl PartitionStream for InformationSchemaParameters {
1393    fn schema(&self) -> &SchemaRef {
1394        &self.schema
1395    }
1396
1397    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1398        let config = self.config.clone();
1399        let mut builder = self.builder();
1400        Box::pin(RecordBatchStreamAdapter::new(
1401            Arc::clone(&self.schema),
1402            futures::stream::once(async move {
1403                config.make_parameters(
1404                    ctx.scalar_functions(),
1405                    ctx.aggregate_functions(),
1406                    ctx.window_functions(),
1407                    ctx.session_config().options(),
1408                    &mut builder,
1409                )?;
1410                Ok(builder.finish())
1411            }),
1412        ))
1413    }
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418    use super::*;
1419    use crate::CatalogProvider;
1420
1421    #[tokio::test]
1422    async fn make_tables_uses_table_type() {
1423        let config = InformationSchemaConfig {
1424            catalog_list: Arc::new(Fixture),
1425        };
1426        let mut builder = InformationSchemaTablesBuilder {
1427            catalog_names: StringBuilder::new(),
1428            schema_names: StringBuilder::new(),
1429            table_names: StringBuilder::new(),
1430            table_types: StringBuilder::new(),
1431            schema: Arc::new(Schema::empty()),
1432        };
1433
1434        assert!(config.make_tables(&mut builder).await.is_ok());
1435
1436        assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1437    }
1438
1439    #[derive(Debug)]
1440    struct Fixture;
1441
1442    #[async_trait]
1443    impl SchemaProvider for Fixture {
1444        // InformationSchemaConfig::make_tables should use this.
1445        async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1446            Ok(Some(TableType::Base))
1447        }
1448
1449        // InformationSchemaConfig::make_tables used this before `table_type`
1450        // existed but should not, as it may be expensive.
1451        async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1452            panic!(
1453                "InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
1454            )
1455        }
1456
1457        fn as_any(&self) -> &dyn Any {
1458            unimplemented!("not required for these tests")
1459        }
1460
1461        fn table_names(&self) -> Vec<String> {
1462            vec!["atable".to_string()]
1463        }
1464
1465        fn table_exist(&self, _: &str) -> bool {
1466            unimplemented!("not required for these tests")
1467        }
1468    }
1469
1470    impl CatalogProviderList for Fixture {
1471        fn as_any(&self) -> &dyn Any {
1472            unimplemented!("not required for these tests")
1473        }
1474
1475        fn register_catalog(
1476            &self,
1477            _: String,
1478            _: Arc<dyn CatalogProvider>,
1479        ) -> Option<Arc<dyn CatalogProvider>> {
1480            unimplemented!("not required for these tests")
1481        }
1482
1483        fn catalog_names(&self) -> Vec<String> {
1484            vec!["acatalog".to_string()]
1485        }
1486
1487        fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1488            Some(Arc::new(Self))
1489        }
1490    }
1491
1492    impl CatalogProvider for Fixture {
1493        fn as_any(&self) -> &dyn Any {
1494            unimplemented!("not required for these tests")
1495        }
1496
1497        fn schema_names(&self) -> Vec<String> {
1498            vec!["aschema".to_string()]
1499        }
1500
1501        fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1502            Some(Arc::new(Self))
1503        }
1504    }
1505}