1use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26 array::{StringBuilder, UInt64Builder},
27 datatypes::{DataType, Field, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::config::{ConfigEntry, ConfigOptions};
32use datafusion_common::error::Result;
33use datafusion_common::DataFusionError;
34use datafusion_execution::TaskContext;
35use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
36use datafusion_expr::{TableType, Volatility};
37use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
38use datafusion_physical_plan::streaming::PartitionStream;
39use datafusion_physical_plan::SendableRecordBatchStream;
40use std::collections::{HashMap, HashSet};
41use std::fmt::Debug;
42use std::{any::Any, sync::Arc};
43
44pub const INFORMATION_SCHEMA: &str = "information_schema";
45pub(crate) const TABLES: &str = "tables";
46pub(crate) const VIEWS: &str = "views";
47pub(crate) const COLUMNS: &str = "columns";
48pub(crate) const DF_SETTINGS: &str = "df_settings";
49pub(crate) const SCHEMATA: &str = "schemata";
50pub(crate) const ROUTINES: &str = "routines";
51pub(crate) const PARAMETERS: &str = "parameters";
52
53pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
55 TABLES,
56 VIEWS,
57 COLUMNS,
58 DF_SETTINGS,
59 SCHEMATA,
60 ROUTINES,
61 PARAMETERS,
62];
63
64#[derive(Debug)]
71pub struct InformationSchemaProvider {
72 config: InformationSchemaConfig,
73}
74
75impl InformationSchemaProvider {
76 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
78 Self {
79 config: InformationSchemaConfig { catalog_list },
80 }
81 }
82}
83
84#[derive(Clone, Debug)]
85struct InformationSchemaConfig {
86 catalog_list: Arc<dyn CatalogProviderList>,
87}
88
89impl InformationSchemaConfig {
90 async fn make_tables(
92 &self,
93 builder: &mut InformationSchemaTablesBuilder,
94 ) -> Result<(), DataFusionError> {
95 for catalog_name in self.catalog_list.catalog_names() {
98 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
99
100 for schema_name in catalog.schema_names() {
101 if schema_name != INFORMATION_SCHEMA {
102 if let Some(schema) = catalog.schema(&schema_name) {
104 for table_name in schema.table_names() {
105 if let Some(table) = schema.table(&table_name).await? {
106 builder.add_table(
107 &catalog_name,
108 &schema_name,
109 &table_name,
110 table.table_type(),
111 );
112 }
113 }
114 }
115 }
116 }
117
118 for table_name in INFORMATION_SCHEMA_TABLES {
120 builder.add_table(
121 &catalog_name,
122 INFORMATION_SCHEMA,
123 table_name,
124 TableType::View,
125 );
126 }
127 }
128
129 Ok(())
130 }
131
132 async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
133 for catalog_name in self.catalog_list.catalog_names() {
134 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
135
136 for schema_name in catalog.schema_names() {
137 if schema_name != INFORMATION_SCHEMA {
138 if let Some(schema) = catalog.schema(&schema_name) {
139 let schema_owner = schema.owner_name();
140 builder.add_schemata(&catalog_name, &schema_name, schema_owner);
141 }
142 }
143 }
144 }
145 }
146
147 async fn make_views(
148 &self,
149 builder: &mut InformationSchemaViewBuilder,
150 ) -> Result<(), DataFusionError> {
151 for catalog_name in self.catalog_list.catalog_names() {
152 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
153
154 for schema_name in catalog.schema_names() {
155 if schema_name != INFORMATION_SCHEMA {
156 if let Some(schema) = catalog.schema(&schema_name) {
158 for table_name in schema.table_names() {
159 if let Some(table) = schema.table(&table_name).await? {
160 builder.add_view(
161 &catalog_name,
162 &schema_name,
163 &table_name,
164 table.get_table_definition(),
165 )
166 }
167 }
168 }
169 }
170 }
171 }
172
173 Ok(())
174 }
175
176 async fn make_columns(
178 &self,
179 builder: &mut InformationSchemaColumnsBuilder,
180 ) -> Result<(), DataFusionError> {
181 for catalog_name in self.catalog_list.catalog_names() {
182 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
183
184 for schema_name in catalog.schema_names() {
185 if schema_name != INFORMATION_SCHEMA {
186 if let Some(schema) = catalog.schema(&schema_name) {
188 for table_name in schema.table_names() {
189 if let Some(table) = schema.table(&table_name).await? {
190 for (field_position, field) in
191 table.schema().fields().iter().enumerate()
192 {
193 builder.add_column(
194 &catalog_name,
195 &schema_name,
196 &table_name,
197 field_position,
198 field,
199 )
200 }
201 }
202 }
203 }
204 }
205 }
206 }
207
208 Ok(())
209 }
210
211 fn make_df_settings(
213 &self,
214 config_options: &ConfigOptions,
215 builder: &mut InformationSchemaDfSettingsBuilder,
216 ) {
217 for entry in config_options.entries() {
218 builder.add_setting(entry);
219 }
220 }
221
222 fn make_routines(
223 &self,
224 udfs: &HashMap<String, Arc<ScalarUDF>>,
225 udafs: &HashMap<String, Arc<AggregateUDF>>,
226 udwfs: &HashMap<String, Arc<WindowUDF>>,
227 config_options: &ConfigOptions,
228 builder: &mut InformationSchemaRoutinesBuilder,
229 ) -> Result<()> {
230 let catalog_name = &config_options.catalog.default_catalog;
231 let schema_name = &config_options.catalog.default_schema;
232
233 for (name, udf) in udfs {
234 let return_types = get_udf_args_and_return_types(udf)?
235 .into_iter()
236 .map(|(_, return_type)| return_type)
237 .collect::<HashSet<_>>();
238 for return_type in return_types {
239 builder.add_routine(
240 catalog_name,
241 schema_name,
242 name,
243 "FUNCTION",
244 Self::is_deterministic(udf.signature()),
245 return_type,
246 "SCALAR",
247 udf.documentation().map(|d| d.description.to_string()),
248 udf.documentation().map(|d| d.syntax_example.to_string()),
249 )
250 }
251 }
252
253 for (name, udaf) in udafs {
254 let return_types = get_udaf_args_and_return_types(udaf)?
255 .into_iter()
256 .map(|(_, return_type)| return_type)
257 .collect::<HashSet<_>>();
258 for return_type in return_types {
259 builder.add_routine(
260 catalog_name,
261 schema_name,
262 name,
263 "FUNCTION",
264 Self::is_deterministic(udaf.signature()),
265 return_type,
266 "AGGREGATE",
267 udaf.documentation().map(|d| d.description.to_string()),
268 udaf.documentation().map(|d| d.syntax_example.to_string()),
269 )
270 }
271 }
272
273 for (name, udwf) in udwfs {
274 let return_types = get_udwf_args_and_return_types(udwf)?
275 .into_iter()
276 .map(|(_, return_type)| return_type)
277 .collect::<HashSet<_>>();
278 for return_type in return_types {
279 builder.add_routine(
280 catalog_name,
281 schema_name,
282 name,
283 "FUNCTION",
284 Self::is_deterministic(udwf.signature()),
285 return_type,
286 "WINDOW",
287 udwf.documentation().map(|d| d.description.to_string()),
288 udwf.documentation().map(|d| d.syntax_example.to_string()),
289 )
290 }
291 }
292 Ok(())
293 }
294
295 fn is_deterministic(signature: &Signature) -> bool {
296 signature.volatility == Volatility::Immutable
297 }
298 fn make_parameters(
299 &self,
300 udfs: &HashMap<String, Arc<ScalarUDF>>,
301 udafs: &HashMap<String, Arc<AggregateUDF>>,
302 udwfs: &HashMap<String, Arc<WindowUDF>>,
303 config_options: &ConfigOptions,
304 builder: &mut InformationSchemaParametersBuilder,
305 ) -> Result<()> {
306 let catalog_name = &config_options.catalog.default_catalog;
307 let schema_name = &config_options.catalog.default_schema;
308 let mut add_parameters = |func_name: &str,
309 args: Option<&Vec<(String, String)>>,
310 arg_types: Vec<String>,
311 return_type: Option<String>,
312 is_variadic: bool,
313 rid: u8| {
314 for (position, type_name) in arg_types.iter().enumerate() {
315 let param_name =
316 args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
317 builder.add_parameter(
318 catalog_name,
319 schema_name,
320 func_name,
321 position as u64 + 1,
322 "IN",
323 param_name,
324 type_name,
325 None::<&str>,
326 is_variadic,
327 rid,
328 );
329 }
330 if let Some(return_type) = return_type {
331 builder.add_parameter(
332 catalog_name,
333 schema_name,
334 func_name,
335 1,
336 "OUT",
337 None::<&str>,
338 return_type.as_str(),
339 None::<&str>,
340 false,
341 rid,
342 );
343 }
344 };
345
346 for (func_name, udf) in udfs {
347 let args = udf.documentation().and_then(|d| d.arguments.clone());
348 let combinations = get_udf_args_and_return_types(udf)?;
349 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
350 add_parameters(
351 func_name,
352 args.as_ref(),
353 arg_types,
354 return_type,
355 Self::is_variadic(udf.signature()),
356 rid as u8,
357 );
358 }
359 }
360
361 for (func_name, udaf) in udafs {
362 let args = udaf.documentation().and_then(|d| d.arguments.clone());
363 let combinations = get_udaf_args_and_return_types(udaf)?;
364 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
365 add_parameters(
366 func_name,
367 args.as_ref(),
368 arg_types,
369 return_type,
370 Self::is_variadic(udaf.signature()),
371 rid as u8,
372 );
373 }
374 }
375
376 for (func_name, udwf) in udwfs {
377 let args = udwf.documentation().and_then(|d| d.arguments.clone());
378 let combinations = get_udwf_args_and_return_types(udwf)?;
379 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
380 add_parameters(
381 func_name,
382 args.as_ref(),
383 arg_types,
384 return_type,
385 Self::is_variadic(udwf.signature()),
386 rid as u8,
387 );
388 }
389 }
390
391 Ok(())
392 }
393
394 fn is_variadic(signature: &Signature) -> bool {
395 matches!(
396 signature.type_signature,
397 TypeSignature::Variadic(_) | TypeSignature::VariadicAny
398 )
399 }
400}
401
402fn get_udf_args_and_return_types(
405 udf: &Arc<ScalarUDF>,
406) -> Result<Vec<(Vec<String>, Option<String>)>> {
407 let signature = udf.signature();
408 let arg_types = signature.type_signature.get_example_types();
409 if arg_types.is_empty() {
410 Ok(vec![(vec![], None)])
411 } else {
412 Ok(arg_types
413 .into_iter()
414 .map(|arg_types| {
415 let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string());
417 let arg_types = arg_types
418 .into_iter()
419 .map(|t| t.to_string())
420 .collect::<Vec<_>>();
421 (arg_types, return_type)
422 })
423 .collect::<Vec<_>>())
424 }
425}
426
427fn get_udaf_args_and_return_types(
428 udaf: &Arc<AggregateUDF>,
429) -> Result<Vec<(Vec<String>, Option<String>)>> {
430 let signature = udaf.signature();
431 let arg_types = signature.type_signature.get_example_types();
432 if arg_types.is_empty() {
433 Ok(vec![(vec![], None)])
434 } else {
435 Ok(arg_types
436 .into_iter()
437 .map(|arg_types| {
438 let return_type =
440 udaf.return_type(&arg_types).ok().map(|t| t.to_string());
441 let arg_types = arg_types
442 .into_iter()
443 .map(|t| t.to_string())
444 .collect::<Vec<_>>();
445 (arg_types, return_type)
446 })
447 .collect::<Vec<_>>())
448 }
449}
450
451fn get_udwf_args_and_return_types(
452 udwf: &Arc<WindowUDF>,
453) -> Result<Vec<(Vec<String>, Option<String>)>> {
454 let signature = udwf.signature();
455 let arg_types = signature.type_signature.get_example_types();
456 if arg_types.is_empty() {
457 Ok(vec![(vec![], None)])
458 } else {
459 Ok(arg_types
460 .into_iter()
461 .map(|arg_types| {
462 let arg_types = arg_types
464 .into_iter()
465 .map(|t| t.to_string())
466 .collect::<Vec<_>>();
467 (arg_types, None)
468 })
469 .collect::<Vec<_>>())
470 }
471}
472
473#[async_trait]
474impl SchemaProvider for InformationSchemaProvider {
475 fn as_any(&self) -> &dyn Any {
476 self
477 }
478
479 fn table_names(&self) -> Vec<String> {
480 INFORMATION_SCHEMA_TABLES
481 .iter()
482 .map(|t| t.to_string())
483 .collect()
484 }
485
486 async fn table(
487 &self,
488 name: &str,
489 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
490 let config = self.config.clone();
491 let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
492 TABLES => Arc::new(InformationSchemaTables::new(config)),
493 COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
494 VIEWS => Arc::new(InformationSchemaViews::new(config)),
495 DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
496 SCHEMATA => Arc::new(InformationSchemata::new(config)),
497 ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
498 PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
499 _ => return Ok(None),
500 };
501
502 Ok(Some(Arc::new(
503 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
504 )))
505 }
506
507 fn table_exist(&self, name: &str) -> bool {
508 INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
509 }
510}
511
512#[derive(Debug)]
513struct InformationSchemaTables {
514 schema: SchemaRef,
515 config: InformationSchemaConfig,
516}
517
518impl InformationSchemaTables {
519 fn new(config: InformationSchemaConfig) -> Self {
520 let schema = Arc::new(Schema::new(vec![
521 Field::new("table_catalog", DataType::Utf8, false),
522 Field::new("table_schema", DataType::Utf8, false),
523 Field::new("table_name", DataType::Utf8, false),
524 Field::new("table_type", DataType::Utf8, false),
525 ]));
526
527 Self { schema, config }
528 }
529
530 fn builder(&self) -> InformationSchemaTablesBuilder {
531 InformationSchemaTablesBuilder {
532 catalog_names: StringBuilder::new(),
533 schema_names: StringBuilder::new(),
534 table_names: StringBuilder::new(),
535 table_types: StringBuilder::new(),
536 schema: Arc::clone(&self.schema),
537 }
538 }
539}
540
541impl PartitionStream for InformationSchemaTables {
542 fn schema(&self) -> &SchemaRef {
543 &self.schema
544 }
545
546 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
547 let mut builder = self.builder();
548 let config = self.config.clone();
549 Box::pin(RecordBatchStreamAdapter::new(
550 Arc::clone(&self.schema),
551 futures::stream::once(async move {
553 config.make_tables(&mut builder).await?;
554 Ok(builder.finish())
555 }),
556 ))
557 }
558}
559
560struct InformationSchemaTablesBuilder {
564 schema: SchemaRef,
565 catalog_names: StringBuilder,
566 schema_names: StringBuilder,
567 table_names: StringBuilder,
568 table_types: StringBuilder,
569}
570
571impl InformationSchemaTablesBuilder {
572 fn add_table(
573 &mut self,
574 catalog_name: impl AsRef<str>,
575 schema_name: impl AsRef<str>,
576 table_name: impl AsRef<str>,
577 table_type: TableType,
578 ) {
579 self.catalog_names.append_value(catalog_name.as_ref());
581 self.schema_names.append_value(schema_name.as_ref());
582 self.table_names.append_value(table_name.as_ref());
583 self.table_types.append_value(match table_type {
584 TableType::Base => "BASE TABLE",
585 TableType::View => "VIEW",
586 TableType::Temporary => "LOCAL TEMPORARY",
587 });
588 }
589
590 fn finish(&mut self) -> RecordBatch {
591 RecordBatch::try_new(
592 Arc::clone(&self.schema),
593 vec![
594 Arc::new(self.catalog_names.finish()),
595 Arc::new(self.schema_names.finish()),
596 Arc::new(self.table_names.finish()),
597 Arc::new(self.table_types.finish()),
598 ],
599 )
600 .unwrap()
601 }
602}
603
604#[derive(Debug)]
605struct InformationSchemaViews {
606 schema: SchemaRef,
607 config: InformationSchemaConfig,
608}
609
610impl InformationSchemaViews {
611 fn new(config: InformationSchemaConfig) -> Self {
612 let schema = Arc::new(Schema::new(vec![
613 Field::new("table_catalog", DataType::Utf8, false),
614 Field::new("table_schema", DataType::Utf8, false),
615 Field::new("table_name", DataType::Utf8, false),
616 Field::new("definition", DataType::Utf8, true),
617 ]));
618
619 Self { schema, config }
620 }
621
622 fn builder(&self) -> InformationSchemaViewBuilder {
623 InformationSchemaViewBuilder {
624 catalog_names: StringBuilder::new(),
625 schema_names: StringBuilder::new(),
626 table_names: StringBuilder::new(),
627 definitions: StringBuilder::new(),
628 schema: Arc::clone(&self.schema),
629 }
630 }
631}
632
633impl PartitionStream for InformationSchemaViews {
634 fn schema(&self) -> &SchemaRef {
635 &self.schema
636 }
637
638 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
639 let mut builder = self.builder();
640 let config = self.config.clone();
641 Box::pin(RecordBatchStreamAdapter::new(
642 Arc::clone(&self.schema),
643 futures::stream::once(async move {
645 config.make_views(&mut builder).await?;
646 Ok(builder.finish())
647 }),
648 ))
649 }
650}
651
652struct InformationSchemaViewBuilder {
656 schema: SchemaRef,
657 catalog_names: StringBuilder,
658 schema_names: StringBuilder,
659 table_names: StringBuilder,
660 definitions: StringBuilder,
661}
662
663impl InformationSchemaViewBuilder {
664 fn add_view(
665 &mut self,
666 catalog_name: impl AsRef<str>,
667 schema_name: impl AsRef<str>,
668 table_name: impl AsRef<str>,
669 definition: Option<impl AsRef<str>>,
670 ) {
671 self.catalog_names.append_value(catalog_name.as_ref());
673 self.schema_names.append_value(schema_name.as_ref());
674 self.table_names.append_value(table_name.as_ref());
675 self.definitions.append_option(definition.as_ref());
676 }
677
678 fn finish(&mut self) -> RecordBatch {
679 RecordBatch::try_new(
680 Arc::clone(&self.schema),
681 vec![
682 Arc::new(self.catalog_names.finish()),
683 Arc::new(self.schema_names.finish()),
684 Arc::new(self.table_names.finish()),
685 Arc::new(self.definitions.finish()),
686 ],
687 )
688 .unwrap()
689 }
690}
691
692#[derive(Debug)]
693struct InformationSchemaColumns {
694 schema: SchemaRef,
695 config: InformationSchemaConfig,
696}
697
698impl InformationSchemaColumns {
699 fn new(config: InformationSchemaConfig) -> Self {
700 let schema = Arc::new(Schema::new(vec![
701 Field::new("table_catalog", DataType::Utf8, false),
702 Field::new("table_schema", DataType::Utf8, false),
703 Field::new("table_name", DataType::Utf8, false),
704 Field::new("column_name", DataType::Utf8, false),
705 Field::new("ordinal_position", DataType::UInt64, false),
706 Field::new("column_default", DataType::Utf8, true),
707 Field::new("is_nullable", DataType::Utf8, false),
708 Field::new("data_type", DataType::Utf8, false),
709 Field::new("character_maximum_length", DataType::UInt64, true),
710 Field::new("character_octet_length", DataType::UInt64, true),
711 Field::new("numeric_precision", DataType::UInt64, true),
712 Field::new("numeric_precision_radix", DataType::UInt64, true),
713 Field::new("numeric_scale", DataType::UInt64, true),
714 Field::new("datetime_precision", DataType::UInt64, true),
715 Field::new("interval_type", DataType::Utf8, true),
716 ]));
717
718 Self { schema, config }
719 }
720
721 fn builder(&self) -> InformationSchemaColumnsBuilder {
722 let default_capacity = 10;
726
727 InformationSchemaColumnsBuilder {
728 catalog_names: StringBuilder::new(),
729 schema_names: StringBuilder::new(),
730 table_names: StringBuilder::new(),
731 column_names: StringBuilder::new(),
732 ordinal_positions: UInt64Builder::with_capacity(default_capacity),
733 column_defaults: StringBuilder::new(),
734 is_nullables: StringBuilder::new(),
735 data_types: StringBuilder::new(),
736 character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
737 character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
738 numeric_precisions: UInt64Builder::with_capacity(default_capacity),
739 numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
740 numeric_scales: UInt64Builder::with_capacity(default_capacity),
741 datetime_precisions: UInt64Builder::with_capacity(default_capacity),
742 interval_types: StringBuilder::new(),
743 schema: Arc::clone(&self.schema),
744 }
745 }
746}
747
748impl PartitionStream for InformationSchemaColumns {
749 fn schema(&self) -> &SchemaRef {
750 &self.schema
751 }
752
753 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
754 let mut builder = self.builder();
755 let config = self.config.clone();
756 Box::pin(RecordBatchStreamAdapter::new(
757 Arc::clone(&self.schema),
758 futures::stream::once(async move {
760 config.make_columns(&mut builder).await?;
761 Ok(builder.finish())
762 }),
763 ))
764 }
765}
766
767struct InformationSchemaColumnsBuilder {
771 schema: SchemaRef,
772 catalog_names: StringBuilder,
773 schema_names: StringBuilder,
774 table_names: StringBuilder,
775 column_names: StringBuilder,
776 ordinal_positions: UInt64Builder,
777 column_defaults: StringBuilder,
778 is_nullables: StringBuilder,
779 data_types: StringBuilder,
780 character_maximum_lengths: UInt64Builder,
781 character_octet_lengths: UInt64Builder,
782 numeric_precisions: UInt64Builder,
783 numeric_precision_radixes: UInt64Builder,
784 numeric_scales: UInt64Builder,
785 datetime_precisions: UInt64Builder,
786 interval_types: StringBuilder,
787}
788
789impl InformationSchemaColumnsBuilder {
790 fn add_column(
791 &mut self,
792 catalog_name: &str,
793 schema_name: &str,
794 table_name: &str,
795 field_position: usize,
796 field: &Field,
797 ) {
798 use DataType::*;
799
800 self.catalog_names.append_value(catalog_name);
802 self.schema_names.append_value(schema_name);
803 self.table_names.append_value(table_name);
804
805 self.column_names.append_value(field.name());
806
807 self.ordinal_positions.append_value(field_position as u64);
808
809 self.column_defaults.append_null();
811
812 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
814 self.is_nullables.append_value(nullable_str);
815
816 self.data_types
818 .append_value(format!("{:?}", field.data_type()));
819
820 let max_chars = None;
826 self.character_maximum_lengths.append_option(max_chars);
827
828 let char_len: Option<u64> = match field.data_type() {
831 Utf8 | Binary => Some(i32::MAX as u64),
832 LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
833 _ => None,
834 };
835 self.character_octet_lengths.append_option(char_len);
836
837 let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
860 Int8 | UInt8 => (Some(8), Some(2), None),
861 Int16 | UInt16 => (Some(16), Some(2), None),
862 Int32 | UInt32 => (Some(32), Some(2), None),
863 Float16 => (Some(15), Some(2), None),
866 Float32 => (Some(24), Some(2), None),
868 Float64 => (Some(24), Some(2), None),
870 Decimal128(precision, scale) => {
871 (Some(*precision as u64), Some(10), Some(*scale as u64))
872 }
873 _ => (None, None, None),
874 };
875
876 self.numeric_precisions.append_option(numeric_precision);
877 self.numeric_precision_radixes.append_option(numeric_radix);
878 self.numeric_scales.append_option(numeric_scale);
879
880 self.datetime_precisions.append_option(None);
881 self.interval_types.append_null();
882 }
883
884 fn finish(&mut self) -> RecordBatch {
885 RecordBatch::try_new(
886 Arc::clone(&self.schema),
887 vec![
888 Arc::new(self.catalog_names.finish()),
889 Arc::new(self.schema_names.finish()),
890 Arc::new(self.table_names.finish()),
891 Arc::new(self.column_names.finish()),
892 Arc::new(self.ordinal_positions.finish()),
893 Arc::new(self.column_defaults.finish()),
894 Arc::new(self.is_nullables.finish()),
895 Arc::new(self.data_types.finish()),
896 Arc::new(self.character_maximum_lengths.finish()),
897 Arc::new(self.character_octet_lengths.finish()),
898 Arc::new(self.numeric_precisions.finish()),
899 Arc::new(self.numeric_precision_radixes.finish()),
900 Arc::new(self.numeric_scales.finish()),
901 Arc::new(self.datetime_precisions.finish()),
902 Arc::new(self.interval_types.finish()),
903 ],
904 )
905 .unwrap()
906 }
907}
908
909#[derive(Debug)]
910struct InformationSchemata {
911 schema: SchemaRef,
912 config: InformationSchemaConfig,
913}
914
915impl InformationSchemata {
916 fn new(config: InformationSchemaConfig) -> Self {
917 let schema = Arc::new(Schema::new(vec![
918 Field::new("catalog_name", DataType::Utf8, false),
919 Field::new("schema_name", DataType::Utf8, false),
920 Field::new("schema_owner", DataType::Utf8, true),
921 Field::new("default_character_set_catalog", DataType::Utf8, true),
922 Field::new("default_character_set_schema", DataType::Utf8, true),
923 Field::new("default_character_set_name", DataType::Utf8, true),
924 Field::new("sql_path", DataType::Utf8, true),
925 ]));
926 Self { schema, config }
927 }
928
929 fn builder(&self) -> InformationSchemataBuilder {
930 InformationSchemataBuilder {
931 schema: Arc::clone(&self.schema),
932 catalog_name: StringBuilder::new(),
933 schema_name: StringBuilder::new(),
934 schema_owner: StringBuilder::new(),
935 default_character_set_catalog: StringBuilder::new(),
936 default_character_set_schema: StringBuilder::new(),
937 default_character_set_name: StringBuilder::new(),
938 sql_path: StringBuilder::new(),
939 }
940 }
941}
942
943struct InformationSchemataBuilder {
944 schema: SchemaRef,
945 catalog_name: StringBuilder,
946 schema_name: StringBuilder,
947 schema_owner: StringBuilder,
948 default_character_set_catalog: StringBuilder,
949 default_character_set_schema: StringBuilder,
950 default_character_set_name: StringBuilder,
951 sql_path: StringBuilder,
952}
953
954impl InformationSchemataBuilder {
955 fn add_schemata(
956 &mut self,
957 catalog_name: &str,
958 schema_name: &str,
959 schema_owner: Option<&str>,
960 ) {
961 self.catalog_name.append_value(catalog_name);
962 self.schema_name.append_value(schema_name);
963 match schema_owner {
964 Some(owner) => self.schema_owner.append_value(owner),
965 None => self.schema_owner.append_null(),
966 }
967 self.default_character_set_catalog.append_null();
970 self.default_character_set_schema.append_null();
971 self.default_character_set_name.append_null();
972 self.sql_path.append_null();
973 }
974
975 fn finish(&mut self) -> RecordBatch {
976 RecordBatch::try_new(
977 Arc::clone(&self.schema),
978 vec![
979 Arc::new(self.catalog_name.finish()),
980 Arc::new(self.schema_name.finish()),
981 Arc::new(self.schema_owner.finish()),
982 Arc::new(self.default_character_set_catalog.finish()),
983 Arc::new(self.default_character_set_schema.finish()),
984 Arc::new(self.default_character_set_name.finish()),
985 Arc::new(self.sql_path.finish()),
986 ],
987 )
988 .unwrap()
989 }
990}
991
992impl PartitionStream for InformationSchemata {
993 fn schema(&self) -> &SchemaRef {
994 &self.schema
995 }
996
997 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
998 let mut builder = self.builder();
999 let config = self.config.clone();
1000 Box::pin(RecordBatchStreamAdapter::new(
1001 Arc::clone(&self.schema),
1002 futures::stream::once(async move {
1004 config.make_schemata(&mut builder).await;
1005 Ok(builder.finish())
1006 }),
1007 ))
1008 }
1009}
1010
1011#[derive(Debug)]
1012struct InformationSchemaDfSettings {
1013 schema: SchemaRef,
1014 config: InformationSchemaConfig,
1015}
1016
1017impl InformationSchemaDfSettings {
1018 fn new(config: InformationSchemaConfig) -> Self {
1019 let schema = Arc::new(Schema::new(vec![
1020 Field::new("name", DataType::Utf8, false),
1021 Field::new("value", DataType::Utf8, true),
1022 Field::new("description", DataType::Utf8, true),
1023 ]));
1024
1025 Self { schema, config }
1026 }
1027
1028 fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1029 InformationSchemaDfSettingsBuilder {
1030 names: StringBuilder::new(),
1031 values: StringBuilder::new(),
1032 descriptions: StringBuilder::new(),
1033 schema: Arc::clone(&self.schema),
1034 }
1035 }
1036}
1037
1038impl PartitionStream for InformationSchemaDfSettings {
1039 fn schema(&self) -> &SchemaRef {
1040 &self.schema
1041 }
1042
1043 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1044 let config = self.config.clone();
1045 let mut builder = self.builder();
1046 Box::pin(RecordBatchStreamAdapter::new(
1047 Arc::clone(&self.schema),
1048 futures::stream::once(async move {
1050 config.make_df_settings(ctx.session_config().options(), &mut builder);
1052 Ok(builder.finish())
1053 }),
1054 ))
1055 }
1056}
1057
1058struct InformationSchemaDfSettingsBuilder {
1059 schema: SchemaRef,
1060 names: StringBuilder,
1061 values: StringBuilder,
1062 descriptions: StringBuilder,
1063}
1064
1065impl InformationSchemaDfSettingsBuilder {
1066 fn add_setting(&mut self, entry: ConfigEntry) {
1067 self.names.append_value(entry.key);
1068 self.values.append_option(entry.value);
1069 self.descriptions.append_value(entry.description);
1070 }
1071
1072 fn finish(&mut self) -> RecordBatch {
1073 RecordBatch::try_new(
1074 Arc::clone(&self.schema),
1075 vec![
1076 Arc::new(self.names.finish()),
1077 Arc::new(self.values.finish()),
1078 Arc::new(self.descriptions.finish()),
1079 ],
1080 )
1081 .unwrap()
1082 }
1083}
1084
1085#[derive(Debug)]
1086struct InformationSchemaRoutines {
1087 schema: SchemaRef,
1088 config: InformationSchemaConfig,
1089}
1090
1091impl InformationSchemaRoutines {
1092 fn new(config: InformationSchemaConfig) -> Self {
1093 let schema = Arc::new(Schema::new(vec![
1094 Field::new("specific_catalog", DataType::Utf8, false),
1095 Field::new("specific_schema", DataType::Utf8, false),
1096 Field::new("specific_name", DataType::Utf8, false),
1097 Field::new("routine_catalog", DataType::Utf8, false),
1098 Field::new("routine_schema", DataType::Utf8, false),
1099 Field::new("routine_name", DataType::Utf8, false),
1100 Field::new("routine_type", DataType::Utf8, false),
1101 Field::new("is_deterministic", DataType::Boolean, true),
1102 Field::new("data_type", DataType::Utf8, true),
1103 Field::new("function_type", DataType::Utf8, true),
1104 Field::new("description", DataType::Utf8, true),
1105 Field::new("syntax_example", DataType::Utf8, true),
1106 ]));
1107
1108 Self { schema, config }
1109 }
1110
1111 fn builder(&self) -> InformationSchemaRoutinesBuilder {
1112 InformationSchemaRoutinesBuilder {
1113 schema: Arc::clone(&self.schema),
1114 specific_catalog: StringBuilder::new(),
1115 specific_schema: StringBuilder::new(),
1116 specific_name: StringBuilder::new(),
1117 routine_catalog: StringBuilder::new(),
1118 routine_schema: StringBuilder::new(),
1119 routine_name: StringBuilder::new(),
1120 routine_type: StringBuilder::new(),
1121 is_deterministic: BooleanBuilder::new(),
1122 data_type: StringBuilder::new(),
1123 function_type: StringBuilder::new(),
1124 description: StringBuilder::new(),
1125 syntax_example: StringBuilder::new(),
1126 }
1127 }
1128}
1129
1130struct InformationSchemaRoutinesBuilder {
1131 schema: SchemaRef,
1132 specific_catalog: StringBuilder,
1133 specific_schema: StringBuilder,
1134 specific_name: StringBuilder,
1135 routine_catalog: StringBuilder,
1136 routine_schema: StringBuilder,
1137 routine_name: StringBuilder,
1138 routine_type: StringBuilder,
1139 is_deterministic: BooleanBuilder,
1140 data_type: StringBuilder,
1141 function_type: StringBuilder,
1142 description: StringBuilder,
1143 syntax_example: StringBuilder,
1144}
1145
1146impl InformationSchemaRoutinesBuilder {
1147 #[allow(clippy::too_many_arguments)]
1148 fn add_routine(
1149 &mut self,
1150 catalog_name: impl AsRef<str>,
1151 schema_name: impl AsRef<str>,
1152 routine_name: impl AsRef<str>,
1153 routine_type: impl AsRef<str>,
1154 is_deterministic: bool,
1155 data_type: Option<impl AsRef<str>>,
1156 function_type: impl AsRef<str>,
1157 description: Option<impl AsRef<str>>,
1158 syntax_example: Option<impl AsRef<str>>,
1159 ) {
1160 self.specific_catalog.append_value(catalog_name.as_ref());
1161 self.specific_schema.append_value(schema_name.as_ref());
1162 self.specific_name.append_value(routine_name.as_ref());
1163 self.routine_catalog.append_value(catalog_name.as_ref());
1164 self.routine_schema.append_value(schema_name.as_ref());
1165 self.routine_name.append_value(routine_name.as_ref());
1166 self.routine_type.append_value(routine_type.as_ref());
1167 self.is_deterministic.append_value(is_deterministic);
1168 self.data_type.append_option(data_type.as_ref());
1169 self.function_type.append_value(function_type.as_ref());
1170 self.description.append_option(description);
1171 self.syntax_example.append_option(syntax_example);
1172 }
1173
1174 fn finish(&mut self) -> RecordBatch {
1175 RecordBatch::try_new(
1176 Arc::clone(&self.schema),
1177 vec![
1178 Arc::new(self.specific_catalog.finish()),
1179 Arc::new(self.specific_schema.finish()),
1180 Arc::new(self.specific_name.finish()),
1181 Arc::new(self.routine_catalog.finish()),
1182 Arc::new(self.routine_schema.finish()),
1183 Arc::new(self.routine_name.finish()),
1184 Arc::new(self.routine_type.finish()),
1185 Arc::new(self.is_deterministic.finish()),
1186 Arc::new(self.data_type.finish()),
1187 Arc::new(self.function_type.finish()),
1188 Arc::new(self.description.finish()),
1189 Arc::new(self.syntax_example.finish()),
1190 ],
1191 )
1192 .unwrap()
1193 }
1194}
1195
1196impl PartitionStream for InformationSchemaRoutines {
1197 fn schema(&self) -> &SchemaRef {
1198 &self.schema
1199 }
1200
1201 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1202 let config = self.config.clone();
1203 let mut builder = self.builder();
1204 Box::pin(RecordBatchStreamAdapter::new(
1205 Arc::clone(&self.schema),
1206 futures::stream::once(async move {
1207 config.make_routines(
1208 ctx.scalar_functions(),
1209 ctx.aggregate_functions(),
1210 ctx.window_functions(),
1211 ctx.session_config().options(),
1212 &mut builder,
1213 )?;
1214 Ok(builder.finish())
1215 }),
1216 ))
1217 }
1218}
1219
1220#[derive(Debug)]
1221struct InformationSchemaParameters {
1222 schema: SchemaRef,
1223 config: InformationSchemaConfig,
1224}
1225
1226impl InformationSchemaParameters {
1227 fn new(config: InformationSchemaConfig) -> Self {
1228 let schema = Arc::new(Schema::new(vec![
1229 Field::new("specific_catalog", DataType::Utf8, false),
1230 Field::new("specific_schema", DataType::Utf8, false),
1231 Field::new("specific_name", DataType::Utf8, false),
1232 Field::new("ordinal_position", DataType::UInt64, false),
1233 Field::new("parameter_mode", DataType::Utf8, false),
1234 Field::new("parameter_name", DataType::Utf8, true),
1235 Field::new("data_type", DataType::Utf8, false),
1236 Field::new("parameter_default", DataType::Utf8, true),
1237 Field::new("is_variadic", DataType::Boolean, false),
1238 Field::new("rid", DataType::UInt8, false),
1244 ]));
1245
1246 Self { schema, config }
1247 }
1248
1249 fn builder(&self) -> InformationSchemaParametersBuilder {
1250 InformationSchemaParametersBuilder {
1251 schema: Arc::clone(&self.schema),
1252 specific_catalog: StringBuilder::new(),
1253 specific_schema: StringBuilder::new(),
1254 specific_name: StringBuilder::new(),
1255 ordinal_position: UInt64Builder::new(),
1256 parameter_mode: StringBuilder::new(),
1257 parameter_name: StringBuilder::new(),
1258 data_type: StringBuilder::new(),
1259 parameter_default: StringBuilder::new(),
1260 is_variadic: BooleanBuilder::new(),
1261 rid: UInt8Builder::new(),
1262 }
1263 }
1264}
1265
1266struct InformationSchemaParametersBuilder {
1267 schema: SchemaRef,
1268 specific_catalog: StringBuilder,
1269 specific_schema: StringBuilder,
1270 specific_name: StringBuilder,
1271 ordinal_position: UInt64Builder,
1272 parameter_mode: StringBuilder,
1273 parameter_name: StringBuilder,
1274 data_type: StringBuilder,
1275 parameter_default: StringBuilder,
1276 is_variadic: BooleanBuilder,
1277 rid: UInt8Builder,
1278}
1279
1280impl InformationSchemaParametersBuilder {
1281 #[allow(clippy::too_many_arguments)]
1282 fn add_parameter(
1283 &mut self,
1284 specific_catalog: impl AsRef<str>,
1285 specific_schema: impl AsRef<str>,
1286 specific_name: impl AsRef<str>,
1287 ordinal_position: u64,
1288 parameter_mode: impl AsRef<str>,
1289 parameter_name: Option<impl AsRef<str>>,
1290 data_type: impl AsRef<str>,
1291 parameter_default: Option<impl AsRef<str>>,
1292 is_variadic: bool,
1293 rid: u8,
1294 ) {
1295 self.specific_catalog
1296 .append_value(specific_catalog.as_ref());
1297 self.specific_schema.append_value(specific_schema.as_ref());
1298 self.specific_name.append_value(specific_name.as_ref());
1299 self.ordinal_position.append_value(ordinal_position);
1300 self.parameter_mode.append_value(parameter_mode.as_ref());
1301 self.parameter_name.append_option(parameter_name.as_ref());
1302 self.data_type.append_value(data_type.as_ref());
1303 self.parameter_default.append_option(parameter_default);
1304 self.is_variadic.append_value(is_variadic);
1305 self.rid.append_value(rid);
1306 }
1307
1308 fn finish(&mut self) -> RecordBatch {
1309 RecordBatch::try_new(
1310 Arc::clone(&self.schema),
1311 vec![
1312 Arc::new(self.specific_catalog.finish()),
1313 Arc::new(self.specific_schema.finish()),
1314 Arc::new(self.specific_name.finish()),
1315 Arc::new(self.ordinal_position.finish()),
1316 Arc::new(self.parameter_mode.finish()),
1317 Arc::new(self.parameter_name.finish()),
1318 Arc::new(self.data_type.finish()),
1319 Arc::new(self.parameter_default.finish()),
1320 Arc::new(self.is_variadic.finish()),
1321 Arc::new(self.rid.finish()),
1322 ],
1323 )
1324 .unwrap()
1325 }
1326}
1327
1328impl PartitionStream for InformationSchemaParameters {
1329 fn schema(&self) -> &SchemaRef {
1330 &self.schema
1331 }
1332
1333 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1334 let config = self.config.clone();
1335 let mut builder = self.builder();
1336 Box::pin(RecordBatchStreamAdapter::new(
1337 Arc::clone(&self.schema),
1338 futures::stream::once(async move {
1339 config.make_parameters(
1340 ctx.scalar_functions(),
1341 ctx.aggregate_functions(),
1342 ctx.window_functions(),
1343 ctx.session_config().options(),
1344 &mut builder,
1345 )?;
1346 Ok(builder.finish())
1347 }),
1348 ))
1349 }
1350}