1use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26 array::{StringBuilder, UInt64Builder},
27 datatypes::{DataType, Field, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::DataFusionError;
32use datafusion_common::config::{ConfigEntry, ConfigOptions};
33use datafusion_common::error::Result;
34use datafusion_common::types::NativeType;
35use datafusion_execution::TaskContext;
36use datafusion_execution::runtime_env::RuntimeEnv;
37use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
38use datafusion_expr::{TableType, Volatility};
39use datafusion_physical_plan::SendableRecordBatchStream;
40use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
41use datafusion_physical_plan::streaming::PartitionStream;
42use std::collections::{BTreeSet, HashMap, HashSet};
43use std::fmt::Debug;
44use std::{any::Any, sync::Arc};
45
46pub const INFORMATION_SCHEMA: &str = "information_schema";
47pub(crate) const TABLES: &str = "tables";
48pub(crate) const VIEWS: &str = "views";
49pub(crate) const COLUMNS: &str = "columns";
50pub(crate) const DF_SETTINGS: &str = "df_settings";
51pub(crate) const SCHEMATA: &str = "schemata";
52pub(crate) const ROUTINES: &str = "routines";
53pub(crate) const PARAMETERS: &str = "parameters";
54
55pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
57 TABLES,
58 VIEWS,
59 COLUMNS,
60 DF_SETTINGS,
61 SCHEMATA,
62 ROUTINES,
63 PARAMETERS,
64];
65
66#[derive(Debug)]
73pub struct InformationSchemaProvider {
74 config: InformationSchemaConfig,
75}
76
77impl InformationSchemaProvider {
78 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
80 Self {
81 config: InformationSchemaConfig { catalog_list },
82 }
83 }
84}
85
86#[derive(Clone, Debug)]
87struct InformationSchemaConfig {
88 catalog_list: Arc<dyn CatalogProviderList>,
89}
90
91impl InformationSchemaConfig {
92 async fn make_tables(
94 &self,
95 builder: &mut InformationSchemaTablesBuilder,
96 ) -> Result<(), DataFusionError> {
97 for catalog_name in self.catalog_list.catalog_names() {
100 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
101
102 for schema_name in catalog.schema_names() {
103 if schema_name != INFORMATION_SCHEMA {
104 if let Some(schema) = catalog.schema(&schema_name) {
106 for table_name in schema.table_names() {
107 if let Some(table_type) =
108 schema.table_type(&table_name).await?
109 {
110 builder.add_table(
111 &catalog_name,
112 &schema_name,
113 &table_name,
114 table_type,
115 );
116 }
117 }
118 }
119 }
120 }
121
122 for table_name in INFORMATION_SCHEMA_TABLES {
124 builder.add_table(
125 &catalog_name,
126 INFORMATION_SCHEMA,
127 table_name,
128 TableType::View,
129 );
130 }
131 }
132
133 Ok(())
134 }
135
136 async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
137 for catalog_name in self.catalog_list.catalog_names() {
138 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
139
140 for schema_name in catalog.schema_names() {
141 if schema_name != INFORMATION_SCHEMA
142 && let Some(schema) = catalog.schema(&schema_name)
143 {
144 let schema_owner = schema.owner_name();
145 builder.add_schemata(&catalog_name, &schema_name, schema_owner);
146 }
147 }
148 }
149 }
150
151 async fn make_views(
152 &self,
153 builder: &mut InformationSchemaViewBuilder,
154 ) -> Result<(), DataFusionError> {
155 for catalog_name in self.catalog_list.catalog_names() {
156 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
157
158 for schema_name in catalog.schema_names() {
159 if schema_name != INFORMATION_SCHEMA {
160 if let Some(schema) = catalog.schema(&schema_name) {
162 for table_name in schema.table_names() {
163 if let Some(table) = schema.table(&table_name).await? {
164 builder.add_view(
165 &catalog_name,
166 &schema_name,
167 &table_name,
168 table.get_table_definition(),
169 )
170 }
171 }
172 }
173 }
174 }
175 }
176
177 Ok(())
178 }
179
180 async fn make_columns(
182 &self,
183 builder: &mut InformationSchemaColumnsBuilder,
184 ) -> Result<(), DataFusionError> {
185 for catalog_name in self.catalog_list.catalog_names() {
186 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
187
188 for schema_name in catalog.schema_names() {
189 if schema_name != INFORMATION_SCHEMA {
190 if let Some(schema) = catalog.schema(&schema_name) {
192 for table_name in schema.table_names() {
193 if let Some(table) = schema.table(&table_name).await? {
194 for (field_position, field) in
195 table.schema().fields().iter().enumerate()
196 {
197 builder.add_column(
198 &catalog_name,
199 &schema_name,
200 &table_name,
201 field_position,
202 field,
203 )
204 }
205 }
206 }
207 }
208 }
209 }
210 }
211
212 Ok(())
213 }
214
215 fn make_df_settings(
217 &self,
218 config_options: &ConfigOptions,
219 runtime_env: &Arc<RuntimeEnv>,
220 builder: &mut InformationSchemaDfSettingsBuilder,
221 ) {
222 for entry in config_options.entries() {
223 builder.add_setting(entry);
224 }
225 for entry in runtime_env.config_entries() {
227 builder.add_setting(entry);
228 }
229 }
230
231 fn make_routines(
232 &self,
233 udfs: &HashMap<String, Arc<ScalarUDF>>,
234 udafs: &HashMap<String, Arc<AggregateUDF>>,
235 udwfs: &HashMap<String, Arc<WindowUDF>>,
236 config_options: &ConfigOptions,
237 builder: &mut InformationSchemaRoutinesBuilder,
238 ) -> Result<()> {
239 let catalog_name = &config_options.catalog.default_catalog;
240 let schema_name = &config_options.catalog.default_schema;
241
242 for (name, udf) in udfs {
243 let return_types = get_udf_args_and_return_types(udf)?
244 .into_iter()
245 .map(|(_, return_type)| return_type)
246 .collect::<HashSet<_>>();
247 for return_type in return_types {
248 builder.add_routine(
249 catalog_name,
250 schema_name,
251 name,
252 "FUNCTION",
253 Self::is_deterministic(udf.signature()),
254 return_type.as_ref(),
255 "SCALAR",
256 udf.documentation().map(|d| d.description.to_string()),
257 udf.documentation().map(|d| d.syntax_example.to_string()),
258 )
259 }
260 }
261
262 for (name, udaf) in udafs {
263 let return_types = get_udaf_args_and_return_types(udaf)?
264 .into_iter()
265 .map(|(_, return_type)| return_type)
266 .collect::<HashSet<_>>();
267 for return_type in return_types {
268 builder.add_routine(
269 catalog_name,
270 schema_name,
271 name,
272 "FUNCTION",
273 Self::is_deterministic(udaf.signature()),
274 return_type.as_ref(),
275 "AGGREGATE",
276 udaf.documentation().map(|d| d.description.to_string()),
277 udaf.documentation().map(|d| d.syntax_example.to_string()),
278 )
279 }
280 }
281
282 for (name, udwf) in udwfs {
283 let return_types = get_udwf_args_and_return_types(udwf)?
284 .into_iter()
285 .map(|(_, return_type)| return_type)
286 .collect::<HashSet<_>>();
287 for return_type in return_types {
288 builder.add_routine(
289 catalog_name,
290 schema_name,
291 name,
292 "FUNCTION",
293 Self::is_deterministic(udwf.signature()),
294 return_type.as_ref(),
295 "WINDOW",
296 udwf.documentation().map(|d| d.description.to_string()),
297 udwf.documentation().map(|d| d.syntax_example.to_string()),
298 )
299 }
300 }
301 Ok(())
302 }
303
304 fn is_deterministic(signature: &Signature) -> bool {
305 signature.volatility == Volatility::Immutable
306 }
307 fn make_parameters(
308 &self,
309 udfs: &HashMap<String, Arc<ScalarUDF>>,
310 udafs: &HashMap<String, Arc<AggregateUDF>>,
311 udwfs: &HashMap<String, Arc<WindowUDF>>,
312 config_options: &ConfigOptions,
313 builder: &mut InformationSchemaParametersBuilder,
314 ) -> Result<()> {
315 let catalog_name = &config_options.catalog.default_catalog;
316 let schema_name = &config_options.catalog.default_schema;
317 let mut add_parameters = |func_name: &str,
318 args: Option<&Vec<(String, String)>>,
319 arg_types: Vec<String>,
320 return_type: Option<String>,
321 is_variadic: bool,
322 rid: u8| {
323 for (position, type_name) in arg_types.iter().enumerate() {
324 let param_name =
325 args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
326 builder.add_parameter(
327 catalog_name,
328 schema_name,
329 func_name,
330 position as u64 + 1,
331 "IN",
332 param_name,
333 type_name,
334 None::<&str>,
335 is_variadic,
336 rid,
337 );
338 }
339 if let Some(return_type) = return_type {
340 builder.add_parameter(
341 catalog_name,
342 schema_name,
343 func_name,
344 1,
345 "OUT",
346 None::<&str>,
347 return_type.as_str(),
348 None::<&str>,
349 false,
350 rid,
351 );
352 }
353 };
354
355 for (func_name, udf) in udfs {
356 let args = udf.documentation().and_then(|d| d.arguments.clone());
357 let combinations = get_udf_args_and_return_types(udf)?;
358 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
359 add_parameters(
360 func_name,
361 args.as_ref(),
362 arg_types,
363 return_type,
364 Self::is_variadic(udf.signature()),
365 rid as u8,
366 );
367 }
368 }
369
370 for (func_name, udaf) in udafs {
371 let args = udaf.documentation().and_then(|d| d.arguments.clone());
372 let combinations = get_udaf_args_and_return_types(udaf)?;
373 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
374 add_parameters(
375 func_name,
376 args.as_ref(),
377 arg_types,
378 return_type,
379 Self::is_variadic(udaf.signature()),
380 rid as u8,
381 );
382 }
383 }
384
385 for (func_name, udwf) in udwfs {
386 let args = udwf.documentation().and_then(|d| d.arguments.clone());
387 let combinations = get_udwf_args_and_return_types(udwf)?;
388 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
389 add_parameters(
390 func_name,
391 args.as_ref(),
392 arg_types,
393 return_type,
394 Self::is_variadic(udwf.signature()),
395 rid as u8,
396 );
397 }
398 }
399
400 Ok(())
401 }
402
403 fn is_variadic(signature: &Signature) -> bool {
404 matches!(
405 signature.type_signature,
406 TypeSignature::Variadic(_) | TypeSignature::VariadicAny
407 )
408 }
409}
410
411fn get_udf_args_and_return_types(
414 udf: &Arc<ScalarUDF>,
415) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
416 let signature = udf.signature();
417 let arg_types = signature.type_signature.get_example_types();
418 if arg_types.is_empty() {
419 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
420 } else {
421 Ok(arg_types
422 .into_iter()
423 .map(|arg_types| {
424 let return_type = udf
426 .return_type(&arg_types)
427 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
428 .ok();
429 let arg_types = arg_types
430 .into_iter()
431 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
432 .collect::<Vec<_>>();
433 (arg_types, return_type)
434 })
435 .collect::<BTreeSet<_>>())
436 }
437}
438
439fn get_udaf_args_and_return_types(
440 udaf: &Arc<AggregateUDF>,
441) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
442 let signature = udaf.signature();
443 let arg_types = signature.type_signature.get_example_types();
444 if arg_types.is_empty() {
445 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
446 } else {
447 Ok(arg_types
448 .into_iter()
449 .map(|arg_types| {
450 let return_type = udaf
452 .return_type(&arg_types)
453 .ok()
454 .map(|t| remove_native_type_prefix(&NativeType::from(t)));
455 let arg_types = arg_types
456 .into_iter()
457 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
458 .collect::<Vec<_>>();
459 (arg_types, return_type)
460 })
461 .collect::<BTreeSet<_>>())
462 }
463}
464
465fn get_udwf_args_and_return_types(
466 udwf: &Arc<WindowUDF>,
467) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
468 let signature = udwf.signature();
469 let arg_types = signature.type_signature.get_example_types();
470 if arg_types.is_empty() {
471 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
472 } else {
473 Ok(arg_types
474 .into_iter()
475 .map(|arg_types| {
476 let arg_types = arg_types
478 .into_iter()
479 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
480 .collect::<Vec<_>>();
481 (arg_types, None)
482 })
483 .collect::<BTreeSet<_>>())
484 }
485}
486
487#[inline]
488fn remove_native_type_prefix(native_type: &NativeType) -> String {
489 format!("{native_type}")
490}
491
492#[async_trait]
493impl SchemaProvider for InformationSchemaProvider {
494 fn as_any(&self) -> &dyn Any {
495 self
496 }
497
498 fn table_names(&self) -> Vec<String> {
499 INFORMATION_SCHEMA_TABLES
500 .iter()
501 .map(|t| (*t).to_string())
502 .collect()
503 }
504
505 async fn table(
506 &self,
507 name: &str,
508 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
509 let config = self.config.clone();
510 let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
511 TABLES => Arc::new(InformationSchemaTables::new(config)),
512 COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
513 VIEWS => Arc::new(InformationSchemaViews::new(config)),
514 DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
515 SCHEMATA => Arc::new(InformationSchemata::new(config)),
516 ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
517 PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
518 _ => return Ok(None),
519 };
520
521 Ok(Some(Arc::new(
522 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
523 )))
524 }
525
526 fn table_exist(&self, name: &str) -> bool {
527 INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
528 }
529}
530
531#[derive(Debug)]
532struct InformationSchemaTables {
533 schema: SchemaRef,
534 config: InformationSchemaConfig,
535}
536
537impl InformationSchemaTables {
538 fn new(config: InformationSchemaConfig) -> Self {
539 let schema = Arc::new(Schema::new(vec![
540 Field::new("table_catalog", DataType::Utf8, false),
541 Field::new("table_schema", DataType::Utf8, false),
542 Field::new("table_name", DataType::Utf8, false),
543 Field::new("table_type", DataType::Utf8, false),
544 ]));
545
546 Self { schema, config }
547 }
548
549 fn builder(&self) -> InformationSchemaTablesBuilder {
550 InformationSchemaTablesBuilder {
551 catalog_names: StringBuilder::new(),
552 schema_names: StringBuilder::new(),
553 table_names: StringBuilder::new(),
554 table_types: StringBuilder::new(),
555 schema: Arc::clone(&self.schema),
556 }
557 }
558}
559
560impl PartitionStream for InformationSchemaTables {
561 fn schema(&self) -> &SchemaRef {
562 &self.schema
563 }
564
565 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
566 let mut builder = self.builder();
567 let config = self.config.clone();
568 Box::pin(RecordBatchStreamAdapter::new(
569 Arc::clone(&self.schema),
570 futures::stream::once(async move {
572 config.make_tables(&mut builder).await?;
573 Ok(builder.finish())
574 }),
575 ))
576 }
577}
578
579struct InformationSchemaTablesBuilder {
583 schema: SchemaRef,
584 catalog_names: StringBuilder,
585 schema_names: StringBuilder,
586 table_names: StringBuilder,
587 table_types: StringBuilder,
588}
589
590impl InformationSchemaTablesBuilder {
591 fn add_table(
592 &mut self,
593 catalog_name: impl AsRef<str>,
594 schema_name: impl AsRef<str>,
595 table_name: impl AsRef<str>,
596 table_type: TableType,
597 ) {
598 self.catalog_names.append_value(catalog_name.as_ref());
600 self.schema_names.append_value(schema_name.as_ref());
601 self.table_names.append_value(table_name.as_ref());
602 self.table_types.append_value(match table_type {
603 TableType::Base => "BASE TABLE",
604 TableType::View => "VIEW",
605 TableType::Temporary => "LOCAL TEMPORARY",
606 });
607 }
608
609 fn finish(&mut self) -> RecordBatch {
610 RecordBatch::try_new(
611 Arc::clone(&self.schema),
612 vec![
613 Arc::new(self.catalog_names.finish()),
614 Arc::new(self.schema_names.finish()),
615 Arc::new(self.table_names.finish()),
616 Arc::new(self.table_types.finish()),
617 ],
618 )
619 .unwrap()
620 }
621}
622
623#[derive(Debug)]
624struct InformationSchemaViews {
625 schema: SchemaRef,
626 config: InformationSchemaConfig,
627}
628
629impl InformationSchemaViews {
630 fn new(config: InformationSchemaConfig) -> Self {
631 let schema = Arc::new(Schema::new(vec![
632 Field::new("table_catalog", DataType::Utf8, false),
633 Field::new("table_schema", DataType::Utf8, false),
634 Field::new("table_name", DataType::Utf8, false),
635 Field::new("definition", DataType::Utf8, true),
636 ]));
637
638 Self { schema, config }
639 }
640
641 fn builder(&self) -> InformationSchemaViewBuilder {
642 InformationSchemaViewBuilder {
643 catalog_names: StringBuilder::new(),
644 schema_names: StringBuilder::new(),
645 table_names: StringBuilder::new(),
646 definitions: StringBuilder::new(),
647 schema: Arc::clone(&self.schema),
648 }
649 }
650}
651
652impl PartitionStream for InformationSchemaViews {
653 fn schema(&self) -> &SchemaRef {
654 &self.schema
655 }
656
657 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
658 let mut builder = self.builder();
659 let config = self.config.clone();
660 Box::pin(RecordBatchStreamAdapter::new(
661 Arc::clone(&self.schema),
662 futures::stream::once(async move {
664 config.make_views(&mut builder).await?;
665 Ok(builder.finish())
666 }),
667 ))
668 }
669}
670
671struct InformationSchemaViewBuilder {
675 schema: SchemaRef,
676 catalog_names: StringBuilder,
677 schema_names: StringBuilder,
678 table_names: StringBuilder,
679 definitions: StringBuilder,
680}
681
682impl InformationSchemaViewBuilder {
683 fn add_view(
684 &mut self,
685 catalog_name: impl AsRef<str>,
686 schema_name: impl AsRef<str>,
687 table_name: impl AsRef<str>,
688 definition: Option<&(impl AsRef<str> + ?Sized)>,
689 ) {
690 self.catalog_names.append_value(catalog_name.as_ref());
692 self.schema_names.append_value(schema_name.as_ref());
693 self.table_names.append_value(table_name.as_ref());
694 self.definitions.append_option(definition.as_ref());
695 }
696
697 fn finish(&mut self) -> RecordBatch {
698 RecordBatch::try_new(
699 Arc::clone(&self.schema),
700 vec![
701 Arc::new(self.catalog_names.finish()),
702 Arc::new(self.schema_names.finish()),
703 Arc::new(self.table_names.finish()),
704 Arc::new(self.definitions.finish()),
705 ],
706 )
707 .unwrap()
708 }
709}
710
711#[derive(Debug)]
712struct InformationSchemaColumns {
713 schema: SchemaRef,
714 config: InformationSchemaConfig,
715}
716
717impl InformationSchemaColumns {
718 fn new(config: InformationSchemaConfig) -> Self {
719 let schema = Arc::new(Schema::new(vec![
720 Field::new("table_catalog", DataType::Utf8, false),
721 Field::new("table_schema", DataType::Utf8, false),
722 Field::new("table_name", DataType::Utf8, false),
723 Field::new("column_name", DataType::Utf8, false),
724 Field::new("ordinal_position", DataType::UInt64, false),
725 Field::new("column_default", DataType::Utf8, true),
726 Field::new("is_nullable", DataType::Utf8, false),
727 Field::new("data_type", DataType::Utf8, false),
728 Field::new("character_maximum_length", DataType::UInt64, true),
729 Field::new("character_octet_length", DataType::UInt64, true),
730 Field::new("numeric_precision", DataType::UInt64, true),
731 Field::new("numeric_precision_radix", DataType::UInt64, true),
732 Field::new("numeric_scale", DataType::UInt64, true),
733 Field::new("datetime_precision", DataType::UInt64, true),
734 Field::new("interval_type", DataType::Utf8, true),
735 ]));
736
737 Self { schema, config }
738 }
739
740 fn builder(&self) -> InformationSchemaColumnsBuilder {
741 let default_capacity = 10;
745
746 InformationSchemaColumnsBuilder {
747 catalog_names: StringBuilder::new(),
748 schema_names: StringBuilder::new(),
749 table_names: StringBuilder::new(),
750 column_names: StringBuilder::new(),
751 ordinal_positions: UInt64Builder::with_capacity(default_capacity),
752 column_defaults: StringBuilder::new(),
753 is_nullables: StringBuilder::new(),
754 data_types: StringBuilder::new(),
755 character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
756 character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
757 numeric_precisions: UInt64Builder::with_capacity(default_capacity),
758 numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
759 numeric_scales: UInt64Builder::with_capacity(default_capacity),
760 datetime_precisions: UInt64Builder::with_capacity(default_capacity),
761 interval_types: StringBuilder::new(),
762 schema: Arc::clone(&self.schema),
763 }
764 }
765}
766
767impl PartitionStream for InformationSchemaColumns {
768 fn schema(&self) -> &SchemaRef {
769 &self.schema
770 }
771
772 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
773 let mut builder = self.builder();
774 let config = self.config.clone();
775 Box::pin(RecordBatchStreamAdapter::new(
776 Arc::clone(&self.schema),
777 futures::stream::once(async move {
779 config.make_columns(&mut builder).await?;
780 Ok(builder.finish())
781 }),
782 ))
783 }
784}
785
786struct InformationSchemaColumnsBuilder {
790 schema: SchemaRef,
791 catalog_names: StringBuilder,
792 schema_names: StringBuilder,
793 table_names: StringBuilder,
794 column_names: StringBuilder,
795 ordinal_positions: UInt64Builder,
796 column_defaults: StringBuilder,
797 is_nullables: StringBuilder,
798 data_types: StringBuilder,
799 character_maximum_lengths: UInt64Builder,
800 character_octet_lengths: UInt64Builder,
801 numeric_precisions: UInt64Builder,
802 numeric_precision_radixes: UInt64Builder,
803 numeric_scales: UInt64Builder,
804 datetime_precisions: UInt64Builder,
805 interval_types: StringBuilder,
806}
807
808impl InformationSchemaColumnsBuilder {
809 fn add_column(
810 &mut self,
811 catalog_name: &str,
812 schema_name: &str,
813 table_name: &str,
814 field_position: usize,
815 field: &Field,
816 ) {
817 use DataType::*;
818
819 self.catalog_names.append_value(catalog_name);
821 self.schema_names.append_value(schema_name);
822 self.table_names.append_value(table_name);
823
824 self.column_names.append_value(field.name());
825
826 self.ordinal_positions.append_value(field_position as u64);
827
828 self.column_defaults.append_null();
830
831 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
833 self.is_nullables.append_value(nullable_str);
834
835 self.data_types.append_value(field.data_type().to_string());
837
838 let max_chars = None;
844 self.character_maximum_lengths.append_option(max_chars);
845
846 let char_len: Option<u64> = match field.data_type() {
849 Utf8 | Binary => Some(i32::MAX as u64),
850 LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
851 _ => None,
852 };
853 self.character_octet_lengths.append_option(char_len);
854
855 let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
878 Int8 | UInt8 => (Some(8), Some(2), None),
879 Int16 | UInt16 => (Some(16), Some(2), None),
880 Int32 | UInt32 => (Some(32), Some(2), None),
881 Float16 => (Some(15), Some(2), None),
884 Float32 => (Some(24), Some(2), None),
886 Float64 => (Some(24), Some(2), None),
888 Decimal128(precision, scale) => {
889 (Some(*precision as u64), Some(10), Some(*scale as u64))
890 }
891 _ => (None, None, None),
892 };
893
894 self.numeric_precisions.append_option(numeric_precision);
895 self.numeric_precision_radixes.append_option(numeric_radix);
896 self.numeric_scales.append_option(numeric_scale);
897
898 self.datetime_precisions.append_option(None);
899 self.interval_types.append_null();
900 }
901
902 fn finish(&mut self) -> RecordBatch {
903 RecordBatch::try_new(
904 Arc::clone(&self.schema),
905 vec![
906 Arc::new(self.catalog_names.finish()),
907 Arc::new(self.schema_names.finish()),
908 Arc::new(self.table_names.finish()),
909 Arc::new(self.column_names.finish()),
910 Arc::new(self.ordinal_positions.finish()),
911 Arc::new(self.column_defaults.finish()),
912 Arc::new(self.is_nullables.finish()),
913 Arc::new(self.data_types.finish()),
914 Arc::new(self.character_maximum_lengths.finish()),
915 Arc::new(self.character_octet_lengths.finish()),
916 Arc::new(self.numeric_precisions.finish()),
917 Arc::new(self.numeric_precision_radixes.finish()),
918 Arc::new(self.numeric_scales.finish()),
919 Arc::new(self.datetime_precisions.finish()),
920 Arc::new(self.interval_types.finish()),
921 ],
922 )
923 .unwrap()
924 }
925}
926
927#[derive(Debug)]
928struct InformationSchemata {
929 schema: SchemaRef,
930 config: InformationSchemaConfig,
931}
932
933impl InformationSchemata {
934 fn new(config: InformationSchemaConfig) -> Self {
935 let schema = Arc::new(Schema::new(vec![
936 Field::new("catalog_name", DataType::Utf8, false),
937 Field::new("schema_name", DataType::Utf8, false),
938 Field::new("schema_owner", DataType::Utf8, true),
939 Field::new("default_character_set_catalog", DataType::Utf8, true),
940 Field::new("default_character_set_schema", DataType::Utf8, true),
941 Field::new("default_character_set_name", DataType::Utf8, true),
942 Field::new("sql_path", DataType::Utf8, true),
943 ]));
944 Self { schema, config }
945 }
946
947 fn builder(&self) -> InformationSchemataBuilder {
948 InformationSchemataBuilder {
949 schema: Arc::clone(&self.schema),
950 catalog_name: StringBuilder::new(),
951 schema_name: StringBuilder::new(),
952 schema_owner: StringBuilder::new(),
953 default_character_set_catalog: StringBuilder::new(),
954 default_character_set_schema: StringBuilder::new(),
955 default_character_set_name: StringBuilder::new(),
956 sql_path: StringBuilder::new(),
957 }
958 }
959}
960
961struct InformationSchemataBuilder {
962 schema: SchemaRef,
963 catalog_name: StringBuilder,
964 schema_name: StringBuilder,
965 schema_owner: StringBuilder,
966 default_character_set_catalog: StringBuilder,
967 default_character_set_schema: StringBuilder,
968 default_character_set_name: StringBuilder,
969 sql_path: StringBuilder,
970}
971
972impl InformationSchemataBuilder {
973 fn add_schemata(
974 &mut self,
975 catalog_name: &str,
976 schema_name: &str,
977 schema_owner: Option<&str>,
978 ) {
979 self.catalog_name.append_value(catalog_name);
980 self.schema_name.append_value(schema_name);
981 match schema_owner {
982 Some(owner) => self.schema_owner.append_value(owner),
983 None => self.schema_owner.append_null(),
984 }
985 self.default_character_set_catalog.append_null();
988 self.default_character_set_schema.append_null();
989 self.default_character_set_name.append_null();
990 self.sql_path.append_null();
991 }
992
993 fn finish(&mut self) -> RecordBatch {
994 RecordBatch::try_new(
995 Arc::clone(&self.schema),
996 vec![
997 Arc::new(self.catalog_name.finish()),
998 Arc::new(self.schema_name.finish()),
999 Arc::new(self.schema_owner.finish()),
1000 Arc::new(self.default_character_set_catalog.finish()),
1001 Arc::new(self.default_character_set_schema.finish()),
1002 Arc::new(self.default_character_set_name.finish()),
1003 Arc::new(self.sql_path.finish()),
1004 ],
1005 )
1006 .unwrap()
1007 }
1008}
1009
1010impl PartitionStream for InformationSchemata {
1011 fn schema(&self) -> &SchemaRef {
1012 &self.schema
1013 }
1014
1015 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1016 let mut builder = self.builder();
1017 let config = self.config.clone();
1018 Box::pin(RecordBatchStreamAdapter::new(
1019 Arc::clone(&self.schema),
1020 futures::stream::once(async move {
1022 config.make_schemata(&mut builder).await;
1023 Ok(builder.finish())
1024 }),
1025 ))
1026 }
1027}
1028
1029#[derive(Debug)]
1030struct InformationSchemaDfSettings {
1031 schema: SchemaRef,
1032 config: InformationSchemaConfig,
1033}
1034
1035impl InformationSchemaDfSettings {
1036 fn new(config: InformationSchemaConfig) -> Self {
1037 let schema = Arc::new(Schema::new(vec![
1038 Field::new("name", DataType::Utf8, false),
1039 Field::new("value", DataType::Utf8, true),
1040 Field::new("description", DataType::Utf8, true),
1041 ]));
1042
1043 Self { schema, config }
1044 }
1045
1046 fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1047 InformationSchemaDfSettingsBuilder {
1048 names: StringBuilder::new(),
1049 values: StringBuilder::new(),
1050 descriptions: StringBuilder::new(),
1051 schema: Arc::clone(&self.schema),
1052 }
1053 }
1054}
1055
1056impl PartitionStream for InformationSchemaDfSettings {
1057 fn schema(&self) -> &SchemaRef {
1058 &self.schema
1059 }
1060
1061 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1062 let config = self.config.clone();
1063 let mut builder = self.builder();
1064 Box::pin(RecordBatchStreamAdapter::new(
1065 Arc::clone(&self.schema),
1066 futures::stream::once(async move {
1068 let runtime_env = ctx.runtime_env();
1070 config.make_df_settings(
1071 ctx.session_config().options(),
1072 &runtime_env,
1073 &mut builder,
1074 );
1075 Ok(builder.finish())
1076 }),
1077 ))
1078 }
1079}
1080
1081struct InformationSchemaDfSettingsBuilder {
1082 schema: SchemaRef,
1083 names: StringBuilder,
1084 values: StringBuilder,
1085 descriptions: StringBuilder,
1086}
1087
1088impl InformationSchemaDfSettingsBuilder {
1089 fn add_setting(&mut self, entry: ConfigEntry) {
1090 self.names.append_value(entry.key);
1091 self.values.append_option(entry.value);
1092 self.descriptions.append_value(entry.description);
1093 }
1094
1095 fn finish(&mut self) -> RecordBatch {
1096 RecordBatch::try_new(
1097 Arc::clone(&self.schema),
1098 vec![
1099 Arc::new(self.names.finish()),
1100 Arc::new(self.values.finish()),
1101 Arc::new(self.descriptions.finish()),
1102 ],
1103 )
1104 .unwrap()
1105 }
1106}
1107
1108#[derive(Debug)]
1109struct InformationSchemaRoutines {
1110 schema: SchemaRef,
1111 config: InformationSchemaConfig,
1112}
1113
1114impl InformationSchemaRoutines {
1115 fn new(config: InformationSchemaConfig) -> Self {
1116 let schema = Arc::new(Schema::new(vec![
1117 Field::new("specific_catalog", DataType::Utf8, false),
1118 Field::new("specific_schema", DataType::Utf8, false),
1119 Field::new("specific_name", DataType::Utf8, false),
1120 Field::new("routine_catalog", DataType::Utf8, false),
1121 Field::new("routine_schema", DataType::Utf8, false),
1122 Field::new("routine_name", DataType::Utf8, false),
1123 Field::new("routine_type", DataType::Utf8, false),
1124 Field::new("is_deterministic", DataType::Boolean, true),
1125 Field::new("data_type", DataType::Utf8, true),
1126 Field::new("function_type", DataType::Utf8, true),
1127 Field::new("description", DataType::Utf8, true),
1128 Field::new("syntax_example", DataType::Utf8, true),
1129 ]));
1130
1131 Self { schema, config }
1132 }
1133
1134 fn builder(&self) -> InformationSchemaRoutinesBuilder {
1135 InformationSchemaRoutinesBuilder {
1136 schema: Arc::clone(&self.schema),
1137 specific_catalog: StringBuilder::new(),
1138 specific_schema: StringBuilder::new(),
1139 specific_name: StringBuilder::new(),
1140 routine_catalog: StringBuilder::new(),
1141 routine_schema: StringBuilder::new(),
1142 routine_name: StringBuilder::new(),
1143 routine_type: StringBuilder::new(),
1144 is_deterministic: BooleanBuilder::new(),
1145 data_type: StringBuilder::new(),
1146 function_type: StringBuilder::new(),
1147 description: StringBuilder::new(),
1148 syntax_example: StringBuilder::new(),
1149 }
1150 }
1151}
1152
1153struct InformationSchemaRoutinesBuilder {
1154 schema: SchemaRef,
1155 specific_catalog: StringBuilder,
1156 specific_schema: StringBuilder,
1157 specific_name: StringBuilder,
1158 routine_catalog: StringBuilder,
1159 routine_schema: StringBuilder,
1160 routine_name: StringBuilder,
1161 routine_type: StringBuilder,
1162 is_deterministic: BooleanBuilder,
1163 data_type: StringBuilder,
1164 function_type: StringBuilder,
1165 description: StringBuilder,
1166 syntax_example: StringBuilder,
1167}
1168
1169impl InformationSchemaRoutinesBuilder {
1170 #[expect(clippy::too_many_arguments)]
1171 fn add_routine(
1172 &mut self,
1173 catalog_name: impl AsRef<str>,
1174 schema_name: impl AsRef<str>,
1175 routine_name: impl AsRef<str>,
1176 routine_type: impl AsRef<str>,
1177 is_deterministic: bool,
1178 data_type: Option<&impl AsRef<str>>,
1179 function_type: impl AsRef<str>,
1180 description: Option<impl AsRef<str>>,
1181 syntax_example: Option<impl AsRef<str>>,
1182 ) {
1183 self.specific_catalog.append_value(catalog_name.as_ref());
1184 self.specific_schema.append_value(schema_name.as_ref());
1185 self.specific_name.append_value(routine_name.as_ref());
1186 self.routine_catalog.append_value(catalog_name.as_ref());
1187 self.routine_schema.append_value(schema_name.as_ref());
1188 self.routine_name.append_value(routine_name.as_ref());
1189 self.routine_type.append_value(routine_type.as_ref());
1190 self.is_deterministic.append_value(is_deterministic);
1191 self.data_type.append_option(data_type.as_ref());
1192 self.function_type.append_value(function_type.as_ref());
1193 self.description.append_option(description);
1194 self.syntax_example.append_option(syntax_example);
1195 }
1196
1197 fn finish(&mut self) -> RecordBatch {
1198 RecordBatch::try_new(
1199 Arc::clone(&self.schema),
1200 vec![
1201 Arc::new(self.specific_catalog.finish()),
1202 Arc::new(self.specific_schema.finish()),
1203 Arc::new(self.specific_name.finish()),
1204 Arc::new(self.routine_catalog.finish()),
1205 Arc::new(self.routine_schema.finish()),
1206 Arc::new(self.routine_name.finish()),
1207 Arc::new(self.routine_type.finish()),
1208 Arc::new(self.is_deterministic.finish()),
1209 Arc::new(self.data_type.finish()),
1210 Arc::new(self.function_type.finish()),
1211 Arc::new(self.description.finish()),
1212 Arc::new(self.syntax_example.finish()),
1213 ],
1214 )
1215 .unwrap()
1216 }
1217}
1218
1219impl PartitionStream for InformationSchemaRoutines {
1220 fn schema(&self) -> &SchemaRef {
1221 &self.schema
1222 }
1223
1224 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1225 let config = self.config.clone();
1226 let mut builder = self.builder();
1227 Box::pin(RecordBatchStreamAdapter::new(
1228 Arc::clone(&self.schema),
1229 futures::stream::once(async move {
1230 config.make_routines(
1231 ctx.scalar_functions(),
1232 ctx.aggregate_functions(),
1233 ctx.window_functions(),
1234 ctx.session_config().options(),
1235 &mut builder,
1236 )?;
1237 Ok(builder.finish())
1238 }),
1239 ))
1240 }
1241}
1242
1243#[derive(Debug)]
1244struct InformationSchemaParameters {
1245 schema: SchemaRef,
1246 config: InformationSchemaConfig,
1247}
1248
1249impl InformationSchemaParameters {
1250 fn new(config: InformationSchemaConfig) -> Self {
1251 let schema = Arc::new(Schema::new(vec![
1252 Field::new("specific_catalog", DataType::Utf8, false),
1253 Field::new("specific_schema", DataType::Utf8, false),
1254 Field::new("specific_name", DataType::Utf8, false),
1255 Field::new("ordinal_position", DataType::UInt64, false),
1256 Field::new("parameter_mode", DataType::Utf8, false),
1257 Field::new("parameter_name", DataType::Utf8, true),
1258 Field::new("data_type", DataType::Utf8, false),
1259 Field::new("parameter_default", DataType::Utf8, true),
1260 Field::new("is_variadic", DataType::Boolean, false),
1261 Field::new("rid", DataType::UInt8, false),
1267 ]));
1268
1269 Self { schema, config }
1270 }
1271
1272 fn builder(&self) -> InformationSchemaParametersBuilder {
1273 InformationSchemaParametersBuilder {
1274 schema: Arc::clone(&self.schema),
1275 specific_catalog: StringBuilder::new(),
1276 specific_schema: StringBuilder::new(),
1277 specific_name: StringBuilder::new(),
1278 ordinal_position: UInt64Builder::new(),
1279 parameter_mode: StringBuilder::new(),
1280 parameter_name: StringBuilder::new(),
1281 data_type: StringBuilder::new(),
1282 parameter_default: StringBuilder::new(),
1283 is_variadic: BooleanBuilder::new(),
1284 rid: UInt8Builder::new(),
1285 }
1286 }
1287}
1288
1289struct InformationSchemaParametersBuilder {
1290 schema: SchemaRef,
1291 specific_catalog: StringBuilder,
1292 specific_schema: StringBuilder,
1293 specific_name: StringBuilder,
1294 ordinal_position: UInt64Builder,
1295 parameter_mode: StringBuilder,
1296 parameter_name: StringBuilder,
1297 data_type: StringBuilder,
1298 parameter_default: StringBuilder,
1299 is_variadic: BooleanBuilder,
1300 rid: UInt8Builder,
1301}
1302
1303impl InformationSchemaParametersBuilder {
1304 #[expect(clippy::too_many_arguments)]
1305 fn add_parameter(
1306 &mut self,
1307 specific_catalog: impl AsRef<str>,
1308 specific_schema: impl AsRef<str>,
1309 specific_name: impl AsRef<str>,
1310 ordinal_position: u64,
1311 parameter_mode: impl AsRef<str>,
1312 parameter_name: Option<&(impl AsRef<str> + ?Sized)>,
1313 data_type: impl AsRef<str>,
1314 parameter_default: Option<impl AsRef<str>>,
1315 is_variadic: bool,
1316 rid: u8,
1317 ) {
1318 self.specific_catalog
1319 .append_value(specific_catalog.as_ref());
1320 self.specific_schema.append_value(specific_schema.as_ref());
1321 self.specific_name.append_value(specific_name.as_ref());
1322 self.ordinal_position.append_value(ordinal_position);
1323 self.parameter_mode.append_value(parameter_mode.as_ref());
1324 self.parameter_name.append_option(parameter_name.as_ref());
1325 self.data_type.append_value(data_type.as_ref());
1326 self.parameter_default.append_option(parameter_default);
1327 self.is_variadic.append_value(is_variadic);
1328 self.rid.append_value(rid);
1329 }
1330
1331 fn finish(&mut self) -> RecordBatch {
1332 RecordBatch::try_new(
1333 Arc::clone(&self.schema),
1334 vec![
1335 Arc::new(self.specific_catalog.finish()),
1336 Arc::new(self.specific_schema.finish()),
1337 Arc::new(self.specific_name.finish()),
1338 Arc::new(self.ordinal_position.finish()),
1339 Arc::new(self.parameter_mode.finish()),
1340 Arc::new(self.parameter_name.finish()),
1341 Arc::new(self.data_type.finish()),
1342 Arc::new(self.parameter_default.finish()),
1343 Arc::new(self.is_variadic.finish()),
1344 Arc::new(self.rid.finish()),
1345 ],
1346 )
1347 .unwrap()
1348 }
1349}
1350
1351impl PartitionStream for InformationSchemaParameters {
1352 fn schema(&self) -> &SchemaRef {
1353 &self.schema
1354 }
1355
1356 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1357 let config = self.config.clone();
1358 let mut builder = self.builder();
1359 Box::pin(RecordBatchStreamAdapter::new(
1360 Arc::clone(&self.schema),
1361 futures::stream::once(async move {
1362 config.make_parameters(
1363 ctx.scalar_functions(),
1364 ctx.aggregate_functions(),
1365 ctx.window_functions(),
1366 ctx.session_config().options(),
1367 &mut builder,
1368 )?;
1369 Ok(builder.finish())
1370 }),
1371 ))
1372 }
1373}
1374
1375#[cfg(test)]
1376mod tests {
1377 use super::*;
1378 use crate::CatalogProvider;
1379
1380 #[tokio::test]
1381 async fn make_tables_uses_table_type() {
1382 let config = InformationSchemaConfig {
1383 catalog_list: Arc::new(Fixture),
1384 };
1385 let mut builder = InformationSchemaTablesBuilder {
1386 catalog_names: StringBuilder::new(),
1387 schema_names: StringBuilder::new(),
1388 table_names: StringBuilder::new(),
1389 table_types: StringBuilder::new(),
1390 schema: Arc::new(Schema::empty()),
1391 };
1392
1393 assert!(config.make_tables(&mut builder).await.is_ok());
1394
1395 assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1396 }
1397
1398 #[derive(Debug)]
1399 struct Fixture;
1400
1401 #[async_trait]
1402 impl SchemaProvider for Fixture {
1403 async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1405 Ok(Some(TableType::Base))
1406 }
1407
1408 async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1411 panic!(
1412 "InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
1413 )
1414 }
1415
1416 fn as_any(&self) -> &dyn Any {
1417 unimplemented!("not required for these tests")
1418 }
1419
1420 fn table_names(&self) -> Vec<String> {
1421 vec!["atable".to_string()]
1422 }
1423
1424 fn table_exist(&self, _: &str) -> bool {
1425 unimplemented!("not required for these tests")
1426 }
1427 }
1428
1429 impl CatalogProviderList for Fixture {
1430 fn as_any(&self) -> &dyn Any {
1431 unimplemented!("not required for these tests")
1432 }
1433
1434 fn register_catalog(
1435 &self,
1436 _: String,
1437 _: Arc<dyn CatalogProvider>,
1438 ) -> Option<Arc<dyn CatalogProvider>> {
1439 unimplemented!("not required for these tests")
1440 }
1441
1442 fn catalog_names(&self) -> Vec<String> {
1443 vec!["acatalog".to_string()]
1444 }
1445
1446 fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1447 Some(Arc::new(Self))
1448 }
1449 }
1450
1451 impl CatalogProvider for Fixture {
1452 fn as_any(&self) -> &dyn Any {
1453 unimplemented!("not required for these tests")
1454 }
1455
1456 fn schema_names(&self) -> Vec<String> {
1457 vec!["aschema".to_string()]
1458 }
1459
1460 fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1461 Some(Arc::new(Self))
1462 }
1463 }
1464}