1use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26 array::{StringBuilder, UInt64Builder},
27 datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::DataFusionError;
32use datafusion_common::config::{ConfigEntry, ConfigOptions};
33use datafusion_common::error::Result;
34use datafusion_common::types::NativeType;
35use datafusion_execution::TaskContext;
36use datafusion_execution::runtime_env::RuntimeEnv;
37use datafusion_expr::function::WindowUDFFieldArgs;
38use datafusion_expr::{
39 AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF,
40};
41use datafusion_expr::{TableType, Volatility};
42use datafusion_physical_plan::SendableRecordBatchStream;
43use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
44use datafusion_physical_plan::streaming::PartitionStream;
45use std::collections::{BTreeSet, HashMap, HashSet};
46use std::fmt::Debug;
47use std::{any::Any, sync::Arc};
48
49pub const INFORMATION_SCHEMA: &str = "information_schema";
50pub(crate) const TABLES: &str = "tables";
51pub(crate) const VIEWS: &str = "views";
52pub(crate) const COLUMNS: &str = "columns";
53pub(crate) const DF_SETTINGS: &str = "df_settings";
54pub(crate) const SCHEMATA: &str = "schemata";
55pub(crate) const ROUTINES: &str = "routines";
56pub(crate) const PARAMETERS: &str = "parameters";
57
58pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
60 TABLES,
61 VIEWS,
62 COLUMNS,
63 DF_SETTINGS,
64 SCHEMATA,
65 ROUTINES,
66 PARAMETERS,
67];
68
69#[derive(Debug)]
76pub struct InformationSchemaProvider {
77 config: InformationSchemaConfig,
78}
79
80impl InformationSchemaProvider {
81 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
83 Self {
84 config: InformationSchemaConfig { catalog_list },
85 }
86 }
87}
88
89#[derive(Clone, Debug)]
90struct InformationSchemaConfig {
91 catalog_list: Arc<dyn CatalogProviderList>,
92}
93
94impl InformationSchemaConfig {
95 async fn make_tables(
97 &self,
98 builder: &mut InformationSchemaTablesBuilder,
99 ) -> Result<(), DataFusionError> {
100 for catalog_name in self.catalog_list.catalog_names() {
103 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
104
105 for schema_name in catalog.schema_names() {
106 if schema_name != INFORMATION_SCHEMA {
107 if let Some(schema) = catalog.schema(&schema_name) {
109 for table_name in schema.table_names() {
110 if let Some(table_type) =
111 schema.table_type(&table_name).await?
112 {
113 builder.add_table(
114 &catalog_name,
115 &schema_name,
116 &table_name,
117 table_type,
118 );
119 }
120 }
121 }
122 }
123 }
124
125 for table_name in INFORMATION_SCHEMA_TABLES {
127 builder.add_table(
128 &catalog_name,
129 INFORMATION_SCHEMA,
130 table_name,
131 TableType::View,
132 );
133 }
134 }
135
136 Ok(())
137 }
138
139 async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
140 for catalog_name in self.catalog_list.catalog_names() {
141 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
142
143 for schema_name in catalog.schema_names() {
144 if schema_name != INFORMATION_SCHEMA
145 && let Some(schema) = catalog.schema(&schema_name)
146 {
147 let schema_owner = schema.owner_name();
148 builder.add_schemata(&catalog_name, &schema_name, schema_owner);
149 }
150 }
151 }
152 }
153
154 async fn make_views(
155 &self,
156 builder: &mut InformationSchemaViewBuilder,
157 ) -> Result<(), DataFusionError> {
158 for catalog_name in self.catalog_list.catalog_names() {
159 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
160
161 for schema_name in catalog.schema_names() {
162 if schema_name != INFORMATION_SCHEMA {
163 if let Some(schema) = catalog.schema(&schema_name) {
165 for table_name in schema.table_names() {
166 if let Some(table) = schema.table(&table_name).await? {
167 builder.add_view(
168 &catalog_name,
169 &schema_name,
170 &table_name,
171 table.get_table_definition(),
172 )
173 }
174 }
175 }
176 }
177 }
178 }
179
180 Ok(())
181 }
182
183 async fn make_columns(
185 &self,
186 builder: &mut InformationSchemaColumnsBuilder,
187 ) -> Result<(), DataFusionError> {
188 for catalog_name in self.catalog_list.catalog_names() {
189 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
190
191 for schema_name in catalog.schema_names() {
192 if schema_name != INFORMATION_SCHEMA {
193 if let Some(schema) = catalog.schema(&schema_name) {
195 for table_name in schema.table_names() {
196 if let Some(table) = schema.table(&table_name).await? {
197 for (field_position, field) in
198 table.schema().fields().iter().enumerate()
199 {
200 builder.add_column(
201 &catalog_name,
202 &schema_name,
203 &table_name,
204 field_position,
205 field,
206 )
207 }
208 }
209 }
210 }
211 }
212 }
213 }
214
215 Ok(())
216 }
217
218 fn make_df_settings(
220 &self,
221 config_options: &ConfigOptions,
222 runtime_env: &Arc<RuntimeEnv>,
223 builder: &mut InformationSchemaDfSettingsBuilder,
224 ) {
225 for entry in config_options.entries() {
226 builder.add_setting(entry);
227 }
228 for entry in runtime_env.config_entries() {
230 builder.add_setting(entry);
231 }
232 }
233
234 fn make_routines(
235 &self,
236 udfs: &HashMap<String, Arc<ScalarUDF>>,
237 udafs: &HashMap<String, Arc<AggregateUDF>>,
238 udwfs: &HashMap<String, Arc<WindowUDF>>,
239 config_options: &ConfigOptions,
240 builder: &mut InformationSchemaRoutinesBuilder,
241 ) -> Result<()> {
242 let catalog_name = &config_options.catalog.default_catalog;
243 let schema_name = &config_options.catalog.default_schema;
244
245 for (name, udf) in udfs {
246 let return_types = get_udf_args_and_return_types(udf)?
247 .into_iter()
248 .map(|(_, return_type)| return_type)
249 .collect::<HashSet<_>>();
250 for return_type in return_types {
251 builder.add_routine(
252 catalog_name,
253 schema_name,
254 name,
255 "FUNCTION",
256 Self::is_deterministic(udf.signature()),
257 return_type.as_ref(),
258 "SCALAR",
259 udf.documentation().map(|d| d.description.to_string()),
260 udf.documentation().map(|d| d.syntax_example.to_string()),
261 )
262 }
263 }
264
265 for (name, udaf) in udafs {
266 let return_types = get_udaf_args_and_return_types(udaf)?
267 .into_iter()
268 .map(|(_, return_type)| return_type)
269 .collect::<HashSet<_>>();
270 for return_type in return_types {
271 builder.add_routine(
272 catalog_name,
273 schema_name,
274 name,
275 "FUNCTION",
276 Self::is_deterministic(udaf.signature()),
277 return_type.as_ref(),
278 "AGGREGATE",
279 udaf.documentation().map(|d| d.description.to_string()),
280 udaf.documentation().map(|d| d.syntax_example.to_string()),
281 )
282 }
283 }
284
285 for (name, udwf) in udwfs {
286 let return_types = get_udwf_args_and_return_types(udwf)?
287 .into_iter()
288 .map(|(_, return_type)| return_type)
289 .collect::<HashSet<_>>();
290 for return_type in return_types {
291 builder.add_routine(
292 catalog_name,
293 schema_name,
294 name,
295 "FUNCTION",
296 Self::is_deterministic(udwf.signature()),
297 return_type.as_ref(),
298 "WINDOW",
299 udwf.documentation().map(|d| d.description.to_string()),
300 udwf.documentation().map(|d| d.syntax_example.to_string()),
301 )
302 }
303 }
304 Ok(())
305 }
306
307 fn is_deterministic(signature: &Signature) -> bool {
308 signature.volatility == Volatility::Immutable
309 }
310 fn make_parameters(
311 &self,
312 udfs: &HashMap<String, Arc<ScalarUDF>>,
313 udafs: &HashMap<String, Arc<AggregateUDF>>,
314 udwfs: &HashMap<String, Arc<WindowUDF>>,
315 config_options: &ConfigOptions,
316 builder: &mut InformationSchemaParametersBuilder,
317 ) -> Result<()> {
318 let catalog_name = &config_options.catalog.default_catalog;
319 let schema_name = &config_options.catalog.default_schema;
320 let mut add_parameters = |func_name: &str,
321 args: Option<&Vec<(String, String)>>,
322 arg_types: Vec<String>,
323 return_type: Option<String>,
324 is_variadic: bool,
325 rid: u8| {
326 for (position, type_name) in arg_types.iter().enumerate() {
327 let param_name =
328 args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
329 builder.add_parameter(
330 catalog_name,
331 schema_name,
332 func_name,
333 position as u64 + 1,
334 "IN",
335 param_name,
336 type_name,
337 None::<&str>,
338 is_variadic,
339 rid,
340 );
341 }
342 if let Some(return_type) = return_type {
343 builder.add_parameter(
344 catalog_name,
345 schema_name,
346 func_name,
347 1,
348 "OUT",
349 None::<&str>,
350 return_type.as_str(),
351 None::<&str>,
352 false,
353 rid,
354 );
355 }
356 };
357
358 for (func_name, udf) in udfs {
359 let args = udf.documentation().and_then(|d| d.arguments.clone());
360 let combinations = get_udf_args_and_return_types(udf)?;
361 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
362 add_parameters(
363 func_name,
364 args.as_ref(),
365 arg_types,
366 return_type,
367 Self::is_variadic(udf.signature()),
368 rid as u8,
369 );
370 }
371 }
372
373 for (func_name, udaf) in udafs {
374 let args = udaf.documentation().and_then(|d| d.arguments.clone());
375 let combinations = get_udaf_args_and_return_types(udaf)?;
376 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
377 add_parameters(
378 func_name,
379 args.as_ref(),
380 arg_types,
381 return_type,
382 Self::is_variadic(udaf.signature()),
383 rid as u8,
384 );
385 }
386 }
387
388 for (func_name, udwf) in udwfs {
389 let args = udwf.documentation().and_then(|d| d.arguments.clone());
390 let combinations = get_udwf_args_and_return_types(udwf)?;
391 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
392 add_parameters(
393 func_name,
394 args.as_ref(),
395 arg_types,
396 return_type,
397 Self::is_variadic(udwf.signature()),
398 rid as u8,
399 );
400 }
401 }
402
403 Ok(())
404 }
405
406 fn is_variadic(signature: &Signature) -> bool {
407 matches!(
408 signature.type_signature,
409 TypeSignature::Variadic(_) | TypeSignature::VariadicAny
410 )
411 }
412}
413
414fn get_udf_args_and_return_types(
417 udf: &Arc<ScalarUDF>,
418) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
419 let signature = udf.signature();
420 let arg_types = signature.type_signature.get_example_types();
421 if arg_types.is_empty() {
422 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
423 } else {
424 Ok(arg_types
425 .into_iter()
426 .map(|arg_types| {
427 let arg_fields: Vec<FieldRef> = arg_types
428 .iter()
429 .enumerate()
430 .map(|(i, t)| {
431 Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
432 })
433 .collect();
434 let scalar_arguments = vec![None; arg_fields.len()];
435 let return_type = udf
436 .return_field_from_args(ReturnFieldArgs {
437 arg_fields: &arg_fields,
438 scalar_arguments: &scalar_arguments,
439 })
440 .map(|f| {
441 remove_native_type_prefix(&NativeType::from(
442 f.data_type().clone(),
443 ))
444 })
445 .ok();
446 let arg_types = arg_types
447 .into_iter()
448 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
449 .collect::<Vec<_>>();
450 (arg_types, return_type)
451 })
452 .collect::<BTreeSet<_>>())
453 }
454}
455
456fn get_udaf_args_and_return_types(
457 udaf: &Arc<AggregateUDF>,
458) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
459 let signature = udaf.signature();
460 let arg_types = signature.type_signature.get_example_types();
461 if arg_types.is_empty() {
462 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
463 } else {
464 Ok(arg_types
465 .into_iter()
466 .map(|arg_types| {
467 let arg_fields: Vec<FieldRef> = arg_types
468 .iter()
469 .enumerate()
470 .map(|(i, t)| {
471 Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
472 })
473 .collect();
474 let return_type = udaf
475 .return_field(&arg_fields)
476 .map(|f| {
477 remove_native_type_prefix(&NativeType::from(
478 f.data_type().clone(),
479 ))
480 })
481 .ok();
482 let arg_types = arg_types
483 .into_iter()
484 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
485 .collect::<Vec<_>>();
486 (arg_types, return_type)
487 })
488 .collect::<BTreeSet<_>>())
489 }
490}
491
492fn get_udwf_args_and_return_types(
493 udwf: &Arc<WindowUDF>,
494) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
495 let signature = udwf.signature();
496 let arg_types = signature.type_signature.get_example_types();
497 if arg_types.is_empty() {
498 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
499 } else {
500 Ok(arg_types
501 .into_iter()
502 .map(|arg_types| {
503 let arg_fields: Vec<FieldRef> = arg_types
504 .iter()
505 .enumerate()
506 .map(|(i, t)| {
507 Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
508 })
509 .collect();
510 let return_type = udwf
511 .field(WindowUDFFieldArgs::new(&arg_fields, udwf.name()))
512 .map(|f| {
513 remove_native_type_prefix(&NativeType::from(
514 f.data_type().clone(),
515 ))
516 })
517 .ok();
518 let arg_types = arg_types
519 .into_iter()
520 .map(|t| remove_native_type_prefix(&NativeType::from(t)))
521 .collect::<Vec<_>>();
522 (arg_types, return_type)
523 })
524 .collect::<BTreeSet<_>>())
525 }
526}
527
528#[inline]
529fn remove_native_type_prefix(native_type: &NativeType) -> String {
530 format!("{native_type}")
531}
532
533#[async_trait]
534impl SchemaProvider for InformationSchemaProvider {
535 fn as_any(&self) -> &dyn Any {
536 self
537 }
538
539 fn table_names(&self) -> Vec<String> {
540 INFORMATION_SCHEMA_TABLES
541 .iter()
542 .map(|t| (*t).to_string())
543 .collect()
544 }
545
546 async fn table(
547 &self,
548 name: &str,
549 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
550 let config = self.config.clone();
551 let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
552 TABLES => Arc::new(InformationSchemaTables::new(config)),
553 COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
554 VIEWS => Arc::new(InformationSchemaViews::new(config)),
555 DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
556 SCHEMATA => Arc::new(InformationSchemata::new(config)),
557 ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
558 PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
559 _ => return Ok(None),
560 };
561
562 Ok(Some(Arc::new(
563 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
564 )))
565 }
566
567 fn table_exist(&self, name: &str) -> bool {
568 INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
569 }
570}
571
572#[derive(Debug)]
573struct InformationSchemaTables {
574 schema: SchemaRef,
575 config: InformationSchemaConfig,
576}
577
578impl InformationSchemaTables {
579 fn new(config: InformationSchemaConfig) -> Self {
580 let schema = Arc::new(Schema::new(vec![
581 Field::new("table_catalog", DataType::Utf8, false),
582 Field::new("table_schema", DataType::Utf8, false),
583 Field::new("table_name", DataType::Utf8, false),
584 Field::new("table_type", DataType::Utf8, false),
585 ]));
586
587 Self { schema, config }
588 }
589
590 fn builder(&self) -> InformationSchemaTablesBuilder {
591 InformationSchemaTablesBuilder {
592 catalog_names: StringBuilder::new(),
593 schema_names: StringBuilder::new(),
594 table_names: StringBuilder::new(),
595 table_types: StringBuilder::new(),
596 schema: Arc::clone(&self.schema),
597 }
598 }
599}
600
601impl PartitionStream for InformationSchemaTables {
602 fn schema(&self) -> &SchemaRef {
603 &self.schema
604 }
605
606 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
607 let mut builder = self.builder();
608 let config = self.config.clone();
609 Box::pin(RecordBatchStreamAdapter::new(
610 Arc::clone(&self.schema),
611 futures::stream::once(async move {
613 config.make_tables(&mut builder).await?;
614 Ok(builder.finish())
615 }),
616 ))
617 }
618}
619
620struct InformationSchemaTablesBuilder {
624 schema: SchemaRef,
625 catalog_names: StringBuilder,
626 schema_names: StringBuilder,
627 table_names: StringBuilder,
628 table_types: StringBuilder,
629}
630
631impl InformationSchemaTablesBuilder {
632 fn add_table(
633 &mut self,
634 catalog_name: impl AsRef<str>,
635 schema_name: impl AsRef<str>,
636 table_name: impl AsRef<str>,
637 table_type: TableType,
638 ) {
639 self.catalog_names.append_value(catalog_name.as_ref());
641 self.schema_names.append_value(schema_name.as_ref());
642 self.table_names.append_value(table_name.as_ref());
643 self.table_types.append_value(match table_type {
644 TableType::Base => "BASE TABLE",
645 TableType::View => "VIEW",
646 TableType::Temporary => "LOCAL TEMPORARY",
647 });
648 }
649
650 fn finish(&mut self) -> RecordBatch {
651 RecordBatch::try_new(
652 Arc::clone(&self.schema),
653 vec![
654 Arc::new(self.catalog_names.finish()),
655 Arc::new(self.schema_names.finish()),
656 Arc::new(self.table_names.finish()),
657 Arc::new(self.table_types.finish()),
658 ],
659 )
660 .unwrap()
661 }
662}
663
664#[derive(Debug)]
665struct InformationSchemaViews {
666 schema: SchemaRef,
667 config: InformationSchemaConfig,
668}
669
670impl InformationSchemaViews {
671 fn new(config: InformationSchemaConfig) -> Self {
672 let schema = Arc::new(Schema::new(vec![
673 Field::new("table_catalog", DataType::Utf8, false),
674 Field::new("table_schema", DataType::Utf8, false),
675 Field::new("table_name", DataType::Utf8, false),
676 Field::new("definition", DataType::Utf8, true),
677 ]));
678
679 Self { schema, config }
680 }
681
682 fn builder(&self) -> InformationSchemaViewBuilder {
683 InformationSchemaViewBuilder {
684 catalog_names: StringBuilder::new(),
685 schema_names: StringBuilder::new(),
686 table_names: StringBuilder::new(),
687 definitions: StringBuilder::new(),
688 schema: Arc::clone(&self.schema),
689 }
690 }
691}
692
693impl PartitionStream for InformationSchemaViews {
694 fn schema(&self) -> &SchemaRef {
695 &self.schema
696 }
697
698 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
699 let mut builder = self.builder();
700 let config = self.config.clone();
701 Box::pin(RecordBatchStreamAdapter::new(
702 Arc::clone(&self.schema),
703 futures::stream::once(async move {
705 config.make_views(&mut builder).await?;
706 Ok(builder.finish())
707 }),
708 ))
709 }
710}
711
712struct InformationSchemaViewBuilder {
716 schema: SchemaRef,
717 catalog_names: StringBuilder,
718 schema_names: StringBuilder,
719 table_names: StringBuilder,
720 definitions: StringBuilder,
721}
722
723impl InformationSchemaViewBuilder {
724 fn add_view(
725 &mut self,
726 catalog_name: impl AsRef<str>,
727 schema_name: impl AsRef<str>,
728 table_name: impl AsRef<str>,
729 definition: Option<&(impl AsRef<str> + ?Sized)>,
730 ) {
731 self.catalog_names.append_value(catalog_name.as_ref());
733 self.schema_names.append_value(schema_name.as_ref());
734 self.table_names.append_value(table_name.as_ref());
735 self.definitions.append_option(definition.as_ref());
736 }
737
738 fn finish(&mut self) -> RecordBatch {
739 RecordBatch::try_new(
740 Arc::clone(&self.schema),
741 vec![
742 Arc::new(self.catalog_names.finish()),
743 Arc::new(self.schema_names.finish()),
744 Arc::new(self.table_names.finish()),
745 Arc::new(self.definitions.finish()),
746 ],
747 )
748 .unwrap()
749 }
750}
751
752#[derive(Debug)]
753struct InformationSchemaColumns {
754 schema: SchemaRef,
755 config: InformationSchemaConfig,
756}
757
758impl InformationSchemaColumns {
759 fn new(config: InformationSchemaConfig) -> Self {
760 let schema = Arc::new(Schema::new(vec![
761 Field::new("table_catalog", DataType::Utf8, false),
762 Field::new("table_schema", DataType::Utf8, false),
763 Field::new("table_name", DataType::Utf8, false),
764 Field::new("column_name", DataType::Utf8, false),
765 Field::new("ordinal_position", DataType::UInt64, false),
766 Field::new("column_default", DataType::Utf8, true),
767 Field::new("is_nullable", DataType::Utf8, false),
768 Field::new("data_type", DataType::Utf8, false),
769 Field::new("character_maximum_length", DataType::UInt64, true),
770 Field::new("character_octet_length", DataType::UInt64, true),
771 Field::new("numeric_precision", DataType::UInt64, true),
772 Field::new("numeric_precision_radix", DataType::UInt64, true),
773 Field::new("numeric_scale", DataType::UInt64, true),
774 Field::new("datetime_precision", DataType::UInt64, true),
775 Field::new("interval_type", DataType::Utf8, true),
776 ]));
777
778 Self { schema, config }
779 }
780
781 fn builder(&self) -> InformationSchemaColumnsBuilder {
782 let default_capacity = 10;
786
787 InformationSchemaColumnsBuilder {
788 catalog_names: StringBuilder::new(),
789 schema_names: StringBuilder::new(),
790 table_names: StringBuilder::new(),
791 column_names: StringBuilder::new(),
792 ordinal_positions: UInt64Builder::with_capacity(default_capacity),
793 column_defaults: StringBuilder::new(),
794 is_nullables: StringBuilder::new(),
795 data_types: StringBuilder::new(),
796 character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
797 character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
798 numeric_precisions: UInt64Builder::with_capacity(default_capacity),
799 numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
800 numeric_scales: UInt64Builder::with_capacity(default_capacity),
801 datetime_precisions: UInt64Builder::with_capacity(default_capacity),
802 interval_types: StringBuilder::new(),
803 schema: Arc::clone(&self.schema),
804 }
805 }
806}
807
808impl PartitionStream for InformationSchemaColumns {
809 fn schema(&self) -> &SchemaRef {
810 &self.schema
811 }
812
813 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
814 let mut builder = self.builder();
815 let config = self.config.clone();
816 Box::pin(RecordBatchStreamAdapter::new(
817 Arc::clone(&self.schema),
818 futures::stream::once(async move {
820 config.make_columns(&mut builder).await?;
821 Ok(builder.finish())
822 }),
823 ))
824 }
825}
826
827struct InformationSchemaColumnsBuilder {
831 schema: SchemaRef,
832 catalog_names: StringBuilder,
833 schema_names: StringBuilder,
834 table_names: StringBuilder,
835 column_names: StringBuilder,
836 ordinal_positions: UInt64Builder,
837 column_defaults: StringBuilder,
838 is_nullables: StringBuilder,
839 data_types: StringBuilder,
840 character_maximum_lengths: UInt64Builder,
841 character_octet_lengths: UInt64Builder,
842 numeric_precisions: UInt64Builder,
843 numeric_precision_radixes: UInt64Builder,
844 numeric_scales: UInt64Builder,
845 datetime_precisions: UInt64Builder,
846 interval_types: StringBuilder,
847}
848
849impl InformationSchemaColumnsBuilder {
850 fn add_column(
851 &mut self,
852 catalog_name: &str,
853 schema_name: &str,
854 table_name: &str,
855 field_position: usize,
856 field: &Field,
857 ) {
858 use DataType::*;
859
860 self.catalog_names.append_value(catalog_name);
862 self.schema_names.append_value(schema_name);
863 self.table_names.append_value(table_name);
864
865 self.column_names.append_value(field.name());
866
867 self.ordinal_positions.append_value(field_position as u64);
868
869 self.column_defaults.append_null();
871
872 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
874 self.is_nullables.append_value(nullable_str);
875
876 self.data_types.append_value(field.data_type().to_string());
878
879 let max_chars = None;
885 self.character_maximum_lengths.append_option(max_chars);
886
887 let char_len: Option<u64> = match field.data_type() {
890 Utf8 | Binary => Some(i32::MAX as u64),
891 LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
892 _ => None,
893 };
894 self.character_octet_lengths.append_option(char_len);
895
896 let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
919 Int8 | UInt8 => (Some(8), Some(2), None),
920 Int16 | UInt16 => (Some(16), Some(2), None),
921 Int32 | UInt32 => (Some(32), Some(2), None),
922 Float16 => (Some(15), Some(2), None),
925 Float32 => (Some(24), Some(2), None),
927 Float64 => (Some(24), Some(2), None),
929 Decimal128(precision, scale) => {
930 (Some(*precision as u64), Some(10), Some(*scale as u64))
931 }
932 _ => (None, None, None),
933 };
934
935 self.numeric_precisions.append_option(numeric_precision);
936 self.numeric_precision_radixes.append_option(numeric_radix);
937 self.numeric_scales.append_option(numeric_scale);
938
939 self.datetime_precisions.append_option(None);
940 self.interval_types.append_null();
941 }
942
943 fn finish(&mut self) -> RecordBatch {
944 RecordBatch::try_new(
945 Arc::clone(&self.schema),
946 vec![
947 Arc::new(self.catalog_names.finish()),
948 Arc::new(self.schema_names.finish()),
949 Arc::new(self.table_names.finish()),
950 Arc::new(self.column_names.finish()),
951 Arc::new(self.ordinal_positions.finish()),
952 Arc::new(self.column_defaults.finish()),
953 Arc::new(self.is_nullables.finish()),
954 Arc::new(self.data_types.finish()),
955 Arc::new(self.character_maximum_lengths.finish()),
956 Arc::new(self.character_octet_lengths.finish()),
957 Arc::new(self.numeric_precisions.finish()),
958 Arc::new(self.numeric_precision_radixes.finish()),
959 Arc::new(self.numeric_scales.finish()),
960 Arc::new(self.datetime_precisions.finish()),
961 Arc::new(self.interval_types.finish()),
962 ],
963 )
964 .unwrap()
965 }
966}
967
968#[derive(Debug)]
969struct InformationSchemata {
970 schema: SchemaRef,
971 config: InformationSchemaConfig,
972}
973
974impl InformationSchemata {
975 fn new(config: InformationSchemaConfig) -> Self {
976 let schema = Arc::new(Schema::new(vec![
977 Field::new("catalog_name", DataType::Utf8, false),
978 Field::new("schema_name", DataType::Utf8, false),
979 Field::new("schema_owner", DataType::Utf8, true),
980 Field::new("default_character_set_catalog", DataType::Utf8, true),
981 Field::new("default_character_set_schema", DataType::Utf8, true),
982 Field::new("default_character_set_name", DataType::Utf8, true),
983 Field::new("sql_path", DataType::Utf8, true),
984 ]));
985 Self { schema, config }
986 }
987
988 fn builder(&self) -> InformationSchemataBuilder {
989 InformationSchemataBuilder {
990 schema: Arc::clone(&self.schema),
991 catalog_name: StringBuilder::new(),
992 schema_name: StringBuilder::new(),
993 schema_owner: StringBuilder::new(),
994 default_character_set_catalog: StringBuilder::new(),
995 default_character_set_schema: StringBuilder::new(),
996 default_character_set_name: StringBuilder::new(),
997 sql_path: StringBuilder::new(),
998 }
999 }
1000}
1001
1002struct InformationSchemataBuilder {
1003 schema: SchemaRef,
1004 catalog_name: StringBuilder,
1005 schema_name: StringBuilder,
1006 schema_owner: StringBuilder,
1007 default_character_set_catalog: StringBuilder,
1008 default_character_set_schema: StringBuilder,
1009 default_character_set_name: StringBuilder,
1010 sql_path: StringBuilder,
1011}
1012
1013impl InformationSchemataBuilder {
1014 fn add_schemata(
1015 &mut self,
1016 catalog_name: &str,
1017 schema_name: &str,
1018 schema_owner: Option<&str>,
1019 ) {
1020 self.catalog_name.append_value(catalog_name);
1021 self.schema_name.append_value(schema_name);
1022 match schema_owner {
1023 Some(owner) => self.schema_owner.append_value(owner),
1024 None => self.schema_owner.append_null(),
1025 }
1026 self.default_character_set_catalog.append_null();
1029 self.default_character_set_schema.append_null();
1030 self.default_character_set_name.append_null();
1031 self.sql_path.append_null();
1032 }
1033
1034 fn finish(&mut self) -> RecordBatch {
1035 RecordBatch::try_new(
1036 Arc::clone(&self.schema),
1037 vec![
1038 Arc::new(self.catalog_name.finish()),
1039 Arc::new(self.schema_name.finish()),
1040 Arc::new(self.schema_owner.finish()),
1041 Arc::new(self.default_character_set_catalog.finish()),
1042 Arc::new(self.default_character_set_schema.finish()),
1043 Arc::new(self.default_character_set_name.finish()),
1044 Arc::new(self.sql_path.finish()),
1045 ],
1046 )
1047 .unwrap()
1048 }
1049}
1050
1051impl PartitionStream for InformationSchemata {
1052 fn schema(&self) -> &SchemaRef {
1053 &self.schema
1054 }
1055
1056 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1057 let mut builder = self.builder();
1058 let config = self.config.clone();
1059 Box::pin(RecordBatchStreamAdapter::new(
1060 Arc::clone(&self.schema),
1061 futures::stream::once(async move {
1063 config.make_schemata(&mut builder).await;
1064 Ok(builder.finish())
1065 }),
1066 ))
1067 }
1068}
1069
1070#[derive(Debug)]
1071struct InformationSchemaDfSettings {
1072 schema: SchemaRef,
1073 config: InformationSchemaConfig,
1074}
1075
1076impl InformationSchemaDfSettings {
1077 fn new(config: InformationSchemaConfig) -> Self {
1078 let schema = Arc::new(Schema::new(vec![
1079 Field::new("name", DataType::Utf8, false),
1080 Field::new("value", DataType::Utf8, true),
1081 Field::new("description", DataType::Utf8, true),
1082 ]));
1083
1084 Self { schema, config }
1085 }
1086
1087 fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1088 InformationSchemaDfSettingsBuilder {
1089 names: StringBuilder::new(),
1090 values: StringBuilder::new(),
1091 descriptions: StringBuilder::new(),
1092 schema: Arc::clone(&self.schema),
1093 }
1094 }
1095}
1096
1097impl PartitionStream for InformationSchemaDfSettings {
1098 fn schema(&self) -> &SchemaRef {
1099 &self.schema
1100 }
1101
1102 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1103 let config = self.config.clone();
1104 let mut builder = self.builder();
1105 Box::pin(RecordBatchStreamAdapter::new(
1106 Arc::clone(&self.schema),
1107 futures::stream::once(async move {
1109 let runtime_env = ctx.runtime_env();
1111 config.make_df_settings(
1112 ctx.session_config().options(),
1113 &runtime_env,
1114 &mut builder,
1115 );
1116 Ok(builder.finish())
1117 }),
1118 ))
1119 }
1120}
1121
1122struct InformationSchemaDfSettingsBuilder {
1123 schema: SchemaRef,
1124 names: StringBuilder,
1125 values: StringBuilder,
1126 descriptions: StringBuilder,
1127}
1128
1129impl InformationSchemaDfSettingsBuilder {
1130 fn add_setting(&mut self, entry: ConfigEntry) {
1131 self.names.append_value(entry.key);
1132 self.values.append_option(entry.value);
1133 self.descriptions.append_value(entry.description);
1134 }
1135
1136 fn finish(&mut self) -> RecordBatch {
1137 RecordBatch::try_new(
1138 Arc::clone(&self.schema),
1139 vec![
1140 Arc::new(self.names.finish()),
1141 Arc::new(self.values.finish()),
1142 Arc::new(self.descriptions.finish()),
1143 ],
1144 )
1145 .unwrap()
1146 }
1147}
1148
1149#[derive(Debug)]
1150struct InformationSchemaRoutines {
1151 schema: SchemaRef,
1152 config: InformationSchemaConfig,
1153}
1154
1155impl InformationSchemaRoutines {
1156 fn new(config: InformationSchemaConfig) -> Self {
1157 let schema = Arc::new(Schema::new(vec![
1158 Field::new("specific_catalog", DataType::Utf8, false),
1159 Field::new("specific_schema", DataType::Utf8, false),
1160 Field::new("specific_name", DataType::Utf8, false),
1161 Field::new("routine_catalog", DataType::Utf8, false),
1162 Field::new("routine_schema", DataType::Utf8, false),
1163 Field::new("routine_name", DataType::Utf8, false),
1164 Field::new("routine_type", DataType::Utf8, false),
1165 Field::new("is_deterministic", DataType::Boolean, true),
1166 Field::new("data_type", DataType::Utf8, true),
1167 Field::new("function_type", DataType::Utf8, true),
1168 Field::new("description", DataType::Utf8, true),
1169 Field::new("syntax_example", DataType::Utf8, true),
1170 ]));
1171
1172 Self { schema, config }
1173 }
1174
1175 fn builder(&self) -> InformationSchemaRoutinesBuilder {
1176 InformationSchemaRoutinesBuilder {
1177 schema: Arc::clone(&self.schema),
1178 specific_catalog: StringBuilder::new(),
1179 specific_schema: StringBuilder::new(),
1180 specific_name: StringBuilder::new(),
1181 routine_catalog: StringBuilder::new(),
1182 routine_schema: StringBuilder::new(),
1183 routine_name: StringBuilder::new(),
1184 routine_type: StringBuilder::new(),
1185 is_deterministic: BooleanBuilder::new(),
1186 data_type: StringBuilder::new(),
1187 function_type: StringBuilder::new(),
1188 description: StringBuilder::new(),
1189 syntax_example: StringBuilder::new(),
1190 }
1191 }
1192}
1193
1194struct InformationSchemaRoutinesBuilder {
1195 schema: SchemaRef,
1196 specific_catalog: StringBuilder,
1197 specific_schema: StringBuilder,
1198 specific_name: StringBuilder,
1199 routine_catalog: StringBuilder,
1200 routine_schema: StringBuilder,
1201 routine_name: StringBuilder,
1202 routine_type: StringBuilder,
1203 is_deterministic: BooleanBuilder,
1204 data_type: StringBuilder,
1205 function_type: StringBuilder,
1206 description: StringBuilder,
1207 syntax_example: StringBuilder,
1208}
1209
1210impl InformationSchemaRoutinesBuilder {
1211 #[expect(clippy::too_many_arguments)]
1212 fn add_routine(
1213 &mut self,
1214 catalog_name: impl AsRef<str>,
1215 schema_name: impl AsRef<str>,
1216 routine_name: impl AsRef<str>,
1217 routine_type: impl AsRef<str>,
1218 is_deterministic: bool,
1219 data_type: Option<&impl AsRef<str>>,
1220 function_type: impl AsRef<str>,
1221 description: Option<impl AsRef<str>>,
1222 syntax_example: Option<impl AsRef<str>>,
1223 ) {
1224 self.specific_catalog.append_value(catalog_name.as_ref());
1225 self.specific_schema.append_value(schema_name.as_ref());
1226 self.specific_name.append_value(routine_name.as_ref());
1227 self.routine_catalog.append_value(catalog_name.as_ref());
1228 self.routine_schema.append_value(schema_name.as_ref());
1229 self.routine_name.append_value(routine_name.as_ref());
1230 self.routine_type.append_value(routine_type.as_ref());
1231 self.is_deterministic.append_value(is_deterministic);
1232 self.data_type.append_option(data_type.as_ref());
1233 self.function_type.append_value(function_type.as_ref());
1234 self.description.append_option(description);
1235 self.syntax_example.append_option(syntax_example);
1236 }
1237
1238 fn finish(&mut self) -> RecordBatch {
1239 RecordBatch::try_new(
1240 Arc::clone(&self.schema),
1241 vec![
1242 Arc::new(self.specific_catalog.finish()),
1243 Arc::new(self.specific_schema.finish()),
1244 Arc::new(self.specific_name.finish()),
1245 Arc::new(self.routine_catalog.finish()),
1246 Arc::new(self.routine_schema.finish()),
1247 Arc::new(self.routine_name.finish()),
1248 Arc::new(self.routine_type.finish()),
1249 Arc::new(self.is_deterministic.finish()),
1250 Arc::new(self.data_type.finish()),
1251 Arc::new(self.function_type.finish()),
1252 Arc::new(self.description.finish()),
1253 Arc::new(self.syntax_example.finish()),
1254 ],
1255 )
1256 .unwrap()
1257 }
1258}
1259
1260impl PartitionStream for InformationSchemaRoutines {
1261 fn schema(&self) -> &SchemaRef {
1262 &self.schema
1263 }
1264
1265 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1266 let config = self.config.clone();
1267 let mut builder = self.builder();
1268 Box::pin(RecordBatchStreamAdapter::new(
1269 Arc::clone(&self.schema),
1270 futures::stream::once(async move {
1271 config.make_routines(
1272 ctx.scalar_functions(),
1273 ctx.aggregate_functions(),
1274 ctx.window_functions(),
1275 ctx.session_config().options(),
1276 &mut builder,
1277 )?;
1278 Ok(builder.finish())
1279 }),
1280 ))
1281 }
1282}
1283
1284#[derive(Debug)]
1285struct InformationSchemaParameters {
1286 schema: SchemaRef,
1287 config: InformationSchemaConfig,
1288}
1289
1290impl InformationSchemaParameters {
1291 fn new(config: InformationSchemaConfig) -> Self {
1292 let schema = Arc::new(Schema::new(vec![
1293 Field::new("specific_catalog", DataType::Utf8, false),
1294 Field::new("specific_schema", DataType::Utf8, false),
1295 Field::new("specific_name", DataType::Utf8, false),
1296 Field::new("ordinal_position", DataType::UInt64, false),
1297 Field::new("parameter_mode", DataType::Utf8, false),
1298 Field::new("parameter_name", DataType::Utf8, true),
1299 Field::new("data_type", DataType::Utf8, false),
1300 Field::new("parameter_default", DataType::Utf8, true),
1301 Field::new("is_variadic", DataType::Boolean, false),
1302 Field::new("rid", DataType::UInt8, false),
1308 ]));
1309
1310 Self { schema, config }
1311 }
1312
1313 fn builder(&self) -> InformationSchemaParametersBuilder {
1314 InformationSchemaParametersBuilder {
1315 schema: Arc::clone(&self.schema),
1316 specific_catalog: StringBuilder::new(),
1317 specific_schema: StringBuilder::new(),
1318 specific_name: StringBuilder::new(),
1319 ordinal_position: UInt64Builder::new(),
1320 parameter_mode: StringBuilder::new(),
1321 parameter_name: StringBuilder::new(),
1322 data_type: StringBuilder::new(),
1323 parameter_default: StringBuilder::new(),
1324 is_variadic: BooleanBuilder::new(),
1325 rid: UInt8Builder::new(),
1326 }
1327 }
1328}
1329
1330struct InformationSchemaParametersBuilder {
1331 schema: SchemaRef,
1332 specific_catalog: StringBuilder,
1333 specific_schema: StringBuilder,
1334 specific_name: StringBuilder,
1335 ordinal_position: UInt64Builder,
1336 parameter_mode: StringBuilder,
1337 parameter_name: StringBuilder,
1338 data_type: StringBuilder,
1339 parameter_default: StringBuilder,
1340 is_variadic: BooleanBuilder,
1341 rid: UInt8Builder,
1342}
1343
1344impl InformationSchemaParametersBuilder {
1345 #[expect(clippy::too_many_arguments)]
1346 fn add_parameter(
1347 &mut self,
1348 specific_catalog: impl AsRef<str>,
1349 specific_schema: impl AsRef<str>,
1350 specific_name: impl AsRef<str>,
1351 ordinal_position: u64,
1352 parameter_mode: impl AsRef<str>,
1353 parameter_name: Option<&(impl AsRef<str> + ?Sized)>,
1354 data_type: impl AsRef<str>,
1355 parameter_default: Option<impl AsRef<str>>,
1356 is_variadic: bool,
1357 rid: u8,
1358 ) {
1359 self.specific_catalog
1360 .append_value(specific_catalog.as_ref());
1361 self.specific_schema.append_value(specific_schema.as_ref());
1362 self.specific_name.append_value(specific_name.as_ref());
1363 self.ordinal_position.append_value(ordinal_position);
1364 self.parameter_mode.append_value(parameter_mode.as_ref());
1365 self.parameter_name.append_option(parameter_name.as_ref());
1366 self.data_type.append_value(data_type.as_ref());
1367 self.parameter_default.append_option(parameter_default);
1368 self.is_variadic.append_value(is_variadic);
1369 self.rid.append_value(rid);
1370 }
1371
1372 fn finish(&mut self) -> RecordBatch {
1373 RecordBatch::try_new(
1374 Arc::clone(&self.schema),
1375 vec![
1376 Arc::new(self.specific_catalog.finish()),
1377 Arc::new(self.specific_schema.finish()),
1378 Arc::new(self.specific_name.finish()),
1379 Arc::new(self.ordinal_position.finish()),
1380 Arc::new(self.parameter_mode.finish()),
1381 Arc::new(self.parameter_name.finish()),
1382 Arc::new(self.data_type.finish()),
1383 Arc::new(self.parameter_default.finish()),
1384 Arc::new(self.is_variadic.finish()),
1385 Arc::new(self.rid.finish()),
1386 ],
1387 )
1388 .unwrap()
1389 }
1390}
1391
1392impl PartitionStream for InformationSchemaParameters {
1393 fn schema(&self) -> &SchemaRef {
1394 &self.schema
1395 }
1396
1397 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1398 let config = self.config.clone();
1399 let mut builder = self.builder();
1400 Box::pin(RecordBatchStreamAdapter::new(
1401 Arc::clone(&self.schema),
1402 futures::stream::once(async move {
1403 config.make_parameters(
1404 ctx.scalar_functions(),
1405 ctx.aggregate_functions(),
1406 ctx.window_functions(),
1407 ctx.session_config().options(),
1408 &mut builder,
1409 )?;
1410 Ok(builder.finish())
1411 }),
1412 ))
1413 }
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418 use super::*;
1419 use crate::CatalogProvider;
1420
1421 #[tokio::test]
1422 async fn make_tables_uses_table_type() {
1423 let config = InformationSchemaConfig {
1424 catalog_list: Arc::new(Fixture),
1425 };
1426 let mut builder = InformationSchemaTablesBuilder {
1427 catalog_names: StringBuilder::new(),
1428 schema_names: StringBuilder::new(),
1429 table_names: StringBuilder::new(),
1430 table_types: StringBuilder::new(),
1431 schema: Arc::new(Schema::empty()),
1432 };
1433
1434 assert!(config.make_tables(&mut builder).await.is_ok());
1435
1436 assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1437 }
1438
1439 #[derive(Debug)]
1440 struct Fixture;
1441
1442 #[async_trait]
1443 impl SchemaProvider for Fixture {
1444 async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1446 Ok(Some(TableType::Base))
1447 }
1448
1449 async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1452 panic!(
1453 "InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type"
1454 )
1455 }
1456
1457 fn as_any(&self) -> &dyn Any {
1458 unimplemented!("not required for these tests")
1459 }
1460
1461 fn table_names(&self) -> Vec<String> {
1462 vec!["atable".to_string()]
1463 }
1464
1465 fn table_exist(&self, _: &str) -> bool {
1466 unimplemented!("not required for these tests")
1467 }
1468 }
1469
1470 impl CatalogProviderList for Fixture {
1471 fn as_any(&self) -> &dyn Any {
1472 unimplemented!("not required for these tests")
1473 }
1474
1475 fn register_catalog(
1476 &self,
1477 _: String,
1478 _: Arc<dyn CatalogProvider>,
1479 ) -> Option<Arc<dyn CatalogProvider>> {
1480 unimplemented!("not required for these tests")
1481 }
1482
1483 fn catalog_names(&self) -> Vec<String> {
1484 vec!["acatalog".to_string()]
1485 }
1486
1487 fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1488 Some(Arc::new(Self))
1489 }
1490 }
1491
1492 impl CatalogProvider for Fixture {
1493 fn as_any(&self) -> &dyn Any {
1494 unimplemented!("not required for these tests")
1495 }
1496
1497 fn schema_names(&self) -> Vec<String> {
1498 vec!["aschema".to_string()]
1499 }
1500
1501 fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1502 Some(Arc::new(Self))
1503 }
1504 }
1505}