1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::arrow::array::{
5 as_boolean_array, ArrayRef, BooleanArray, Float64Array, Int16Array, Int32Array, RecordBatch,
6 StringArray, StringBuilder,
7};
8use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
9use datafusion::catalog::streaming::StreamingTable;
10use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
11use datafusion::common::utils::SingleRowListArrayBuilder;
12use datafusion::datasource::TableProvider;
13use datafusion::error::{DataFusionError, Result};
14use datafusion::execution::{SendableRecordBatchStream, TaskContext};
15use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
16use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
17use datafusion::physical_plan::streaming::PartitionStream;
18use datafusion::prelude::{create_udf, SessionContext};
19
20const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
21const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
22const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
23const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
24const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
25const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
26const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
27
28pub const PG_CATALOG_TABLES: &[&str] = &[
29 PG_CATALOG_TABLE_PG_TYPE,
30 PG_CATALOG_TABLE_PG_CLASS,
31 PG_CATALOG_TABLE_PG_ATTRIBUTE,
32 PG_CATALOG_TABLE_PG_NAMESPACE,
33 PG_CATALOG_TABLE_PG_PROC,
34 PG_CATALOG_TABLE_PG_DATABASE,
35 PG_CATALOG_TABLE_PG_AM,
36];
37
38#[derive(Debug)]
40pub struct PgCatalogSchemaProvider {
41 catalog_list: Arc<dyn CatalogProviderList>,
42}
43
44#[async_trait]
45impl SchemaProvider for PgCatalogSchemaProvider {
46 fn as_any(&self) -> &dyn std::any::Any {
47 self
48 }
49
50 fn table_names(&self) -> Vec<String> {
51 PG_CATALOG_TABLES.iter().map(ToString::to_string).collect()
52 }
53
54 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
55 match name.to_ascii_lowercase().as_str() {
56 PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
57 PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
58 PG_CATALOG_TABLE_PG_CLASS => {
59 let table = Arc::new(PgClassTable::new(self.catalog_list.clone()));
60 Ok(Some(Arc::new(
61 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
62 )))
63 }
64 PG_CATALOG_TABLE_PG_NAMESPACE => {
65 let table = Arc::new(PgNamespaceTable::new(self.catalog_list.clone()));
66 Ok(Some(Arc::new(
67 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
68 )))
69 }
70 PG_CATALOG_TABLE_PG_DATABASE => {
71 let table = Arc::new(PgDatabaseTable::new(self.catalog_list.clone()));
72 Ok(Some(Arc::new(
73 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
74 )))
75 }
76 _ => Ok(None),
77 }
78 }
79
80 fn table_exist(&self, name: &str) -> bool {
81 PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str())
82 }
83}
84
85impl PgCatalogSchemaProvider {
86 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
87 Self { catalog_list }
88 }
89
90 fn create_pg_type_table(&self) -> Arc<dyn TableProvider> {
92 let schema = Arc::new(Schema::new(vec![
94 Field::new("oid", DataType::Int32, false),
95 Field::new("typname", DataType::Utf8, false),
96 Field::new("typnamespace", DataType::Int32, false),
97 Field::new("typlen", DataType::Int16, false),
98 ]));
100
101 let provider = MemTable::try_new(schema, vec![]).unwrap();
103
104 Arc::new(provider)
105 }
106
107 fn create_pg_am_table(&self) -> Arc<dyn TableProvider> {
109 let schema = Arc::new(Schema::new(vec![
112 Field::new("oid", DataType::Int32, false), Field::new("amname", DataType::Utf8, false), Field::new("amhandler", DataType::Int32, false), Field::new("amtype", DataType::Utf8, false), Field::new("amstrategies", DataType::Int32, false), Field::new("amsupport", DataType::Int32, false), Field::new("amcanorder", DataType::Boolean, false), Field::new("amcanorderbyop", DataType::Boolean, false), Field::new("amcanbackward", DataType::Boolean, false), Field::new("amcanunique", DataType::Boolean, false), Field::new("amcanmulticol", DataType::Boolean, false), Field::new("amoptionalkey", DataType::Boolean, false), Field::new("amsearcharray", DataType::Boolean, false), Field::new("amsearchnulls", DataType::Boolean, false), Field::new("amstorage", DataType::Boolean, false), Field::new("amclusterable", DataType::Boolean, false), Field::new("ampredlocks", DataType::Boolean, false), Field::new("amcanparallel", DataType::Boolean, false), Field::new("amcanbeginscan", DataType::Boolean, false), Field::new("amcanmarkpos", DataType::Boolean, false), Field::new("amcanfetch", DataType::Boolean, false), Field::new("amkeytype", DataType::Int32, false), ]));
135
136 let provider = MemTable::try_new(schema, vec![]).unwrap();
138
139 Arc::new(provider)
140 }
141}
142
143#[derive(Debug)]
144struct PgClassTable {
145 schema: SchemaRef,
146 catalog_list: Arc<dyn CatalogProviderList>,
147}
148
149impl PgClassTable {
150 fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgClassTable {
151 let schema = Arc::new(Schema::new(vec![
154 Field::new("oid", DataType::Int32, false), Field::new("relname", DataType::Utf8, false), Field::new("relnamespace", DataType::Int32, false), Field::new("reltype", DataType::Int32, false), Field::new("reloftype", DataType::Int32, true), Field::new("relowner", DataType::Int32, false), Field::new("relam", DataType::Int32, false), Field::new("relfilenode", DataType::Int32, false), Field::new("reltablespace", DataType::Int32, false), Field::new("relpages", DataType::Int32, false), Field::new("reltuples", DataType::Float64, false), Field::new("relallvisible", DataType::Int32, false), Field::new("reltoastrelid", DataType::Int32, false), Field::new("relhasindex", DataType::Boolean, false), Field::new("relisshared", DataType::Boolean, false), Field::new("relpersistence", DataType::Utf8, false), Field::new("relkind", DataType::Utf8, false), Field::new("relnatts", DataType::Int16, false), Field::new("relchecks", DataType::Int16, false), Field::new("relhasrules", DataType::Boolean, false), Field::new("relhastriggers", DataType::Boolean, false), Field::new("relhassubclass", DataType::Boolean, false), Field::new("relrowsecurity", DataType::Boolean, false), Field::new("relforcerowsecurity", DataType::Boolean, false), Field::new("relispopulated", DataType::Boolean, false), Field::new("relreplident", DataType::Utf8, false), Field::new("relispartition", DataType::Boolean, false), Field::new("relrewrite", DataType::Int32, true), Field::new("relfrozenxid", DataType::Int32, false), Field::new("relminmxid", DataType::Int32, false), ]));
185
186 Self {
187 schema,
188 catalog_list,
189 }
190 }
191
192 async fn get_data(
194 schema: SchemaRef,
195 catalog_list: Arc<dyn CatalogProviderList>,
196 ) -> Result<RecordBatch> {
197 let mut oids = Vec::new();
199 let mut relnames = Vec::new();
200 let mut relnamespaces = Vec::new();
201 let mut reltypes = Vec::new();
202 let mut reloftypes = Vec::new();
203 let mut relowners = Vec::new();
204 let mut relams = Vec::new();
205 let mut relfilenodes = Vec::new();
206 let mut reltablespaces = Vec::new();
207 let mut relpages = Vec::new();
208 let mut reltuples = Vec::new();
209 let mut relallvisibles = Vec::new();
210 let mut reltoastrelids = Vec::new();
211 let mut relhasindexes = Vec::new();
212 let mut relisshareds = Vec::new();
213 let mut relpersistences = Vec::new();
214 let mut relkinds = Vec::new();
215 let mut relnattses = Vec::new();
216 let mut relcheckses = Vec::new();
217 let mut relhasruleses = Vec::new();
218 let mut relhastriggersses = Vec::new();
219 let mut relhassubclasses = Vec::new();
220 let mut relrowsecurities = Vec::new();
221 let mut relforcerowsecurities = Vec::new();
222 let mut relispopulateds = Vec::new();
223 let mut relreplidents = Vec::new();
224 let mut relispartitions = Vec::new();
225 let mut relrewrites = Vec::new();
226 let mut relfrozenxids = Vec::new();
227 let mut relminmxids = Vec::new();
228
229 let mut next_oid = 10000;
231
232 for catalog_name in catalog_list.catalog_names() {
234 if let Some(catalog) = catalog_list.catalog(&catalog_name) {
235 for schema_name in catalog.schema_names() {
236 if let Some(schema) = catalog.schema(&schema_name) {
237 let schema_oid = next_oid;
238 next_oid += 1;
239
240 for table_name in schema.table_names() {
245 let table_oid = next_oid;
246 next_oid += 1;
247
248 if let Some(table) = schema.table(&table_name).await? {
249 let table_type = "r";
251
252 let column_count = table.schema().fields().len() as i16;
254
255 oids.push(table_oid);
257 relnames.push(table_name.clone());
258 relnamespaces.push(schema_oid);
259 reltypes.push(0); reloftypes.push(None);
261 relowners.push(0); relams.push(0); relfilenodes.push(table_oid); reltablespaces.push(0); relpages.push(1); reltuples.push(0.0); relallvisibles.push(0);
268 reltoastrelids.push(0);
269 relhasindexes.push(false);
270 relisshareds.push(false);
271 relpersistences.push("p".to_string()); relkinds.push(table_type.to_string());
273 relnattses.push(column_count);
274 relcheckses.push(0);
275 relhasruleses.push(false);
276 relhastriggersses.push(false);
277 relhassubclasses.push(false);
278 relrowsecurities.push(false);
279 relforcerowsecurities.push(false);
280 relispopulateds.push(true);
281 relreplidents.push("d".to_string()); relispartitions.push(false);
283 relrewrites.push(None);
284 relfrozenxids.push(0);
285 relminmxids.push(0);
286 }
287 }
288 }
289 }
290 }
291 }
292
293 let arrays: Vec<ArrayRef> = vec![
295 Arc::new(Int32Array::from(oids)),
296 Arc::new(StringArray::from(relnames)),
297 Arc::new(Int32Array::from(relnamespaces)),
298 Arc::new(Int32Array::from(reltypes)),
299 Arc::new(Int32Array::from_iter(reloftypes.into_iter())),
300 Arc::new(Int32Array::from(relowners)),
301 Arc::new(Int32Array::from(relams)),
302 Arc::new(Int32Array::from(relfilenodes)),
303 Arc::new(Int32Array::from(reltablespaces)),
304 Arc::new(Int32Array::from(relpages)),
305 Arc::new(Float64Array::from_iter(reltuples.into_iter())),
306 Arc::new(Int32Array::from(relallvisibles)),
307 Arc::new(Int32Array::from(reltoastrelids)),
308 Arc::new(BooleanArray::from(relhasindexes)),
309 Arc::new(BooleanArray::from(relisshareds)),
310 Arc::new(StringArray::from(relpersistences)),
311 Arc::new(StringArray::from(relkinds)),
312 Arc::new(Int16Array::from(relnattses)),
313 Arc::new(Int16Array::from(relcheckses)),
314 Arc::new(BooleanArray::from(relhasruleses)),
315 Arc::new(BooleanArray::from(relhastriggersses)),
316 Arc::new(BooleanArray::from(relhassubclasses)),
317 Arc::new(BooleanArray::from(relrowsecurities)),
318 Arc::new(BooleanArray::from(relforcerowsecurities)),
319 Arc::new(BooleanArray::from(relispopulateds)),
320 Arc::new(StringArray::from(relreplidents)),
321 Arc::new(BooleanArray::from(relispartitions)),
322 Arc::new(Int32Array::from_iter(relrewrites.into_iter())),
323 Arc::new(Int32Array::from(relfrozenxids)),
324 Arc::new(Int32Array::from(relminmxids)),
325 ];
326
327 let batch = RecordBatch::try_new(schema.clone(), arrays)?;
329
330 Ok(batch)
331 }
332}
333
334impl PartitionStream for PgClassTable {
335 fn schema(&self) -> &SchemaRef {
336 &self.schema
337 }
338
339 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
340 let catalog_list = self.catalog_list.clone();
341 let schema = Arc::clone(&self.schema);
342 Box::pin(RecordBatchStreamAdapter::new(
343 schema.clone(),
344 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
345 ))
346 }
347}
348
349#[derive(Debug)]
350struct PgNamespaceTable {
351 schema: SchemaRef,
352 catalog_list: Arc<dyn CatalogProviderList>,
353}
354
355impl PgNamespaceTable {
356 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
357 let schema = Arc::new(Schema::new(vec![
360 Field::new("oid", DataType::Int32, false), Field::new("nspname", DataType::Utf8, false), Field::new("nspowner", DataType::Int32, false), Field::new("nspacl", DataType::Utf8, true), Field::new("options", DataType::Utf8, true), ]));
366
367 Self {
368 schema,
369 catalog_list,
370 }
371 }
372
373 async fn get_data(
375 schema: SchemaRef,
376 catalog_list: Arc<dyn CatalogProviderList>,
377 ) -> Result<RecordBatch> {
378 let mut oids = Vec::new();
380 let mut nspnames = Vec::new();
381 let mut nspowners = Vec::new();
382 let mut nspacls: Vec<Option<String>> = Vec::new();
383 let mut options: Vec<Option<String>> = Vec::new();
384
385 let mut next_oid = 10000;
387
388 oids.push(11);
391 nspnames.push("pg_catalog".to_string());
392 nspowners.push(10); nspacls.push(None);
394 options.push(None);
395
396 oids.push(2200);
398 nspnames.push("public".to_string());
399 nspowners.push(10); nspacls.push(None);
401 options.push(None);
402
403 oids.push(12);
405 nspnames.push("information_schema".to_string());
406 nspowners.push(10); nspacls.push(None);
408 options.push(None);
409
410 for catalog_name in catalog_list.catalog_names() {
412 if let Some(catalog) = catalog_list.catalog(&catalog_name) {
413 for schema_name in catalog.schema_names() {
414 if schema_name == "pg_catalog"
416 || schema_name == "public"
417 || schema_name == "information_schema"
418 {
419 continue;
420 }
421
422 let schema_oid = next_oid;
423 next_oid += 1;
424
425 oids.push(schema_oid);
426 nspnames.push(schema_name.clone());
427 nspowners.push(10); nspacls.push(None);
429 options.push(None);
430 }
431 }
432 }
433
434 let arrays: Vec<ArrayRef> = vec![
436 Arc::new(Int32Array::from(oids)),
437 Arc::new(StringArray::from(nspnames)),
438 Arc::new(Int32Array::from(nspowners)),
439 Arc::new(StringArray::from_iter(nspacls.into_iter())),
440 Arc::new(StringArray::from_iter(options.into_iter())),
441 ];
442
443 let batch = RecordBatch::try_new(schema.clone(), arrays)?;
445
446 Ok(batch)
447 }
448}
449
450impl PartitionStream for PgNamespaceTable {
451 fn schema(&self) -> &SchemaRef {
452 &self.schema
453 }
454
455 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
456 let catalog_list = self.catalog_list.clone();
457 let schema = Arc::clone(&self.schema);
458 Box::pin(RecordBatchStreamAdapter::new(
459 schema.clone(),
460 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
461 ))
462 }
463}
464
465#[derive(Debug)]
466struct PgDatabaseTable {
467 schema: SchemaRef,
468 catalog_list: Arc<dyn CatalogProviderList>,
469}
470
471impl PgDatabaseTable {
472 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
473 let schema = Arc::new(Schema::new(vec![
476 Field::new("oid", DataType::Int32, false), Field::new("datname", DataType::Utf8, false), Field::new("datdba", DataType::Int32, false), Field::new("encoding", DataType::Int32, false), Field::new("datcollate", DataType::Utf8, false), Field::new("datctype", DataType::Utf8, false), Field::new("datistemplate", DataType::Boolean, false), Field::new("datallowconn", DataType::Boolean, false), Field::new("datconnlimit", DataType::Int32, false), Field::new("datlastsysoid", DataType::Int32, false), Field::new("datfrozenxid", DataType::Int32, false), Field::new("datminmxid", DataType::Int32, false), Field::new("dattablespace", DataType::Int32, false), Field::new("datacl", DataType::Utf8, true), ]));
491
492 Self {
493 schema,
494 catalog_list,
495 }
496 }
497
498 async fn get_data(
500 schema: SchemaRef,
501 catalog_list: Arc<dyn CatalogProviderList>,
502 ) -> Result<RecordBatch> {
503 let mut oids = Vec::new();
505 let mut datnames = Vec::new();
506 let mut datdbas = Vec::new();
507 let mut encodings = Vec::new();
508 let mut datcollates = Vec::new();
509 let mut datctypes = Vec::new();
510 let mut datistemplates = Vec::new();
511 let mut datallowconns = Vec::new();
512 let mut datconnlimits = Vec::new();
513 let mut datlastsysoids = Vec::new();
514 let mut datfrozenxids = Vec::new();
515 let mut datminmxids = Vec::new();
516 let mut dattablespaces = Vec::new();
517 let mut datacles: Vec<Option<String>> = Vec::new();
518
519 let mut next_oid = 16384; for catalog_name in catalog_list.catalog_names() {
524 let oid = next_oid;
525 next_oid += 1;
526
527 oids.push(oid);
528 datnames.push(catalog_name.clone());
529 datdbas.push(10); encodings.push(6); datcollates.push("en_US.UTF-8".to_string()); datctypes.push("en_US.UTF-8".to_string()); datistemplates.push(false);
534 datallowconns.push(true);
535 datconnlimits.push(-1); datlastsysoids.push(100000); datfrozenxids.push(1); datminmxids.push(1); dattablespaces.push(1663); datacles.push(None); }
542
543 if !datnames.contains(&"postgres".to_string()) {
546 let oid = next_oid;
547
548 oids.push(oid);
549 datnames.push("postgres".to_string());
550 datdbas.push(10);
551 encodings.push(6);
552 datcollates.push("en_US.UTF-8".to_string());
553 datctypes.push("en_US.UTF-8".to_string());
554 datistemplates.push(false);
555 datallowconns.push(true);
556 datconnlimits.push(-1);
557 datlastsysoids.push(100000);
558 datfrozenxids.push(1);
559 datminmxids.push(1);
560 dattablespaces.push(1663);
561 datacles.push(None);
562 }
563
564 let arrays: Vec<ArrayRef> = vec![
566 Arc::new(Int32Array::from(oids)),
567 Arc::new(StringArray::from(datnames)),
568 Arc::new(Int32Array::from(datdbas)),
569 Arc::new(Int32Array::from(encodings)),
570 Arc::new(StringArray::from(datcollates)),
571 Arc::new(StringArray::from(datctypes)),
572 Arc::new(BooleanArray::from(datistemplates)),
573 Arc::new(BooleanArray::from(datallowconns)),
574 Arc::new(Int32Array::from(datconnlimits)),
575 Arc::new(Int32Array::from(datlastsysoids)),
576 Arc::new(Int32Array::from(datfrozenxids)),
577 Arc::new(Int32Array::from(datminmxids)),
578 Arc::new(Int32Array::from(dattablespaces)),
579 Arc::new(StringArray::from_iter(datacles.into_iter())),
580 ];
581
582 let full_batch = RecordBatch::try_new(schema.clone(), arrays)?;
584 Ok(full_batch)
585 }
586}
587
588impl PartitionStream for PgDatabaseTable {
589 fn schema(&self) -> &SchemaRef {
590 &self.schema
591 }
592
593 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
594 let catalog_list = self.catalog_list.clone();
595 let schema = Arc::clone(&self.schema);
596 Box::pin(RecordBatchStreamAdapter::new(
597 schema.clone(),
598 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
599 ))
600 }
601}
602
603pub fn create_current_schemas_udf() -> ScalarUDF {
604 let func = move |args: &[ColumnarValue]| {
606 let args = ColumnarValue::values_to_arrays(args)?;
607 let input = as_boolean_array(&args[0]);
608
609 let mut values = vec!["public"];
611 if input.value(0) {
613 values.push("information_schema");
614 values.push("pg_catalog");
615 }
616
617 let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
618
619 let array: ArrayRef = Arc::new(list_array.build_list_array());
620
621 Ok(ColumnarValue::Array(array))
622 };
623
624 create_udf(
626 "current_schemas",
627 vec![DataType::Boolean],
628 DataType::List(Arc::new(Field::new("schema", DataType::Utf8, false))),
629 Volatility::Immutable,
630 Arc::new(func),
631 )
632}
633
634pub fn create_current_schema_udf() -> ScalarUDF {
635 let func = move |_args: &[ColumnarValue]| {
637 let mut builder = StringBuilder::new();
639 builder.append_value("public");
640 let array: ArrayRef = Arc::new(builder.finish());
641
642 Ok(ColumnarValue::Array(array))
643 };
644
645 create_udf(
647 "current_schema",
648 vec![],
649 DataType::Utf8,
650 Volatility::Immutable,
651 Arc::new(func),
652 )
653}
654
655pub fn setup_pg_catalog(
657 session_context: &SessionContext,
658 catalog_name: &str,
659) -> Result<(), Box<DataFusionError>> {
660 let pg_catalog = PgCatalogSchemaProvider::new(session_context.state().catalog_list().clone());
661 session_context
662 .catalog(catalog_name)
663 .ok_or_else(|| {
664 DataFusionError::Configuration(format!(
665 "Catalog not found when registering pg_catalog: {}",
666 catalog_name
667 ))
668 })?
669 .register_schema("pg_catalog", Arc::new(pg_catalog))?;
670
671 session_context.register_udf(create_current_schema_udf());
672 session_context.register_udf(create_current_schemas_udf());
673
674 Ok(())
675}