datafusion_postgres/
pg_catalog.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::arrow::array::{
5    as_boolean_array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
6    RecordBatch, StringArray, StringBuilder,
7};
8use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
9use datafusion::catalog::streaming::StreamingTable;
10use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
11use datafusion::common::utils::SingleRowListArrayBuilder;
12use datafusion::datasource::{TableProvider, ViewTable};
13use datafusion::error::{DataFusionError, Result};
14use datafusion::execution::{SendableRecordBatchStream, TaskContext};
15use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
16use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
17use datafusion::physical_plan::streaming::PartitionStream;
18use datafusion::prelude::{create_udf, SessionContext};
19
20const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
21const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
22const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
23const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
24const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
25const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
26const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
27
28/// Determine PostgreSQL table type (relkind) from DataFusion TableProvider
29fn get_table_type(table: &Arc<dyn TableProvider>) -> &'static str {
30    // Use Any trait to determine the actual table provider type
31    if table.as_any().is::<ViewTable>() {
32        "v" // view
33    } else {
34        "r" // All other table types (StreamingTable, MemTable, etc.) are treated as regular tables
35    }
36}
37
38/// Determine PostgreSQL table type (relkind) with table name context
39fn get_table_type_with_name(
40    table: &Arc<dyn TableProvider>,
41    table_name: &str,
42    schema_name: &str,
43) -> &'static str {
44    // Check if this is a system catalog table
45    if schema_name == "pg_catalog" || schema_name == "information_schema" {
46        if table_name.starts_with("pg_")
47            || table_name.contains("_table")
48            || table_name.contains("_column")
49        {
50            "r" // System tables are still regular tables in PostgreSQL
51        } else {
52            "v" // Some system objects might be views
53        }
54    } else {
55        get_table_type(table)
56    }
57}
58
59pub const PG_CATALOG_TABLES: &[&str] = &[
60    PG_CATALOG_TABLE_PG_TYPE,
61    PG_CATALOG_TABLE_PG_CLASS,
62    PG_CATALOG_TABLE_PG_ATTRIBUTE,
63    PG_CATALOG_TABLE_PG_NAMESPACE,
64    PG_CATALOG_TABLE_PG_PROC,
65    PG_CATALOG_TABLE_PG_DATABASE,
66    PG_CATALOG_TABLE_PG_AM,
67];
68
69// Data structure to hold pg_type table data
70#[derive(Debug)]
71struct PgTypesData {
72    oids: Vec<i32>,
73    typnames: Vec<String>,
74    typnamespaces: Vec<i32>,
75    typowners: Vec<i32>,
76    typlens: Vec<i16>,
77    typbyvals: Vec<bool>,
78    typtypes: Vec<String>,
79    typcategories: Vec<String>,
80    typispreferreds: Vec<bool>,
81    typisdefineds: Vec<bool>,
82    typdelims: Vec<String>,
83    typrelids: Vec<i32>,
84    typelems: Vec<i32>,
85    typarrays: Vec<i32>,
86    typinputs: Vec<String>,
87    typoutputs: Vec<String>,
88    typreceives: Vec<String>,
89    typsends: Vec<String>,
90    typmodins: Vec<String>,
91    typmodouts: Vec<String>,
92    typanalyzes: Vec<String>,
93    typaligns: Vec<String>,
94    typstorages: Vec<String>,
95    typnotnulls: Vec<bool>,
96    typbasetypes: Vec<i32>,
97    typtymods: Vec<i32>,
98    typndimss: Vec<i32>,
99    typcollations: Vec<i32>,
100    typdefaultbins: Vec<Option<String>>,
101    typdefaults: Vec<Option<String>>,
102}
103
104impl PgTypesData {
105    fn new() -> Self {
106        Self {
107            oids: Vec::new(),
108            typnames: Vec::new(),
109            typnamespaces: Vec::new(),
110            typowners: Vec::new(),
111            typlens: Vec::new(),
112            typbyvals: Vec::new(),
113            typtypes: Vec::new(),
114            typcategories: Vec::new(),
115            typispreferreds: Vec::new(),
116            typisdefineds: Vec::new(),
117            typdelims: Vec::new(),
118            typrelids: Vec::new(),
119            typelems: Vec::new(),
120            typarrays: Vec::new(),
121            typinputs: Vec::new(),
122            typoutputs: Vec::new(),
123            typreceives: Vec::new(),
124            typsends: Vec::new(),
125            typmodins: Vec::new(),
126            typmodouts: Vec::new(),
127            typanalyzes: Vec::new(),
128            typaligns: Vec::new(),
129            typstorages: Vec::new(),
130            typnotnulls: Vec::new(),
131            typbasetypes: Vec::new(),
132            typtymods: Vec::new(),
133            typndimss: Vec::new(),
134            typcollations: Vec::new(),
135            typdefaultbins: Vec::new(),
136            typdefaults: Vec::new(),
137        }
138    }
139
140    #[allow(clippy::too_many_arguments)]
141    fn add_type(
142        &mut self,
143        oid: i32,
144        typname: &str,
145        typnamespace: i32,
146        typowner: i32,
147        typlen: i16,
148        typbyval: bool,
149        typtype: &str,
150        typcategory: &str,
151        typispreferred: bool,
152        typisdefined: bool,
153        typdelim: &str,
154        typrelid: i32,
155        typelem: i32,
156        typarray: i32,
157        typinput: &str,
158        typoutput: &str,
159        typreceive: &str,
160        typsend: &str,
161        typmodin: &str,
162        typmodout: &str,
163        typanalyze: &str,
164        typalign: &str,
165        typstorage: &str,
166        typnotnull: bool,
167        typbasetype: i32,
168        typtypmod: i32,
169        typndims: i32,
170        typcollation: i32,
171        typdefaultbin: Option<String>,
172        typdefault: Option<String>,
173    ) {
174        self.oids.push(oid);
175        self.typnames.push(typname.to_string());
176        self.typnamespaces.push(typnamespace);
177        self.typowners.push(typowner);
178        self.typlens.push(typlen);
179        self.typbyvals.push(typbyval);
180        self.typtypes.push(typtype.to_string());
181        self.typcategories.push(typcategory.to_string());
182        self.typispreferreds.push(typispreferred);
183        self.typisdefineds.push(typisdefined);
184        self.typdelims.push(typdelim.to_string());
185        self.typrelids.push(typrelid);
186        self.typelems.push(typelem);
187        self.typarrays.push(typarray);
188        self.typinputs.push(typinput.to_string());
189        self.typoutputs.push(typoutput.to_string());
190        self.typreceives.push(typreceive.to_string());
191        self.typsends.push(typsend.to_string());
192        self.typmodins.push(typmodin.to_string());
193        self.typmodouts.push(typmodout.to_string());
194        self.typanalyzes.push(typanalyze.to_string());
195        self.typaligns.push(typalign.to_string());
196        self.typstorages.push(typstorage.to_string());
197        self.typnotnulls.push(typnotnull);
198        self.typbasetypes.push(typbasetype);
199        self.typtymods.push(typtypmod);
200        self.typndimss.push(typndims);
201        self.typcollations.push(typcollation);
202        self.typdefaultbins.push(typdefaultbin);
203        self.typdefaults.push(typdefault);
204    }
205}
206
207// Create custom schema provider for pg_catalog
208#[derive(Debug)]
209pub struct PgCatalogSchemaProvider {
210    catalog_list: Arc<dyn CatalogProviderList>,
211}
212
213#[async_trait]
214impl SchemaProvider for PgCatalogSchemaProvider {
215    fn as_any(&self) -> &dyn std::any::Any {
216        self
217    }
218
219    fn table_names(&self) -> Vec<String> {
220        PG_CATALOG_TABLES.iter().map(ToString::to_string).collect()
221    }
222
223    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
224        match name.to_ascii_lowercase().as_str() {
225            PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
226            PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
227            PG_CATALOG_TABLE_PG_CLASS => {
228                let table = Arc::new(PgClassTable::new(self.catalog_list.clone()));
229                Ok(Some(Arc::new(
230                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
231                )))
232            }
233            PG_CATALOG_TABLE_PG_NAMESPACE => {
234                let table = Arc::new(PgNamespaceTable::new(self.catalog_list.clone()));
235                Ok(Some(Arc::new(
236                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
237                )))
238            }
239            PG_CATALOG_TABLE_PG_DATABASE => {
240                let table = Arc::new(PgDatabaseTable::new(self.catalog_list.clone()));
241                Ok(Some(Arc::new(
242                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
243                )))
244            }
245            PG_CATALOG_TABLE_PG_ATTRIBUTE => {
246                let table = Arc::new(PgAttributeTable::new(self.catalog_list.clone()));
247                Ok(Some(Arc::new(
248                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
249                )))
250            }
251            PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.create_pg_proc_table())),
252            _ => Ok(None),
253        }
254    }
255
256    fn table_exist(&self, name: &str) -> bool {
257        PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str())
258    }
259}
260
261impl PgCatalogSchemaProvider {
262    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
263        Self { catalog_list }
264    }
265
266    /// Create a populated pg_type table with standard PostgreSQL data types
267    fn create_pg_type_table(&self) -> Arc<dyn TableProvider> {
268        // Define complete schema for pg_type (matching PostgreSQL)
269        let schema = Arc::new(Schema::new(vec![
270            Field::new("oid", DataType::Int32, false),
271            Field::new("typname", DataType::Utf8, false),
272            Field::new("typnamespace", DataType::Int32, false),
273            Field::new("typowner", DataType::Int32, false),
274            Field::new("typlen", DataType::Int16, false),
275            Field::new("typbyval", DataType::Boolean, false),
276            Field::new("typtype", DataType::Utf8, false),
277            Field::new("typcategory", DataType::Utf8, false),
278            Field::new("typispreferred", DataType::Boolean, false),
279            Field::new("typisdefined", DataType::Boolean, false),
280            Field::new("typdelim", DataType::Utf8, false),
281            Field::new("typrelid", DataType::Int32, false),
282            Field::new("typelem", DataType::Int32, false),
283            Field::new("typarray", DataType::Int32, false),
284            Field::new("typinput", DataType::Utf8, false),
285            Field::new("typoutput", DataType::Utf8, false),
286            Field::new("typreceive", DataType::Utf8, false),
287            Field::new("typsend", DataType::Utf8, false),
288            Field::new("typmodin", DataType::Utf8, false),
289            Field::new("typmodout", DataType::Utf8, false),
290            Field::new("typanalyze", DataType::Utf8, false),
291            Field::new("typalign", DataType::Utf8, false),
292            Field::new("typstorage", DataType::Utf8, false),
293            Field::new("typnotnull", DataType::Boolean, false),
294            Field::new("typbasetype", DataType::Int32, false),
295            Field::new("typtypmod", DataType::Int32, false),
296            Field::new("typndims", DataType::Int32, false),
297            Field::new("typcollation", DataType::Int32, false),
298            Field::new("typdefaultbin", DataType::Utf8, true),
299            Field::new("typdefault", DataType::Utf8, true),
300        ]));
301
302        // Create standard PostgreSQL data types
303        let pg_types_data = Self::get_standard_pg_types();
304
305        // Create RecordBatch from the data
306        let arrays: Vec<ArrayRef> = vec![
307            Arc::new(Int32Array::from(pg_types_data.oids)),
308            Arc::new(StringArray::from(pg_types_data.typnames)),
309            Arc::new(Int32Array::from(pg_types_data.typnamespaces)),
310            Arc::new(Int32Array::from(pg_types_data.typowners)),
311            Arc::new(Int16Array::from(pg_types_data.typlens)),
312            Arc::new(BooleanArray::from(pg_types_data.typbyvals)),
313            Arc::new(StringArray::from(pg_types_data.typtypes)),
314            Arc::new(StringArray::from(pg_types_data.typcategories)),
315            Arc::new(BooleanArray::from(pg_types_data.typispreferreds)),
316            Arc::new(BooleanArray::from(pg_types_data.typisdefineds)),
317            Arc::new(StringArray::from(pg_types_data.typdelims)),
318            Arc::new(Int32Array::from(pg_types_data.typrelids)),
319            Arc::new(Int32Array::from(pg_types_data.typelems)),
320            Arc::new(Int32Array::from(pg_types_data.typarrays)),
321            Arc::new(StringArray::from(pg_types_data.typinputs)),
322            Arc::new(StringArray::from(pg_types_data.typoutputs)),
323            Arc::new(StringArray::from(pg_types_data.typreceives)),
324            Arc::new(StringArray::from(pg_types_data.typsends)),
325            Arc::new(StringArray::from(pg_types_data.typmodins)),
326            Arc::new(StringArray::from(pg_types_data.typmodouts)),
327            Arc::new(StringArray::from(pg_types_data.typanalyzes)),
328            Arc::new(StringArray::from(pg_types_data.typaligns)),
329            Arc::new(StringArray::from(pg_types_data.typstorages)),
330            Arc::new(BooleanArray::from(pg_types_data.typnotnulls)),
331            Arc::new(Int32Array::from(pg_types_data.typbasetypes)),
332            Arc::new(Int32Array::from(pg_types_data.typtymods)),
333            Arc::new(Int32Array::from(pg_types_data.typndimss)),
334            Arc::new(Int32Array::from(pg_types_data.typcollations)),
335            Arc::new(StringArray::from_iter(
336                pg_types_data.typdefaultbins.into_iter(),
337            )),
338            Arc::new(StringArray::from_iter(
339                pg_types_data.typdefaults.into_iter(),
340            )),
341        ];
342
343        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
344
345        // Create memory table with populated data
346        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
347
348        Arc::new(provider)
349    }
350
351    /// Generate standard PostgreSQL data types for pg_type table
352    fn get_standard_pg_types() -> PgTypesData {
353        let mut data = PgTypesData::new();
354
355        // Basic data types commonly used
356        data.add_type(
357            16, "bool", 11, 10, 1, true, "b", "B", true, true, ",", 0, 0, 1000, "boolin",
358            "boolout", "boolrecv", "boolsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
359            None,
360        );
361        data.add_type(
362            17,
363            "bytea",
364            11,
365            10,
366            -1,
367            false,
368            "b",
369            "U",
370            false,
371            true,
372            ",",
373            0,
374            0,
375            1001,
376            "byteain",
377            "byteaout",
378            "bytearecv",
379            "byteasend",
380            "-",
381            "-",
382            "-",
383            "i",
384            "x",
385            false,
386            0,
387            -1,
388            0,
389            0,
390            None,
391            None,
392        );
393        data.add_type(
394            18, "char", 11, 10, 1, true, "b", "S", false, true, ",", 0, 0, 1002, "charin",
395            "charout", "charrecv", "charsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
396            None,
397        );
398        data.add_type(
399            19, "name", 11, 10, 64, false, "b", "S", false, true, ",", 0, 0, 1003, "namein",
400            "nameout", "namerecv", "namesend", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
401            None,
402        );
403        data.add_type(
404            20, "int8", 11, 10, 8, true, "b", "N", false, true, ",", 0, 0, 1016, "int8in",
405            "int8out", "int8recv", "int8send", "-", "-", "-", "d", "p", false, 0, -1, 0, 0, None,
406            None,
407        );
408        data.add_type(
409            21, "int2", 11, 10, 2, true, "b", "N", false, true, ",", 0, 0, 1005, "int2in",
410            "int2out", "int2recv", "int2send", "-", "-", "-", "s", "p", false, 0, -1, 0, 0, None,
411            None,
412        );
413        data.add_type(
414            23, "int4", 11, 10, 4, true, "b", "N", true, true, ",", 0, 0, 1007, "int4in",
415            "int4out", "int4recv", "int4send", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
416            None,
417        );
418        data.add_type(
419            25, "text", 11, 10, -1, false, "b", "S", true, true, ",", 0, 0, 1009, "textin",
420            "textout", "textrecv", "textsend", "-", "-", "-", "i", "x", false, 0, -1, 0, 100, None,
421            None,
422        );
423        data.add_type(
424            700,
425            "float4",
426            11,
427            10,
428            4,
429            true,
430            "b",
431            "N",
432            false,
433            true,
434            ",",
435            0,
436            0,
437            1021,
438            "float4in",
439            "float4out",
440            "float4recv",
441            "float4send",
442            "-",
443            "-",
444            "-",
445            "i",
446            "p",
447            false,
448            0,
449            -1,
450            0,
451            0,
452            None,
453            None,
454        );
455        data.add_type(
456            701,
457            "float8",
458            11,
459            10,
460            8,
461            true,
462            "b",
463            "N",
464            true,
465            true,
466            ",",
467            0,
468            0,
469            1022,
470            "float8in",
471            "float8out",
472            "float8recv",
473            "float8send",
474            "-",
475            "-",
476            "-",
477            "d",
478            "p",
479            false,
480            0,
481            -1,
482            0,
483            0,
484            None,
485            None,
486        );
487        data.add_type(
488            1043,
489            "varchar",
490            11,
491            10,
492            -1,
493            false,
494            "b",
495            "S",
496            false,
497            true,
498            ",",
499            0,
500            0,
501            1015,
502            "varcharin",
503            "varcharout",
504            "varcharrecv",
505            "varcharsend",
506            "varchartypmodin",
507            "varchartypmodout",
508            "-",
509            "i",
510            "x",
511            false,
512            0,
513            -1,
514            0,
515            100,
516            None,
517            None,
518        );
519        data.add_type(
520            1082,
521            "date",
522            11,
523            10,
524            4,
525            true,
526            "b",
527            "D",
528            false,
529            true,
530            ",",
531            0,
532            0,
533            1182,
534            "date_in",
535            "date_out",
536            "date_recv",
537            "date_send",
538            "-",
539            "-",
540            "-",
541            "i",
542            "p",
543            false,
544            0,
545            -1,
546            0,
547            0,
548            None,
549            None,
550        );
551        data.add_type(
552            1083,
553            "time",
554            11,
555            10,
556            8,
557            true,
558            "b",
559            "D",
560            false,
561            true,
562            ",",
563            0,
564            0,
565            1183,
566            "time_in",
567            "time_out",
568            "time_recv",
569            "time_send",
570            "timetypmodin",
571            "timetypmodout",
572            "-",
573            "d",
574            "p",
575            false,
576            0,
577            -1,
578            0,
579            0,
580            None,
581            None,
582        );
583        data.add_type(
584            1114,
585            "timestamp",
586            11,
587            10,
588            8,
589            true,
590            "b",
591            "D",
592            false,
593            true,
594            ",",
595            0,
596            0,
597            1115,
598            "timestamp_in",
599            "timestamp_out",
600            "timestamp_recv",
601            "timestamp_send",
602            "timestamptypmodin",
603            "timestamptypmodout",
604            "-",
605            "d",
606            "p",
607            false,
608            0,
609            -1,
610            0,
611            0,
612            None,
613            None,
614        );
615        data.add_type(
616            1184,
617            "timestamptz",
618            11,
619            10,
620            8,
621            true,
622            "b",
623            "D",
624            true,
625            true,
626            ",",
627            0,
628            0,
629            1185,
630            "timestamptz_in",
631            "timestamptz_out",
632            "timestamptz_recv",
633            "timestamptz_send",
634            "timestamptztypmodin",
635            "timestamptztypmodout",
636            "-",
637            "d",
638            "p",
639            false,
640            0,
641            -1,
642            0,
643            0,
644            None,
645            None,
646        );
647        data.add_type(
648            1700,
649            "numeric",
650            11,
651            10,
652            -1,
653            false,
654            "b",
655            "N",
656            false,
657            true,
658            ",",
659            0,
660            0,
661            1231,
662            "numeric_in",
663            "numeric_out",
664            "numeric_recv",
665            "numeric_send",
666            "numerictypmodin",
667            "numerictypmodout",
668            "-",
669            "i",
670            "m",
671            false,
672            0,
673            -1,
674            0,
675            0,
676            None,
677            None,
678        );
679
680        data
681    }
682
683    /// Create a mock empty table for pg_am
684    fn create_pg_am_table(&self) -> Arc<dyn TableProvider> {
685        // Define the schema for pg_am
686        // This matches PostgreSQL's pg_am table columns
687        let schema = Arc::new(Schema::new(vec![
688            Field::new("oid", DataType::Int32, false), // Object identifier
689            Field::new("amname", DataType::Utf8, false), // Name of the access method
690            Field::new("amhandler", DataType::Int32, false), // OID of handler function
691            Field::new("amtype", DataType::Utf8, false), // Type of access method (i=index, t=table)
692            Field::new("amstrategies", DataType::Int32, false), // Number of operator strategies
693            Field::new("amsupport", DataType::Int32, false), // Number of support routines
694            Field::new("amcanorder", DataType::Boolean, false), // Does AM support ordered scans?
695            Field::new("amcanorderbyop", DataType::Boolean, false), // Does AM support order by operator result?
696            Field::new("amcanbackward", DataType::Boolean, false), // Does AM support backward scanning?
697            Field::new("amcanunique", DataType::Boolean, false), // Does AM support unique indexes?
698            Field::new("amcanmulticol", DataType::Boolean, false), // Does AM support multi-column indexes?
699            Field::new("amoptionalkey", DataType::Boolean, false), // Can first index column be omitted in search?
700            Field::new("amsearcharray", DataType::Boolean, false), // Does AM support ScalarArrayOpExpr searches?
701            Field::new("amsearchnulls", DataType::Boolean, false), // Does AM support searching for NULL/NOT NULL?
702            Field::new("amstorage", DataType::Boolean, false), // Can storage type differ from column type?
703            Field::new("amclusterable", DataType::Boolean, false), // Can index be clustered on?
704            Field::new("ampredlocks", DataType::Boolean, false), // Does AM manage fine-grained predicate locks?
705            Field::new("amcanparallel", DataType::Boolean, false), // Does AM support parallel scan?
706            Field::new("amcanbeginscan", DataType::Boolean, false), // Does AM support BRIN index scans?
707            Field::new("amcanmarkpos", DataType::Boolean, false), // Does AM support mark/restore positions?
708            Field::new("amcanfetch", DataType::Boolean, false), // Does AM support fetching specific tuples?
709            Field::new("amkeytype", DataType::Int32, false),    // Type of data in index
710        ]));
711
712        // Create memory table with schema
713        let provider = MemTable::try_new(schema, vec![]).unwrap();
714
715        Arc::new(provider)
716    }
717
718    /// Create a populated pg_proc table with standard PostgreSQL functions
719    fn create_pg_proc_table(&self) -> Arc<dyn TableProvider> {
720        // Define complete schema for pg_proc (matching PostgreSQL)
721        let schema = Arc::new(Schema::new(vec![
722            Field::new("oid", DataType::Int32, false), // Object identifier
723            Field::new("proname", DataType::Utf8, false), // Function name
724            Field::new("pronamespace", DataType::Int32, false), // OID of namespace containing function
725            Field::new("proowner", DataType::Int32, false),     // Owner of the function
726            Field::new("prolang", DataType::Int32, false),      // Implementation language
727            Field::new("procost", DataType::Float32, false),    // Estimated execution cost
728            Field::new("prorows", DataType::Float32, false), // Estimated result size for set-returning functions
729            Field::new("provariadic", DataType::Int32, false), // Element type of variadic array
730            Field::new("prosupport", DataType::Int32, false), // Support function OID
731            Field::new("prokind", DataType::Utf8, false), // f=function, p=procedure, a=aggregate, w=window
732            Field::new("prosecdef", DataType::Boolean, false), // Security definer flag
733            Field::new("proleakproof", DataType::Boolean, false), // Leak-proof flag
734            Field::new("proisstrict", DataType::Boolean, false), // Returns null if any argument is null
735            Field::new("proretset", DataType::Boolean, false),   // Returns a set (vs scalar)
736            Field::new("provolatile", DataType::Utf8, false), // i=immutable, s=stable, v=volatile
737            Field::new("proparallel", DataType::Utf8, false), // s=safe, r=restricted, u=unsafe
738            Field::new("pronargs", DataType::Int16, false),   // Number of input arguments
739            Field::new("pronargdefaults", DataType::Int16, false), // Number of arguments with defaults
740            Field::new("prorettype", DataType::Int32, false),      // OID of return type
741            Field::new("proargtypes", DataType::Utf8, false),      // Array of argument type OIDs
742            Field::new("proallargtypes", DataType::Utf8, true), // Array of all argument type OIDs
743            Field::new("proargmodes", DataType::Utf8, true),    // Array of argument modes
744            Field::new("proargnames", DataType::Utf8, true),    // Array of argument names
745            Field::new("proargdefaults", DataType::Utf8, true), // Expression for argument defaults
746            Field::new("protrftypes", DataType::Utf8, true),    // Transform types
747            Field::new("prosrc", DataType::Utf8, false),        // Function source code
748            Field::new("probin", DataType::Utf8, true),         // Binary file containing function
749            Field::new("prosqlbody", DataType::Utf8, true),     // SQL function body
750            Field::new("proconfig", DataType::Utf8, true),      // Configuration variables
751            Field::new("proacl", DataType::Utf8, true),         // Access privileges
752        ]));
753
754        // Create standard PostgreSQL functions
755        let pg_proc_data = Self::get_standard_pg_functions();
756
757        // Create RecordBatch from the data
758        let arrays: Vec<ArrayRef> = vec![
759            Arc::new(Int32Array::from(pg_proc_data.oids)),
760            Arc::new(StringArray::from(pg_proc_data.pronames)),
761            Arc::new(Int32Array::from(pg_proc_data.pronamespaces)),
762            Arc::new(Int32Array::from(pg_proc_data.proowners)),
763            Arc::new(Int32Array::from(pg_proc_data.prolangs)),
764            Arc::new(Float32Array::from(pg_proc_data.procosts)),
765            Arc::new(Float32Array::from(pg_proc_data.prorows)),
766            Arc::new(Int32Array::from(pg_proc_data.provariadics)),
767            Arc::new(Int32Array::from(pg_proc_data.prosupports)),
768            Arc::new(StringArray::from(pg_proc_data.prokinds)),
769            Arc::new(BooleanArray::from(pg_proc_data.prosecdefs)),
770            Arc::new(BooleanArray::from(pg_proc_data.proleakproofs)),
771            Arc::new(BooleanArray::from(pg_proc_data.proisstricts)),
772            Arc::new(BooleanArray::from(pg_proc_data.proretsets)),
773            Arc::new(StringArray::from(pg_proc_data.provolatiles)),
774            Arc::new(StringArray::from(pg_proc_data.proparallels)),
775            Arc::new(Int16Array::from(pg_proc_data.pronargs)),
776            Arc::new(Int16Array::from(pg_proc_data.pronargdefaults)),
777            Arc::new(Int32Array::from(pg_proc_data.prorettypes)),
778            Arc::new(StringArray::from(pg_proc_data.proargtypes)),
779            Arc::new(StringArray::from_iter(
780                pg_proc_data.proallargtypes.into_iter(),
781            )),
782            Arc::new(StringArray::from_iter(pg_proc_data.proargmodes.into_iter())),
783            Arc::new(StringArray::from_iter(pg_proc_data.proargnames.into_iter())),
784            Arc::new(StringArray::from_iter(
785                pg_proc_data.proargdefaults.into_iter(),
786            )),
787            Arc::new(StringArray::from_iter(pg_proc_data.protrftypes.into_iter())),
788            Arc::new(StringArray::from(pg_proc_data.prosrcs)),
789            Arc::new(StringArray::from_iter(pg_proc_data.probins.into_iter())),
790            Arc::new(StringArray::from_iter(pg_proc_data.prosqlbodys.into_iter())),
791            Arc::new(StringArray::from_iter(pg_proc_data.proconfigs.into_iter())),
792            Arc::new(StringArray::from_iter(pg_proc_data.proacls.into_iter())),
793        ];
794
795        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
796
797        // Create memory table with populated data
798        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
799
800        Arc::new(provider)
801    }
802
803    /// Generate standard PostgreSQL functions for pg_proc table
804    fn get_standard_pg_functions() -> PgProcData {
805        let mut data = PgProcData::new();
806
807        // Essential PostgreSQL functions that many tools expect
808        data.add_function(
809            1242, "boolin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
810            0, 16, "2275", None, None, None, None, None, "boolin", None, None, None, None,
811        );
812        data.add_function(
813            1243, "boolout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
814            1, 0, 2275, "16", None, None, None, None, None, "boolout", None, None, None, None,
815        );
816        data.add_function(
817            1564, "textin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
818            0, 25, "2275", None, None, None, None, None, "textin", None, None, None, None,
819        );
820        data.add_function(
821            1565, "textout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
822            1, 0, 2275, "25", None, None, None, None, None, "textout", None, None, None, None,
823        );
824        data.add_function(
825            1242,
826            "version",
827            11,
828            10,
829            12,
830            1.0,
831            0.0,
832            0,
833            0,
834            "f",
835            false,
836            true,
837            false,
838            false,
839            "s",
840            "s",
841            0,
842            0,
843            25,
844            "",
845            None,
846            None,
847            None,
848            None,
849            None,
850            "SELECT 'DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu'",
851            None,
852            None,
853            None,
854            None,
855        );
856
857        data
858    }
859}
860
861// Data structure to hold pg_proc table data
862#[derive(Debug)]
863struct PgProcData {
864    oids: Vec<i32>,
865    pronames: Vec<String>,
866    pronamespaces: Vec<i32>,
867    proowners: Vec<i32>,
868    prolangs: Vec<i32>,
869    procosts: Vec<f32>,
870    prorows: Vec<f32>,
871    provariadics: Vec<i32>,
872    prosupports: Vec<i32>,
873    prokinds: Vec<String>,
874    prosecdefs: Vec<bool>,
875    proleakproofs: Vec<bool>,
876    proisstricts: Vec<bool>,
877    proretsets: Vec<bool>,
878    provolatiles: Vec<String>,
879    proparallels: Vec<String>,
880    pronargs: Vec<i16>,
881    pronargdefaults: Vec<i16>,
882    prorettypes: Vec<i32>,
883    proargtypes: Vec<String>,
884    proallargtypes: Vec<Option<String>>,
885    proargmodes: Vec<Option<String>>,
886    proargnames: Vec<Option<String>>,
887    proargdefaults: Vec<Option<String>>,
888    protrftypes: Vec<Option<String>>,
889    prosrcs: Vec<String>,
890    probins: Vec<Option<String>>,
891    prosqlbodys: Vec<Option<String>>,
892    proconfigs: Vec<Option<String>>,
893    proacls: Vec<Option<String>>,
894}
895
896impl PgProcData {
897    fn new() -> Self {
898        Self {
899            oids: Vec::new(),
900            pronames: Vec::new(),
901            pronamespaces: Vec::new(),
902            proowners: Vec::new(),
903            prolangs: Vec::new(),
904            procosts: Vec::new(),
905            prorows: Vec::new(),
906            provariadics: Vec::new(),
907            prosupports: Vec::new(),
908            prokinds: Vec::new(),
909            prosecdefs: Vec::new(),
910            proleakproofs: Vec::new(),
911            proisstricts: Vec::new(),
912            proretsets: Vec::new(),
913            provolatiles: Vec::new(),
914            proparallels: Vec::new(),
915            pronargs: Vec::new(),
916            pronargdefaults: Vec::new(),
917            prorettypes: Vec::new(),
918            proargtypes: Vec::new(),
919            proallargtypes: Vec::new(),
920            proargmodes: Vec::new(),
921            proargnames: Vec::new(),
922            proargdefaults: Vec::new(),
923            protrftypes: Vec::new(),
924            prosrcs: Vec::new(),
925            probins: Vec::new(),
926            prosqlbodys: Vec::new(),
927            proconfigs: Vec::new(),
928            proacls: Vec::new(),
929        }
930    }
931
932    #[allow(clippy::too_many_arguments)]
933    fn add_function(
934        &mut self,
935        oid: i32,
936        proname: &str,
937        pronamespace: i32,
938        proowner: i32,
939        prolang: i32,
940        procost: f32,
941        prorows: f32,
942        provariadic: i32,
943        prosupport: i32,
944        prokind: &str,
945        prosecdef: bool,
946        proleakproof: bool,
947        proisstrict: bool,
948        proretset: bool,
949        provolatile: &str,
950        proparallel: &str,
951        pronargs: i16,
952        pronargdefaults: i16,
953        prorettype: i32,
954        proargtypes: &str,
955        proallargtypes: Option<String>,
956        proargmodes: Option<String>,
957        proargnames: Option<String>,
958        proargdefaults: Option<String>,
959        protrftypes: Option<String>,
960        prosrc: &str,
961        probin: Option<String>,
962        prosqlbody: Option<String>,
963        proconfig: Option<String>,
964        proacl: Option<String>,
965    ) {
966        self.oids.push(oid);
967        self.pronames.push(proname.to_string());
968        self.pronamespaces.push(pronamespace);
969        self.proowners.push(proowner);
970        self.prolangs.push(prolang);
971        self.procosts.push(procost);
972        self.prorows.push(prorows);
973        self.provariadics.push(provariadic);
974        self.prosupports.push(prosupport);
975        self.prokinds.push(prokind.to_string());
976        self.prosecdefs.push(prosecdef);
977        self.proleakproofs.push(proleakproof);
978        self.proisstricts.push(proisstrict);
979        self.proretsets.push(proretset);
980        self.provolatiles.push(provolatile.to_string());
981        self.proparallels.push(proparallel.to_string());
982        self.pronargs.push(pronargs);
983        self.pronargdefaults.push(pronargdefaults);
984        self.prorettypes.push(prorettype);
985        self.proargtypes.push(proargtypes.to_string());
986        self.proallargtypes.push(proallargtypes);
987        self.proargmodes.push(proargmodes);
988        self.proargnames.push(proargnames);
989        self.proargdefaults.push(proargdefaults);
990        self.protrftypes.push(protrftypes);
991        self.prosrcs.push(prosrc.to_string());
992        self.probins.push(probin);
993        self.prosqlbodys.push(prosqlbody);
994        self.proconfigs.push(proconfig);
995        self.proacls.push(proacl);
996    }
997}
998
999#[derive(Debug)]
1000struct PgClassTable {
1001    schema: SchemaRef,
1002    catalog_list: Arc<dyn CatalogProviderList>,
1003}
1004
1005impl PgClassTable {
1006    fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgClassTable {
1007        // Define the schema for pg_class
1008        // This matches key columns from PostgreSQL's pg_class
1009        let schema = Arc::new(Schema::new(vec![
1010            Field::new("oid", DataType::Int32, false), // Object identifier
1011            Field::new("relname", DataType::Utf8, false), // Name of the table, index, view, etc.
1012            Field::new("relnamespace", DataType::Int32, false), // OID of the namespace that contains this relation
1013            Field::new("reltype", DataType::Int32, false), // OID of the data type (composite type) this table describes
1014            Field::new("reloftype", DataType::Int32, true), // OID of the composite type for typed table, 0 otherwise
1015            Field::new("relowner", DataType::Int32, false), // Owner of the relation
1016            Field::new("relam", DataType::Int32, false), // If this is an index, the access method used
1017            Field::new("relfilenode", DataType::Int32, false), // Name of the on-disk file of this relation
1018            Field::new("reltablespace", DataType::Int32, false), // Tablespace OID for this relation
1019            Field::new("relpages", DataType::Int32, false), // Size of the on-disk representation in pages
1020            Field::new("reltuples", DataType::Float64, false), // Number of tuples
1021            Field::new("relallvisible", DataType::Int32, false), // Number of all-visible pages
1022            Field::new("reltoastrelid", DataType::Int32, false), // OID of the TOAST table
1023            Field::new("relhasindex", DataType::Boolean, false), // True if this is a table and it has (or recently had) any indexes
1024            Field::new("relisshared", DataType::Boolean, false), // True if this table is shared across all databases
1025            Field::new("relpersistence", DataType::Utf8, false), // p=permanent table, u=unlogged table, t=temporary table
1026            Field::new("relkind", DataType::Utf8, false), // r=ordinary table, i=index, S=sequence, v=view, etc.
1027            Field::new("relnatts", DataType::Int16, false), // Number of user columns
1028            Field::new("relchecks", DataType::Int16, false), // Number of CHECK constraints
1029            Field::new("relhasrules", DataType::Boolean, false), // True if table has (or once had) rules
1030            Field::new("relhastriggers", DataType::Boolean, false), // True if table has (or once had) triggers
1031            Field::new("relhassubclass", DataType::Boolean, false), // True if table or index has (or once had) any inheritance children
1032            Field::new("relrowsecurity", DataType::Boolean, false), // True if row security is enabled
1033            Field::new("relforcerowsecurity", DataType::Boolean, false), // True if row security forced for owners
1034            Field::new("relispopulated", DataType::Boolean, false), // True if relation is populated (not true for some materialized views)
1035            Field::new("relreplident", DataType::Utf8, false), // Columns used to form "replica identity" for rows
1036            Field::new("relispartition", DataType::Boolean, false), // True if table is a partition
1037            Field::new("relrewrite", DataType::Int32, true), // OID of a rule that rewrites this relation
1038            Field::new("relfrozenxid", DataType::Int32, false), // All transaction IDs before this have been replaced with a permanent ("frozen") transaction ID
1039            Field::new("relminmxid", DataType::Int32, false), // All Multixact IDs before this have been replaced with a transaction ID
1040        ]));
1041
1042        Self {
1043            schema,
1044            catalog_list,
1045        }
1046    }
1047
1048    /// Generate record batches based on the current state of the catalog
1049    async fn get_data(
1050        schema: SchemaRef,
1051        catalog_list: Arc<dyn CatalogProviderList>,
1052    ) -> Result<RecordBatch> {
1053        // Vectors to store column data
1054        let mut oids = Vec::new();
1055        let mut relnames = Vec::new();
1056        let mut relnamespaces = Vec::new();
1057        let mut reltypes = Vec::new();
1058        let mut reloftypes = Vec::new();
1059        let mut relowners = Vec::new();
1060        let mut relams = Vec::new();
1061        let mut relfilenodes = Vec::new();
1062        let mut reltablespaces = Vec::new();
1063        let mut relpages = Vec::new();
1064        let mut reltuples = Vec::new();
1065        let mut relallvisibles = Vec::new();
1066        let mut reltoastrelids = Vec::new();
1067        let mut relhasindexes = Vec::new();
1068        let mut relisshareds = Vec::new();
1069        let mut relpersistences = Vec::new();
1070        let mut relkinds = Vec::new();
1071        let mut relnattses = Vec::new();
1072        let mut relcheckses = Vec::new();
1073        let mut relhasruleses = Vec::new();
1074        let mut relhastriggersses = Vec::new();
1075        let mut relhassubclasses = Vec::new();
1076        let mut relrowsecurities = Vec::new();
1077        let mut relforcerowsecurities = Vec::new();
1078        let mut relispopulateds = Vec::new();
1079        let mut relreplidents = Vec::new();
1080        let mut relispartitions = Vec::new();
1081        let mut relrewrites = Vec::new();
1082        let mut relfrozenxids = Vec::new();
1083        let mut relminmxids = Vec::new();
1084
1085        // Start OID counter (this is simplistic and would need to be more robust in practice)
1086        let mut next_oid = 10000;
1087
1088        // Iterate through all catalogs and schemas
1089        for catalog_name in catalog_list.catalog_names() {
1090            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1091                for schema_name in catalog.schema_names() {
1092                    if let Some(schema) = catalog.schema(&schema_name) {
1093                        let schema_oid = next_oid;
1094                        next_oid += 1;
1095
1096                        // Add an entry for the schema itself (as a namespace)
1097                        // (In a full implementation, this would go in pg_namespace)
1098
1099                        // Now process all tables in this schema
1100                        for table_name in schema.table_names() {
1101                            let table_oid = next_oid;
1102                            next_oid += 1;
1103
1104                            if let Some(table) = schema.table(&table_name).await? {
1105                                // Determine the correct table type based on the table provider and context
1106                                let table_type =
1107                                    get_table_type_with_name(&table, &table_name, &schema_name);
1108
1109                                // Get column count from schema
1110                                let column_count = table.schema().fields().len() as i16;
1111
1112                                // Add table entry
1113                                oids.push(table_oid);
1114                                relnames.push(table_name.clone());
1115                                relnamespaces.push(schema_oid);
1116                                reltypes.push(0); // Simplified: we're not tracking data types
1117                                reloftypes.push(None);
1118                                relowners.push(0); // Simplified: no owner tracking
1119                                relams.push(0); // Default access method
1120                                relfilenodes.push(table_oid); // Use OID as filenode
1121                                reltablespaces.push(0); // Default tablespace
1122                                relpages.push(1); // Default page count
1123                                reltuples.push(0.0); // No row count stats
1124                                relallvisibles.push(0);
1125                                reltoastrelids.push(0);
1126                                relhasindexes.push(false);
1127                                relisshareds.push(false);
1128                                relpersistences.push("p".to_string()); // Permanent
1129                                relkinds.push(table_type.to_string());
1130                                relnattses.push(column_count);
1131                                relcheckses.push(0);
1132                                relhasruleses.push(false);
1133                                relhastriggersses.push(false);
1134                                relhassubclasses.push(false);
1135                                relrowsecurities.push(false);
1136                                relforcerowsecurities.push(false);
1137                                relispopulateds.push(true);
1138                                relreplidents.push("d".to_string()); // Default
1139                                relispartitions.push(false);
1140                                relrewrites.push(None);
1141                                relfrozenxids.push(0);
1142                                relminmxids.push(0);
1143                            }
1144                        }
1145                    }
1146                }
1147            }
1148        }
1149
1150        // Create Arrow arrays from the collected data
1151        let arrays: Vec<ArrayRef> = vec![
1152            Arc::new(Int32Array::from(oids)),
1153            Arc::new(StringArray::from(relnames)),
1154            Arc::new(Int32Array::from(relnamespaces)),
1155            Arc::new(Int32Array::from(reltypes)),
1156            Arc::new(Int32Array::from_iter(reloftypes.into_iter())),
1157            Arc::new(Int32Array::from(relowners)),
1158            Arc::new(Int32Array::from(relams)),
1159            Arc::new(Int32Array::from(relfilenodes)),
1160            Arc::new(Int32Array::from(reltablespaces)),
1161            Arc::new(Int32Array::from(relpages)),
1162            Arc::new(Float64Array::from_iter(reltuples.into_iter())),
1163            Arc::new(Int32Array::from(relallvisibles)),
1164            Arc::new(Int32Array::from(reltoastrelids)),
1165            Arc::new(BooleanArray::from(relhasindexes)),
1166            Arc::new(BooleanArray::from(relisshareds)),
1167            Arc::new(StringArray::from(relpersistences)),
1168            Arc::new(StringArray::from(relkinds)),
1169            Arc::new(Int16Array::from(relnattses)),
1170            Arc::new(Int16Array::from(relcheckses)),
1171            Arc::new(BooleanArray::from(relhasruleses)),
1172            Arc::new(BooleanArray::from(relhastriggersses)),
1173            Arc::new(BooleanArray::from(relhassubclasses)),
1174            Arc::new(BooleanArray::from(relrowsecurities)),
1175            Arc::new(BooleanArray::from(relforcerowsecurities)),
1176            Arc::new(BooleanArray::from(relispopulateds)),
1177            Arc::new(StringArray::from(relreplidents)),
1178            Arc::new(BooleanArray::from(relispartitions)),
1179            Arc::new(Int32Array::from_iter(relrewrites.into_iter())),
1180            Arc::new(Int32Array::from(relfrozenxids)),
1181            Arc::new(Int32Array::from(relminmxids)),
1182        ];
1183
1184        // Create a record batch
1185        let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1186
1187        Ok(batch)
1188    }
1189}
1190
1191impl PartitionStream for PgClassTable {
1192    fn schema(&self) -> &SchemaRef {
1193        &self.schema
1194    }
1195
1196    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1197        let catalog_list = self.catalog_list.clone();
1198        let schema = Arc::clone(&self.schema);
1199        Box::pin(RecordBatchStreamAdapter::new(
1200            schema.clone(),
1201            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1202        ))
1203    }
1204}
1205
1206#[derive(Debug)]
1207struct PgNamespaceTable {
1208    schema: SchemaRef,
1209    catalog_list: Arc<dyn CatalogProviderList>,
1210}
1211
1212impl PgNamespaceTable {
1213    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1214        // Define the schema for pg_namespace
1215        // This matches the columns from PostgreSQL's pg_namespace
1216        let schema = Arc::new(Schema::new(vec![
1217            Field::new("oid", DataType::Int32, false), // Object identifier
1218            Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema)
1219            Field::new("nspowner", DataType::Int32, false), // Owner of the namespace
1220            Field::new("nspacl", DataType::Utf8, true), // Access privileges
1221            Field::new("options", DataType::Utf8, true), // Schema-level options
1222        ]));
1223
1224        Self {
1225            schema,
1226            catalog_list,
1227        }
1228    }
1229
1230    /// Generate record batches based on the current state of the catalog
1231    async fn get_data(
1232        schema: SchemaRef,
1233        catalog_list: Arc<dyn CatalogProviderList>,
1234    ) -> Result<RecordBatch> {
1235        // Vectors to store column data
1236        let mut oids = Vec::new();
1237        let mut nspnames = Vec::new();
1238        let mut nspowners = Vec::new();
1239        let mut nspacls: Vec<Option<String>> = Vec::new();
1240        let mut options: Vec<Option<String>> = Vec::new();
1241
1242        // Start OID counter (should be consistent with the values used in pg_class)
1243        let mut next_oid = 10000;
1244
1245        // Add standard PostgreSQL system schemas
1246        // pg_catalog schema (OID 11)
1247        oids.push(11);
1248        nspnames.push("pg_catalog".to_string());
1249        nspowners.push(10); // Default superuser
1250        nspacls.push(None);
1251        options.push(None);
1252
1253        // public schema (OID 2200)
1254        oids.push(2200);
1255        nspnames.push("public".to_string());
1256        nspowners.push(10); // Default superuser
1257        nspacls.push(None);
1258        options.push(None);
1259
1260        // information_schema (OID 12)
1261        oids.push(12);
1262        nspnames.push("information_schema".to_string());
1263        nspowners.push(10); // Default superuser
1264        nspacls.push(None);
1265        options.push(None);
1266
1267        // Now add all schemas from DataFusion catalogs
1268        for catalog_name in catalog_list.catalog_names() {
1269            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1270                for schema_name in catalog.schema_names() {
1271                    // Skip schemas we've already added as system schemas
1272                    if schema_name == "pg_catalog"
1273                        || schema_name == "public"
1274                        || schema_name == "information_schema"
1275                    {
1276                        continue;
1277                    }
1278
1279                    let schema_oid = next_oid;
1280                    next_oid += 1;
1281
1282                    oids.push(schema_oid);
1283                    nspnames.push(schema_name.clone());
1284                    nspowners.push(10); // Default owner
1285                    nspacls.push(None);
1286                    options.push(None);
1287                }
1288            }
1289        }
1290
1291        // Create Arrow arrays from the collected data
1292        let arrays: Vec<ArrayRef> = vec![
1293            Arc::new(Int32Array::from(oids)),
1294            Arc::new(StringArray::from(nspnames)),
1295            Arc::new(Int32Array::from(nspowners)),
1296            Arc::new(StringArray::from_iter(nspacls.into_iter())),
1297            Arc::new(StringArray::from_iter(options.into_iter())),
1298        ];
1299
1300        // Create a full record batch
1301        let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1302
1303        Ok(batch)
1304    }
1305}
1306
1307impl PartitionStream for PgNamespaceTable {
1308    fn schema(&self) -> &SchemaRef {
1309        &self.schema
1310    }
1311
1312    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1313        let catalog_list = self.catalog_list.clone();
1314        let schema = Arc::clone(&self.schema);
1315        Box::pin(RecordBatchStreamAdapter::new(
1316            schema.clone(),
1317            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1318        ))
1319    }
1320}
1321
1322#[derive(Debug)]
1323struct PgDatabaseTable {
1324    schema: SchemaRef,
1325    catalog_list: Arc<dyn CatalogProviderList>,
1326}
1327
1328impl PgDatabaseTable {
1329    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1330        // Define the schema for pg_database
1331        // This matches PostgreSQL's pg_database table columns
1332        let schema = Arc::new(Schema::new(vec![
1333            Field::new("oid", DataType::Int32, false), // Object identifier
1334            Field::new("datname", DataType::Utf8, false), // Database name
1335            Field::new("datdba", DataType::Int32, false), // Database owner's user ID
1336            Field::new("encoding", DataType::Int32, false), // Character encoding
1337            Field::new("datcollate", DataType::Utf8, false), // LC_COLLATE for this database
1338            Field::new("datctype", DataType::Utf8, false), // LC_CTYPE for this database
1339            Field::new("datistemplate", DataType::Boolean, false), // If true, database can be used as a template
1340            Field::new("datallowconn", DataType::Boolean, false), // If false, no one can connect to this database
1341            Field::new("datconnlimit", DataType::Int32, false), // Max number of concurrent connections (-1=no limit)
1342            Field::new("datlastsysoid", DataType::Int32, false), // Last system OID in database
1343            Field::new("datfrozenxid", DataType::Int32, false), // Frozen XID for this database
1344            Field::new("datminmxid", DataType::Int32, false),   // Minimum multixact ID
1345            Field::new("dattablespace", DataType::Int32, false), // Default tablespace for this database
1346            Field::new("datacl", DataType::Utf8, true),          // Access privileges
1347        ]));
1348
1349        Self {
1350            schema,
1351            catalog_list,
1352        }
1353    }
1354
1355    /// Generate record batches based on the current state of the catalog
1356    async fn get_data(
1357        schema: SchemaRef,
1358        catalog_list: Arc<dyn CatalogProviderList>,
1359    ) -> Result<RecordBatch> {
1360        // Vectors to store column data
1361        let mut oids = Vec::new();
1362        let mut datnames = Vec::new();
1363        let mut datdbas = Vec::new();
1364        let mut encodings = Vec::new();
1365        let mut datcollates = Vec::new();
1366        let mut datctypes = Vec::new();
1367        let mut datistemplates = Vec::new();
1368        let mut datallowconns = Vec::new();
1369        let mut datconnlimits = Vec::new();
1370        let mut datlastsysoids = Vec::new();
1371        let mut datfrozenxids = Vec::new();
1372        let mut datminmxids = Vec::new();
1373        let mut dattablespaces = Vec::new();
1374        let mut datacles: Vec<Option<String>> = Vec::new();
1375
1376        // Start OID counter (this is simplistic and would need to be more robust in practice)
1377        let mut next_oid = 16384; // Standard PostgreSQL starting OID for user databases
1378
1379        // Add a record for each catalog (treating catalogs as "databases")
1380        for catalog_name in catalog_list.catalog_names() {
1381            let oid = next_oid;
1382            next_oid += 1;
1383
1384            oids.push(oid);
1385            datnames.push(catalog_name.clone());
1386            datdbas.push(10); // Default owner (assuming 10 = postgres user)
1387            encodings.push(6); // 6 = UTF8 in PostgreSQL
1388            datcollates.push("en_US.UTF-8".to_string()); // Default collation
1389            datctypes.push("en_US.UTF-8".to_string()); // Default ctype
1390            datistemplates.push(false);
1391            datallowconns.push(true);
1392            datconnlimits.push(-1); // No connection limit
1393            datlastsysoids.push(100000); // Arbitrary last system OID
1394            datfrozenxids.push(1); // Simplified transaction ID
1395            datminmxids.push(1); // Simplified multixact ID
1396            dattablespaces.push(1663); // Default tablespace (1663 = pg_default in PostgreSQL)
1397            datacles.push(None); // No specific ACLs
1398        }
1399
1400        // Always include a "postgres" database entry if not already present
1401        // (This is for compatibility with tools that expect it)
1402        if !datnames.contains(&"postgres".to_string()) {
1403            let oid = next_oid;
1404
1405            oids.push(oid);
1406            datnames.push("postgres".to_string());
1407            datdbas.push(10);
1408            encodings.push(6);
1409            datcollates.push("en_US.UTF-8".to_string());
1410            datctypes.push("en_US.UTF-8".to_string());
1411            datistemplates.push(false);
1412            datallowconns.push(true);
1413            datconnlimits.push(-1);
1414            datlastsysoids.push(100000);
1415            datfrozenxids.push(1);
1416            datminmxids.push(1);
1417            dattablespaces.push(1663);
1418            datacles.push(None);
1419        }
1420
1421        // Create Arrow arrays from the collected data
1422        let arrays: Vec<ArrayRef> = vec![
1423            Arc::new(Int32Array::from(oids)),
1424            Arc::new(StringArray::from(datnames)),
1425            Arc::new(Int32Array::from(datdbas)),
1426            Arc::new(Int32Array::from(encodings)),
1427            Arc::new(StringArray::from(datcollates)),
1428            Arc::new(StringArray::from(datctypes)),
1429            Arc::new(BooleanArray::from(datistemplates)),
1430            Arc::new(BooleanArray::from(datallowconns)),
1431            Arc::new(Int32Array::from(datconnlimits)),
1432            Arc::new(Int32Array::from(datlastsysoids)),
1433            Arc::new(Int32Array::from(datfrozenxids)),
1434            Arc::new(Int32Array::from(datminmxids)),
1435            Arc::new(Int32Array::from(dattablespaces)),
1436            Arc::new(StringArray::from_iter(datacles.into_iter())),
1437        ];
1438
1439        // Create a full record batch
1440        let full_batch = RecordBatch::try_new(schema.clone(), arrays)?;
1441        Ok(full_batch)
1442    }
1443}
1444
1445impl PartitionStream for PgDatabaseTable {
1446    fn schema(&self) -> &SchemaRef {
1447        &self.schema
1448    }
1449
1450    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1451        let catalog_list = self.catalog_list.clone();
1452        let schema = Arc::clone(&self.schema);
1453        Box::pin(RecordBatchStreamAdapter::new(
1454            schema.clone(),
1455            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1456        ))
1457    }
1458}
1459
1460#[derive(Debug)]
1461struct PgAttributeTable {
1462    schema: SchemaRef,
1463    catalog_list: Arc<dyn CatalogProviderList>,
1464}
1465
1466impl PgAttributeTable {
1467    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1468        // Define the schema for pg_attribute
1469        // This matches PostgreSQL's pg_attribute table columns
1470        let schema = Arc::new(Schema::new(vec![
1471            Field::new("attrelid", DataType::Int32, false), // OID of the relation this column belongs to
1472            Field::new("attname", DataType::Utf8, false),   // Column name
1473            Field::new("atttypid", DataType::Int32, false), // OID of the column data type
1474            Field::new("attstattarget", DataType::Int32, false), // Statistics target
1475            Field::new("attlen", DataType::Int16, false),   // Length of the type
1476            Field::new("attnum", DataType::Int16, false), // Column number (positive for regular columns)
1477            Field::new("attndims", DataType::Int32, false), // Number of dimensions for array types
1478            Field::new("attcacheoff", DataType::Int32, false), // Cache offset
1479            Field::new("atttypmod", DataType::Int32, false), // Type-specific modifier
1480            Field::new("attbyval", DataType::Boolean, false), // True if the type is pass-by-value
1481            Field::new("attalign", DataType::Utf8, false), // Type alignment
1482            Field::new("attstorage", DataType::Utf8, false), // Storage type
1483            Field::new("attcompression", DataType::Utf8, true), // Compression method
1484            Field::new("attnotnull", DataType::Boolean, false), // True if column cannot be null
1485            Field::new("atthasdef", DataType::Boolean, false), // True if column has a default value
1486            Field::new("atthasmissing", DataType::Boolean, false), // True if column has missing values
1487            Field::new("attidentity", DataType::Utf8, false),      // Identity column type
1488            Field::new("attgenerated", DataType::Utf8, false),     // Generated column type
1489            Field::new("attisdropped", DataType::Boolean, false), // True if column has been dropped
1490            Field::new("attislocal", DataType::Boolean, false), // True if column is local to this relation
1491            Field::new("attinhcount", DataType::Int32, false), // Number of direct inheritance ancestors
1492            Field::new("attcollation", DataType::Int32, false), // OID of collation
1493            Field::new("attacl", DataType::Utf8, true),        // Access privileges
1494            Field::new("attoptions", DataType::Utf8, true),    // Attribute-level options
1495            Field::new("attfdwoptions", DataType::Utf8, true), // Foreign data wrapper options
1496            Field::new("attmissingval", DataType::Utf8, true), // Missing value for added columns
1497        ]));
1498
1499        Self {
1500            schema,
1501            catalog_list,
1502        }
1503    }
1504
1505    /// Generate record batches based on the current state of the catalog
1506    async fn get_data(
1507        schema: SchemaRef,
1508        catalog_list: Arc<dyn CatalogProviderList>,
1509    ) -> Result<RecordBatch> {
1510        // Vectors to store column data
1511        let mut attrelids = Vec::new();
1512        let mut attnames = Vec::new();
1513        let mut atttypids = Vec::new();
1514        let mut attstattargets = Vec::new();
1515        let mut attlens = Vec::new();
1516        let mut attnums = Vec::new();
1517        let mut attndimss = Vec::new();
1518        let mut attcacheoffs = Vec::new();
1519        let mut atttymods = Vec::new();
1520        let mut attbyvals = Vec::new();
1521        let mut attaligns = Vec::new();
1522        let mut attstorages = Vec::new();
1523        let mut attcompressions: Vec<Option<String>> = Vec::new();
1524        let mut attnotnulls = Vec::new();
1525        let mut atthasdefs = Vec::new();
1526        let mut atthasmissings = Vec::new();
1527        let mut attidentitys = Vec::new();
1528        let mut attgenerateds = Vec::new();
1529        let mut attisdroppeds = Vec::new();
1530        let mut attislocals = Vec::new();
1531        let mut attinhcounts = Vec::new();
1532        let mut attcollations = Vec::new();
1533        let mut attacls: Vec<Option<String>> = Vec::new();
1534        let mut attoptions: Vec<Option<String>> = Vec::new();
1535        let mut attfdwoptions: Vec<Option<String>> = Vec::new();
1536        let mut attmissingvals: Vec<Option<String>> = Vec::new();
1537
1538        // Start OID counter (should be consistent with pg_class)
1539        let mut next_oid = 10000;
1540
1541        // Iterate through all catalogs and schemas
1542        for catalog_name in catalog_list.catalog_names() {
1543            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1544                for schema_name in catalog.schema_names() {
1545                    if let Some(schema_provider) = catalog.schema(&schema_name) {
1546                        // Process all tables in this schema
1547                        for table_name in schema_provider.table_names() {
1548                            let table_oid = next_oid;
1549                            next_oid += 1;
1550
1551                            if let Some(table) = schema_provider.table(&table_name).await? {
1552                                let table_schema = table.schema();
1553
1554                                // Add column entries for this table
1555                                for (column_idx, field) in table_schema.fields().iter().enumerate()
1556                                {
1557                                    let attnum = (column_idx + 1) as i16; // PostgreSQL column numbers start at 1
1558                                    let (pg_type_oid, type_len, by_val, align, storage) =
1559                                        Self::datafusion_to_pg_type(field.data_type());
1560
1561                                    attrelids.push(table_oid);
1562                                    attnames.push(field.name().clone());
1563                                    atttypids.push(pg_type_oid);
1564                                    attstattargets.push(-1); // Default statistics target
1565                                    attlens.push(type_len);
1566                                    attnums.push(attnum);
1567                                    attndimss.push(0); // No array support for now
1568                                    attcacheoffs.push(-1); // Not cached
1569                                    atttymods.push(-1); // No type modifiers
1570                                    attbyvals.push(by_val);
1571                                    attaligns.push(align.to_string());
1572                                    attstorages.push(storage.to_string());
1573                                    attcompressions.push(None); // No compression
1574                                    attnotnulls.push(!field.is_nullable());
1575                                    atthasdefs.push(false); // No default values
1576                                    atthasmissings.push(false); // No missing values
1577                                    attidentitys.push("".to_string()); // No identity columns
1578                                    attgenerateds.push("".to_string()); // No generated columns
1579                                    attisdroppeds.push(false); // Not dropped
1580                                    attislocals.push(true); // Local to this relation
1581                                    attinhcounts.push(0); // No inheritance
1582                                    attcollations.push(0); // Default collation
1583                                    attacls.push(None); // No ACLs
1584                                    attoptions.push(None); // No options
1585                                    attfdwoptions.push(None); // No FDW options
1586                                    attmissingvals.push(None); // No missing values
1587                                }
1588                            }
1589                        }
1590                    }
1591                }
1592            }
1593        }
1594
1595        // Create Arrow arrays from the collected data
1596        let arrays: Vec<ArrayRef> = vec![
1597            Arc::new(Int32Array::from(attrelids)),
1598            Arc::new(StringArray::from(attnames)),
1599            Arc::new(Int32Array::from(atttypids)),
1600            Arc::new(Int32Array::from(attstattargets)),
1601            Arc::new(Int16Array::from(attlens)),
1602            Arc::new(Int16Array::from(attnums)),
1603            Arc::new(Int32Array::from(attndimss)),
1604            Arc::new(Int32Array::from(attcacheoffs)),
1605            Arc::new(Int32Array::from(atttymods)),
1606            Arc::new(BooleanArray::from(attbyvals)),
1607            Arc::new(StringArray::from(attaligns)),
1608            Arc::new(StringArray::from(attstorages)),
1609            Arc::new(StringArray::from_iter(attcompressions.into_iter())),
1610            Arc::new(BooleanArray::from(attnotnulls)),
1611            Arc::new(BooleanArray::from(atthasdefs)),
1612            Arc::new(BooleanArray::from(atthasmissings)),
1613            Arc::new(StringArray::from(attidentitys)),
1614            Arc::new(StringArray::from(attgenerateds)),
1615            Arc::new(BooleanArray::from(attisdroppeds)),
1616            Arc::new(BooleanArray::from(attislocals)),
1617            Arc::new(Int32Array::from(attinhcounts)),
1618            Arc::new(Int32Array::from(attcollations)),
1619            Arc::new(StringArray::from_iter(attacls.into_iter())),
1620            Arc::new(StringArray::from_iter(attoptions.into_iter())),
1621            Arc::new(StringArray::from_iter(attfdwoptions.into_iter())),
1622            Arc::new(StringArray::from_iter(attmissingvals.into_iter())),
1623        ];
1624
1625        // Create a record batch
1626        let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1627        Ok(batch)
1628    }
1629
1630    /// Map DataFusion data types to PostgreSQL type information
1631    fn datafusion_to_pg_type(data_type: &DataType) -> (i32, i16, bool, &'static str, &'static str) {
1632        match data_type {
1633            DataType::Boolean => (16, 1, true, "c", "p"),    // bool
1634            DataType::Int8 => (18, 1, true, "c", "p"),       // char
1635            DataType::Int16 => (21, 2, true, "s", "p"),      // int2
1636            DataType::Int32 => (23, 4, true, "i", "p"),      // int4
1637            DataType::Int64 => (20, 8, true, "d", "p"),      // int8
1638            DataType::UInt8 => (21, 2, true, "s", "p"),      // Treat as int2
1639            DataType::UInt16 => (23, 4, true, "i", "p"),     // Treat as int4
1640            DataType::UInt32 => (20, 8, true, "d", "p"),     // Treat as int8
1641            DataType::UInt64 => (1700, -1, false, "i", "m"), // Treat as numeric
1642            DataType::Float32 => (700, 4, true, "i", "p"),   // float4
1643            DataType::Float64 => (701, 8, true, "d", "p"),   // float8
1644            DataType::Utf8 => (25, -1, false, "i", "x"),     // text
1645            DataType::LargeUtf8 => (25, -1, false, "i", "x"), // text
1646            DataType::Binary => (17, -1, false, "i", "x"),   // bytea
1647            DataType::LargeBinary => (17, -1, false, "i", "x"), // bytea
1648            DataType::Date32 => (1082, 4, true, "i", "p"),   // date
1649            DataType::Date64 => (1082, 4, true, "i", "p"),   // date
1650            DataType::Time32(_) => (1083, 8, true, "d", "p"), // time
1651            DataType::Time64(_) => (1083, 8, true, "d", "p"), // time
1652            DataType::Timestamp(_, _) => (1114, 8, true, "d", "p"), // timestamp
1653            DataType::Decimal128(_, _) => (1700, -1, false, "i", "m"), // numeric
1654            DataType::Decimal256(_, _) => (1700, -1, false, "i", "m"), // numeric
1655            _ => (25, -1, false, "i", "x"),                  // Default to text for unknown types
1656        }
1657    }
1658}
1659
1660impl PartitionStream for PgAttributeTable {
1661    fn schema(&self) -> &SchemaRef {
1662        &self.schema
1663    }
1664
1665    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1666        let catalog_list = self.catalog_list.clone();
1667        let schema = Arc::clone(&self.schema);
1668        Box::pin(RecordBatchStreamAdapter::new(
1669            schema.clone(),
1670            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1671        ))
1672    }
1673}
1674
1675pub fn create_current_schemas_udf() -> ScalarUDF {
1676    // Define the function implementation
1677    let func = move |args: &[ColumnarValue]| {
1678        let args = ColumnarValue::values_to_arrays(args)?;
1679        let input = as_boolean_array(&args[0]);
1680
1681        // Create a UTF8 array with a single value
1682        let mut values = vec!["public"];
1683        // include implicit schemas
1684        if input.value(0) {
1685            values.push("information_schema");
1686            values.push("pg_catalog");
1687        }
1688
1689        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
1690
1691        let array: ArrayRef = Arc::new(list_array.build_list_array());
1692
1693        Ok(ColumnarValue::Array(array))
1694    };
1695
1696    // Wrap the implementation in a scalar function
1697    create_udf(
1698        "current_schemas",
1699        vec![DataType::Boolean],
1700        DataType::List(Arc::new(Field::new("schema", DataType::Utf8, false))),
1701        Volatility::Immutable,
1702        Arc::new(func),
1703    )
1704}
1705
1706pub fn create_current_schema_udf() -> ScalarUDF {
1707    // Define the function implementation
1708    let func = move |_args: &[ColumnarValue]| {
1709        // Create a UTF8 array with a single value
1710        let mut builder = StringBuilder::new();
1711        builder.append_value("public");
1712        let array: ArrayRef = Arc::new(builder.finish());
1713
1714        Ok(ColumnarValue::Array(array))
1715    };
1716
1717    // Wrap the implementation in a scalar function
1718    create_udf(
1719        "current_schema",
1720        vec![],
1721        DataType::Utf8,
1722        Volatility::Immutable,
1723        Arc::new(func),
1724    )
1725}
1726
1727pub fn create_version_udf() -> ScalarUDF {
1728    // Define the function implementation
1729    let func = move |_args: &[ColumnarValue]| {
1730        // Create a UTF8 array with version information
1731        let mut builder = StringBuilder::new();
1732        builder
1733            .append_value("DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu, compiled by Rust");
1734        let array: ArrayRef = Arc::new(builder.finish());
1735
1736        Ok(ColumnarValue::Array(array))
1737    };
1738
1739    // Wrap the implementation in a scalar function
1740    create_udf(
1741        "version",
1742        vec![],
1743        DataType::Utf8,
1744        Volatility::Immutable,
1745        Arc::new(func),
1746    )
1747}
1748
1749pub fn create_pg_get_userbyid_udf() -> ScalarUDF {
1750    // Define the function implementation
1751    let func = move |args: &[ColumnarValue]| {
1752        let args = ColumnarValue::values_to_arrays(args)?;
1753        let _input = &args[0]; // User OID, but we'll ignore for now
1754
1755        // Create a UTF8 array with default user name
1756        let mut builder = StringBuilder::new();
1757        builder.append_value("postgres");
1758        let array: ArrayRef = Arc::new(builder.finish());
1759
1760        Ok(ColumnarValue::Array(array))
1761    };
1762
1763    // Wrap the implementation in a scalar function
1764    create_udf(
1765        "pg_get_userbyid",
1766        vec![DataType::Int32],
1767        DataType::Utf8,
1768        Volatility::Stable,
1769        Arc::new(func),
1770    )
1771}
1772
1773pub fn create_has_table_privilege_3param_udf() -> ScalarUDF {
1774    // Define the function implementation for 3-parameter version
1775    let func = move |args: &[ColumnarValue]| {
1776        let args = ColumnarValue::values_to_arrays(args)?;
1777        let _user = &args[0]; // User (can be name or OID)
1778        let _table = &args[1]; // Table (can be name or OID)
1779        let _privilege = &args[2]; // Privilege type (SELECT, INSERT, etc.)
1780
1781        // For now, always return true (full access)
1782        let mut builder = BooleanArray::builder(1);
1783        builder.append_value(true);
1784        let array: ArrayRef = Arc::new(builder.finish());
1785
1786        Ok(ColumnarValue::Array(array))
1787    };
1788
1789    // Wrap the implementation in a scalar function
1790    create_udf(
1791        "has_table_privilege",
1792        vec![DataType::Utf8, DataType::Utf8, DataType::Utf8],
1793        DataType::Boolean,
1794        Volatility::Stable,
1795        Arc::new(func),
1796    )
1797}
1798
1799pub fn create_has_table_privilege_2param_udf() -> ScalarUDF {
1800    // Define the function implementation for 2-parameter version (current user, table, privilege)
1801    let func = move |args: &[ColumnarValue]| {
1802        let args = ColumnarValue::values_to_arrays(args)?;
1803        let _table = &args[0]; // Table (can be name or OID)
1804        let _privilege = &args[1]; // Privilege type (SELECT, INSERT, etc.)
1805
1806        // For now, always return true (full access for current user)
1807        let mut builder = BooleanArray::builder(1);
1808        builder.append_value(true);
1809        let array: ArrayRef = Arc::new(builder.finish());
1810
1811        Ok(ColumnarValue::Array(array))
1812    };
1813
1814    // Wrap the implementation in a scalar function
1815    create_udf(
1816        "has_table_privilege",
1817        vec![DataType::Utf8, DataType::Utf8],
1818        DataType::Boolean,
1819        Volatility::Stable,
1820        Arc::new(func),
1821    )
1822}
1823
1824/// Install pg_catalog and postgres UDFs to current `SessionContext`
1825pub fn setup_pg_catalog(
1826    session_context: &SessionContext,
1827    catalog_name: &str,
1828) -> Result<(), Box<DataFusionError>> {
1829    let pg_catalog = PgCatalogSchemaProvider::new(session_context.state().catalog_list().clone());
1830    session_context
1831        .catalog(catalog_name)
1832        .ok_or_else(|| {
1833            DataFusionError::Configuration(format!(
1834                "Catalog not found when registering pg_catalog: {catalog_name}"
1835            ))
1836        })?
1837        .register_schema("pg_catalog", Arc::new(pg_catalog))?;
1838
1839    session_context.register_udf(create_current_schema_udf());
1840    session_context.register_udf(create_current_schemas_udf());
1841    session_context.register_udf(create_version_udf());
1842    session_context.register_udf(create_pg_get_userbyid_udf());
1843    session_context.register_udf(create_has_table_privilege_2param_udf());
1844
1845    Ok(())
1846}