1use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26 array::{StringBuilder, UInt64Builder},
27 datatypes::{DataType, Field, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::config::{ConfigEntry, ConfigOptions};
32use datafusion_common::error::Result;
33use datafusion_common::types::NativeType;
34use datafusion_common::DataFusionError;
35use datafusion_execution::TaskContext;
36use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
37use datafusion_expr::{TableType, Volatility};
38use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
39use datafusion_physical_plan::streaming::PartitionStream;
40use datafusion_physical_plan::SendableRecordBatchStream;
41use std::collections::{BTreeSet, HashMap, HashSet};
42use std::fmt::Debug;
43use std::{any::Any, sync::Arc};
44
45pub const INFORMATION_SCHEMA: &str = "information_schema";
46pub(crate) const TABLES: &str = "tables";
47pub(crate) const VIEWS: &str = "views";
48pub(crate) const COLUMNS: &str = "columns";
49pub(crate) const DF_SETTINGS: &str = "df_settings";
50pub(crate) const SCHEMATA: &str = "schemata";
51pub(crate) const ROUTINES: &str = "routines";
52pub(crate) const PARAMETERS: &str = "parameters";
53
54pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
56 TABLES,
57 VIEWS,
58 COLUMNS,
59 DF_SETTINGS,
60 SCHEMATA,
61 ROUTINES,
62 PARAMETERS,
63];
64
65#[derive(Debug)]
72pub struct InformationSchemaProvider {
73 config: InformationSchemaConfig,
74}
75
76impl InformationSchemaProvider {
77 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
79 Self {
80 config: InformationSchemaConfig { catalog_list },
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
86struct InformationSchemaConfig {
87 catalog_list: Arc<dyn CatalogProviderList>,
88}
89
90impl InformationSchemaConfig {
91 async fn make_tables(
93 &self,
94 builder: &mut InformationSchemaTablesBuilder,
95 ) -> Result<(), DataFusionError> {
96 for catalog_name in self.catalog_list.catalog_names() {
99 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
100
101 for schema_name in catalog.schema_names() {
102 if schema_name != INFORMATION_SCHEMA {
103 if let Some(schema) = catalog.schema(&schema_name) {
105 for table_name in schema.table_names() {
106 if let Some(table) = schema.table(&table_name).await? {
107 builder.add_table(
108 &catalog_name,
109 &schema_name,
110 &table_name,
111 table.table_type(),
112 );
113 }
114 }
115 }
116 }
117 }
118
119 for table_name in INFORMATION_SCHEMA_TABLES {
121 builder.add_table(
122 &catalog_name,
123 INFORMATION_SCHEMA,
124 table_name,
125 TableType::View,
126 );
127 }
128 }
129
130 Ok(())
131 }
132
133 async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
134 for catalog_name in self.catalog_list.catalog_names() {
135 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
136
137 for schema_name in catalog.schema_names() {
138 if schema_name != INFORMATION_SCHEMA {
139 if let Some(schema) = catalog.schema(&schema_name) {
140 let schema_owner = schema.owner_name();
141 builder.add_schemata(&catalog_name, &schema_name, schema_owner);
142 }
143 }
144 }
145 }
146 }
147
148 async fn make_views(
149 &self,
150 builder: &mut InformationSchemaViewBuilder,
151 ) -> Result<(), DataFusionError> {
152 for catalog_name in self.catalog_list.catalog_names() {
153 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
154
155 for schema_name in catalog.schema_names() {
156 if schema_name != INFORMATION_SCHEMA {
157 if let Some(schema) = catalog.schema(&schema_name) {
159 for table_name in schema.table_names() {
160 if let Some(table) = schema.table(&table_name).await? {
161 builder.add_view(
162 &catalog_name,
163 &schema_name,
164 &table_name,
165 table.get_table_definition(),
166 )
167 }
168 }
169 }
170 }
171 }
172 }
173
174 Ok(())
175 }
176
177 async fn make_columns(
179 &self,
180 builder: &mut InformationSchemaColumnsBuilder,
181 ) -> Result<(), DataFusionError> {
182 for catalog_name in self.catalog_list.catalog_names() {
183 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
184
185 for schema_name in catalog.schema_names() {
186 if schema_name != INFORMATION_SCHEMA {
187 if let Some(schema) = catalog.schema(&schema_name) {
189 for table_name in schema.table_names() {
190 if let Some(table) = schema.table(&table_name).await? {
191 for (field_position, field) in
192 table.schema().fields().iter().enumerate()
193 {
194 builder.add_column(
195 &catalog_name,
196 &schema_name,
197 &table_name,
198 field_position,
199 field,
200 )
201 }
202 }
203 }
204 }
205 }
206 }
207 }
208
209 Ok(())
210 }
211
212 fn make_df_settings(
214 &self,
215 config_options: &ConfigOptions,
216 builder: &mut InformationSchemaDfSettingsBuilder,
217 ) {
218 for entry in config_options.entries() {
219 builder.add_setting(entry);
220 }
221 }
222
223 fn make_routines(
224 &self,
225 udfs: &HashMap<String, Arc<ScalarUDF>>,
226 udafs: &HashMap<String, Arc<AggregateUDF>>,
227 udwfs: &HashMap<String, Arc<WindowUDF>>,
228 config_options: &ConfigOptions,
229 builder: &mut InformationSchemaRoutinesBuilder,
230 ) -> Result<()> {
231 let catalog_name = &config_options.catalog.default_catalog;
232 let schema_name = &config_options.catalog.default_schema;
233
234 for (name, udf) in udfs {
235 let return_types = get_udf_args_and_return_types(udf)?
236 .into_iter()
237 .map(|(_, return_type)| return_type)
238 .collect::<HashSet<_>>();
239 for return_type in return_types {
240 builder.add_routine(
241 catalog_name,
242 schema_name,
243 name,
244 "FUNCTION",
245 Self::is_deterministic(udf.signature()),
246 return_type,
247 "SCALAR",
248 udf.documentation().map(|d| d.description.to_string()),
249 udf.documentation().map(|d| d.syntax_example.to_string()),
250 )
251 }
252 }
253
254 for (name, udaf) in udafs {
255 let return_types = get_udaf_args_and_return_types(udaf)?
256 .into_iter()
257 .map(|(_, return_type)| return_type)
258 .collect::<HashSet<_>>();
259 for return_type in return_types {
260 builder.add_routine(
261 catalog_name,
262 schema_name,
263 name,
264 "FUNCTION",
265 Self::is_deterministic(udaf.signature()),
266 return_type,
267 "AGGREGATE",
268 udaf.documentation().map(|d| d.description.to_string()),
269 udaf.documentation().map(|d| d.syntax_example.to_string()),
270 )
271 }
272 }
273
274 for (name, udwf) in udwfs {
275 let return_types = get_udwf_args_and_return_types(udwf)?
276 .into_iter()
277 .map(|(_, return_type)| return_type)
278 .collect::<HashSet<_>>();
279 for return_type in return_types {
280 builder.add_routine(
281 catalog_name,
282 schema_name,
283 name,
284 "FUNCTION",
285 Self::is_deterministic(udwf.signature()),
286 return_type,
287 "WINDOW",
288 udwf.documentation().map(|d| d.description.to_string()),
289 udwf.documentation().map(|d| d.syntax_example.to_string()),
290 )
291 }
292 }
293 Ok(())
294 }
295
296 fn is_deterministic(signature: &Signature) -> bool {
297 signature.volatility == Volatility::Immutable
298 }
299 fn make_parameters(
300 &self,
301 udfs: &HashMap<String, Arc<ScalarUDF>>,
302 udafs: &HashMap<String, Arc<AggregateUDF>>,
303 udwfs: &HashMap<String, Arc<WindowUDF>>,
304 config_options: &ConfigOptions,
305 builder: &mut InformationSchemaParametersBuilder,
306 ) -> Result<()> {
307 let catalog_name = &config_options.catalog.default_catalog;
308 let schema_name = &config_options.catalog.default_schema;
309 let mut add_parameters = |func_name: &str,
310 args: Option<&Vec<(String, String)>>,
311 arg_types: Vec<String>,
312 return_type: Option<String>,
313 is_variadic: bool,
314 rid: u8| {
315 for (position, type_name) in arg_types.iter().enumerate() {
316 let param_name =
317 args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
318 builder.add_parameter(
319 catalog_name,
320 schema_name,
321 func_name,
322 position as u64 + 1,
323 "IN",
324 param_name,
325 type_name,
326 None::<&str>,
327 is_variadic,
328 rid,
329 );
330 }
331 if let Some(return_type) = return_type {
332 builder.add_parameter(
333 catalog_name,
334 schema_name,
335 func_name,
336 1,
337 "OUT",
338 None::<&str>,
339 return_type.as_str(),
340 None::<&str>,
341 false,
342 rid,
343 );
344 }
345 };
346
347 for (func_name, udf) in udfs {
348 let args = udf.documentation().and_then(|d| d.arguments.clone());
349 let combinations = get_udf_args_and_return_types(udf)?;
350 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
351 add_parameters(
352 func_name,
353 args.as_ref(),
354 arg_types,
355 return_type,
356 Self::is_variadic(udf.signature()),
357 rid as u8,
358 );
359 }
360 }
361
362 for (func_name, udaf) in udafs {
363 let args = udaf.documentation().and_then(|d| d.arguments.clone());
364 let combinations = get_udaf_args_and_return_types(udaf)?;
365 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
366 add_parameters(
367 func_name,
368 args.as_ref(),
369 arg_types,
370 return_type,
371 Self::is_variadic(udaf.signature()),
372 rid as u8,
373 );
374 }
375 }
376
377 for (func_name, udwf) in udwfs {
378 let args = udwf.documentation().and_then(|d| d.arguments.clone());
379 let combinations = get_udwf_args_and_return_types(udwf)?;
380 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
381 add_parameters(
382 func_name,
383 args.as_ref(),
384 arg_types,
385 return_type,
386 Self::is_variadic(udwf.signature()),
387 rid as u8,
388 );
389 }
390 }
391
392 Ok(())
393 }
394
395 fn is_variadic(signature: &Signature) -> bool {
396 matches!(
397 signature.type_signature,
398 TypeSignature::Variadic(_) | TypeSignature::VariadicAny
399 )
400 }
401}
402
403fn get_udf_args_and_return_types(
406 udf: &Arc<ScalarUDF>,
407) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
408 let signature = udf.signature();
409 let arg_types = signature.type_signature.get_example_types();
410 if arg_types.is_empty() {
411 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
412 } else {
413 Ok(arg_types
414 .into_iter()
415 .map(|arg_types| {
416 let return_type = udf
418 .return_type(&arg_types)
419 .map(|t| remove_native_type_prefix(NativeType::from(t)))
420 .ok();
421 let arg_types = arg_types
422 .into_iter()
423 .map(|t| remove_native_type_prefix(NativeType::from(t)))
424 .collect::<Vec<_>>();
425 (arg_types, return_type)
426 })
427 .collect::<BTreeSet<_>>())
428 }
429}
430
431fn get_udaf_args_and_return_types(
432 udaf: &Arc<AggregateUDF>,
433) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
434 let signature = udaf.signature();
435 let arg_types = signature.type_signature.get_example_types();
436 if arg_types.is_empty() {
437 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
438 } else {
439 Ok(arg_types
440 .into_iter()
441 .map(|arg_types| {
442 let return_type = udaf
444 .return_type(&arg_types)
445 .ok()
446 .map(|t| remove_native_type_prefix(NativeType::from(t)));
447 let arg_types = arg_types
448 .into_iter()
449 .map(|t| remove_native_type_prefix(NativeType::from(t)))
450 .collect::<Vec<_>>();
451 (arg_types, return_type)
452 })
453 .collect::<BTreeSet<_>>())
454 }
455}
456
457fn get_udwf_args_and_return_types(
458 udwf: &Arc<WindowUDF>,
459) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
460 let signature = udwf.signature();
461 let arg_types = signature.type_signature.get_example_types();
462 if arg_types.is_empty() {
463 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
464 } else {
465 Ok(arg_types
466 .into_iter()
467 .map(|arg_types| {
468 let arg_types = arg_types
470 .into_iter()
471 .map(|t| remove_native_type_prefix(NativeType::from(t)))
472 .collect::<Vec<_>>();
473 (arg_types, None)
474 })
475 .collect::<BTreeSet<_>>())
476 }
477}
478
479#[inline]
480fn remove_native_type_prefix(native_type: NativeType) -> String {
481 format!("{native_type:?}")
482}
483
484#[async_trait]
485impl SchemaProvider for InformationSchemaProvider {
486 fn as_any(&self) -> &dyn Any {
487 self
488 }
489
490 fn table_names(&self) -> Vec<String> {
491 INFORMATION_SCHEMA_TABLES
492 .iter()
493 .map(|t| t.to_string())
494 .collect()
495 }
496
497 async fn table(
498 &self,
499 name: &str,
500 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
501 let config = self.config.clone();
502 let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
503 TABLES => Arc::new(InformationSchemaTables::new(config)),
504 COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
505 VIEWS => Arc::new(InformationSchemaViews::new(config)),
506 DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
507 SCHEMATA => Arc::new(InformationSchemata::new(config)),
508 ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
509 PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
510 _ => return Ok(None),
511 };
512
513 Ok(Some(Arc::new(
514 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
515 )))
516 }
517
518 fn table_exist(&self, name: &str) -> bool {
519 INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
520 }
521}
522
523#[derive(Debug)]
524struct InformationSchemaTables {
525 schema: SchemaRef,
526 config: InformationSchemaConfig,
527}
528
529impl InformationSchemaTables {
530 fn new(config: InformationSchemaConfig) -> Self {
531 let schema = Arc::new(Schema::new(vec![
532 Field::new("table_catalog", DataType::Utf8, false),
533 Field::new("table_schema", DataType::Utf8, false),
534 Field::new("table_name", DataType::Utf8, false),
535 Field::new("table_type", DataType::Utf8, false),
536 ]));
537
538 Self { schema, config }
539 }
540
541 fn builder(&self) -> InformationSchemaTablesBuilder {
542 InformationSchemaTablesBuilder {
543 catalog_names: StringBuilder::new(),
544 schema_names: StringBuilder::new(),
545 table_names: StringBuilder::new(),
546 table_types: StringBuilder::new(),
547 schema: Arc::clone(&self.schema),
548 }
549 }
550}
551
552impl PartitionStream for InformationSchemaTables {
553 fn schema(&self) -> &SchemaRef {
554 &self.schema
555 }
556
557 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
558 let mut builder = self.builder();
559 let config = self.config.clone();
560 Box::pin(RecordBatchStreamAdapter::new(
561 Arc::clone(&self.schema),
562 futures::stream::once(async move {
564 config.make_tables(&mut builder).await?;
565 Ok(builder.finish())
566 }),
567 ))
568 }
569}
570
571struct InformationSchemaTablesBuilder {
575 schema: SchemaRef,
576 catalog_names: StringBuilder,
577 schema_names: StringBuilder,
578 table_names: StringBuilder,
579 table_types: StringBuilder,
580}
581
582impl InformationSchemaTablesBuilder {
583 fn add_table(
584 &mut self,
585 catalog_name: impl AsRef<str>,
586 schema_name: impl AsRef<str>,
587 table_name: impl AsRef<str>,
588 table_type: TableType,
589 ) {
590 self.catalog_names.append_value(catalog_name.as_ref());
592 self.schema_names.append_value(schema_name.as_ref());
593 self.table_names.append_value(table_name.as_ref());
594 self.table_types.append_value(match table_type {
595 TableType::Base => "BASE TABLE",
596 TableType::View => "VIEW",
597 TableType::Temporary => "LOCAL TEMPORARY",
598 });
599 }
600
601 fn finish(&mut self) -> RecordBatch {
602 RecordBatch::try_new(
603 Arc::clone(&self.schema),
604 vec![
605 Arc::new(self.catalog_names.finish()),
606 Arc::new(self.schema_names.finish()),
607 Arc::new(self.table_names.finish()),
608 Arc::new(self.table_types.finish()),
609 ],
610 )
611 .unwrap()
612 }
613}
614
615#[derive(Debug)]
616struct InformationSchemaViews {
617 schema: SchemaRef,
618 config: InformationSchemaConfig,
619}
620
621impl InformationSchemaViews {
622 fn new(config: InformationSchemaConfig) -> Self {
623 let schema = Arc::new(Schema::new(vec![
624 Field::new("table_catalog", DataType::Utf8, false),
625 Field::new("table_schema", DataType::Utf8, false),
626 Field::new("table_name", DataType::Utf8, false),
627 Field::new("definition", DataType::Utf8, true),
628 ]));
629
630 Self { schema, config }
631 }
632
633 fn builder(&self) -> InformationSchemaViewBuilder {
634 InformationSchemaViewBuilder {
635 catalog_names: StringBuilder::new(),
636 schema_names: StringBuilder::new(),
637 table_names: StringBuilder::new(),
638 definitions: StringBuilder::new(),
639 schema: Arc::clone(&self.schema),
640 }
641 }
642}
643
644impl PartitionStream for InformationSchemaViews {
645 fn schema(&self) -> &SchemaRef {
646 &self.schema
647 }
648
649 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
650 let mut builder = self.builder();
651 let config = self.config.clone();
652 Box::pin(RecordBatchStreamAdapter::new(
653 Arc::clone(&self.schema),
654 futures::stream::once(async move {
656 config.make_views(&mut builder).await?;
657 Ok(builder.finish())
658 }),
659 ))
660 }
661}
662
663struct InformationSchemaViewBuilder {
667 schema: SchemaRef,
668 catalog_names: StringBuilder,
669 schema_names: StringBuilder,
670 table_names: StringBuilder,
671 definitions: StringBuilder,
672}
673
674impl InformationSchemaViewBuilder {
675 fn add_view(
676 &mut self,
677 catalog_name: impl AsRef<str>,
678 schema_name: impl AsRef<str>,
679 table_name: impl AsRef<str>,
680 definition: Option<impl AsRef<str>>,
681 ) {
682 self.catalog_names.append_value(catalog_name.as_ref());
684 self.schema_names.append_value(schema_name.as_ref());
685 self.table_names.append_value(table_name.as_ref());
686 self.definitions.append_option(definition.as_ref());
687 }
688
689 fn finish(&mut self) -> RecordBatch {
690 RecordBatch::try_new(
691 Arc::clone(&self.schema),
692 vec![
693 Arc::new(self.catalog_names.finish()),
694 Arc::new(self.schema_names.finish()),
695 Arc::new(self.table_names.finish()),
696 Arc::new(self.definitions.finish()),
697 ],
698 )
699 .unwrap()
700 }
701}
702
703#[derive(Debug)]
704struct InformationSchemaColumns {
705 schema: SchemaRef,
706 config: InformationSchemaConfig,
707}
708
709impl InformationSchemaColumns {
710 fn new(config: InformationSchemaConfig) -> Self {
711 let schema = Arc::new(Schema::new(vec![
712 Field::new("table_catalog", DataType::Utf8, false),
713 Field::new("table_schema", DataType::Utf8, false),
714 Field::new("table_name", DataType::Utf8, false),
715 Field::new("column_name", DataType::Utf8, false),
716 Field::new("ordinal_position", DataType::UInt64, false),
717 Field::new("column_default", DataType::Utf8, true),
718 Field::new("is_nullable", DataType::Utf8, false),
719 Field::new("data_type", DataType::Utf8, false),
720 Field::new("character_maximum_length", DataType::UInt64, true),
721 Field::new("character_octet_length", DataType::UInt64, true),
722 Field::new("numeric_precision", DataType::UInt64, true),
723 Field::new("numeric_precision_radix", DataType::UInt64, true),
724 Field::new("numeric_scale", DataType::UInt64, true),
725 Field::new("datetime_precision", DataType::UInt64, true),
726 Field::new("interval_type", DataType::Utf8, true),
727 ]));
728
729 Self { schema, config }
730 }
731
732 fn builder(&self) -> InformationSchemaColumnsBuilder {
733 let default_capacity = 10;
737
738 InformationSchemaColumnsBuilder {
739 catalog_names: StringBuilder::new(),
740 schema_names: StringBuilder::new(),
741 table_names: StringBuilder::new(),
742 column_names: StringBuilder::new(),
743 ordinal_positions: UInt64Builder::with_capacity(default_capacity),
744 column_defaults: StringBuilder::new(),
745 is_nullables: StringBuilder::new(),
746 data_types: StringBuilder::new(),
747 character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
748 character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
749 numeric_precisions: UInt64Builder::with_capacity(default_capacity),
750 numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
751 numeric_scales: UInt64Builder::with_capacity(default_capacity),
752 datetime_precisions: UInt64Builder::with_capacity(default_capacity),
753 interval_types: StringBuilder::new(),
754 schema: Arc::clone(&self.schema),
755 }
756 }
757}
758
759impl PartitionStream for InformationSchemaColumns {
760 fn schema(&self) -> &SchemaRef {
761 &self.schema
762 }
763
764 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
765 let mut builder = self.builder();
766 let config = self.config.clone();
767 Box::pin(RecordBatchStreamAdapter::new(
768 Arc::clone(&self.schema),
769 futures::stream::once(async move {
771 config.make_columns(&mut builder).await?;
772 Ok(builder.finish())
773 }),
774 ))
775 }
776}
777
778struct InformationSchemaColumnsBuilder {
782 schema: SchemaRef,
783 catalog_names: StringBuilder,
784 schema_names: StringBuilder,
785 table_names: StringBuilder,
786 column_names: StringBuilder,
787 ordinal_positions: UInt64Builder,
788 column_defaults: StringBuilder,
789 is_nullables: StringBuilder,
790 data_types: StringBuilder,
791 character_maximum_lengths: UInt64Builder,
792 character_octet_lengths: UInt64Builder,
793 numeric_precisions: UInt64Builder,
794 numeric_precision_radixes: UInt64Builder,
795 numeric_scales: UInt64Builder,
796 datetime_precisions: UInt64Builder,
797 interval_types: StringBuilder,
798}
799
800impl InformationSchemaColumnsBuilder {
801 fn add_column(
802 &mut self,
803 catalog_name: &str,
804 schema_name: &str,
805 table_name: &str,
806 field_position: usize,
807 field: &Field,
808 ) {
809 use DataType::*;
810
811 self.catalog_names.append_value(catalog_name);
813 self.schema_names.append_value(schema_name);
814 self.table_names.append_value(table_name);
815
816 self.column_names.append_value(field.name());
817
818 self.ordinal_positions.append_value(field_position as u64);
819
820 self.column_defaults.append_null();
822
823 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
825 self.is_nullables.append_value(nullable_str);
826
827 self.data_types
829 .append_value(format!("{:?}", field.data_type()));
830
831 let max_chars = None;
837 self.character_maximum_lengths.append_option(max_chars);
838
839 let char_len: Option<u64> = match field.data_type() {
842 Utf8 | Binary => Some(i32::MAX as u64),
843 LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
844 _ => None,
845 };
846 self.character_octet_lengths.append_option(char_len);
847
848 let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
871 Int8 | UInt8 => (Some(8), Some(2), None),
872 Int16 | UInt16 => (Some(16), Some(2), None),
873 Int32 | UInt32 => (Some(32), Some(2), None),
874 Float16 => (Some(15), Some(2), None),
877 Float32 => (Some(24), Some(2), None),
879 Float64 => (Some(24), Some(2), None),
881 Decimal128(precision, scale) => {
882 (Some(*precision as u64), Some(10), Some(*scale as u64))
883 }
884 _ => (None, None, None),
885 };
886
887 self.numeric_precisions.append_option(numeric_precision);
888 self.numeric_precision_radixes.append_option(numeric_radix);
889 self.numeric_scales.append_option(numeric_scale);
890
891 self.datetime_precisions.append_option(None);
892 self.interval_types.append_null();
893 }
894
895 fn finish(&mut self) -> RecordBatch {
896 RecordBatch::try_new(
897 Arc::clone(&self.schema),
898 vec![
899 Arc::new(self.catalog_names.finish()),
900 Arc::new(self.schema_names.finish()),
901 Arc::new(self.table_names.finish()),
902 Arc::new(self.column_names.finish()),
903 Arc::new(self.ordinal_positions.finish()),
904 Arc::new(self.column_defaults.finish()),
905 Arc::new(self.is_nullables.finish()),
906 Arc::new(self.data_types.finish()),
907 Arc::new(self.character_maximum_lengths.finish()),
908 Arc::new(self.character_octet_lengths.finish()),
909 Arc::new(self.numeric_precisions.finish()),
910 Arc::new(self.numeric_precision_radixes.finish()),
911 Arc::new(self.numeric_scales.finish()),
912 Arc::new(self.datetime_precisions.finish()),
913 Arc::new(self.interval_types.finish()),
914 ],
915 )
916 .unwrap()
917 }
918}
919
920#[derive(Debug)]
921struct InformationSchemata {
922 schema: SchemaRef,
923 config: InformationSchemaConfig,
924}
925
926impl InformationSchemata {
927 fn new(config: InformationSchemaConfig) -> Self {
928 let schema = Arc::new(Schema::new(vec![
929 Field::new("catalog_name", DataType::Utf8, false),
930 Field::new("schema_name", DataType::Utf8, false),
931 Field::new("schema_owner", DataType::Utf8, true),
932 Field::new("default_character_set_catalog", DataType::Utf8, true),
933 Field::new("default_character_set_schema", DataType::Utf8, true),
934 Field::new("default_character_set_name", DataType::Utf8, true),
935 Field::new("sql_path", DataType::Utf8, true),
936 ]));
937 Self { schema, config }
938 }
939
940 fn builder(&self) -> InformationSchemataBuilder {
941 InformationSchemataBuilder {
942 schema: Arc::clone(&self.schema),
943 catalog_name: StringBuilder::new(),
944 schema_name: StringBuilder::new(),
945 schema_owner: StringBuilder::new(),
946 default_character_set_catalog: StringBuilder::new(),
947 default_character_set_schema: StringBuilder::new(),
948 default_character_set_name: StringBuilder::new(),
949 sql_path: StringBuilder::new(),
950 }
951 }
952}
953
954struct InformationSchemataBuilder {
955 schema: SchemaRef,
956 catalog_name: StringBuilder,
957 schema_name: StringBuilder,
958 schema_owner: StringBuilder,
959 default_character_set_catalog: StringBuilder,
960 default_character_set_schema: StringBuilder,
961 default_character_set_name: StringBuilder,
962 sql_path: StringBuilder,
963}
964
965impl InformationSchemataBuilder {
966 fn add_schemata(
967 &mut self,
968 catalog_name: &str,
969 schema_name: &str,
970 schema_owner: Option<&str>,
971 ) {
972 self.catalog_name.append_value(catalog_name);
973 self.schema_name.append_value(schema_name);
974 match schema_owner {
975 Some(owner) => self.schema_owner.append_value(owner),
976 None => self.schema_owner.append_null(),
977 }
978 self.default_character_set_catalog.append_null();
981 self.default_character_set_schema.append_null();
982 self.default_character_set_name.append_null();
983 self.sql_path.append_null();
984 }
985
986 fn finish(&mut self) -> RecordBatch {
987 RecordBatch::try_new(
988 Arc::clone(&self.schema),
989 vec![
990 Arc::new(self.catalog_name.finish()),
991 Arc::new(self.schema_name.finish()),
992 Arc::new(self.schema_owner.finish()),
993 Arc::new(self.default_character_set_catalog.finish()),
994 Arc::new(self.default_character_set_schema.finish()),
995 Arc::new(self.default_character_set_name.finish()),
996 Arc::new(self.sql_path.finish()),
997 ],
998 )
999 .unwrap()
1000 }
1001}
1002
1003impl PartitionStream for InformationSchemata {
1004 fn schema(&self) -> &SchemaRef {
1005 &self.schema
1006 }
1007
1008 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1009 let mut builder = self.builder();
1010 let config = self.config.clone();
1011 Box::pin(RecordBatchStreamAdapter::new(
1012 Arc::clone(&self.schema),
1013 futures::stream::once(async move {
1015 config.make_schemata(&mut builder).await;
1016 Ok(builder.finish())
1017 }),
1018 ))
1019 }
1020}
1021
1022#[derive(Debug)]
1023struct InformationSchemaDfSettings {
1024 schema: SchemaRef,
1025 config: InformationSchemaConfig,
1026}
1027
1028impl InformationSchemaDfSettings {
1029 fn new(config: InformationSchemaConfig) -> Self {
1030 let schema = Arc::new(Schema::new(vec![
1031 Field::new("name", DataType::Utf8, false),
1032 Field::new("value", DataType::Utf8, true),
1033 Field::new("description", DataType::Utf8, true),
1034 ]));
1035
1036 Self { schema, config }
1037 }
1038
1039 fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1040 InformationSchemaDfSettingsBuilder {
1041 names: StringBuilder::new(),
1042 values: StringBuilder::new(),
1043 descriptions: StringBuilder::new(),
1044 schema: Arc::clone(&self.schema),
1045 }
1046 }
1047}
1048
1049impl PartitionStream for InformationSchemaDfSettings {
1050 fn schema(&self) -> &SchemaRef {
1051 &self.schema
1052 }
1053
1054 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1055 let config = self.config.clone();
1056 let mut builder = self.builder();
1057 Box::pin(RecordBatchStreamAdapter::new(
1058 Arc::clone(&self.schema),
1059 futures::stream::once(async move {
1061 config.make_df_settings(ctx.session_config().options(), &mut builder);
1063 Ok(builder.finish())
1064 }),
1065 ))
1066 }
1067}
1068
1069struct InformationSchemaDfSettingsBuilder {
1070 schema: SchemaRef,
1071 names: StringBuilder,
1072 values: StringBuilder,
1073 descriptions: StringBuilder,
1074}
1075
1076impl InformationSchemaDfSettingsBuilder {
1077 fn add_setting(&mut self, entry: ConfigEntry) {
1078 self.names.append_value(entry.key);
1079 self.values.append_option(entry.value);
1080 self.descriptions.append_value(entry.description);
1081 }
1082
1083 fn finish(&mut self) -> RecordBatch {
1084 RecordBatch::try_new(
1085 Arc::clone(&self.schema),
1086 vec![
1087 Arc::new(self.names.finish()),
1088 Arc::new(self.values.finish()),
1089 Arc::new(self.descriptions.finish()),
1090 ],
1091 )
1092 .unwrap()
1093 }
1094}
1095
1096#[derive(Debug)]
1097struct InformationSchemaRoutines {
1098 schema: SchemaRef,
1099 config: InformationSchemaConfig,
1100}
1101
1102impl InformationSchemaRoutines {
1103 fn new(config: InformationSchemaConfig) -> Self {
1104 let schema = Arc::new(Schema::new(vec![
1105 Field::new("specific_catalog", DataType::Utf8, false),
1106 Field::new("specific_schema", DataType::Utf8, false),
1107 Field::new("specific_name", DataType::Utf8, false),
1108 Field::new("routine_catalog", DataType::Utf8, false),
1109 Field::new("routine_schema", DataType::Utf8, false),
1110 Field::new("routine_name", DataType::Utf8, false),
1111 Field::new("routine_type", DataType::Utf8, false),
1112 Field::new("is_deterministic", DataType::Boolean, true),
1113 Field::new("data_type", DataType::Utf8, true),
1114 Field::new("function_type", DataType::Utf8, true),
1115 Field::new("description", DataType::Utf8, true),
1116 Field::new("syntax_example", DataType::Utf8, true),
1117 ]));
1118
1119 Self { schema, config }
1120 }
1121
1122 fn builder(&self) -> InformationSchemaRoutinesBuilder {
1123 InformationSchemaRoutinesBuilder {
1124 schema: Arc::clone(&self.schema),
1125 specific_catalog: StringBuilder::new(),
1126 specific_schema: StringBuilder::new(),
1127 specific_name: StringBuilder::new(),
1128 routine_catalog: StringBuilder::new(),
1129 routine_schema: StringBuilder::new(),
1130 routine_name: StringBuilder::new(),
1131 routine_type: StringBuilder::new(),
1132 is_deterministic: BooleanBuilder::new(),
1133 data_type: StringBuilder::new(),
1134 function_type: StringBuilder::new(),
1135 description: StringBuilder::new(),
1136 syntax_example: StringBuilder::new(),
1137 }
1138 }
1139}
1140
1141struct InformationSchemaRoutinesBuilder {
1142 schema: SchemaRef,
1143 specific_catalog: StringBuilder,
1144 specific_schema: StringBuilder,
1145 specific_name: StringBuilder,
1146 routine_catalog: StringBuilder,
1147 routine_schema: StringBuilder,
1148 routine_name: StringBuilder,
1149 routine_type: StringBuilder,
1150 is_deterministic: BooleanBuilder,
1151 data_type: StringBuilder,
1152 function_type: StringBuilder,
1153 description: StringBuilder,
1154 syntax_example: StringBuilder,
1155}
1156
1157impl InformationSchemaRoutinesBuilder {
1158 #[allow(clippy::too_many_arguments)]
1159 fn add_routine(
1160 &mut self,
1161 catalog_name: impl AsRef<str>,
1162 schema_name: impl AsRef<str>,
1163 routine_name: impl AsRef<str>,
1164 routine_type: impl AsRef<str>,
1165 is_deterministic: bool,
1166 data_type: Option<impl AsRef<str>>,
1167 function_type: impl AsRef<str>,
1168 description: Option<impl AsRef<str>>,
1169 syntax_example: Option<impl AsRef<str>>,
1170 ) {
1171 self.specific_catalog.append_value(catalog_name.as_ref());
1172 self.specific_schema.append_value(schema_name.as_ref());
1173 self.specific_name.append_value(routine_name.as_ref());
1174 self.routine_catalog.append_value(catalog_name.as_ref());
1175 self.routine_schema.append_value(schema_name.as_ref());
1176 self.routine_name.append_value(routine_name.as_ref());
1177 self.routine_type.append_value(routine_type.as_ref());
1178 self.is_deterministic.append_value(is_deterministic);
1179 self.data_type.append_option(data_type.as_ref());
1180 self.function_type.append_value(function_type.as_ref());
1181 self.description.append_option(description);
1182 self.syntax_example.append_option(syntax_example);
1183 }
1184
1185 fn finish(&mut self) -> RecordBatch {
1186 RecordBatch::try_new(
1187 Arc::clone(&self.schema),
1188 vec![
1189 Arc::new(self.specific_catalog.finish()),
1190 Arc::new(self.specific_schema.finish()),
1191 Arc::new(self.specific_name.finish()),
1192 Arc::new(self.routine_catalog.finish()),
1193 Arc::new(self.routine_schema.finish()),
1194 Arc::new(self.routine_name.finish()),
1195 Arc::new(self.routine_type.finish()),
1196 Arc::new(self.is_deterministic.finish()),
1197 Arc::new(self.data_type.finish()),
1198 Arc::new(self.function_type.finish()),
1199 Arc::new(self.description.finish()),
1200 Arc::new(self.syntax_example.finish()),
1201 ],
1202 )
1203 .unwrap()
1204 }
1205}
1206
1207impl PartitionStream for InformationSchemaRoutines {
1208 fn schema(&self) -> &SchemaRef {
1209 &self.schema
1210 }
1211
1212 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1213 let config = self.config.clone();
1214 let mut builder = self.builder();
1215 Box::pin(RecordBatchStreamAdapter::new(
1216 Arc::clone(&self.schema),
1217 futures::stream::once(async move {
1218 config.make_routines(
1219 ctx.scalar_functions(),
1220 ctx.aggregate_functions(),
1221 ctx.window_functions(),
1222 ctx.session_config().options(),
1223 &mut builder,
1224 )?;
1225 Ok(builder.finish())
1226 }),
1227 ))
1228 }
1229}
1230
1231#[derive(Debug)]
1232struct InformationSchemaParameters {
1233 schema: SchemaRef,
1234 config: InformationSchemaConfig,
1235}
1236
1237impl InformationSchemaParameters {
1238 fn new(config: InformationSchemaConfig) -> Self {
1239 let schema = Arc::new(Schema::new(vec![
1240 Field::new("specific_catalog", DataType::Utf8, false),
1241 Field::new("specific_schema", DataType::Utf8, false),
1242 Field::new("specific_name", DataType::Utf8, false),
1243 Field::new("ordinal_position", DataType::UInt64, false),
1244 Field::new("parameter_mode", DataType::Utf8, false),
1245 Field::new("parameter_name", DataType::Utf8, true),
1246 Field::new("data_type", DataType::Utf8, false),
1247 Field::new("parameter_default", DataType::Utf8, true),
1248 Field::new("is_variadic", DataType::Boolean, false),
1249 Field::new("rid", DataType::UInt8, false),
1255 ]));
1256
1257 Self { schema, config }
1258 }
1259
1260 fn builder(&self) -> InformationSchemaParametersBuilder {
1261 InformationSchemaParametersBuilder {
1262 schema: Arc::clone(&self.schema),
1263 specific_catalog: StringBuilder::new(),
1264 specific_schema: StringBuilder::new(),
1265 specific_name: StringBuilder::new(),
1266 ordinal_position: UInt64Builder::new(),
1267 parameter_mode: StringBuilder::new(),
1268 parameter_name: StringBuilder::new(),
1269 data_type: StringBuilder::new(),
1270 parameter_default: StringBuilder::new(),
1271 is_variadic: BooleanBuilder::new(),
1272 rid: UInt8Builder::new(),
1273 }
1274 }
1275}
1276
1277struct InformationSchemaParametersBuilder {
1278 schema: SchemaRef,
1279 specific_catalog: StringBuilder,
1280 specific_schema: StringBuilder,
1281 specific_name: StringBuilder,
1282 ordinal_position: UInt64Builder,
1283 parameter_mode: StringBuilder,
1284 parameter_name: StringBuilder,
1285 data_type: StringBuilder,
1286 parameter_default: StringBuilder,
1287 is_variadic: BooleanBuilder,
1288 rid: UInt8Builder,
1289}
1290
1291impl InformationSchemaParametersBuilder {
1292 #[allow(clippy::too_many_arguments)]
1293 fn add_parameter(
1294 &mut self,
1295 specific_catalog: impl AsRef<str>,
1296 specific_schema: impl AsRef<str>,
1297 specific_name: impl AsRef<str>,
1298 ordinal_position: u64,
1299 parameter_mode: impl AsRef<str>,
1300 parameter_name: Option<impl AsRef<str>>,
1301 data_type: impl AsRef<str>,
1302 parameter_default: Option<impl AsRef<str>>,
1303 is_variadic: bool,
1304 rid: u8,
1305 ) {
1306 self.specific_catalog
1307 .append_value(specific_catalog.as_ref());
1308 self.specific_schema.append_value(specific_schema.as_ref());
1309 self.specific_name.append_value(specific_name.as_ref());
1310 self.ordinal_position.append_value(ordinal_position);
1311 self.parameter_mode.append_value(parameter_mode.as_ref());
1312 self.parameter_name.append_option(parameter_name.as_ref());
1313 self.data_type.append_value(data_type.as_ref());
1314 self.parameter_default.append_option(parameter_default);
1315 self.is_variadic.append_value(is_variadic);
1316 self.rid.append_value(rid);
1317 }
1318
1319 fn finish(&mut self) -> RecordBatch {
1320 RecordBatch::try_new(
1321 Arc::clone(&self.schema),
1322 vec![
1323 Arc::new(self.specific_catalog.finish()),
1324 Arc::new(self.specific_schema.finish()),
1325 Arc::new(self.specific_name.finish()),
1326 Arc::new(self.ordinal_position.finish()),
1327 Arc::new(self.parameter_mode.finish()),
1328 Arc::new(self.parameter_name.finish()),
1329 Arc::new(self.data_type.finish()),
1330 Arc::new(self.parameter_default.finish()),
1331 Arc::new(self.is_variadic.finish()),
1332 Arc::new(self.rid.finish()),
1333 ],
1334 )
1335 .unwrap()
1336 }
1337}
1338
1339impl PartitionStream for InformationSchemaParameters {
1340 fn schema(&self) -> &SchemaRef {
1341 &self.schema
1342 }
1343
1344 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1345 let config = self.config.clone();
1346 let mut builder = self.builder();
1347 Box::pin(RecordBatchStreamAdapter::new(
1348 Arc::clone(&self.schema),
1349 futures::stream::once(async move {
1350 config.make_parameters(
1351 ctx.scalar_functions(),
1352 ctx.aggregate_functions(),
1353 ctx.window_functions(),
1354 ctx.session_config().options(),
1355 &mut builder,
1356 )?;
1357 Ok(builder.finish())
1358 }),
1359 ))
1360 }
1361}