datafusion_catalog/
information_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
19//!
20//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
21
22use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26    array::{StringBuilder, UInt64Builder},
27    datatypes::{DataType, Field, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::config::{ConfigEntry, ConfigOptions};
32use datafusion_common::error::Result;
33use datafusion_common::types::NativeType;
34use datafusion_common::DataFusionError;
35use datafusion_execution::TaskContext;
36use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
37use datafusion_expr::{TableType, Volatility};
38use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
39use datafusion_physical_plan::streaming::PartitionStream;
40use datafusion_physical_plan::SendableRecordBatchStream;
41use std::collections::{BTreeSet, HashMap, HashSet};
42use std::fmt::Debug;
43use std::{any::Any, sync::Arc};
44
45pub const INFORMATION_SCHEMA: &str = "information_schema";
46pub(crate) const TABLES: &str = "tables";
47pub(crate) const VIEWS: &str = "views";
48pub(crate) const COLUMNS: &str = "columns";
49pub(crate) const DF_SETTINGS: &str = "df_settings";
50pub(crate) const SCHEMATA: &str = "schemata";
51pub(crate) const ROUTINES: &str = "routines";
52pub(crate) const PARAMETERS: &str = "parameters";
53
54/// All information schema tables
55pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
56    TABLES,
57    VIEWS,
58    COLUMNS,
59    DF_SETTINGS,
60    SCHEMATA,
61    ROUTINES,
62    PARAMETERS,
63];
64
65/// Implements the `information_schema` virtual schema and tables
66///
67/// The underlying tables in the `information_schema` are created on
68/// demand. This means that if more tables are added to the underlying
69/// providers, they will appear the next time the `information_schema`
70/// table is queried.
71#[derive(Debug)]
72pub struct InformationSchemaProvider {
73    config: InformationSchemaConfig,
74}
75
76impl InformationSchemaProvider {
77    /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
78    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
79        Self {
80            config: InformationSchemaConfig { catalog_list },
81        }
82    }
83}
84
85#[derive(Clone, Debug)]
86struct InformationSchemaConfig {
87    catalog_list: Arc<dyn CatalogProviderList>,
88}
89
90impl InformationSchemaConfig {
91    /// Construct the `information_schema.tables` virtual table
92    async fn make_tables(
93        &self,
94        builder: &mut InformationSchemaTablesBuilder,
95    ) -> Result<(), DataFusionError> {
96        // create a mem table with the names of tables
97
98        for catalog_name in self.catalog_list.catalog_names() {
99            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
100
101            for schema_name in catalog.schema_names() {
102                if schema_name != INFORMATION_SCHEMA {
103                    // schema name may not exist in the catalog, so we need to check
104                    if let Some(schema) = catalog.schema(&schema_name) {
105                        for table_name in schema.table_names() {
106                            if let Some(table) = schema.table(&table_name).await? {
107                                builder.add_table(
108                                    &catalog_name,
109                                    &schema_name,
110                                    &table_name,
111                                    table.table_type(),
112                                );
113                            }
114                        }
115                    }
116                }
117            }
118
119            // Add a final list for the information schema tables themselves
120            for table_name in INFORMATION_SCHEMA_TABLES {
121                builder.add_table(
122                    &catalog_name,
123                    INFORMATION_SCHEMA,
124                    table_name,
125                    TableType::View,
126                );
127            }
128        }
129
130        Ok(())
131    }
132
133    async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
134        for catalog_name in self.catalog_list.catalog_names() {
135            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
136
137            for schema_name in catalog.schema_names() {
138                if schema_name != INFORMATION_SCHEMA {
139                    if let Some(schema) = catalog.schema(&schema_name) {
140                        let schema_owner = schema.owner_name();
141                        builder.add_schemata(&catalog_name, &schema_name, schema_owner);
142                    }
143                }
144            }
145        }
146    }
147
148    async fn make_views(
149        &self,
150        builder: &mut InformationSchemaViewBuilder,
151    ) -> Result<(), DataFusionError> {
152        for catalog_name in self.catalog_list.catalog_names() {
153            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
154
155            for schema_name in catalog.schema_names() {
156                if schema_name != INFORMATION_SCHEMA {
157                    // schema name may not exist in the catalog, so we need to check
158                    if let Some(schema) = catalog.schema(&schema_name) {
159                        for table_name in schema.table_names() {
160                            if let Some(table) = schema.table(&table_name).await? {
161                                builder.add_view(
162                                    &catalog_name,
163                                    &schema_name,
164                                    &table_name,
165                                    table.get_table_definition(),
166                                )
167                            }
168                        }
169                    }
170                }
171            }
172        }
173
174        Ok(())
175    }
176
177    /// Construct the `information_schema.columns` virtual table
178    async fn make_columns(
179        &self,
180        builder: &mut InformationSchemaColumnsBuilder,
181    ) -> Result<(), DataFusionError> {
182        for catalog_name in self.catalog_list.catalog_names() {
183            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
184
185            for schema_name in catalog.schema_names() {
186                if schema_name != INFORMATION_SCHEMA {
187                    // schema name may not exist in the catalog, so we need to check
188                    if let Some(schema) = catalog.schema(&schema_name) {
189                        for table_name in schema.table_names() {
190                            if let Some(table) = schema.table(&table_name).await? {
191                                for (field_position, field) in
192                                    table.schema().fields().iter().enumerate()
193                                {
194                                    builder.add_column(
195                                        &catalog_name,
196                                        &schema_name,
197                                        &table_name,
198                                        field_position,
199                                        field,
200                                    )
201                                }
202                            }
203                        }
204                    }
205                }
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Construct the `information_schema.df_settings` virtual table
213    fn make_df_settings(
214        &self,
215        config_options: &ConfigOptions,
216        builder: &mut InformationSchemaDfSettingsBuilder,
217    ) {
218        for entry in config_options.entries() {
219            builder.add_setting(entry);
220        }
221    }
222
223    fn make_routines(
224        &self,
225        udfs: &HashMap<String, Arc<ScalarUDF>>,
226        udafs: &HashMap<String, Arc<AggregateUDF>>,
227        udwfs: &HashMap<String, Arc<WindowUDF>>,
228        config_options: &ConfigOptions,
229        builder: &mut InformationSchemaRoutinesBuilder,
230    ) -> Result<()> {
231        let catalog_name = &config_options.catalog.default_catalog;
232        let schema_name = &config_options.catalog.default_schema;
233
234        for (name, udf) in udfs {
235            let return_types = get_udf_args_and_return_types(udf)?
236                .into_iter()
237                .map(|(_, return_type)| return_type)
238                .collect::<HashSet<_>>();
239            for return_type in return_types {
240                builder.add_routine(
241                    catalog_name,
242                    schema_name,
243                    name,
244                    "FUNCTION",
245                    Self::is_deterministic(udf.signature()),
246                    return_type,
247                    "SCALAR",
248                    udf.documentation().map(|d| d.description.to_string()),
249                    udf.documentation().map(|d| d.syntax_example.to_string()),
250                )
251            }
252        }
253
254        for (name, udaf) in udafs {
255            let return_types = get_udaf_args_and_return_types(udaf)?
256                .into_iter()
257                .map(|(_, return_type)| return_type)
258                .collect::<HashSet<_>>();
259            for return_type in return_types {
260                builder.add_routine(
261                    catalog_name,
262                    schema_name,
263                    name,
264                    "FUNCTION",
265                    Self::is_deterministic(udaf.signature()),
266                    return_type,
267                    "AGGREGATE",
268                    udaf.documentation().map(|d| d.description.to_string()),
269                    udaf.documentation().map(|d| d.syntax_example.to_string()),
270                )
271            }
272        }
273
274        for (name, udwf) in udwfs {
275            let return_types = get_udwf_args_and_return_types(udwf)?
276                .into_iter()
277                .map(|(_, return_type)| return_type)
278                .collect::<HashSet<_>>();
279            for return_type in return_types {
280                builder.add_routine(
281                    catalog_name,
282                    schema_name,
283                    name,
284                    "FUNCTION",
285                    Self::is_deterministic(udwf.signature()),
286                    return_type,
287                    "WINDOW",
288                    udwf.documentation().map(|d| d.description.to_string()),
289                    udwf.documentation().map(|d| d.syntax_example.to_string()),
290                )
291            }
292        }
293        Ok(())
294    }
295
296    fn is_deterministic(signature: &Signature) -> bool {
297        signature.volatility == Volatility::Immutable
298    }
299    fn make_parameters(
300        &self,
301        udfs: &HashMap<String, Arc<ScalarUDF>>,
302        udafs: &HashMap<String, Arc<AggregateUDF>>,
303        udwfs: &HashMap<String, Arc<WindowUDF>>,
304        config_options: &ConfigOptions,
305        builder: &mut InformationSchemaParametersBuilder,
306    ) -> Result<()> {
307        let catalog_name = &config_options.catalog.default_catalog;
308        let schema_name = &config_options.catalog.default_schema;
309        let mut add_parameters = |func_name: &str,
310                                  args: Option<&Vec<(String, String)>>,
311                                  arg_types: Vec<String>,
312                                  return_type: Option<String>,
313                                  is_variadic: bool,
314                                  rid: u8| {
315            for (position, type_name) in arg_types.iter().enumerate() {
316                let param_name =
317                    args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
318                builder.add_parameter(
319                    catalog_name,
320                    schema_name,
321                    func_name,
322                    position as u64 + 1,
323                    "IN",
324                    param_name,
325                    type_name,
326                    None::<&str>,
327                    is_variadic,
328                    rid,
329                );
330            }
331            if let Some(return_type) = return_type {
332                builder.add_parameter(
333                    catalog_name,
334                    schema_name,
335                    func_name,
336                    1,
337                    "OUT",
338                    None::<&str>,
339                    return_type.as_str(),
340                    None::<&str>,
341                    false,
342                    rid,
343                );
344            }
345        };
346
347        for (func_name, udf) in udfs {
348            let args = udf.documentation().and_then(|d| d.arguments.clone());
349            let combinations = get_udf_args_and_return_types(udf)?;
350            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
351                add_parameters(
352                    func_name,
353                    args.as_ref(),
354                    arg_types,
355                    return_type,
356                    Self::is_variadic(udf.signature()),
357                    rid as u8,
358                );
359            }
360        }
361
362        for (func_name, udaf) in udafs {
363            let args = udaf.documentation().and_then(|d| d.arguments.clone());
364            let combinations = get_udaf_args_and_return_types(udaf)?;
365            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
366                add_parameters(
367                    func_name,
368                    args.as_ref(),
369                    arg_types,
370                    return_type,
371                    Self::is_variadic(udaf.signature()),
372                    rid as u8,
373                );
374            }
375        }
376
377        for (func_name, udwf) in udwfs {
378            let args = udwf.documentation().and_then(|d| d.arguments.clone());
379            let combinations = get_udwf_args_and_return_types(udwf)?;
380            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
381                add_parameters(
382                    func_name,
383                    args.as_ref(),
384                    arg_types,
385                    return_type,
386                    Self::is_variadic(udwf.signature()),
387                    rid as u8,
388                );
389            }
390        }
391
392        Ok(())
393    }
394
395    fn is_variadic(signature: &Signature) -> bool {
396        matches!(
397            signature.type_signature,
398            TypeSignature::Variadic(_) | TypeSignature::VariadicAny
399        )
400    }
401}
402
403/// get the arguments and return types of a UDF
404/// returns a tuple of (arg_types, return_type)
405fn get_udf_args_and_return_types(
406    udf: &Arc<ScalarUDF>,
407) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
408    let signature = udf.signature();
409    let arg_types = signature.type_signature.get_example_types();
410    if arg_types.is_empty() {
411        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
412    } else {
413        Ok(arg_types
414            .into_iter()
415            .map(|arg_types| {
416                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
417                let return_type = udf
418                    .return_type(&arg_types)
419                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
420                    .ok();
421                let arg_types = arg_types
422                    .into_iter()
423                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
424                    .collect::<Vec<_>>();
425                (arg_types, return_type)
426            })
427            .collect::<BTreeSet<_>>())
428    }
429}
430
431fn get_udaf_args_and_return_types(
432    udaf: &Arc<AggregateUDF>,
433) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
434    let signature = udaf.signature();
435    let arg_types = signature.type_signature.get_example_types();
436    if arg_types.is_empty() {
437        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
438    } else {
439        Ok(arg_types
440            .into_iter()
441            .map(|arg_types| {
442                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
443                let return_type = udaf
444                    .return_type(&arg_types)
445                    .ok()
446                    .map(|t| remove_native_type_prefix(NativeType::from(t)));
447                let arg_types = arg_types
448                    .into_iter()
449                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
450                    .collect::<Vec<_>>();
451                (arg_types, return_type)
452            })
453            .collect::<BTreeSet<_>>())
454    }
455}
456
457fn get_udwf_args_and_return_types(
458    udwf: &Arc<WindowUDF>,
459) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
460    let signature = udwf.signature();
461    let arg_types = signature.type_signature.get_example_types();
462    if arg_types.is_empty() {
463        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
464    } else {
465        Ok(arg_types
466            .into_iter()
467            .map(|arg_types| {
468                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
469                let arg_types = arg_types
470                    .into_iter()
471                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
472                    .collect::<Vec<_>>();
473                (arg_types, None)
474            })
475            .collect::<BTreeSet<_>>())
476    }
477}
478
479#[inline]
480fn remove_native_type_prefix(native_type: NativeType) -> String {
481    format!("{native_type:?}")
482}
483
484#[async_trait]
485impl SchemaProvider for InformationSchemaProvider {
486    fn as_any(&self) -> &dyn Any {
487        self
488    }
489
490    fn table_names(&self) -> Vec<String> {
491        INFORMATION_SCHEMA_TABLES
492            .iter()
493            .map(|t| t.to_string())
494            .collect()
495    }
496
497    async fn table(
498        &self,
499        name: &str,
500    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
501        let config = self.config.clone();
502        let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
503            TABLES => Arc::new(InformationSchemaTables::new(config)),
504            COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
505            VIEWS => Arc::new(InformationSchemaViews::new(config)),
506            DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
507            SCHEMATA => Arc::new(InformationSchemata::new(config)),
508            ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
509            PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
510            _ => return Ok(None),
511        };
512
513        Ok(Some(Arc::new(
514            StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
515        )))
516    }
517
518    fn table_exist(&self, name: &str) -> bool {
519        INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
520    }
521}
522
523#[derive(Debug)]
524struct InformationSchemaTables {
525    schema: SchemaRef,
526    config: InformationSchemaConfig,
527}
528
529impl InformationSchemaTables {
530    fn new(config: InformationSchemaConfig) -> Self {
531        let schema = Arc::new(Schema::new(vec![
532            Field::new("table_catalog", DataType::Utf8, false),
533            Field::new("table_schema", DataType::Utf8, false),
534            Field::new("table_name", DataType::Utf8, false),
535            Field::new("table_type", DataType::Utf8, false),
536        ]));
537
538        Self { schema, config }
539    }
540
541    fn builder(&self) -> InformationSchemaTablesBuilder {
542        InformationSchemaTablesBuilder {
543            catalog_names: StringBuilder::new(),
544            schema_names: StringBuilder::new(),
545            table_names: StringBuilder::new(),
546            table_types: StringBuilder::new(),
547            schema: Arc::clone(&self.schema),
548        }
549    }
550}
551
552impl PartitionStream for InformationSchemaTables {
553    fn schema(&self) -> &SchemaRef {
554        &self.schema
555    }
556
557    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
558        let mut builder = self.builder();
559        let config = self.config.clone();
560        Box::pin(RecordBatchStreamAdapter::new(
561            Arc::clone(&self.schema),
562            // TODO: Stream this
563            futures::stream::once(async move {
564                config.make_tables(&mut builder).await?;
565                Ok(builder.finish())
566            }),
567        ))
568    }
569}
570
571/// Builds the `information_schema.TABLE` table row by row
572///
573/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
574struct InformationSchemaTablesBuilder {
575    schema: SchemaRef,
576    catalog_names: StringBuilder,
577    schema_names: StringBuilder,
578    table_names: StringBuilder,
579    table_types: StringBuilder,
580}
581
582impl InformationSchemaTablesBuilder {
583    fn add_table(
584        &mut self,
585        catalog_name: impl AsRef<str>,
586        schema_name: impl AsRef<str>,
587        table_name: impl AsRef<str>,
588        table_type: TableType,
589    ) {
590        // Note: append_value is actually infallible.
591        self.catalog_names.append_value(catalog_name.as_ref());
592        self.schema_names.append_value(schema_name.as_ref());
593        self.table_names.append_value(table_name.as_ref());
594        self.table_types.append_value(match table_type {
595            TableType::Base => "BASE TABLE",
596            TableType::View => "VIEW",
597            TableType::Temporary => "LOCAL TEMPORARY",
598        });
599    }
600
601    fn finish(&mut self) -> RecordBatch {
602        RecordBatch::try_new(
603            Arc::clone(&self.schema),
604            vec![
605                Arc::new(self.catalog_names.finish()),
606                Arc::new(self.schema_names.finish()),
607                Arc::new(self.table_names.finish()),
608                Arc::new(self.table_types.finish()),
609            ],
610        )
611        .unwrap()
612    }
613}
614
615#[derive(Debug)]
616struct InformationSchemaViews {
617    schema: SchemaRef,
618    config: InformationSchemaConfig,
619}
620
621impl InformationSchemaViews {
622    fn new(config: InformationSchemaConfig) -> Self {
623        let schema = Arc::new(Schema::new(vec![
624            Field::new("table_catalog", DataType::Utf8, false),
625            Field::new("table_schema", DataType::Utf8, false),
626            Field::new("table_name", DataType::Utf8, false),
627            Field::new("definition", DataType::Utf8, true),
628        ]));
629
630        Self { schema, config }
631    }
632
633    fn builder(&self) -> InformationSchemaViewBuilder {
634        InformationSchemaViewBuilder {
635            catalog_names: StringBuilder::new(),
636            schema_names: StringBuilder::new(),
637            table_names: StringBuilder::new(),
638            definitions: StringBuilder::new(),
639            schema: Arc::clone(&self.schema),
640        }
641    }
642}
643
644impl PartitionStream for InformationSchemaViews {
645    fn schema(&self) -> &SchemaRef {
646        &self.schema
647    }
648
649    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
650        let mut builder = self.builder();
651        let config = self.config.clone();
652        Box::pin(RecordBatchStreamAdapter::new(
653            Arc::clone(&self.schema),
654            // TODO: Stream this
655            futures::stream::once(async move {
656                config.make_views(&mut builder).await?;
657                Ok(builder.finish())
658            }),
659        ))
660    }
661}
662
663/// Builds the `information_schema.VIEWS` table row by row
664///
665/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
666struct InformationSchemaViewBuilder {
667    schema: SchemaRef,
668    catalog_names: StringBuilder,
669    schema_names: StringBuilder,
670    table_names: StringBuilder,
671    definitions: StringBuilder,
672}
673
674impl InformationSchemaViewBuilder {
675    fn add_view(
676        &mut self,
677        catalog_name: impl AsRef<str>,
678        schema_name: impl AsRef<str>,
679        table_name: impl AsRef<str>,
680        definition: Option<impl AsRef<str>>,
681    ) {
682        // Note: append_value is actually infallible.
683        self.catalog_names.append_value(catalog_name.as_ref());
684        self.schema_names.append_value(schema_name.as_ref());
685        self.table_names.append_value(table_name.as_ref());
686        self.definitions.append_option(definition.as_ref());
687    }
688
689    fn finish(&mut self) -> RecordBatch {
690        RecordBatch::try_new(
691            Arc::clone(&self.schema),
692            vec![
693                Arc::new(self.catalog_names.finish()),
694                Arc::new(self.schema_names.finish()),
695                Arc::new(self.table_names.finish()),
696                Arc::new(self.definitions.finish()),
697            ],
698        )
699        .unwrap()
700    }
701}
702
703#[derive(Debug)]
704struct InformationSchemaColumns {
705    schema: SchemaRef,
706    config: InformationSchemaConfig,
707}
708
709impl InformationSchemaColumns {
710    fn new(config: InformationSchemaConfig) -> Self {
711        let schema = Arc::new(Schema::new(vec![
712            Field::new("table_catalog", DataType::Utf8, false),
713            Field::new("table_schema", DataType::Utf8, false),
714            Field::new("table_name", DataType::Utf8, false),
715            Field::new("column_name", DataType::Utf8, false),
716            Field::new("ordinal_position", DataType::UInt64, false),
717            Field::new("column_default", DataType::Utf8, true),
718            Field::new("is_nullable", DataType::Utf8, false),
719            Field::new("data_type", DataType::Utf8, false),
720            Field::new("character_maximum_length", DataType::UInt64, true),
721            Field::new("character_octet_length", DataType::UInt64, true),
722            Field::new("numeric_precision", DataType::UInt64, true),
723            Field::new("numeric_precision_radix", DataType::UInt64, true),
724            Field::new("numeric_scale", DataType::UInt64, true),
725            Field::new("datetime_precision", DataType::UInt64, true),
726            Field::new("interval_type", DataType::Utf8, true),
727        ]));
728
729        Self { schema, config }
730    }
731
732    fn builder(&self) -> InformationSchemaColumnsBuilder {
733        // StringBuilder requires providing an initial capacity, so
734        // pick 10 here arbitrarily as this is not performance
735        // critical code and the number of tables is unavailable here.
736        let default_capacity = 10;
737
738        InformationSchemaColumnsBuilder {
739            catalog_names: StringBuilder::new(),
740            schema_names: StringBuilder::new(),
741            table_names: StringBuilder::new(),
742            column_names: StringBuilder::new(),
743            ordinal_positions: UInt64Builder::with_capacity(default_capacity),
744            column_defaults: StringBuilder::new(),
745            is_nullables: StringBuilder::new(),
746            data_types: StringBuilder::new(),
747            character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
748            character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
749            numeric_precisions: UInt64Builder::with_capacity(default_capacity),
750            numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
751            numeric_scales: UInt64Builder::with_capacity(default_capacity),
752            datetime_precisions: UInt64Builder::with_capacity(default_capacity),
753            interval_types: StringBuilder::new(),
754            schema: Arc::clone(&self.schema),
755        }
756    }
757}
758
759impl PartitionStream for InformationSchemaColumns {
760    fn schema(&self) -> &SchemaRef {
761        &self.schema
762    }
763
764    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
765        let mut builder = self.builder();
766        let config = self.config.clone();
767        Box::pin(RecordBatchStreamAdapter::new(
768            Arc::clone(&self.schema),
769            // TODO: Stream this
770            futures::stream::once(async move {
771                config.make_columns(&mut builder).await?;
772                Ok(builder.finish())
773            }),
774        ))
775    }
776}
777
778/// Builds the `information_schema.COLUMNS` table row by row
779///
780/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
781struct InformationSchemaColumnsBuilder {
782    schema: SchemaRef,
783    catalog_names: StringBuilder,
784    schema_names: StringBuilder,
785    table_names: StringBuilder,
786    column_names: StringBuilder,
787    ordinal_positions: UInt64Builder,
788    column_defaults: StringBuilder,
789    is_nullables: StringBuilder,
790    data_types: StringBuilder,
791    character_maximum_lengths: UInt64Builder,
792    character_octet_lengths: UInt64Builder,
793    numeric_precisions: UInt64Builder,
794    numeric_precision_radixes: UInt64Builder,
795    numeric_scales: UInt64Builder,
796    datetime_precisions: UInt64Builder,
797    interval_types: StringBuilder,
798}
799
800impl InformationSchemaColumnsBuilder {
801    fn add_column(
802        &mut self,
803        catalog_name: &str,
804        schema_name: &str,
805        table_name: &str,
806        field_position: usize,
807        field: &Field,
808    ) {
809        use DataType::*;
810
811        // Note: append_value is actually infallable.
812        self.catalog_names.append_value(catalog_name);
813        self.schema_names.append_value(schema_name);
814        self.table_names.append_value(table_name);
815
816        self.column_names.append_value(field.name());
817
818        self.ordinal_positions.append_value(field_position as u64);
819
820        // DataFusion does not support column default values, so null
821        self.column_defaults.append_null();
822
823        // "YES if the column is possibly nullable, NO if it is known not nullable. "
824        let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
825        self.is_nullables.append_value(nullable_str);
826
827        // "System supplied type" --> Use debug format of the datatype
828        self.data_types
829            .append_value(format!("{:?}", field.data_type()));
830
831        // "If data_type identifies a character or bit string type, the
832        // declared maximum length; null for all other data types or
833        // if no maximum length was declared."
834        //
835        // Arrow has no equivalent of VARCHAR(20), so we leave this as Null
836        let max_chars = None;
837        self.character_maximum_lengths.append_option(max_chars);
838
839        // "Maximum length, in bytes, for binary data, character data,
840        // or text and image data."
841        let char_len: Option<u64> = match field.data_type() {
842            Utf8 | Binary => Some(i32::MAX as u64),
843            LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
844            _ => None,
845        };
846        self.character_octet_lengths.append_option(char_len);
847
848        // numeric_precision: "If data_type identifies a numeric type, this column
849        // contains the (declared or implicit) precision of the type
850        // for this column. The precision indicates the number of
851        // significant digits. It can be expressed in decimal (base
852        // 10) or binary (base 2) terms, as specified in the column
853        // numeric_precision_radix. For all other data types, this
854        // column is null."
855        //
856        // numeric_radix: If data_type identifies a numeric type, this
857        // column indicates in which base the values in the columns
858        // numeric_precision and numeric_scale are expressed. The
859        // value is either 2 or 10. For all other data types, this
860        // column is null.
861        //
862        // numeric_scale: If data_type identifies an exact numeric
863        // type, this column contains the (declared or implicit) scale
864        // of the type for this column. The scale indicates the number
865        // of significant digits to the right of the decimal point. It
866        // can be expressed in decimal (base 10) or binary (base 2)
867        // terms, as specified in the column
868        // numeric_precision_radix. For all other data types, this
869        // column is null.
870        let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
871            Int8 | UInt8 => (Some(8), Some(2), None),
872            Int16 | UInt16 => (Some(16), Some(2), None),
873            Int32 | UInt32 => (Some(32), Some(2), None),
874            // From max value of 65504 as explained on
875            // https://en.wikipedia.org/wiki/Half-precision_floating-point_format#Exponent_encoding
876            Float16 => (Some(15), Some(2), None),
877            // Numbers from postgres `real` type
878            Float32 => (Some(24), Some(2), None),
879            // Numbers from postgres `double` type
880            Float64 => (Some(24), Some(2), None),
881            Decimal128(precision, scale) => {
882                (Some(*precision as u64), Some(10), Some(*scale as u64))
883            }
884            _ => (None, None, None),
885        };
886
887        self.numeric_precisions.append_option(numeric_precision);
888        self.numeric_precision_radixes.append_option(numeric_radix);
889        self.numeric_scales.append_option(numeric_scale);
890
891        self.datetime_precisions.append_option(None);
892        self.interval_types.append_null();
893    }
894
895    fn finish(&mut self) -> RecordBatch {
896        RecordBatch::try_new(
897            Arc::clone(&self.schema),
898            vec![
899                Arc::new(self.catalog_names.finish()),
900                Arc::new(self.schema_names.finish()),
901                Arc::new(self.table_names.finish()),
902                Arc::new(self.column_names.finish()),
903                Arc::new(self.ordinal_positions.finish()),
904                Arc::new(self.column_defaults.finish()),
905                Arc::new(self.is_nullables.finish()),
906                Arc::new(self.data_types.finish()),
907                Arc::new(self.character_maximum_lengths.finish()),
908                Arc::new(self.character_octet_lengths.finish()),
909                Arc::new(self.numeric_precisions.finish()),
910                Arc::new(self.numeric_precision_radixes.finish()),
911                Arc::new(self.numeric_scales.finish()),
912                Arc::new(self.datetime_precisions.finish()),
913                Arc::new(self.interval_types.finish()),
914            ],
915        )
916        .unwrap()
917    }
918}
919
920#[derive(Debug)]
921struct InformationSchemata {
922    schema: SchemaRef,
923    config: InformationSchemaConfig,
924}
925
926impl InformationSchemata {
927    fn new(config: InformationSchemaConfig) -> Self {
928        let schema = Arc::new(Schema::new(vec![
929            Field::new("catalog_name", DataType::Utf8, false),
930            Field::new("schema_name", DataType::Utf8, false),
931            Field::new("schema_owner", DataType::Utf8, true),
932            Field::new("default_character_set_catalog", DataType::Utf8, true),
933            Field::new("default_character_set_schema", DataType::Utf8, true),
934            Field::new("default_character_set_name", DataType::Utf8, true),
935            Field::new("sql_path", DataType::Utf8, true),
936        ]));
937        Self { schema, config }
938    }
939
940    fn builder(&self) -> InformationSchemataBuilder {
941        InformationSchemataBuilder {
942            schema: Arc::clone(&self.schema),
943            catalog_name: StringBuilder::new(),
944            schema_name: StringBuilder::new(),
945            schema_owner: StringBuilder::new(),
946            default_character_set_catalog: StringBuilder::new(),
947            default_character_set_schema: StringBuilder::new(),
948            default_character_set_name: StringBuilder::new(),
949            sql_path: StringBuilder::new(),
950        }
951    }
952}
953
954struct InformationSchemataBuilder {
955    schema: SchemaRef,
956    catalog_name: StringBuilder,
957    schema_name: StringBuilder,
958    schema_owner: StringBuilder,
959    default_character_set_catalog: StringBuilder,
960    default_character_set_schema: StringBuilder,
961    default_character_set_name: StringBuilder,
962    sql_path: StringBuilder,
963}
964
965impl InformationSchemataBuilder {
966    fn add_schemata(
967        &mut self,
968        catalog_name: &str,
969        schema_name: &str,
970        schema_owner: Option<&str>,
971    ) {
972        self.catalog_name.append_value(catalog_name);
973        self.schema_name.append_value(schema_name);
974        match schema_owner {
975            Some(owner) => self.schema_owner.append_value(owner),
976            None => self.schema_owner.append_null(),
977        }
978        // refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
979        // these rows apply to a feature that is not implemented in DataFusion
980        self.default_character_set_catalog.append_null();
981        self.default_character_set_schema.append_null();
982        self.default_character_set_name.append_null();
983        self.sql_path.append_null();
984    }
985
986    fn finish(&mut self) -> RecordBatch {
987        RecordBatch::try_new(
988            Arc::clone(&self.schema),
989            vec![
990                Arc::new(self.catalog_name.finish()),
991                Arc::new(self.schema_name.finish()),
992                Arc::new(self.schema_owner.finish()),
993                Arc::new(self.default_character_set_catalog.finish()),
994                Arc::new(self.default_character_set_schema.finish()),
995                Arc::new(self.default_character_set_name.finish()),
996                Arc::new(self.sql_path.finish()),
997            ],
998        )
999        .unwrap()
1000    }
1001}
1002
1003impl PartitionStream for InformationSchemata {
1004    fn schema(&self) -> &SchemaRef {
1005        &self.schema
1006    }
1007
1008    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1009        let mut builder = self.builder();
1010        let config = self.config.clone();
1011        Box::pin(RecordBatchStreamAdapter::new(
1012            Arc::clone(&self.schema),
1013            // TODO: Stream this
1014            futures::stream::once(async move {
1015                config.make_schemata(&mut builder).await;
1016                Ok(builder.finish())
1017            }),
1018        ))
1019    }
1020}
1021
1022#[derive(Debug)]
1023struct InformationSchemaDfSettings {
1024    schema: SchemaRef,
1025    config: InformationSchemaConfig,
1026}
1027
1028impl InformationSchemaDfSettings {
1029    fn new(config: InformationSchemaConfig) -> Self {
1030        let schema = Arc::new(Schema::new(vec![
1031            Field::new("name", DataType::Utf8, false),
1032            Field::new("value", DataType::Utf8, true),
1033            Field::new("description", DataType::Utf8, true),
1034        ]));
1035
1036        Self { schema, config }
1037    }
1038
1039    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1040        InformationSchemaDfSettingsBuilder {
1041            names: StringBuilder::new(),
1042            values: StringBuilder::new(),
1043            descriptions: StringBuilder::new(),
1044            schema: Arc::clone(&self.schema),
1045        }
1046    }
1047}
1048
1049impl PartitionStream for InformationSchemaDfSettings {
1050    fn schema(&self) -> &SchemaRef {
1051        &self.schema
1052    }
1053
1054    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1055        let config = self.config.clone();
1056        let mut builder = self.builder();
1057        Box::pin(RecordBatchStreamAdapter::new(
1058            Arc::clone(&self.schema),
1059            // TODO: Stream this
1060            futures::stream::once(async move {
1061                // create a mem table with the names of tables
1062                config.make_df_settings(ctx.session_config().options(), &mut builder);
1063                Ok(builder.finish())
1064            }),
1065        ))
1066    }
1067}
1068
1069struct InformationSchemaDfSettingsBuilder {
1070    schema: SchemaRef,
1071    names: StringBuilder,
1072    values: StringBuilder,
1073    descriptions: StringBuilder,
1074}
1075
1076impl InformationSchemaDfSettingsBuilder {
1077    fn add_setting(&mut self, entry: ConfigEntry) {
1078        self.names.append_value(entry.key);
1079        self.values.append_option(entry.value);
1080        self.descriptions.append_value(entry.description);
1081    }
1082
1083    fn finish(&mut self) -> RecordBatch {
1084        RecordBatch::try_new(
1085            Arc::clone(&self.schema),
1086            vec![
1087                Arc::new(self.names.finish()),
1088                Arc::new(self.values.finish()),
1089                Arc::new(self.descriptions.finish()),
1090            ],
1091        )
1092        .unwrap()
1093    }
1094}
1095
1096#[derive(Debug)]
1097struct InformationSchemaRoutines {
1098    schema: SchemaRef,
1099    config: InformationSchemaConfig,
1100}
1101
1102impl InformationSchemaRoutines {
1103    fn new(config: InformationSchemaConfig) -> Self {
1104        let schema = Arc::new(Schema::new(vec![
1105            Field::new("specific_catalog", DataType::Utf8, false),
1106            Field::new("specific_schema", DataType::Utf8, false),
1107            Field::new("specific_name", DataType::Utf8, false),
1108            Field::new("routine_catalog", DataType::Utf8, false),
1109            Field::new("routine_schema", DataType::Utf8, false),
1110            Field::new("routine_name", DataType::Utf8, false),
1111            Field::new("routine_type", DataType::Utf8, false),
1112            Field::new("is_deterministic", DataType::Boolean, true),
1113            Field::new("data_type", DataType::Utf8, true),
1114            Field::new("function_type", DataType::Utf8, true),
1115            Field::new("description", DataType::Utf8, true),
1116            Field::new("syntax_example", DataType::Utf8, true),
1117        ]));
1118
1119        Self { schema, config }
1120    }
1121
1122    fn builder(&self) -> InformationSchemaRoutinesBuilder {
1123        InformationSchemaRoutinesBuilder {
1124            schema: Arc::clone(&self.schema),
1125            specific_catalog: StringBuilder::new(),
1126            specific_schema: StringBuilder::new(),
1127            specific_name: StringBuilder::new(),
1128            routine_catalog: StringBuilder::new(),
1129            routine_schema: StringBuilder::new(),
1130            routine_name: StringBuilder::new(),
1131            routine_type: StringBuilder::new(),
1132            is_deterministic: BooleanBuilder::new(),
1133            data_type: StringBuilder::new(),
1134            function_type: StringBuilder::new(),
1135            description: StringBuilder::new(),
1136            syntax_example: StringBuilder::new(),
1137        }
1138    }
1139}
1140
1141struct InformationSchemaRoutinesBuilder {
1142    schema: SchemaRef,
1143    specific_catalog: StringBuilder,
1144    specific_schema: StringBuilder,
1145    specific_name: StringBuilder,
1146    routine_catalog: StringBuilder,
1147    routine_schema: StringBuilder,
1148    routine_name: StringBuilder,
1149    routine_type: StringBuilder,
1150    is_deterministic: BooleanBuilder,
1151    data_type: StringBuilder,
1152    function_type: StringBuilder,
1153    description: StringBuilder,
1154    syntax_example: StringBuilder,
1155}
1156
1157impl InformationSchemaRoutinesBuilder {
1158    #[allow(clippy::too_many_arguments)]
1159    fn add_routine(
1160        &mut self,
1161        catalog_name: impl AsRef<str>,
1162        schema_name: impl AsRef<str>,
1163        routine_name: impl AsRef<str>,
1164        routine_type: impl AsRef<str>,
1165        is_deterministic: bool,
1166        data_type: Option<impl AsRef<str>>,
1167        function_type: impl AsRef<str>,
1168        description: Option<impl AsRef<str>>,
1169        syntax_example: Option<impl AsRef<str>>,
1170    ) {
1171        self.specific_catalog.append_value(catalog_name.as_ref());
1172        self.specific_schema.append_value(schema_name.as_ref());
1173        self.specific_name.append_value(routine_name.as_ref());
1174        self.routine_catalog.append_value(catalog_name.as_ref());
1175        self.routine_schema.append_value(schema_name.as_ref());
1176        self.routine_name.append_value(routine_name.as_ref());
1177        self.routine_type.append_value(routine_type.as_ref());
1178        self.is_deterministic.append_value(is_deterministic);
1179        self.data_type.append_option(data_type.as_ref());
1180        self.function_type.append_value(function_type.as_ref());
1181        self.description.append_option(description);
1182        self.syntax_example.append_option(syntax_example);
1183    }
1184
1185    fn finish(&mut self) -> RecordBatch {
1186        RecordBatch::try_new(
1187            Arc::clone(&self.schema),
1188            vec![
1189                Arc::new(self.specific_catalog.finish()),
1190                Arc::new(self.specific_schema.finish()),
1191                Arc::new(self.specific_name.finish()),
1192                Arc::new(self.routine_catalog.finish()),
1193                Arc::new(self.routine_schema.finish()),
1194                Arc::new(self.routine_name.finish()),
1195                Arc::new(self.routine_type.finish()),
1196                Arc::new(self.is_deterministic.finish()),
1197                Arc::new(self.data_type.finish()),
1198                Arc::new(self.function_type.finish()),
1199                Arc::new(self.description.finish()),
1200                Arc::new(self.syntax_example.finish()),
1201            ],
1202        )
1203        .unwrap()
1204    }
1205}
1206
1207impl PartitionStream for InformationSchemaRoutines {
1208    fn schema(&self) -> &SchemaRef {
1209        &self.schema
1210    }
1211
1212    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1213        let config = self.config.clone();
1214        let mut builder = self.builder();
1215        Box::pin(RecordBatchStreamAdapter::new(
1216            Arc::clone(&self.schema),
1217            futures::stream::once(async move {
1218                config.make_routines(
1219                    ctx.scalar_functions(),
1220                    ctx.aggregate_functions(),
1221                    ctx.window_functions(),
1222                    ctx.session_config().options(),
1223                    &mut builder,
1224                )?;
1225                Ok(builder.finish())
1226            }),
1227        ))
1228    }
1229}
1230
1231#[derive(Debug)]
1232struct InformationSchemaParameters {
1233    schema: SchemaRef,
1234    config: InformationSchemaConfig,
1235}
1236
1237impl InformationSchemaParameters {
1238    fn new(config: InformationSchemaConfig) -> Self {
1239        let schema = Arc::new(Schema::new(vec![
1240            Field::new("specific_catalog", DataType::Utf8, false),
1241            Field::new("specific_schema", DataType::Utf8, false),
1242            Field::new("specific_name", DataType::Utf8, false),
1243            Field::new("ordinal_position", DataType::UInt64, false),
1244            Field::new("parameter_mode", DataType::Utf8, false),
1245            Field::new("parameter_name", DataType::Utf8, true),
1246            Field::new("data_type", DataType::Utf8, false),
1247            Field::new("parameter_default", DataType::Utf8, true),
1248            Field::new("is_variadic", DataType::Boolean, false),
1249            // `rid` (short for `routine id`) is used to differentiate parameters from different signatures
1250            // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
1251            // For example, the following signatures have different `rid` values:
1252            //     - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
1253            //     - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
1254            Field::new("rid", DataType::UInt8, false),
1255        ]));
1256
1257        Self { schema, config }
1258    }
1259
1260    fn builder(&self) -> InformationSchemaParametersBuilder {
1261        InformationSchemaParametersBuilder {
1262            schema: Arc::clone(&self.schema),
1263            specific_catalog: StringBuilder::new(),
1264            specific_schema: StringBuilder::new(),
1265            specific_name: StringBuilder::new(),
1266            ordinal_position: UInt64Builder::new(),
1267            parameter_mode: StringBuilder::new(),
1268            parameter_name: StringBuilder::new(),
1269            data_type: StringBuilder::new(),
1270            parameter_default: StringBuilder::new(),
1271            is_variadic: BooleanBuilder::new(),
1272            rid: UInt8Builder::new(),
1273        }
1274    }
1275}
1276
1277struct InformationSchemaParametersBuilder {
1278    schema: SchemaRef,
1279    specific_catalog: StringBuilder,
1280    specific_schema: StringBuilder,
1281    specific_name: StringBuilder,
1282    ordinal_position: UInt64Builder,
1283    parameter_mode: StringBuilder,
1284    parameter_name: StringBuilder,
1285    data_type: StringBuilder,
1286    parameter_default: StringBuilder,
1287    is_variadic: BooleanBuilder,
1288    rid: UInt8Builder,
1289}
1290
1291impl InformationSchemaParametersBuilder {
1292    #[allow(clippy::too_many_arguments)]
1293    fn add_parameter(
1294        &mut self,
1295        specific_catalog: impl AsRef<str>,
1296        specific_schema: impl AsRef<str>,
1297        specific_name: impl AsRef<str>,
1298        ordinal_position: u64,
1299        parameter_mode: impl AsRef<str>,
1300        parameter_name: Option<impl AsRef<str>>,
1301        data_type: impl AsRef<str>,
1302        parameter_default: Option<impl AsRef<str>>,
1303        is_variadic: bool,
1304        rid: u8,
1305    ) {
1306        self.specific_catalog
1307            .append_value(specific_catalog.as_ref());
1308        self.specific_schema.append_value(specific_schema.as_ref());
1309        self.specific_name.append_value(specific_name.as_ref());
1310        self.ordinal_position.append_value(ordinal_position);
1311        self.parameter_mode.append_value(parameter_mode.as_ref());
1312        self.parameter_name.append_option(parameter_name.as_ref());
1313        self.data_type.append_value(data_type.as_ref());
1314        self.parameter_default.append_option(parameter_default);
1315        self.is_variadic.append_value(is_variadic);
1316        self.rid.append_value(rid);
1317    }
1318
1319    fn finish(&mut self) -> RecordBatch {
1320        RecordBatch::try_new(
1321            Arc::clone(&self.schema),
1322            vec![
1323                Arc::new(self.specific_catalog.finish()),
1324                Arc::new(self.specific_schema.finish()),
1325                Arc::new(self.specific_name.finish()),
1326                Arc::new(self.ordinal_position.finish()),
1327                Arc::new(self.parameter_mode.finish()),
1328                Arc::new(self.parameter_name.finish()),
1329                Arc::new(self.data_type.finish()),
1330                Arc::new(self.parameter_default.finish()),
1331                Arc::new(self.is_variadic.finish()),
1332                Arc::new(self.rid.finish()),
1333            ],
1334        )
1335        .unwrap()
1336    }
1337}
1338
1339impl PartitionStream for InformationSchemaParameters {
1340    fn schema(&self) -> &SchemaRef {
1341        &self.schema
1342    }
1343
1344    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1345        let config = self.config.clone();
1346        let mut builder = self.builder();
1347        Box::pin(RecordBatchStreamAdapter::new(
1348            Arc::clone(&self.schema),
1349            futures::stream::once(async move {
1350                config.make_parameters(
1351                    ctx.scalar_functions(),
1352                    ctx.aggregate_functions(),
1353                    ctx.window_functions(),
1354                    ctx.session_config().options(),
1355                    &mut builder,
1356                )?;
1357                Ok(builder.finish())
1358            }),
1359        ))
1360    }
1361}