1use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26 array::{StringBuilder, UInt64Builder},
27 datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::DataFusionError;
32use datafusion_common::config::{ConfigEntry, ConfigOptions};
33use datafusion_common::error::Result;
34use datafusion_common::types::NativeType;
35use datafusion_execution::TaskContext;
36use datafusion_execution::runtime_env::RuntimeEnv;
37use datafusion_expr::function::WindowUDFFieldArgs;
38use datafusion_expr::{
39 AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF,
40};
41use datafusion_expr::{TableType, Volatility};
42use datafusion_physical_plan::SendableRecordBatchStream;
43use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
44use datafusion_physical_plan::streaming::PartitionStream;
45use std::collections::{BTreeSet, HashMap, HashSet};
46use std::fmt::Debug;
47use std::sync::Arc;
48
49pub const INFORMATION_SCHEMA: &str = "information_schema";
50pub(crate) const TABLES: &str = "tables";
51pub(crate) const VIEWS: &str = "views";
52pub(crate) const COLUMNS: &str = "columns";
53pub(crate) const DF_SETTINGS: &str = "df_settings";
54pub(crate) const SCHEMATA: &str = "schemata";
55pub(crate) const ROUTINES: &str = "routines";
56pub(crate) const PARAMETERS: &str = "parameters";
57
58pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
60 TABLES,
61 VIEWS,
62 COLUMNS,
63 DF_SETTINGS,
64 SCHEMATA,
65 ROUTINES,
66 PARAMETERS,
67];
68
69#[derive(Debug)]
76pub struct InformationSchemaProvider {
77 config: InformationSchemaConfig,
78}
79
80impl InformationSchemaProvider {
81 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
83 Self {
84 config: InformationSchemaConfig { catalog_list },
85 }
86 }
87}
88
89#[derive(Clone, Debug)]
90struct InformationSchemaConfig {
91 catalog_list: Arc<dyn CatalogProviderList>,
92}
93
94impl InformationSchemaConfig {
95 async fn make_tables(
97 &self,
98 builder: &mut InformationSchemaTablesBuilder,
99 ) -> Result<(), DataFusionError> {
100 for catalog_name in self.catalog_list.catalog_names() {
103 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
104
105 for schema_name in catalog.schema_names() {
106 if schema_name != INFORMATION_SCHEMA {
107 if let Some(schema) = catalog.schema(&schema_name) {
109 for table_name in schema.table_names() {
110 if let Some(table_type) =
111 schema.table_type(&table_name).await?
112 {
113 builder.add_table(
114 &catalog_name,
115 &schema_name,
116 &table_name,
117 table_type,
118 );
119 }
120 }
121 }
122 }
123 }
124
125 for table_name in INFORMATION_SCHEMA_TABLES {
127 builder.add_table(
128 &catalog_name,
129 INFORMATION_SCHEMA,
130 table_name,
131 TableType::View,
132 );
133 }
134 }
135
136 Ok(())
137 }
138
139 async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
140 for catalog_name in self.catalog_list.catalog_names() {
141 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
142
143 for schema_name in catalog.schema_names() {
144 if schema_name != INFORMATION_SCHEMA
145 && let Some(schema) = catalog.schema(&schema_name)
146 {
147 let schema_owner = schema.owner_name();
148 builder.add_schemata(&catalog_name, &schema_name, schema_owner);
149 }
150 }
151 }
152 }
153
154 async fn make_views(
155 &self,
156 builder: &mut InformationSchemaViewBuilder,
157 ) -> Result<(), DataFusionError> {
158 for catalog_name in self.catalog_list.catalog_names() {
159 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
160
161 for schema_name in catalog.schema_names() {
162 if schema_name != INFORMATION_SCHEMA {
163 if let Some(schema) = catalog.schema(&schema_name) {
165 for table_name in schema.table_names() {
166 if let Some(table) = schema.table(&table_name).await? {
167 builder.add_view(
168 &catalog_name,
169 &schema_name,
170 &table_name,
171 table.get_table_definition(),
172 )
173 }
174 }
175 }
176 }
177 }
178 }
179
180 Ok(())
181 }
182
183 async fn make_columns(
185 &self,
186 builder: &mut InformationSchemaColumnsBuilder,
187 ) -> Result<(), DataFusionError> {
188 for catalog_name in self.catalog_list.catalog_names() {
189 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
190
191 for schema_name in catalog.schema_names() {
192 if schema_name != INFORMATION_SCHEMA {
193 if let Some(schema) = catalog.schema(&schema_name) {
195 for table_name in schema.table_names() {
196 if let Some(table) = schema.table(&table_name).await? {
197 for (field_position, field) in
198 table.schema().fields().iter().enumerate()
199 {
200 builder.add_column(
201 &catalog_name,
202 &schema_name,
203 &table_name,
204 field_position,
205 field,
206 )
207 }
208 }
209 }
210 }
211 }
212 }
213 }
214
215 Ok(())
216 }
217
218 fn make_df_settings(
220 &self,
221 config_options: &ConfigOptions,
222 runtime_env: &Arc<RuntimeEnv>,
223 builder: &mut InformationSchemaDfSettingsBuilder,
224 ) {
225 for entry in config_options.entries() {
226 builder.add_setting(entry);
227 }
228 for entry in runtime_env.config_entries() {
230 builder.add_setting(entry);
231 }
232 }
233
234 fn make_routines(
235 &self,
236 udfs: &HashMap<String, Arc<ScalarUDF>>,
237 udafs: &HashMap<String, Arc<AggregateUDF>>,
238 udwfs: &HashMap<String, Arc<WindowUDF>>,
239 config_options: &ConfigOptions,
240 builder: &mut InformationSchemaRoutinesBuilder,
241 ) -> Result<()> {
242 let catalog_name = &config_options.catalog.default_catalog;
243 let schema_name = &config_options.catalog.default_schema;
244
245 for (name, udf) in udfs {
246 let return_types = get_udf_args_and_return_types(udf)?
247 .into_iter()
248 .map(|(_, return_type)| return_type)
249 .collect::<HashSet<_>>();
250 for return_type in return_types {
251 builder.add_routine(
252 catalog_name,
253 schema_name,
254 name,
255 "FUNCTION",
256 Self::is_deterministic(udf.signature()),
257 return_type.as_ref(),
258 "SCALAR",
259 udf.documentation().map(|d| d.description.to_string()),
260 udf.documentation().map(|d| d.syntax_example.to_string()),
261 )
262 }
263 }
264
265 for (name, udaf) in udafs {
266 let return_types = get_udaf_args_and_return_types(udaf)?
267 .into_iter()
268 .map(|(_, return_type)| return_type)
269 .collect::<HashSet<_>>();
270 for return_type in return_types {
271 builder.add_routine(
272 catalog_name,
273 schema_name,
274 name,
275 "FUNCTION",
276 Self::is_deterministic(udaf.signature()),
277 return_type.as_ref(),
278 "AGGREGATE",
279 udaf.documentation().map(|d| d.description.to_string()),
280 udaf.documentation().map(|d| d.syntax_example.to_string()),
281 )
282 }
283 }
284
285 for (name, udwf) in udwfs {
286 let return_types = get_udwf_args_and_return_types(udwf)?
287 .into_iter()
288 .map(|(_, return_type)| return_type)
289 .collect::<HashSet<_>>();
290 for return_type in return_types {
291 builder.add_routine(
292 catalog_name,
293 schema_name,
294 name,
295 "FUNCTION",
296 Self::is_deterministic(udwf.signature()),
297 return_type.as_ref(),
298 "WINDOW",
299 udwf.documentation().map(|d| d.description.to_string()),
300 udwf.documentation().map(|d| d.syntax_example.to_string()),
301 )
302 }
303 }
304 Ok(())
305 }
306
307 fn is_deterministic(signature: &Signature) -> bool {
308 signature.volatility == Volatility::Immutable
309 }
310 fn make_parameters(
311 &self,
312 udfs: &HashMap<String, Arc<ScalarUDF>>,
313 udafs: &HashMap<String, Arc<AggregateUDF>>,
314 udwfs: &HashMap<String, Arc<WindowUDF>>,
315 config_options: &ConfigOptions,
316 builder: &mut InformationSchemaParametersBuilder,
317 ) -> Result<()> {
318 let catalog_name = &config_options.catalog.default_catalog;
319 let schema_name = &config_options.catalog.default_schema;
320 let mut add_parameters = |func_name: &str,
321 args: Option<&Vec<(String, String)>>,
322 arg_types: Vec<String>,
323 return_type: Option<String>,
324 is_variadic: bool,
325 rid: u8| {
326 for (position, type_name) in arg_types.iter().enumerate() {
327 let param_name =
328 args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
329 builder.add_parameter(
330 catalog_name,
331 schema_name,
332 func_name,
333 position as u64 + 1,
334 "IN",
335 param_name,
336 type_name,
337 None::<&str>,
338 is_variadic,
339 rid,
340 );
341 }
342 if let Some(return_type) = return_type {
343 builder.add_parameter(
344 catalog_name,
345 schema_name,
346 func_name,
347 1,
348 "OUT",
349 None::<&str>,
350 return_type.as_str(),
351 None::<&str>,
352 false,
353 rid,
354 );
355 }
356 };
357
358 for (func_name, udf) in udfs {
359 let args = udf.documentation().and_then(|d| d.arguments.clone());
360 let combinations = get_udf_args_and_return_types(udf)?;
361 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
362 add_parameters(
363 func_name,
364 args.as_ref(),
365 arg_types,
366 return_type,
367 Self::is_variadic(udf.signature()),
368 rid as u8,
369 );
370 }
371 }
372
373 for (func_name, udaf) in udafs {
374 let args = udaf.documentation().and_then(|d| d.arguments.clone());
375 let combinations = get_udaf_args_and_return_types(udaf)?;
376 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
377 add_parameters(
378 func_name,
379 args.as_ref(),
380 arg_types,
381 return_type,
382 Self::is_variadic(udaf.signature()),
383 rid as u8,
384 );
385 }
386 }
387
388 for (func_name, udwf) in udwfs {
389 let args = udwf.documentation().and_then(|d| d.arguments.clone());
390 let combinations = get_udwf_args_and_return_types(udwf)?;
391 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
392 add_parameters(
393 func_name,
394 args.as_ref(),
395 arg_types,
396 return_type,
397 Self::is_variadic(udwf.signature()),
398 rid as u8,
399 );
400 }
401 }
402
403 Ok(())
404 }
405
406 fn is_variadic(signature: &Signature) -> bool {
407 matches!(
408 signature.type_signature,
409 TypeSignature::Variadic(_) | TypeSignature::VariadicAny
410 )
411 }
412}
413
414fn get_udf_args_and_return_types(
417 udf: &Arc<ScalarUDF>,
418) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
419 let signature = udf.signature();
420 let arg_types = signature.type_signature.get_example_types();
421 if arg_types.is_empty() {
422 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
423 } else {
424 Ok(arg_types
425 .into_iter()
426 .map(|arg_types| {
427 let arg_fields: Vec<FieldRef> = arg_types
428 .iter()
429 .enumerate()
430 .map(|(i, t)| {
431 Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
432 })
433 .collect();
434 let scalar_arguments = vec![None; arg_fields.len()];
435 let return_type = udf
436 .return_field_from_args(ReturnFieldArgs {
437 arg_fields: &arg_fields,
438 scalar_arguments: &scalar_arguments,
439 })
440 .map(|f| {
441 remove_native_type_prefix(&NativeType::from(
442 f.data_type().clone(),
443 ))
444 })
445 .ok();
446 let arg_types = arg_types
447 .into_iter()
448 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
449 .collect::<Vec<_>>();
450 (arg_types, return_type)
451 })
452 .collect::<BTreeSet<_>>())
453 }
454}
455
456fn get_udaf_args_and_return_types(
457 udaf: &Arc<AggregateUDF>,
458) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
459 let signature = udaf.signature();
460 let arg_types = signature.type_signature.get_example_types();
461 if arg_types.is_empty() {
462 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
463 } else {
464 Ok(arg_types
465 .into_iter()
466 .map(|arg_types| {
467 let arg_fields: Vec<FieldRef> = arg_types
468 .iter()
469 .enumerate()
470 .map(|(i, t)| {
471 Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
472 })
473 .collect();
474 let return_type = udaf
475 .return_field(&arg_fields)
476 .map(|f| {
477 remove_native_type_prefix(&NativeType::from(
478 f.data_type().clone(),
479 ))
480 })
481 .ok();
482 let arg_types = arg_types
483 .into_iter()
484 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
485 .collect::<Vec<_>>();
486 (arg_types, return_type)
487 })
488 .collect::<BTreeSet<_>>())
489 }
490}
491
492fn get_udwf_args_and_return_types(
493 udwf: &Arc<WindowUDF>,
494) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
495 let signature = udwf.signature();
496 let arg_types = signature.type_signature.get_example_types();
497 if arg_types.is_empty() {
498 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
499 } else {
500 Ok(arg_types
501 .into_iter()
502 .map(|arg_types| {
503 let arg_fields: Vec<FieldRef> = arg_types
504 .iter()
505 .enumerate()
506 .map(|(i, t)| {
507 Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
508 })
509 .collect();
510 let return_type = udwf
511 .field(WindowUDFFieldArgs::new(&arg_fields, udwf.name()))
512 .map(|f| {
513 remove_native_type_prefix(&NativeType::from(
514 f.data_type().clone(),
515 ))
516 })
517 .ok();
518 let arg_types = arg_types
519 .into_iter()
520 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
521 .collect::<Vec<_>>();
522 (arg_types, return_type)
523 })
524 .collect::<BTreeSet<_>>())
525 }
526}
527
528#[inline]
529fn remove_native_type_prefix(native_type: &NativeType) -> String {
530 format!("{native_type}")
531}
532
533#[async_trait]
534impl SchemaProvider for InformationSchemaProvider {
535 fn table_names(&self) -> Vec<String> {
536 INFORMATION_SCHEMA_TABLES
537 .iter()
538 .map(|t| (*t).to_string())
539 .collect()
540 }
541
542 async fn table(
543 &self,
544 name: &str,
545 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
546 let config = self.config.clone();
547 let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
548 TABLES => Arc::new(InformationSchemaTables::new(config)),
549 COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
550 VIEWS => Arc::new(InformationSchemaViews::new(config)),
551 DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
552 SCHEMATA => Arc::new(InformationSchemata::new(config)),
553 ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
554 PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
555 _ => return Ok(None),
556 };
557
558 Ok(Some(Arc::new(
559 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
560 )))
561 }
562
563 fn table_exist(&self, name: &str) -> bool {
564 INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
565 }
566}
567
568#[derive(Debug)]
569struct InformationSchemaTables {
570 schema: SchemaRef,
571 config: InformationSchemaConfig,
572}
573
574impl InformationSchemaTables {
575 fn new(config: InformationSchemaConfig) -> Self {
576 let schema = Arc::new(Schema::new(vec![
577 Field::new("table_catalog", DataType::Utf8, false),
578 Field::new("table_schema", DataType::Utf8, false),
579 Field::new("table_name", DataType::Utf8, false),
580 Field::new("table_type", DataType::Utf8, false),
581 ]));
582
583 Self { schema, config }
584 }
585
586 fn builder(&self) -> InformationSchemaTablesBuilder {
587 InformationSchemaTablesBuilder {
588 catalog_names: StringBuilder::new(),
589 schema_names: StringBuilder::new(),
590 table_names: StringBuilder::new(),
591 table_types: StringBuilder::new(),
592 schema: Arc::clone(&self.schema),
593 }
594 }
595}
596
597impl PartitionStream for InformationSchemaTables {
598 fn schema(&self) -> &SchemaRef {
599 &self.schema
600 }
601
602 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
603 let mut builder = self.builder();
604 let config = self.config.clone();
605 Box::pin(RecordBatchStreamAdapter::new(
606 Arc::clone(&self.schema),
607 futures::stream::once(async move {
609 config.make_tables(&mut builder).await?;
610 Ok(builder.finish())
611 }),
612 ))
613 }
614}
615
616struct InformationSchemaTablesBuilder {
620 schema: SchemaRef,
621 catalog_names: StringBuilder,
622 schema_names: StringBuilder,
623 table_names: StringBuilder,
624 table_types: StringBuilder,
625}
626
627impl InformationSchemaTablesBuilder {
628 fn add_table(
629 &mut self,
630 catalog_name: impl AsRef<str>,
631 schema_name: impl AsRef<str>,
632 table_name: impl AsRef<str>,
633 table_type: TableType,
634 ) {
635 self.catalog_names.append_value(catalog_name.as_ref());
637 self.schema_names.append_value(schema_name.as_ref());
638 self.table_names.append_value(table_name.as_ref());
639 self.table_types.append_value(match table_type {
640 TableType::Base => "BASE TABLE",
641 TableType::View => "VIEW",
642 TableType::Temporary => "LOCAL TEMPORARY",
643 });
644 }
645
646 fn finish(&mut self) -> RecordBatch {
647 RecordBatch::try_new(
648 Arc::clone(&self.schema),
649 vec![
650 Arc::new(self.catalog_names.finish()),
651 Arc::new(self.schema_names.finish()),
652 Arc::new(self.table_names.finish()),
653 Arc::new(self.table_types.finish()),
654 ],
655 )
656 .unwrap()
657 }
658}
659
660#[derive(Debug)]
661struct InformationSchemaViews {
662 schema: SchemaRef,
663 config: InformationSchemaConfig,
664}
665
666impl InformationSchemaViews {
667 fn new(config: InformationSchemaConfig) -> Self {
668 let schema = Arc::new(Schema::new(vec![
669 Field::new("table_catalog", DataType::Utf8, false),
670 Field::new("table_schema", DataType::Utf8, false),
671 Field::new("table_name", DataType::Utf8, false),
672 Field::new("definition", DataType::Utf8, true),
673 ]));
674
675 Self { schema, config }
676 }
677
678 fn builder(&self) -> InformationSchemaViewBuilder {
679 InformationSchemaViewBuilder {
680 catalog_names: StringBuilder::new(),
681 schema_names: StringBuilder::new(),
682 table_names: StringBuilder::new(),
683 definitions: StringBuilder::new(),
684 schema: Arc::clone(&self.schema),
685 }
686 }
687}
688
689impl PartitionStream for InformationSchemaViews {
690 fn schema(&self) -> &SchemaRef {
691 &self.schema
692 }
693
694 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
695 let mut builder = self.builder();
696 let config = self.config.clone();
697 Box::pin(RecordBatchStreamAdapter::new(
698 Arc::clone(&self.schema),
699 futures::stream::once(async move {
701 config.make_views(&mut builder).await?;
702 Ok(builder.finish())
703 }),
704 ))
705 }
706}
707
708struct InformationSchemaViewBuilder {
712 schema: SchemaRef,
713 catalog_names: StringBuilder,
714 schema_names: StringBuilder,
715 table_names: StringBuilder,
716 definitions: StringBuilder,
717}
718
719impl InformationSchemaViewBuilder {
720 fn add_view(
721 &mut self,
722 catalog_name: impl AsRef<str>,
723 schema_name: impl AsRef<str>,
724 table_name: impl AsRef<str>,
725 definition: Option<&(impl AsRef<str> + ?Sized)>,
726 ) {
727 self.catalog_names.append_value(catalog_name.as_ref());
729 self.schema_names.append_value(schema_name.as_ref());
730 self.table_names.append_value(table_name.as_ref());
731 self.definitions.append_option(definition.as_ref());
732 }
733
734 fn finish(&mut self) -> RecordBatch {
735 RecordBatch::try_new(
736 Arc::clone(&self.schema),
737 vec![
738 Arc::new(self.catalog_names.finish()),
739 Arc::new(self.schema_names.finish()),
740 Arc::new(self.table_names.finish()),
741 Arc::new(self.definitions.finish()),
742 ],
743 )
744 .unwrap()
745 }
746}
747
748#[derive(Debug)]
749struct InformationSchemaColumns {
750 schema: SchemaRef,
751 config: InformationSchemaConfig,
752}
753
754impl InformationSchemaColumns {
755 fn new(config: InformationSchemaConfig) -> Self {
756 let schema = Arc::new(Schema::new(vec![
757 Field::new("table_catalog", DataType::Utf8, false),
758 Field::new("table_schema", DataType::Utf8, false),
759 Field::new("table_name", DataType::Utf8, false),
760 Field::new("column_name", DataType::Utf8, false),
761 Field::new("ordinal_position", DataType::UInt64, false),
762 Field::new("column_default", DataType::Utf8, true),
763 Field::new("is_nullable", DataType::Utf8, false),
764 Field::new("data_type", DataType::Utf8, false),
765 Field::new("character_maximum_length", DataType::UInt64, true),
766 Field::new("character_octet_length", DataType::UInt64, true),
767 Field::new("numeric_precision", DataType::UInt64, true),
768 Field::new("numeric_precision_radix", DataType::UInt64, true),
769 Field::new("numeric_scale", DataType::UInt64, true),
770 Field::new("datetime_precision", DataType::UInt64, true),
771 Field::new("interval_type", DataType::Utf8, true),
772 ]));
773
774 Self { schema, config }
775 }
776
777 fn builder(&self) -> InformationSchemaColumnsBuilder {
778 let default_capacity = 10;
782
783 InformationSchemaColumnsBuilder {
784 catalog_names: StringBuilder::new(),
785 schema_names: StringBuilder::new(),
786 table_names: StringBuilder::new(),
787 column_names: StringBuilder::new(),
788 ordinal_positions: UInt64Builder::with_capacity(default_capacity),
789 column_defaults: StringBuilder::new(),
790 is_nullables: StringBuilder::new(),
791 data_types: StringBuilder::new(),
792 character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
793 character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
794 numeric_precisions: UInt64Builder::with_capacity(default_capacity),
795 numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
796 numeric_scales: UInt64Builder::with_capacity(default_capacity),
797 datetime_precisions: UInt64Builder::with_capacity(default_capacity),
798 interval_types: StringBuilder::new(),
799 schema: Arc::clone(&self.schema),
800 }
801 }
802}
803
804impl PartitionStream for InformationSchemaColumns {
805 fn schema(&self) -> &SchemaRef {
806 &self.schema
807 }
808
809 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
810 let mut builder = self.builder();
811 let config = self.config.clone();
812 Box::pin(RecordBatchStreamAdapter::new(
813 Arc::clone(&self.schema),
814 futures::stream::once(async move {
816 config.make_columns(&mut builder).await?;
817 Ok(builder.finish())
818 }),
819 ))
820 }
821}
822
823struct InformationSchemaColumnsBuilder {
827 schema: SchemaRef,
828 catalog_names: StringBuilder,
829 schema_names: StringBuilder,
830 table_names: StringBuilder,
831 column_names: StringBuilder,
832 ordinal_positions: UInt64Builder,
833 column_defaults: StringBuilder,
834 is_nullables: StringBuilder,
835 data_types: StringBuilder,
836 character_maximum_lengths: UInt64Builder,
837 character_octet_lengths: UInt64Builder,
838 numeric_precisions: UInt64Builder,
839 numeric_precision_radixes: UInt64Builder,
840 numeric_scales: UInt64Builder,
841 datetime_precisions: UInt64Builder,
842 interval_types: StringBuilder,
843}
844
845impl InformationSchemaColumnsBuilder {
846 fn add_column(
847 &mut self,
848 catalog_name: &str,
849 schema_name: &str,
850 table_name: &str,
851 field_position: usize,
852 field: &Field,
853 ) {
854 use DataType::*;
855
856 self.catalog_names.append_value(catalog_name);
858 self.schema_names.append_value(schema_name);
859 self.table_names.append_value(table_name);
860
861 self.column_names.append_value(field.name());
862
863 self.ordinal_positions.append_value(field_position as u64);
864
865 self.column_defaults.append_null();
867
868 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
870 self.is_nullables.append_value(nullable_str);
871
872 self.data_types.append_value(field.data_type().to_string());
874
875 let max_chars = None;
881 self.character_maximum_lengths.append_option(max_chars);
882
883 let char_len: Option<u64> = match field.data_type() {
886 Utf8 | Binary => Some(i32::MAX as u64),
887 LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
888 _ => None,
889 };
890 self.character_octet_lengths.append_option(char_len);
891
892 let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
915 Int8 | UInt8 => (Some(8), Some(2), None),
916 Int16 | UInt16 => (Some(16), Some(2), None),
917 Int32 | UInt32 => (Some(32), Some(2), None),
918 Float16 => (Some(15), Some(2), None),
921 Float32 => (Some(24), Some(2), None),
923 Float64 => (Some(24), Some(2), None),
925 Decimal128(precision, scale) => {
926 (Some(*precision as u64), Some(10), Some(*scale as u64))
927 }
928 _ => (None, None, None),
929 };
930
931 self.numeric_precisions.append_option(numeric_precision);
932 self.numeric_precision_radixes.append_option(numeric_radix);
933 self.numeric_scales.append_option(numeric_scale);
934
935 self.datetime_precisions.append_option(None);
936 self.interval_types.append_null();
937 }
938
939 fn finish(&mut self) -> RecordBatch {
940 RecordBatch::try_new(
941 Arc::clone(&self.schema),
942 vec![
943 Arc::new(self.catalog_names.finish()),
944 Arc::new(self.schema_names.finish()),
945 Arc::new(self.table_names.finish()),
946 Arc::new(self.column_names.finish()),
947 Arc::new(self.ordinal_positions.finish()),
948 Arc::new(self.column_defaults.finish()),
949 Arc::new(self.is_nullables.finish()),
950 Arc::new(self.data_types.finish()),
951 Arc::new(self.character_maximum_lengths.finish()),
952 Arc::new(self.character_octet_lengths.finish()),
953 Arc::new(self.numeric_precisions.finish()),
954 Arc::new(self.numeric_precision_radixes.finish()),
955 Arc::new(self.numeric_scales.finish()),
956 Arc::new(self.datetime_precisions.finish()),
957 Arc::new(self.interval_types.finish()),
958 ],
959 )
960 .unwrap()
961 }
962}
963
964#[derive(Debug)]
965struct InformationSchemata {
966 schema: SchemaRef,
967 config: InformationSchemaConfig,
968}
969
970impl InformationSchemata {
971 fn new(config: InformationSchemaConfig) -> Self {
972 let schema = Arc::new(Schema::new(vec![
973 Field::new("catalog_name", DataType::Utf8, false),
974 Field::new("schema_name", DataType::Utf8, false),
975 Field::new("schema_owner", DataType::Utf8, true),
976 Field::new("default_character_set_catalog", DataType::Utf8, true),
977 Field::new("default_character_set_schema", DataType::Utf8, true),
978 Field::new("default_character_set_name", DataType::Utf8, true),
979 Field::new("sql_path", DataType::Utf8, true),
980 ]));
981 Self { schema, config }
982 }
983
984 fn builder(&self) -> InformationSchemataBuilder {
985 InformationSchemataBuilder {
986 schema: Arc::clone(&self.schema),
987 catalog_name: StringBuilder::new(),
988 schema_name: StringBuilder::new(),
989 schema_owner: StringBuilder::new(),
990 default_character_set_catalog: StringBuilder::new(),
991 default_character_set_schema: StringBuilder::new(),
992 default_character_set_name: StringBuilder::new(),
993 sql_path: StringBuilder::new(),
994 }
995 }
996}
997
998struct InformationSchemataBuilder {
999 schema: SchemaRef,
1000 catalog_name: StringBuilder,
1001 schema_name: StringBuilder,
1002 schema_owner: StringBuilder,
1003 default_character_set_catalog: StringBuilder,
1004 default_character_set_schema: StringBuilder,
1005 default_character_set_name: StringBuilder,
1006 sql_path: StringBuilder,
1007}
1008
1009impl InformationSchemataBuilder {
1010 fn add_schemata(
1011 &mut self,
1012 catalog_name: &str,
1013 schema_name: &str,
1014 schema_owner: Option<&str>,
1015 ) {
1016 self.catalog_name.append_value(catalog_name);
1017 self.schema_name.append_value(schema_name);
1018 match schema_owner {
1019 Some(owner) => self.schema_owner.append_value(owner),
1020 None => self.schema_owner.append_null(),
1021 }
1022 self.default_character_set_catalog.append_null();
1025 self.default_character_set_schema.append_null();
1026 self.default_character_set_name.append_null();
1027 self.sql_path.append_null();
1028 }
1029
1030 fn finish(&mut self) -> RecordBatch {
1031 RecordBatch::try_new(
1032 Arc::clone(&self.schema),
1033 vec![
1034 Arc::new(self.catalog_name.finish()),
1035 Arc::new(self.schema_name.finish()),
1036 Arc::new(self.schema_owner.finish()),
1037 Arc::new(self.default_character_set_catalog.finish()),
1038 Arc::new(self.default_character_set_schema.finish()),
1039 Arc::new(self.default_character_set_name.finish()),
1040 Arc::new(self.sql_path.finish()),
1041 ],
1042 )
1043 .unwrap()
1044 }
1045}
1046
1047impl PartitionStream for InformationSchemata {
1048 fn schema(&self) -> &SchemaRef {
1049 &self.schema
1050 }
1051
1052 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1053 let mut builder = self.builder();
1054 let config = self.config.clone();
1055 Box::pin(RecordBatchStreamAdapter::new(
1056 Arc::clone(&self.schema),
1057 futures::stream::once(async move {
1059 config.make_schemata(&mut builder).await;
1060 Ok(builder.finish())
1061 }),
1062 ))
1063 }
1064}
1065
1066#[derive(Debug)]
1067struct InformationSchemaDfSettings {
1068 schema: SchemaRef,
1069 config: InformationSchemaConfig,
1070}
1071
1072impl InformationSchemaDfSettings {
1073 fn new(config: InformationSchemaConfig) -> Self {
1074 let schema = Arc::new(Schema::new(vec![
1075 Field::new("name", DataType::Utf8, false),
1076 Field::new("value", DataType::Utf8, true),
1077 Field::new("description", DataType::Utf8, true),
1078 ]));
1079
1080 Self { schema, config }
1081 }
1082
1083 fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1084 InformationSchemaDfSettingsBuilder {
1085 names: StringBuilder::new(),
1086 values: StringBuilder::new(),
1087 descriptions: StringBuilder::new(),
1088 schema: Arc::clone(&self.schema),
1089 }
1090 }
1091}
1092
1093impl PartitionStream for InformationSchemaDfSettings {
1094 fn schema(&self) -> &SchemaRef {
1095 &self.schema
1096 }
1097
1098 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1099 let config = self.config.clone();
1100 let mut builder = self.builder();
1101 Box::pin(RecordBatchStreamAdapter::new(
1102 Arc::clone(&self.schema),
1103 futures::stream::once(async move {
1105 let runtime_env = ctx.runtime_env();
1107 config.make_df_settings(
1108 ctx.session_config().options(),
1109 &runtime_env,
1110 &mut builder,
1111 );
1112 Ok(builder.finish())
1113 }),
1114 ))
1115 }
1116}
1117
1118struct InformationSchemaDfSettingsBuilder {
1119 schema: SchemaRef,
1120 names: StringBuilder,
1121 values: StringBuilder,
1122 descriptions: StringBuilder,
1123}
1124
1125impl InformationSchemaDfSettingsBuilder {
1126 fn add_setting(&mut self, entry: ConfigEntry) {
1127 self.names.append_value(entry.key);
1128 self.values.append_option(entry.value);
1129 self.descriptions.append_value(entry.description);
1130 }
1131
1132 fn finish(&mut self) -> RecordBatch {
1133 RecordBatch::try_new(
1134 Arc::clone(&self.schema),
1135 vec![
1136 Arc::new(self.names.finish()),
1137 Arc::new(self.values.finish()),
1138 Arc::new(self.descriptions.finish()),
1139 ],
1140 )
1141 .unwrap()
1142 }
1143}
1144
1145#[derive(Debug)]
1146struct InformationSchemaRoutines {
1147 schema: SchemaRef,
1148 config: InformationSchemaConfig,
1149}
1150
1151impl InformationSchemaRoutines {
1152 fn new(config: InformationSchemaConfig) -> Self {
1153 let schema = Arc::new(Schema::new(vec![
1154 Field::new("specific_catalog", DataType::Utf8, false),
1155 Field::new("specific_schema", DataType::Utf8, false),
1156 Field::new("specific_name", DataType::Utf8, false),
1157 Field::new("routine_catalog", DataType::Utf8, false),
1158 Field::new("routine_schema", DataType::Utf8, false),
1159 Field::new("routine_name", DataType::Utf8, false),
1160 Field::new("routine_type", DataType::Utf8, false),
1161 Field::new("is_deterministic", DataType::Boolean, true),
1162 Field::new("data_type", DataType::Utf8, true),
1163 Field::new("function_type", DataType::Utf8, true),
1164 Field::new("description", DataType::Utf8, true),
1165 Field::new("syntax_example", DataType::Utf8, true),
1166 ]));
1167
1168 Self { schema, config }
1169 }
1170
1171 fn builder(&self) -> InformationSchemaRoutinesBuilder {
1172 InformationSchemaRoutinesBuilder {
1173 schema: Arc::clone(&self.schema),
1174 specific_catalog: StringBuilder::new(),
1175 specific_schema: StringBuilder::new(),
1176 specific_name: StringBuilder::new(),
1177 routine_catalog: StringBuilder::new(),
1178 routine_schema: StringBuilder::new(),
1179 routine_name: StringBuilder::new(),
1180 routine_type: StringBuilder::new(),
1181 is_deterministic: BooleanBuilder::new(),
1182 data_type: StringBuilder::new(),
1183 function_type: StringBuilder::new(),
1184 description: StringBuilder::new(),
1185 syntax_example: StringBuilder::new(),
1186 }
1187 }
1188}
1189
1190struct InformationSchemaRoutinesBuilder {
1191 schema: SchemaRef,
1192 specific_catalog: StringBuilder,
1193 specific_schema: StringBuilder,
1194 specific_name: StringBuilder,
1195 routine_catalog: StringBuilder,
1196 routine_schema: StringBuilder,
1197 routine_name: StringBuilder,
1198 routine_type: StringBuilder,
1199 is_deterministic: BooleanBuilder,
1200 data_type: StringBuilder,
1201 function_type: StringBuilder,
1202 description: StringBuilder,
1203 syntax_example: StringBuilder,
1204}
1205
1206impl InformationSchemaRoutinesBuilder {
1207 #[expect(clippy::too_many_arguments)]
1208 fn add_routine(
1209 &mut self,
1210 catalog_name: impl AsRef<str>,
1211 schema_name: impl AsRef<str>,
1212 routine_name: impl AsRef<str>,
1213 routine_type: impl AsRef<str>,
1214 is_deterministic: bool,
1215 data_type: Option<&impl AsRef<str>>,
1216 function_type: impl AsRef<str>,
1217 description: Option<impl AsRef<str>>,
1218 syntax_example: Option<impl AsRef<str>>,
1219 ) {
1220 self.specific_catalog.append_value(catalog_name.as_ref());
1221 self.specific_schema.append_value(schema_name.as_ref());
1222 self.specific_name.append_value(routine_name.as_ref());
1223 self.routine_catalog.append_value(catalog_name.as_ref());
1224 self.routine_schema.append_value(schema_name.as_ref());
1225 self.routine_name.append_value(routine_name.as_ref());
1226 self.routine_type.append_value(routine_type.as_ref());
1227 self.is_deterministic.append_value(is_deterministic);
1228 self.data_type.append_option(data_type.as_ref());
1229 self.function_type.append_value(function_type.as_ref());
1230 self.description.append_option(description);
1231 self.syntax_example.append_option(syntax_example);
1232 }
1233
1234 fn finish(&mut self) -> RecordBatch {
1235 RecordBatch::try_new(
1236 Arc::clone(&self.schema),
1237 vec![
1238 Arc::new(self.specific_catalog.finish()),
1239 Arc::new(self.specific_schema.finish()),
1240 Arc::new(self.specific_name.finish()),
1241 Arc::new(self.routine_catalog.finish()),
1242 Arc::new(self.routine_schema.finish()),
1243 Arc::new(self.routine_name.finish()),
1244 Arc::new(self.routine_type.finish()),
1245 Arc::new(self.is_deterministic.finish()),
1246 Arc::new(self.data_type.finish()),
1247 Arc::new(self.function_type.finish()),
1248 Arc::new(self.description.finish()),
1249 Arc::new(self.syntax_example.finish()),
1250 ],
1251 )
1252 .unwrap()
1253 }
1254}
1255
1256impl PartitionStream for InformationSchemaRoutines {
1257 fn schema(&self) -> &SchemaRef {
1258 &self.schema
1259 }
1260
1261 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1262 let config = self.config.clone();
1263 let mut builder = self.builder();
1264 Box::pin(RecordBatchStreamAdapter::new(
1265 Arc::clone(&self.schema),
1266 futures::stream::once(async move {
1267 config.make_routines(
1268 ctx.scalar_functions(),
1269 ctx.aggregate_functions(),
1270 ctx.window_functions(),
1271 ctx.session_config().options(),
1272 &mut builder,
1273 )?;
1274 Ok(builder.finish())
1275 }),
1276 ))
1277 }
1278}
1279
1280#[derive(Debug)]
1281struct InformationSchemaParameters {
1282 schema: SchemaRef,
1283 config: InformationSchemaConfig,
1284}
1285
1286impl InformationSchemaParameters {
1287 fn new(config: InformationSchemaConfig) -> Self {
1288 let schema = Arc::new(Schema::new(vec![
1289 Field::new("specific_catalog", DataType::Utf8, false),
1290 Field::new("specific_schema", DataType::Utf8, false),
1291 Field::new("specific_name", DataType::Utf8, false),
1292 Field::new("ordinal_position", DataType::UInt64, false),
1293 Field::new("parameter_mode", DataType::Utf8, false),
1294 Field::new("parameter_name", DataType::Utf8, true),
1295 Field::new("data_type", DataType::Utf8, false),
1296 Field::new("parameter_default", DataType::Utf8, true),
1297 Field::new("is_variadic", DataType::Boolean, false),
1298 Field::new("rid", DataType::UInt8, false),
1304 ]));
1305
1306 Self { schema, config }
1307 }
1308
1309 fn builder(&self) -> InformationSchemaParametersBuilder {
1310 InformationSchemaParametersBuilder {
1311 schema: Arc::clone(&self.schema),
1312 specific_catalog: StringBuilder::new(),
1313 specific_schema: StringBuilder::new(),
1314 specific_name: StringBuilder::new(),
1315 ordinal_position: UInt64Builder::new(),
1316 parameter_mode: StringBuilder::new(),
1317 parameter_name: StringBuilder::new(),
1318 data_type: StringBuilder::new(),
1319 parameter_default: StringBuilder::new(),
1320 is_variadic: BooleanBuilder::new(),
1321 rid: UInt8Builder::new(),
1322 }
1323 }
1324}
1325
1326struct InformationSchemaParametersBuilder {
1327 schema: SchemaRef,
1328 specific_catalog: StringBuilder,
1329 specific_schema: StringBuilder,
1330 specific_name: StringBuilder,
1331 ordinal_position: UInt64Builder,
1332 parameter_mode: StringBuilder,
1333 parameter_name: StringBuilder,
1334 data_type: StringBuilder,
1335 parameter_default: StringBuilder,
1336 is_variadic: BooleanBuilder,
1337 rid: UInt8Builder,
1338}
1339
1340impl InformationSchemaParametersBuilder {
1341 #[expect(clippy::too_many_arguments)]
1342 fn add_parameter(
1343 &mut self,
1344 specific_catalog: impl AsRef<str>,
1345 specific_schema: impl AsRef<str>,
1346 specific_name: impl AsRef<str>,
1347 ordinal_position: u64,
1348 parameter_mode: impl AsRef<str>,
1349 parameter_name: Option<&(impl AsRef<str> + ?Sized)>,
1350 data_type: impl AsRef<str>,
1351 parameter_default: Option<impl AsRef<str>>,
1352 is_variadic: bool,
1353 rid: u8,
1354 ) {
1355 self.specific_catalog
1356 .append_value(specific_catalog.as_ref());
1357 self.specific_schema.append_value(specific_schema.as_ref());
1358 self.specific_name.append_value(specific_name.as_ref());
1359 self.ordinal_position.append_value(ordinal_position);
1360 self.parameter_mode.append_value(parameter_mode.as_ref());
1361 self.parameter_name.append_option(parameter_name.as_ref());
1362 self.data_type.append_value(data_type.as_ref());
1363 self.parameter_default.append_option(parameter_default);
1364 self.is_variadic.append_value(is_variadic);
1365 self.rid.append_value(rid);
1366 }
1367
1368 fn finish(&mut self) -> RecordBatch {
1369 RecordBatch::try_new(
1370 Arc::clone(&self.schema),
1371 vec![
1372 Arc::new(self.specific_catalog.finish()),
1373 Arc::new(self.specific_schema.finish()),
1374 Arc::new(self.specific_name.finish()),
1375 Arc::new(self.ordinal_position.finish()),
1376 Arc::new(self.parameter_mode.finish()),
1377 Arc::new(self.parameter_name.finish()),
1378 Arc::new(self.data_type.finish()),
1379 Arc::new(self.parameter_default.finish()),
1380 Arc::new(self.is_variadic.finish()),
1381 Arc::new(self.rid.finish()),
1382 ],
1383 )
1384 .unwrap()
1385 }
1386}
1387
1388impl PartitionStream for InformationSchemaParameters {
1389 fn schema(&self) -> &SchemaRef {
1390 &self.schema
1391 }
1392
1393 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1394 let config = self.config.clone();
1395 let mut builder = self.builder();
1396 Box::pin(RecordBatchStreamAdapter::new(
1397 Arc::clone(&self.schema),
1398 futures::stream::once(async move {
1399 config.make_parameters(
1400 ctx.scalar_functions(),
1401 ctx.aggregate_functions(),
1402 ctx.window_functions(),
1403 ctx.session_config().options(),
1404 &mut builder,
1405 )?;
1406 Ok(builder.finish())
1407 }),
1408 ))
1409 }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414 use super::*;
1415 use crate::CatalogProvider;
1416
1417 #[tokio::test]
1418 async fn make_tables_uses_table_type() {
1419 let config = InformationSchemaConfig {
1420 catalog_list: Arc::new(Fixture),
1421 };
1422 let mut builder = InformationSchemaTablesBuilder {
1423 catalog_names: StringBuilder::new(),
1424 schema_names: StringBuilder::new(),
1425 table_names: StringBuilder::new(),
1426 table_types: StringBuilder::new(),
1427 schema: Arc::new(Schema::empty()),
1428 };
1429
1430 assert!(config.make_tables(&mut builder).await.is_ok());
1431
1432 assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1433 }
1434
1435 #[derive(Debug)]
1436 struct Fixture;
1437
1438 #[async_trait]
1439 impl SchemaProvider for Fixture {
1440 async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1442 Ok(Some(TableType::Base))
1443 }
1444
1445 async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1448 panic!(
1449 "InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
1450 )
1451 }
1452
1453 fn table_names(&self) -> Vec<String> {
1454 vec!["atable".to_string()]
1455 }
1456
1457 fn table_exist(&self, _: &str) -> bool {
1458 unimplemented!("not required for these tests")
1459 }
1460 }
1461
1462 impl CatalogProviderList for Fixture {
1463 fn register_catalog(
1464 &self,
1465 _: String,
1466 _: Arc<dyn CatalogProvider>,
1467 ) -> Option<Arc<dyn CatalogProvider>> {
1468 unimplemented!("not required for these tests")
1469 }
1470
1471 fn catalog_names(&self) -> Vec<String> {
1472 vec!["acatalog".to_string()]
1473 }
1474
1475 fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1476 Some(Arc::new(Self))
1477 }
1478 }
1479
1480 impl CatalogProvider for Fixture {
1481 fn schema_names(&self) -> Vec<String> {
1482 vec!["aschema".to_string()]
1483 }
1484
1485 fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1486 Some(Arc::new(Self))
1487 }
1488 }
1489}