datafusion_postgres/
pg_catalog.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use datafusion::arrow::array::{
7    as_boolean_array, ArrayRef, BooleanArray, BooleanBuilder, Float32Array, Float64Array,
8    Int16Array, Int32Array, RecordBatch, StringArray, StringBuilder,
9};
10use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
11use datafusion::catalog::streaming::StreamingTable;
12use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
13use datafusion::common::utils::SingleRowListArrayBuilder;
14use datafusion::datasource::{TableProvider, ViewTable};
15use datafusion::error::{DataFusionError, Result};
16use datafusion::execution::{SendableRecordBatchStream, TaskContext};
17use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
18use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
19use datafusion::physical_plan::streaming::PartitionStream;
20use datafusion::prelude::{create_udf, SessionContext};
21use postgres_types::Oid;
22use tokio::sync::RwLock;
23
24const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
25const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
26const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
27const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
28const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
29const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
30const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
31const PG_CATALOG_TABLE_PG_RANGE: &str = "pg_range";
32const PG_CATALOG_TABLE_PG_ENUM: &str = "pg_enum";
33const PG_CATALOG_TABLE_PG_DESCRIPTION: &str = "pg_description";
34
35/// Determine PostgreSQL table type (relkind) from DataFusion TableProvider
36fn get_table_type(table: &Arc<dyn TableProvider>) -> &'static str {
37    // Use Any trait to determine the actual table provider type
38    if table.as_any().is::<ViewTable>() {
39        "v" // view
40    } else {
41        "r" // All other table types (StreamingTable, MemTable, etc.) are treated as regular tables
42    }
43}
44
45/// Determine PostgreSQL table type (relkind) with table name context
46fn get_table_type_with_name(
47    table: &Arc<dyn TableProvider>,
48    table_name: &str,
49    schema_name: &str,
50) -> &'static str {
51    // Check if this is a system catalog table
52    if schema_name == "pg_catalog" || schema_name == "information_schema" {
53        if table_name.starts_with("pg_")
54            || table_name.contains("_table")
55            || table_name.contains("_column")
56        {
57            "r" // System tables are still regular tables in PostgreSQL
58        } else {
59            "v" // Some system objects might be views
60        }
61    } else {
62        get_table_type(table)
63    }
64}
65
66pub const PG_CATALOG_TABLES: &[&str] = &[
67    PG_CATALOG_TABLE_PG_TYPE,
68    PG_CATALOG_TABLE_PG_CLASS,
69    PG_CATALOG_TABLE_PG_ATTRIBUTE,
70    PG_CATALOG_TABLE_PG_NAMESPACE,
71    PG_CATALOG_TABLE_PG_PROC,
72    PG_CATALOG_TABLE_PG_DATABASE,
73    PG_CATALOG_TABLE_PG_AM,
74    PG_CATALOG_TABLE_PG_RANGE,
75    PG_CATALOG_TABLE_PG_ENUM,
76    PG_CATALOG_TABLE_PG_DESCRIPTION,
77];
78
79// Data structure to hold pg_type table data
80#[derive(Debug)]
81struct PgTypesData {
82    oids: Vec<i32>,
83    typnames: Vec<String>,
84    typnamespaces: Vec<i32>,
85    typowners: Vec<i32>,
86    typlens: Vec<i16>,
87    typbyvals: Vec<bool>,
88    typtypes: Vec<String>,
89    typcategories: Vec<String>,
90    typispreferreds: Vec<bool>,
91    typisdefineds: Vec<bool>,
92    typdelims: Vec<String>,
93    typrelids: Vec<i32>,
94    typelems: Vec<i32>,
95    typarrays: Vec<i32>,
96    typinputs: Vec<String>,
97    typoutputs: Vec<String>,
98    typreceives: Vec<String>,
99    typsends: Vec<String>,
100    typmodins: Vec<String>,
101    typmodouts: Vec<String>,
102    typanalyzes: Vec<String>,
103    typaligns: Vec<String>,
104    typstorages: Vec<String>,
105    typnotnulls: Vec<bool>,
106    typbasetypes: Vec<i32>,
107    typtymods: Vec<i32>,
108    typndimss: Vec<i32>,
109    typcollations: Vec<i32>,
110    typdefaultbins: Vec<Option<String>>,
111    typdefaults: Vec<Option<String>>,
112}
113
114impl PgTypesData {
115    fn new() -> Self {
116        Self {
117            oids: Vec::new(),
118            typnames: Vec::new(),
119            typnamespaces: Vec::new(),
120            typowners: Vec::new(),
121            typlens: Vec::new(),
122            typbyvals: Vec::new(),
123            typtypes: Vec::new(),
124            typcategories: Vec::new(),
125            typispreferreds: Vec::new(),
126            typisdefineds: Vec::new(),
127            typdelims: Vec::new(),
128            typrelids: Vec::new(),
129            typelems: Vec::new(),
130            typarrays: Vec::new(),
131            typinputs: Vec::new(),
132            typoutputs: Vec::new(),
133            typreceives: Vec::new(),
134            typsends: Vec::new(),
135            typmodins: Vec::new(),
136            typmodouts: Vec::new(),
137            typanalyzes: Vec::new(),
138            typaligns: Vec::new(),
139            typstorages: Vec::new(),
140            typnotnulls: Vec::new(),
141            typbasetypes: Vec::new(),
142            typtymods: Vec::new(),
143            typndimss: Vec::new(),
144            typcollations: Vec::new(),
145            typdefaultbins: Vec::new(),
146            typdefaults: Vec::new(),
147        }
148    }
149
150    #[allow(clippy::too_many_arguments)]
151    fn add_type(
152        &mut self,
153        oid: i32,
154        typname: &str,
155        typnamespace: i32,
156        typowner: i32,
157        typlen: i16,
158        typbyval: bool,
159        typtype: &str,
160        typcategory: &str,
161        typispreferred: bool,
162        typisdefined: bool,
163        typdelim: &str,
164        typrelid: i32,
165        typelem: i32,
166        typarray: i32,
167        typinput: &str,
168        typoutput: &str,
169        typreceive: &str,
170        typsend: &str,
171        typmodin: &str,
172        typmodout: &str,
173        typanalyze: &str,
174        typalign: &str,
175        typstorage: &str,
176        typnotnull: bool,
177        typbasetype: i32,
178        typtypmod: i32,
179        typndims: i32,
180        typcollation: i32,
181        typdefaultbin: Option<String>,
182        typdefault: Option<String>,
183    ) {
184        self.oids.push(oid);
185        self.typnames.push(typname.to_string());
186        self.typnamespaces.push(typnamespace);
187        self.typowners.push(typowner);
188        self.typlens.push(typlen);
189        self.typbyvals.push(typbyval);
190        self.typtypes.push(typtype.to_string());
191        self.typcategories.push(typcategory.to_string());
192        self.typispreferreds.push(typispreferred);
193        self.typisdefineds.push(typisdefined);
194        self.typdelims.push(typdelim.to_string());
195        self.typrelids.push(typrelid);
196        self.typelems.push(typelem);
197        self.typarrays.push(typarray);
198        self.typinputs.push(typinput.to_string());
199        self.typoutputs.push(typoutput.to_string());
200        self.typreceives.push(typreceive.to_string());
201        self.typsends.push(typsend.to_string());
202        self.typmodins.push(typmodin.to_string());
203        self.typmodouts.push(typmodout.to_string());
204        self.typanalyzes.push(typanalyze.to_string());
205        self.typaligns.push(typalign.to_string());
206        self.typstorages.push(typstorage.to_string());
207        self.typnotnulls.push(typnotnull);
208        self.typbasetypes.push(typbasetype);
209        self.typtymods.push(typtypmod);
210        self.typndimss.push(typndims);
211        self.typcollations.push(typcollation);
212        self.typdefaultbins.push(typdefaultbin);
213        self.typdefaults.push(typdefault);
214    }
215}
216
217#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
218enum OidCacheKey {
219    Catalog(String),
220    Schema(String, String),
221    /// Table by schema and table name
222    Table(String, String, String),
223}
224
225// Create custom schema provider for pg_catalog
226#[derive(Debug)]
227pub struct PgCatalogSchemaProvider {
228    catalog_list: Arc<dyn CatalogProviderList>,
229    oid_counter: Arc<AtomicU32>,
230    oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
231}
232
233#[async_trait]
234impl SchemaProvider for PgCatalogSchemaProvider {
235    fn as_any(&self) -> &dyn std::any::Any {
236        self
237    }
238
239    fn table_names(&self) -> Vec<String> {
240        PG_CATALOG_TABLES.iter().map(ToString::to_string).collect()
241    }
242
243    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
244        match name.to_ascii_lowercase().as_str() {
245            PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
246            PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
247            PG_CATALOG_TABLE_PG_CLASS => {
248                let table = Arc::new(PgClassTable::new(
249                    self.catalog_list.clone(),
250                    self.oid_counter.clone(),
251                    self.oid_cache.clone(),
252                ));
253                Ok(Some(Arc::new(
254                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
255                )))
256            }
257            PG_CATALOG_TABLE_PG_NAMESPACE => {
258                let table = Arc::new(PgNamespaceTable::new(
259                    self.catalog_list.clone(),
260                    self.oid_counter.clone(),
261                    self.oid_cache.clone(),
262                ));
263                Ok(Some(Arc::new(
264                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
265                )))
266            }
267            PG_CATALOG_TABLE_PG_DATABASE => {
268                let table = Arc::new(PgDatabaseTable::new(
269                    self.catalog_list.clone(),
270                    self.oid_counter.clone(),
271                    self.oid_cache.clone(),
272                ));
273                Ok(Some(Arc::new(
274                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
275                )))
276            }
277            PG_CATALOG_TABLE_PG_ATTRIBUTE => {
278                let table = Arc::new(PgAttributeTable::new(self.catalog_list.clone()));
279                Ok(Some(Arc::new(
280                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
281                )))
282            }
283            PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.create_pg_proc_table())),
284            PG_CATALOG_TABLE_PG_RANGE => Ok(Some(self.create_pg_range_table())),
285            PG_CATALOG_TABLE_PG_ENUM => Ok(Some(self.create_pg_enum_table())),
286            PG_CATALOG_TABLE_PG_DESCRIPTION => Ok(Some(self.create_pg_description_table())),
287            _ => Ok(None),
288        }
289    }
290
291    fn table_exist(&self, name: &str) -> bool {
292        PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str())
293    }
294}
295
296impl PgCatalogSchemaProvider {
297    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
298        Self {
299            catalog_list,
300            oid_counter: Arc::new(AtomicU32::new(16384)),
301            oid_cache: Arc::new(RwLock::new(HashMap::new())),
302        }
303    }
304
305    /// Create a populated pg_type table with standard PostgreSQL data types
306    fn create_pg_type_table(&self) -> Arc<dyn TableProvider> {
307        // Define complete schema for pg_type (matching PostgreSQL)
308        let schema = Arc::new(Schema::new(vec![
309            Field::new("oid", DataType::Int32, false),
310            Field::new("typname", DataType::Utf8, false),
311            Field::new("typnamespace", DataType::Int32, false),
312            Field::new("typowner", DataType::Int32, false),
313            Field::new("typlen", DataType::Int16, false),
314            Field::new("typbyval", DataType::Boolean, false),
315            Field::new("typtype", DataType::Utf8, false),
316            Field::new("typcategory", DataType::Utf8, false),
317            Field::new("typispreferred", DataType::Boolean, false),
318            Field::new("typisdefined", DataType::Boolean, false),
319            Field::new("typdelim", DataType::Utf8, false),
320            Field::new("typrelid", DataType::Int32, false),
321            Field::new("typelem", DataType::Int32, false),
322            Field::new("typarray", DataType::Int32, false),
323            Field::new("typinput", DataType::Utf8, false),
324            Field::new("typoutput", DataType::Utf8, false),
325            Field::new("typreceive", DataType::Utf8, false),
326            Field::new("typsend", DataType::Utf8, false),
327            Field::new("typmodin", DataType::Utf8, false),
328            Field::new("typmodout", DataType::Utf8, false),
329            Field::new("typanalyze", DataType::Utf8, false),
330            Field::new("typalign", DataType::Utf8, false),
331            Field::new("typstorage", DataType::Utf8, false),
332            Field::new("typnotnull", DataType::Boolean, false),
333            Field::new("typbasetype", DataType::Int32, false),
334            Field::new("typtypmod", DataType::Int32, false),
335            Field::new("typndims", DataType::Int32, false),
336            Field::new("typcollation", DataType::Int32, false),
337            Field::new("typdefaultbin", DataType::Utf8, true),
338            Field::new("typdefault", DataType::Utf8, true),
339        ]));
340
341        // Create standard PostgreSQL data types
342        let pg_types_data = Self::get_standard_pg_types();
343
344        // Create RecordBatch from the data
345        let arrays: Vec<ArrayRef> = vec![
346            Arc::new(Int32Array::from(pg_types_data.oids)),
347            Arc::new(StringArray::from(pg_types_data.typnames)),
348            Arc::new(Int32Array::from(pg_types_data.typnamespaces)),
349            Arc::new(Int32Array::from(pg_types_data.typowners)),
350            Arc::new(Int16Array::from(pg_types_data.typlens)),
351            Arc::new(BooleanArray::from(pg_types_data.typbyvals)),
352            Arc::new(StringArray::from(pg_types_data.typtypes)),
353            Arc::new(StringArray::from(pg_types_data.typcategories)),
354            Arc::new(BooleanArray::from(pg_types_data.typispreferreds)),
355            Arc::new(BooleanArray::from(pg_types_data.typisdefineds)),
356            Arc::new(StringArray::from(pg_types_data.typdelims)),
357            Arc::new(Int32Array::from(pg_types_data.typrelids)),
358            Arc::new(Int32Array::from(pg_types_data.typelems)),
359            Arc::new(Int32Array::from(pg_types_data.typarrays)),
360            Arc::new(StringArray::from(pg_types_data.typinputs)),
361            Arc::new(StringArray::from(pg_types_data.typoutputs)),
362            Arc::new(StringArray::from(pg_types_data.typreceives)),
363            Arc::new(StringArray::from(pg_types_data.typsends)),
364            Arc::new(StringArray::from(pg_types_data.typmodins)),
365            Arc::new(StringArray::from(pg_types_data.typmodouts)),
366            Arc::new(StringArray::from(pg_types_data.typanalyzes)),
367            Arc::new(StringArray::from(pg_types_data.typaligns)),
368            Arc::new(StringArray::from(pg_types_data.typstorages)),
369            Arc::new(BooleanArray::from(pg_types_data.typnotnulls)),
370            Arc::new(Int32Array::from(pg_types_data.typbasetypes)),
371            Arc::new(Int32Array::from(pg_types_data.typtymods)),
372            Arc::new(Int32Array::from(pg_types_data.typndimss)),
373            Arc::new(Int32Array::from(pg_types_data.typcollations)),
374            Arc::new(StringArray::from_iter(
375                pg_types_data.typdefaultbins.into_iter(),
376            )),
377            Arc::new(StringArray::from_iter(
378                pg_types_data.typdefaults.into_iter(),
379            )),
380        ];
381
382        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
383
384        // Create memory table with populated data
385        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
386
387        Arc::new(provider)
388    }
389
390    /// Generate standard PostgreSQL data types for pg_type table
391    fn get_standard_pg_types() -> PgTypesData {
392        let mut data = PgTypesData::new();
393
394        // Basic data types commonly used
395        data.add_type(
396            16, "bool", 11, 10, 1, true, "b", "B", true, true, ",", 0, 0, 1000, "boolin",
397            "boolout", "boolrecv", "boolsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
398            None,
399        );
400        data.add_type(
401            17,
402            "bytea",
403            11,
404            10,
405            -1,
406            false,
407            "b",
408            "U",
409            false,
410            true,
411            ",",
412            0,
413            0,
414            1001,
415            "byteain",
416            "byteaout",
417            "bytearecv",
418            "byteasend",
419            "-",
420            "-",
421            "-",
422            "i",
423            "x",
424            false,
425            0,
426            -1,
427            0,
428            0,
429            None,
430            None,
431        );
432        data.add_type(
433            18, "char", 11, 10, 1, true, "b", "S", false, true, ",", 0, 0, 1002, "charin",
434            "charout", "charrecv", "charsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
435            None,
436        );
437        data.add_type(
438            19, "name", 11, 10, 64, false, "b", "S", false, true, ",", 0, 0, 1003, "namein",
439            "nameout", "namerecv", "namesend", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
440            None,
441        );
442        data.add_type(
443            20, "int8", 11, 10, 8, true, "b", "N", false, true, ",", 0, 0, 1016, "int8in",
444            "int8out", "int8recv", "int8send", "-", "-", "-", "d", "p", false, 0, -1, 0, 0, None,
445            None,
446        );
447        data.add_type(
448            21, "int2", 11, 10, 2, true, "b", "N", false, true, ",", 0, 0, 1005, "int2in",
449            "int2out", "int2recv", "int2send", "-", "-", "-", "s", "p", false, 0, -1, 0, 0, None,
450            None,
451        );
452        data.add_type(
453            23, "int4", 11, 10, 4, true, "b", "N", true, true, ",", 0, 0, 1007, "int4in",
454            "int4out", "int4recv", "int4send", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
455            None,
456        );
457        data.add_type(
458            25, "text", 11, 10, -1, false, "b", "S", true, true, ",", 0, 0, 1009, "textin",
459            "textout", "textrecv", "textsend", "-", "-", "-", "i", "x", false, 0, -1, 0, 100, None,
460            None,
461        );
462        data.add_type(
463            700,
464            "float4",
465            11,
466            10,
467            4,
468            true,
469            "b",
470            "N",
471            false,
472            true,
473            ",",
474            0,
475            0,
476            1021,
477            "float4in",
478            "float4out",
479            "float4recv",
480            "float4send",
481            "-",
482            "-",
483            "-",
484            "i",
485            "p",
486            false,
487            0,
488            -1,
489            0,
490            0,
491            None,
492            None,
493        );
494        data.add_type(
495            701,
496            "float8",
497            11,
498            10,
499            8,
500            true,
501            "b",
502            "N",
503            true,
504            true,
505            ",",
506            0,
507            0,
508            1022,
509            "float8in",
510            "float8out",
511            "float8recv",
512            "float8send",
513            "-",
514            "-",
515            "-",
516            "d",
517            "p",
518            false,
519            0,
520            -1,
521            0,
522            0,
523            None,
524            None,
525        );
526        data.add_type(
527            1043,
528            "varchar",
529            11,
530            10,
531            -1,
532            false,
533            "b",
534            "S",
535            false,
536            true,
537            ",",
538            0,
539            0,
540            1015,
541            "varcharin",
542            "varcharout",
543            "varcharrecv",
544            "varcharsend",
545            "varchartypmodin",
546            "varchartypmodout",
547            "-",
548            "i",
549            "x",
550            false,
551            0,
552            -1,
553            0,
554            100,
555            None,
556            None,
557        );
558        data.add_type(
559            1082,
560            "date",
561            11,
562            10,
563            4,
564            true,
565            "b",
566            "D",
567            false,
568            true,
569            ",",
570            0,
571            0,
572            1182,
573            "date_in",
574            "date_out",
575            "date_recv",
576            "date_send",
577            "-",
578            "-",
579            "-",
580            "i",
581            "p",
582            false,
583            0,
584            -1,
585            0,
586            0,
587            None,
588            None,
589        );
590        data.add_type(
591            1083,
592            "time",
593            11,
594            10,
595            8,
596            true,
597            "b",
598            "D",
599            false,
600            true,
601            ",",
602            0,
603            0,
604            1183,
605            "time_in",
606            "time_out",
607            "time_recv",
608            "time_send",
609            "timetypmodin",
610            "timetypmodout",
611            "-",
612            "d",
613            "p",
614            false,
615            0,
616            -1,
617            0,
618            0,
619            None,
620            None,
621        );
622        data.add_type(
623            1114,
624            "timestamp",
625            11,
626            10,
627            8,
628            true,
629            "b",
630            "D",
631            false,
632            true,
633            ",",
634            0,
635            0,
636            1115,
637            "timestamp_in",
638            "timestamp_out",
639            "timestamp_recv",
640            "timestamp_send",
641            "timestamptypmodin",
642            "timestamptypmodout",
643            "-",
644            "d",
645            "p",
646            false,
647            0,
648            -1,
649            0,
650            0,
651            None,
652            None,
653        );
654        data.add_type(
655            1184,
656            "timestamptz",
657            11,
658            10,
659            8,
660            true,
661            "b",
662            "D",
663            true,
664            true,
665            ",",
666            0,
667            0,
668            1185,
669            "timestamptz_in",
670            "timestamptz_out",
671            "timestamptz_recv",
672            "timestamptz_send",
673            "timestamptztypmodin",
674            "timestamptztypmodout",
675            "-",
676            "d",
677            "p",
678            false,
679            0,
680            -1,
681            0,
682            0,
683            None,
684            None,
685        );
686        data.add_type(
687            1700,
688            "numeric",
689            11,
690            10,
691            -1,
692            false,
693            "b",
694            "N",
695            false,
696            true,
697            ",",
698            0,
699            0,
700            1231,
701            "numeric_in",
702            "numeric_out",
703            "numeric_recv",
704            "numeric_send",
705            "numerictypmodin",
706            "numerictypmodout",
707            "-",
708            "i",
709            "m",
710            false,
711            0,
712            -1,
713            0,
714            0,
715            None,
716            None,
717        );
718
719        data
720    }
721
722    /// Create a mock empty table for pg_am
723    fn create_pg_am_table(&self) -> Arc<dyn TableProvider> {
724        // Define the schema for pg_am
725        // This matches PostgreSQL's pg_am table columns
726        let schema = Arc::new(Schema::new(vec![
727            Field::new("oid", DataType::Int32, false), // Object identifier
728            Field::new("amname", DataType::Utf8, false), // Name of the access method
729            Field::new("amhandler", DataType::Int32, false), // OID of handler function
730            Field::new("amtype", DataType::Utf8, false), // Type of access method (i=index, t=table)
731            Field::new("amstrategies", DataType::Int32, false), // Number of operator strategies
732            Field::new("amsupport", DataType::Int32, false), // Number of support routines
733            Field::new("amcanorder", DataType::Boolean, false), // Does AM support ordered scans?
734            Field::new("amcanorderbyop", DataType::Boolean, false), // Does AM support order by operator result?
735            Field::new("amcanbackward", DataType::Boolean, false), // Does AM support backward scanning?
736            Field::new("amcanunique", DataType::Boolean, false), // Does AM support unique indexes?
737            Field::new("amcanmulticol", DataType::Boolean, false), // Does AM support multi-column indexes?
738            Field::new("amoptionalkey", DataType::Boolean, false), // Can first index column be omitted in search?
739            Field::new("amsearcharray", DataType::Boolean, false), // Does AM support ScalarArrayOpExpr searches?
740            Field::new("amsearchnulls", DataType::Boolean, false), // Does AM support searching for NULL/NOT NULL?
741            Field::new("amstorage", DataType::Boolean, false), // Can storage type differ from column type?
742            Field::new("amclusterable", DataType::Boolean, false), // Can index be clustered on?
743            Field::new("ampredlocks", DataType::Boolean, false), // Does AM manage fine-grained predicate locks?
744            Field::new("amcanparallel", DataType::Boolean, false), // Does AM support parallel scan?
745            Field::new("amcanbeginscan", DataType::Boolean, false), // Does AM support BRIN index scans?
746            Field::new("amcanmarkpos", DataType::Boolean, false), // Does AM support mark/restore positions?
747            Field::new("amcanfetch", DataType::Boolean, false), // Does AM support fetching specific tuples?
748            Field::new("amkeytype", DataType::Int32, false),    // Type of data in index
749        ]));
750
751        // Create memory table with schema
752        let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
753
754        Arc::new(provider)
755    }
756
757    /// Create a mock empty table for pg_range
758    fn create_pg_range_table(&self) -> Arc<dyn TableProvider> {
759        // Define the schema for pg_range
760        // This matches PostgreSQL's pg_range table columns
761        let schema = Arc::new(Schema::new(vec![
762            Field::new("rngtypid", DataType::Int32, false), // OID of the range type
763            Field::new("rngsubtype", DataType::Int32, false), // OID of the element type (subtype) of this range type
764            Field::new("rngmultitypid", DataType::Int32, false), // OID of the multirange type for this range type
765            Field::new("rngcollation", DataType::Int32, false), // OID of the collation used for range comparisons, or zero if none
766            Field::new("rngsubopc", DataType::Int32, false), // OID of the subtype's operator class used for range comparisons
767            Field::new("rngcanonical", DataType::Int32, false), // OID of the function to convert a range value into canonical form, or zero if none
768            Field::new("rngsubdiff", DataType::Int32, false), // OID of the function to return the difference between two element values as double precision, or zero if none
769        ]));
770
771        // Create memory table with schema
772        let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
773        Arc::new(provider)
774    }
775
776    /// Create a mock empty table for pg_enum
777    fn create_pg_enum_table(&self) -> Arc<dyn TableProvider> {
778        let schema = Arc::new(Schema::new(vec![
779            Field::new("oid", DataType::Int32, false), // Row identifier
780            Field::new("enumtypid", DataType::Int32, false), // The OID of the pg_type entry owning this enum value
781            Field::new("enumsortorder", DataType::Float32, false), // The sort position of this enum value within its enum type
782            Field::new("enumlabel", DataType::Utf8, false), // The textual label for this enum value
783        ]));
784        let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
785        Arc::new(provider)
786    }
787
788    /// Create a mock empty table for pg_description
789    fn create_pg_description_table(&self) -> Arc<dyn TableProvider> {
790        let schema = Arc::new(Schema::new(vec![
791            Field::new("objoid", DataType::Int32, false),   // Oid
792            Field::new("classoid", DataType::Int32, false), // Oid of the obj class
793            Field::new("objsubid", DataType::Int32, false), // subid
794            Field::new("description", DataType::Utf8, false),
795        ]));
796        let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
797        Arc::new(provider)
798    }
799
800    /// Create a populated pg_proc table with standard PostgreSQL functions
801    fn create_pg_proc_table(&self) -> Arc<dyn TableProvider> {
802        // Define complete schema for pg_proc (matching PostgreSQL)
803        let schema = Arc::new(Schema::new(vec![
804            Field::new("oid", DataType::Int32, false), // Object identifier
805            Field::new("proname", DataType::Utf8, false), // Function name
806            Field::new("pronamespace", DataType::Int32, false), // OID of namespace containing function
807            Field::new("proowner", DataType::Int32, false),     // Owner of the function
808            Field::new("prolang", DataType::Int32, false),      // Implementation language
809            Field::new("procost", DataType::Float32, false),    // Estimated execution cost
810            Field::new("prorows", DataType::Float32, false), // Estimated result size for set-returning functions
811            Field::new("provariadic", DataType::Int32, false), // Element type of variadic array
812            Field::new("prosupport", DataType::Int32, false), // Support function OID
813            Field::new("prokind", DataType::Utf8, false), // f=function, p=procedure, a=aggregate, w=window
814            Field::new("prosecdef", DataType::Boolean, false), // Security definer flag
815            Field::new("proleakproof", DataType::Boolean, false), // Leak-proof flag
816            Field::new("proisstrict", DataType::Boolean, false), // Returns null if any argument is null
817            Field::new("proretset", DataType::Boolean, false),   // Returns a set (vs scalar)
818            Field::new("provolatile", DataType::Utf8, false), // i=immutable, s=stable, v=volatile
819            Field::new("proparallel", DataType::Utf8, false), // s=safe, r=restricted, u=unsafe
820            Field::new("pronargs", DataType::Int16, false),   // Number of input arguments
821            Field::new("pronargdefaults", DataType::Int16, false), // Number of arguments with defaults
822            Field::new("prorettype", DataType::Int32, false),      // OID of return type
823            Field::new("proargtypes", DataType::Utf8, false),      // Array of argument type OIDs
824            Field::new("proallargtypes", DataType::Utf8, true), // Array of all argument type OIDs
825            Field::new("proargmodes", DataType::Utf8, true),    // Array of argument modes
826            Field::new("proargnames", DataType::Utf8, true),    // Array of argument names
827            Field::new("proargdefaults", DataType::Utf8, true), // Expression for argument defaults
828            Field::new("protrftypes", DataType::Utf8, true),    // Transform types
829            Field::new("prosrc", DataType::Utf8, false),        // Function source code
830            Field::new("probin", DataType::Utf8, true),         // Binary file containing function
831            Field::new("prosqlbody", DataType::Utf8, true),     // SQL function body
832            Field::new("proconfig", DataType::Utf8, true),      // Configuration variables
833            Field::new("proacl", DataType::Utf8, true),         // Access privileges
834        ]));
835
836        // Create standard PostgreSQL functions
837        let pg_proc_data = Self::get_standard_pg_functions();
838
839        // Create RecordBatch from the data
840        let arrays: Vec<ArrayRef> = vec![
841            Arc::new(Int32Array::from(pg_proc_data.oids)),
842            Arc::new(StringArray::from(pg_proc_data.pronames)),
843            Arc::new(Int32Array::from(pg_proc_data.pronamespaces)),
844            Arc::new(Int32Array::from(pg_proc_data.proowners)),
845            Arc::new(Int32Array::from(pg_proc_data.prolangs)),
846            Arc::new(Float32Array::from(pg_proc_data.procosts)),
847            Arc::new(Float32Array::from(pg_proc_data.prorows)),
848            Arc::new(Int32Array::from(pg_proc_data.provariadics)),
849            Arc::new(Int32Array::from(pg_proc_data.prosupports)),
850            Arc::new(StringArray::from(pg_proc_data.prokinds)),
851            Arc::new(BooleanArray::from(pg_proc_data.prosecdefs)),
852            Arc::new(BooleanArray::from(pg_proc_data.proleakproofs)),
853            Arc::new(BooleanArray::from(pg_proc_data.proisstricts)),
854            Arc::new(BooleanArray::from(pg_proc_data.proretsets)),
855            Arc::new(StringArray::from(pg_proc_data.provolatiles)),
856            Arc::new(StringArray::from(pg_proc_data.proparallels)),
857            Arc::new(Int16Array::from(pg_proc_data.pronargs)),
858            Arc::new(Int16Array::from(pg_proc_data.pronargdefaults)),
859            Arc::new(Int32Array::from(pg_proc_data.prorettypes)),
860            Arc::new(StringArray::from(pg_proc_data.proargtypes)),
861            Arc::new(StringArray::from_iter(
862                pg_proc_data.proallargtypes.into_iter(),
863            )),
864            Arc::new(StringArray::from_iter(pg_proc_data.proargmodes.into_iter())),
865            Arc::new(StringArray::from_iter(pg_proc_data.proargnames.into_iter())),
866            Arc::new(StringArray::from_iter(
867                pg_proc_data.proargdefaults.into_iter(),
868            )),
869            Arc::new(StringArray::from_iter(pg_proc_data.protrftypes.into_iter())),
870            Arc::new(StringArray::from(pg_proc_data.prosrcs)),
871            Arc::new(StringArray::from_iter(pg_proc_data.probins.into_iter())),
872            Arc::new(StringArray::from_iter(pg_proc_data.prosqlbodys.into_iter())),
873            Arc::new(StringArray::from_iter(pg_proc_data.proconfigs.into_iter())),
874            Arc::new(StringArray::from_iter(pg_proc_data.proacls.into_iter())),
875        ];
876
877        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
878
879        // Create memory table with populated data
880        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
881
882        Arc::new(provider)
883    }
884
885    /// Generate standard PostgreSQL functions for pg_proc table
886    fn get_standard_pg_functions() -> PgProcData {
887        let mut data = PgProcData::new();
888
889        // Essential PostgreSQL functions that many tools expect
890        data.add_function(
891            1242, "boolin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
892            0, 16, "2275", None, None, None, None, None, "boolin", None, None, None, None,
893        );
894        data.add_function(
895            1243, "boolout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
896            1, 0, 2275, "16", None, None, None, None, None, "boolout", None, None, None, None,
897        );
898        data.add_function(
899            1564, "textin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
900            0, 25, "2275", None, None, None, None, None, "textin", None, None, None, None,
901        );
902        data.add_function(
903            1565, "textout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
904            1, 0, 2275, "25", None, None, None, None, None, "textout", None, None, None, None,
905        );
906        data.add_function(
907            1242,
908            "version",
909            11,
910            10,
911            12,
912            1.0,
913            0.0,
914            0,
915            0,
916            "f",
917            false,
918            true,
919            false,
920            false,
921            "s",
922            "s",
923            0,
924            0,
925            25,
926            "",
927            None,
928            None,
929            None,
930            None,
931            None,
932            "SELECT 'DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu'",
933            None,
934            None,
935            None,
936            None,
937        );
938
939        data
940    }
941}
942
943// Data structure to hold pg_proc table data
944#[derive(Debug)]
945struct PgProcData {
946    oids: Vec<i32>,
947    pronames: Vec<String>,
948    pronamespaces: Vec<i32>,
949    proowners: Vec<i32>,
950    prolangs: Vec<i32>,
951    procosts: Vec<f32>,
952    prorows: Vec<f32>,
953    provariadics: Vec<i32>,
954    prosupports: Vec<i32>,
955    prokinds: Vec<String>,
956    prosecdefs: Vec<bool>,
957    proleakproofs: Vec<bool>,
958    proisstricts: Vec<bool>,
959    proretsets: Vec<bool>,
960    provolatiles: Vec<String>,
961    proparallels: Vec<String>,
962    pronargs: Vec<i16>,
963    pronargdefaults: Vec<i16>,
964    prorettypes: Vec<i32>,
965    proargtypes: Vec<String>,
966    proallargtypes: Vec<Option<String>>,
967    proargmodes: Vec<Option<String>>,
968    proargnames: Vec<Option<String>>,
969    proargdefaults: Vec<Option<String>>,
970    protrftypes: Vec<Option<String>>,
971    prosrcs: Vec<String>,
972    probins: Vec<Option<String>>,
973    prosqlbodys: Vec<Option<String>>,
974    proconfigs: Vec<Option<String>>,
975    proacls: Vec<Option<String>>,
976}
977
978impl PgProcData {
979    fn new() -> Self {
980        Self {
981            oids: Vec::new(),
982            pronames: Vec::new(),
983            pronamespaces: Vec::new(),
984            proowners: Vec::new(),
985            prolangs: Vec::new(),
986            procosts: Vec::new(),
987            prorows: Vec::new(),
988            provariadics: Vec::new(),
989            prosupports: Vec::new(),
990            prokinds: Vec::new(),
991            prosecdefs: Vec::new(),
992            proleakproofs: Vec::new(),
993            proisstricts: Vec::new(),
994            proretsets: Vec::new(),
995            provolatiles: Vec::new(),
996            proparallels: Vec::new(),
997            pronargs: Vec::new(),
998            pronargdefaults: Vec::new(),
999            prorettypes: Vec::new(),
1000            proargtypes: Vec::new(),
1001            proallargtypes: Vec::new(),
1002            proargmodes: Vec::new(),
1003            proargnames: Vec::new(),
1004            proargdefaults: Vec::new(),
1005            protrftypes: Vec::new(),
1006            prosrcs: Vec::new(),
1007            probins: Vec::new(),
1008            prosqlbodys: Vec::new(),
1009            proconfigs: Vec::new(),
1010            proacls: Vec::new(),
1011        }
1012    }
1013
1014    #[allow(clippy::too_many_arguments)]
1015    fn add_function(
1016        &mut self,
1017        oid: i32,
1018        proname: &str,
1019        pronamespace: i32,
1020        proowner: i32,
1021        prolang: i32,
1022        procost: f32,
1023        prorows: f32,
1024        provariadic: i32,
1025        prosupport: i32,
1026        prokind: &str,
1027        prosecdef: bool,
1028        proleakproof: bool,
1029        proisstrict: bool,
1030        proretset: bool,
1031        provolatile: &str,
1032        proparallel: &str,
1033        pronargs: i16,
1034        pronargdefaults: i16,
1035        prorettype: i32,
1036        proargtypes: &str,
1037        proallargtypes: Option<String>,
1038        proargmodes: Option<String>,
1039        proargnames: Option<String>,
1040        proargdefaults: Option<String>,
1041        protrftypes: Option<String>,
1042        prosrc: &str,
1043        probin: Option<String>,
1044        prosqlbody: Option<String>,
1045        proconfig: Option<String>,
1046        proacl: Option<String>,
1047    ) {
1048        self.oids.push(oid);
1049        self.pronames.push(proname.to_string());
1050        self.pronamespaces.push(pronamespace);
1051        self.proowners.push(proowner);
1052        self.prolangs.push(prolang);
1053        self.procosts.push(procost);
1054        self.prorows.push(prorows);
1055        self.provariadics.push(provariadic);
1056        self.prosupports.push(prosupport);
1057        self.prokinds.push(prokind.to_string());
1058        self.prosecdefs.push(prosecdef);
1059        self.proleakproofs.push(proleakproof);
1060        self.proisstricts.push(proisstrict);
1061        self.proretsets.push(proretset);
1062        self.provolatiles.push(provolatile.to_string());
1063        self.proparallels.push(proparallel.to_string());
1064        self.pronargs.push(pronargs);
1065        self.pronargdefaults.push(pronargdefaults);
1066        self.prorettypes.push(prorettype);
1067        self.proargtypes.push(proargtypes.to_string());
1068        self.proallargtypes.push(proallargtypes);
1069        self.proargmodes.push(proargmodes);
1070        self.proargnames.push(proargnames);
1071        self.proargdefaults.push(proargdefaults);
1072        self.protrftypes.push(protrftypes);
1073        self.prosrcs.push(prosrc.to_string());
1074        self.probins.push(probin);
1075        self.prosqlbodys.push(prosqlbody);
1076        self.proconfigs.push(proconfig);
1077        self.proacls.push(proacl);
1078    }
1079}
1080
1081#[derive(Debug, Clone)]
1082struct PgClassTable {
1083    schema: SchemaRef,
1084    catalog_list: Arc<dyn CatalogProviderList>,
1085    oid_counter: Arc<AtomicU32>,
1086    oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1087}
1088
1089impl PgClassTable {
1090    fn new(
1091        catalog_list: Arc<dyn CatalogProviderList>,
1092        oid_counter: Arc<AtomicU32>,
1093        oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1094    ) -> PgClassTable {
1095        // Define the schema for pg_class
1096        // This matches key columns from PostgreSQL's pg_class
1097        let schema = Arc::new(Schema::new(vec![
1098            Field::new("oid", DataType::Int32, false), // Object identifier
1099            Field::new("relname", DataType::Utf8, false), // Name of the table, index, view, etc.
1100            Field::new("relnamespace", DataType::Int32, false), // OID of the namespace that contains this relation
1101            Field::new("reltype", DataType::Int32, false), // OID of the data type (composite type) this table describes
1102            Field::new("reloftype", DataType::Int32, true), // OID of the composite type for typed table, 0 otherwise
1103            Field::new("relowner", DataType::Int32, false), // Owner of the relation
1104            Field::new("relam", DataType::Int32, false), // If this is an index, the access method used
1105            Field::new("relfilenode", DataType::Int32, false), // Name of the on-disk file of this relation
1106            Field::new("reltablespace", DataType::Int32, false), // Tablespace OID for this relation
1107            Field::new("relpages", DataType::Int32, false), // Size of the on-disk representation in pages
1108            Field::new("reltuples", DataType::Float64, false), // Number of tuples
1109            Field::new("relallvisible", DataType::Int32, false), // Number of all-visible pages
1110            Field::new("reltoastrelid", DataType::Int32, false), // OID of the TOAST table
1111            Field::new("relhasindex", DataType::Boolean, false), // True if this is a table and it has (or recently had) any indexes
1112            Field::new("relisshared", DataType::Boolean, false), // True if this table is shared across all databases
1113            Field::new("relpersistence", DataType::Utf8, false), // p=permanent table, u=unlogged table, t=temporary table
1114            Field::new("relkind", DataType::Utf8, false), // r=ordinary table, i=index, S=sequence, v=view, etc.
1115            Field::new("relnatts", DataType::Int16, false), // Number of user columns
1116            Field::new("relchecks", DataType::Int16, false), // Number of CHECK constraints
1117            Field::new("relhasrules", DataType::Boolean, false), // True if table has (or once had) rules
1118            Field::new("relhastriggers", DataType::Boolean, false), // True if table has (or once had) triggers
1119            Field::new("relhassubclass", DataType::Boolean, false), // True if table or index has (or once had) any inheritance children
1120            Field::new("relrowsecurity", DataType::Boolean, false), // True if row security is enabled
1121            Field::new("relforcerowsecurity", DataType::Boolean, false), // True if row security forced for owners
1122            Field::new("relispopulated", DataType::Boolean, false), // True if relation is populated (not true for some materialized views)
1123            Field::new("relreplident", DataType::Utf8, false), // Columns used to form "replica identity" for rows
1124            Field::new("relispartition", DataType::Boolean, false), // True if table is a partition
1125            Field::new("relrewrite", DataType::Int32, true), // OID of a rule that rewrites this relation
1126            Field::new("relfrozenxid", DataType::Int32, false), // All transaction IDs before this have been replaced with a permanent ("frozen") transaction ID
1127            Field::new("relminmxid", DataType::Int32, false), // All Multixact IDs before this have been replaced with a transaction ID
1128        ]));
1129
1130        Self {
1131            schema,
1132            catalog_list,
1133            oid_counter,
1134            oid_cache,
1135        }
1136    }
1137
1138    /// Generate record batches based on the current state of the catalog
1139    async fn get_data(this: PgClassTable) -> Result<RecordBatch> {
1140        // Vectors to store column data
1141        let mut oids = Vec::new();
1142        let mut relnames = Vec::new();
1143        let mut relnamespaces = Vec::new();
1144        let mut reltypes = Vec::new();
1145        let mut reloftypes = Vec::new();
1146        let mut relowners = Vec::new();
1147        let mut relams = Vec::new();
1148        let mut relfilenodes = Vec::new();
1149        let mut reltablespaces = Vec::new();
1150        let mut relpages = Vec::new();
1151        let mut reltuples = Vec::new();
1152        let mut relallvisibles = Vec::new();
1153        let mut reltoastrelids = Vec::new();
1154        let mut relhasindexes = Vec::new();
1155        let mut relisshareds = Vec::new();
1156        let mut relpersistences = Vec::new();
1157        let mut relkinds = Vec::new();
1158        let mut relnattses = Vec::new();
1159        let mut relcheckses = Vec::new();
1160        let mut relhasruleses = Vec::new();
1161        let mut relhastriggersses = Vec::new();
1162        let mut relhassubclasses = Vec::new();
1163        let mut relrowsecurities = Vec::new();
1164        let mut relforcerowsecurities = Vec::new();
1165        let mut relispopulateds = Vec::new();
1166        let mut relreplidents = Vec::new();
1167        let mut relispartitions = Vec::new();
1168        let mut relrewrites = Vec::new();
1169        let mut relfrozenxids = Vec::new();
1170        let mut relminmxids = Vec::new();
1171
1172        let mut oid_cache = this.oid_cache.write().await;
1173        // Every time when call pg_catalog we generate a new cache and drop the
1174        // original one in case that schemas or tables were dropped.
1175        let mut swap_cache = HashMap::new();
1176
1177        // Iterate through all catalogs and schemas
1178        for catalog_name in this.catalog_list.catalog_names() {
1179            let cache_key = OidCacheKey::Catalog(catalog_name.clone());
1180            let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1181                *oid
1182            } else {
1183                this.oid_counter.fetch_add(1, Ordering::Relaxed)
1184            };
1185            swap_cache.insert(cache_key, catalog_oid);
1186
1187            if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
1188                for schema_name in catalog.schema_names() {
1189                    if let Some(schema) = catalog.schema(&schema_name) {
1190                        let cache_key =
1191                            OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
1192                        let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1193                            *oid
1194                        } else {
1195                            this.oid_counter.fetch_add(1, Ordering::Relaxed)
1196                        };
1197                        swap_cache.insert(cache_key, schema_oid);
1198
1199                        // Add an entry for the schema itself (as a namespace)
1200                        // (In a full implementation, this would go in pg_namespace)
1201
1202                        // Now process all tables in this schema
1203                        for table_name in schema.table_names() {
1204                            let cache_key = OidCacheKey::Table(
1205                                catalog_name.clone(),
1206                                schema_name.clone(),
1207                                table_name.clone(),
1208                            );
1209                            let table_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1210                                *oid
1211                            } else {
1212                                this.oid_counter.fetch_add(1, Ordering::Relaxed)
1213                            };
1214                            swap_cache.insert(cache_key, table_oid);
1215
1216                            if let Some(table) = schema.table(&table_name).await? {
1217                                // Determine the correct table type based on the table provider and context
1218                                let table_type =
1219                                    get_table_type_with_name(&table, &table_name, &schema_name);
1220
1221                                // Get column count from schema
1222                                let column_count = table.schema().fields().len() as i16;
1223
1224                                // Add table entry
1225                                oids.push(table_oid as i32);
1226                                relnames.push(table_name.clone());
1227                                relnamespaces.push(schema_oid as i32);
1228                                reltypes.push(0); // Simplified: we're not tracking data types
1229                                reloftypes.push(None);
1230                                relowners.push(0); // Simplified: no owner tracking
1231                                relams.push(0); // Default access method
1232                                relfilenodes.push(table_oid as i32); // Use OID as filenode
1233                                reltablespaces.push(0); // Default tablespace
1234                                relpages.push(1); // Default page count
1235                                reltuples.push(0.0); // No row count stats
1236                                relallvisibles.push(0);
1237                                reltoastrelids.push(0);
1238                                relhasindexes.push(false);
1239                                relisshareds.push(false);
1240                                relpersistences.push("p".to_string()); // Permanent
1241                                relkinds.push(table_type.to_string());
1242                                relnattses.push(column_count);
1243                                relcheckses.push(0);
1244                                relhasruleses.push(false);
1245                                relhastriggersses.push(false);
1246                                relhassubclasses.push(false);
1247                                relrowsecurities.push(false);
1248                                relforcerowsecurities.push(false);
1249                                relispopulateds.push(true);
1250                                relreplidents.push("d".to_string()); // Default
1251                                relispartitions.push(false);
1252                                relrewrites.push(None);
1253                                relfrozenxids.push(0);
1254                                relminmxids.push(0);
1255                            }
1256                        }
1257                    }
1258                }
1259            }
1260        }
1261
1262        *oid_cache = swap_cache;
1263
1264        // Create Arrow arrays from the collected data
1265        let arrays: Vec<ArrayRef> = vec![
1266            Arc::new(Int32Array::from(oids)),
1267            Arc::new(StringArray::from(relnames)),
1268            Arc::new(Int32Array::from(relnamespaces)),
1269            Arc::new(Int32Array::from(reltypes)),
1270            Arc::new(Int32Array::from_iter(reloftypes.into_iter())),
1271            Arc::new(Int32Array::from(relowners)),
1272            Arc::new(Int32Array::from(relams)),
1273            Arc::new(Int32Array::from(relfilenodes)),
1274            Arc::new(Int32Array::from(reltablespaces)),
1275            Arc::new(Int32Array::from(relpages)),
1276            Arc::new(Float64Array::from_iter(reltuples.into_iter())),
1277            Arc::new(Int32Array::from(relallvisibles)),
1278            Arc::new(Int32Array::from(reltoastrelids)),
1279            Arc::new(BooleanArray::from(relhasindexes)),
1280            Arc::new(BooleanArray::from(relisshareds)),
1281            Arc::new(StringArray::from(relpersistences)),
1282            Arc::new(StringArray::from(relkinds)),
1283            Arc::new(Int16Array::from(relnattses)),
1284            Arc::new(Int16Array::from(relcheckses)),
1285            Arc::new(BooleanArray::from(relhasruleses)),
1286            Arc::new(BooleanArray::from(relhastriggersses)),
1287            Arc::new(BooleanArray::from(relhassubclasses)),
1288            Arc::new(BooleanArray::from(relrowsecurities)),
1289            Arc::new(BooleanArray::from(relforcerowsecurities)),
1290            Arc::new(BooleanArray::from(relispopulateds)),
1291            Arc::new(StringArray::from(relreplidents)),
1292            Arc::new(BooleanArray::from(relispartitions)),
1293            Arc::new(Int32Array::from_iter(relrewrites.into_iter())),
1294            Arc::new(Int32Array::from(relfrozenxids)),
1295            Arc::new(Int32Array::from(relminmxids)),
1296        ];
1297
1298        // Create a record batch
1299        let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1300
1301        Ok(batch)
1302    }
1303}
1304
1305impl PartitionStream for PgClassTable {
1306    fn schema(&self) -> &SchemaRef {
1307        &self.schema
1308    }
1309
1310    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1311        let this = self.clone();
1312        Box::pin(RecordBatchStreamAdapter::new(
1313            this.schema.clone(),
1314            futures::stream::once(async move { PgClassTable::get_data(this).await }),
1315        ))
1316    }
1317}
1318
1319#[derive(Debug, Clone)]
1320struct PgNamespaceTable {
1321    schema: SchemaRef,
1322    catalog_list: Arc<dyn CatalogProviderList>,
1323    oid_counter: Arc<AtomicU32>,
1324    oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1325}
1326
1327impl PgNamespaceTable {
1328    pub fn new(
1329        catalog_list: Arc<dyn CatalogProviderList>,
1330        oid_counter: Arc<AtomicU32>,
1331        oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1332    ) -> Self {
1333        // Define the schema for pg_namespace
1334        // This matches the columns from PostgreSQL's pg_namespace
1335        let schema = Arc::new(Schema::new(vec![
1336            Field::new("oid", DataType::Int32, false), // Object identifier
1337            Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema)
1338            Field::new("nspowner", DataType::Int32, false), // Owner of the namespace
1339            Field::new("nspacl", DataType::Utf8, true), // Access privileges
1340            Field::new("options", DataType::Utf8, true), // Schema-level options
1341        ]));
1342
1343        Self {
1344            schema,
1345            catalog_list,
1346            oid_counter,
1347            oid_cache,
1348        }
1349    }
1350
1351    /// Generate record batches based on the current state of the catalog
1352    async fn get_data(this: PgNamespaceTable) -> Result<RecordBatch> {
1353        // Vectors to store column data
1354        let mut oids = Vec::new();
1355        let mut nspnames = Vec::new();
1356        let mut nspowners = Vec::new();
1357        let mut nspacls: Vec<Option<String>> = Vec::new();
1358        let mut options: Vec<Option<String>> = Vec::new();
1359
1360        // to store all schema-oid mapping temporarily before adding to global oid cache
1361        let mut schema_oid_cache = HashMap::new();
1362
1363        let mut oid_cache = this.oid_cache.write().await;
1364
1365        // Now add all schemas from DataFusion catalogs
1366        for catalog_name in this.catalog_list.catalog_names() {
1367            if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
1368                for schema_name in catalog.schema_names() {
1369                    let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
1370                    let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1371                        *oid
1372                    } else {
1373                        this.oid_counter.fetch_add(1, Ordering::Relaxed)
1374                    };
1375                    schema_oid_cache.insert(cache_key, schema_oid);
1376
1377                    oids.push(schema_oid as i32);
1378                    nspnames.push(schema_name.clone());
1379                    nspowners.push(10); // Default owner
1380                    nspacls.push(None);
1381                    options.push(None);
1382                }
1383            }
1384        }
1385
1386        // remove all schema cache and table of the schema which is no longer exists
1387        oid_cache.retain(|key, _| match key {
1388            OidCacheKey::Catalog(..) => true,
1389            OidCacheKey::Schema(..) => false,
1390            OidCacheKey::Table(catalog, schema_name, _) => schema_oid_cache
1391                .contains_key(&OidCacheKey::Schema(catalog.clone(), schema_name.clone())),
1392        });
1393        // add new schema cache
1394        oid_cache.extend(schema_oid_cache);
1395
1396        // Create Arrow arrays from the collected data
1397        let arrays: Vec<ArrayRef> = vec![
1398            Arc::new(Int32Array::from(oids)),
1399            Arc::new(StringArray::from(nspnames)),
1400            Arc::new(Int32Array::from(nspowners)),
1401            Arc::new(StringArray::from_iter(nspacls.into_iter())),
1402            Arc::new(StringArray::from_iter(options.into_iter())),
1403        ];
1404
1405        // Create a full record batch
1406        let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1407
1408        Ok(batch)
1409    }
1410}
1411
1412impl PartitionStream for PgNamespaceTable {
1413    fn schema(&self) -> &SchemaRef {
1414        &self.schema
1415    }
1416
1417    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1418        let this = self.clone();
1419        Box::pin(RecordBatchStreamAdapter::new(
1420            this.schema.clone(),
1421            futures::stream::once(async move { Self::get_data(this).await }),
1422        ))
1423    }
1424}
1425
1426#[derive(Debug, Clone)]
1427struct PgDatabaseTable {
1428    schema: SchemaRef,
1429    catalog_list: Arc<dyn CatalogProviderList>,
1430    oid_counter: Arc<AtomicU32>,
1431    oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1432}
1433
1434impl PgDatabaseTable {
1435    pub fn new(
1436        catalog_list: Arc<dyn CatalogProviderList>,
1437        oid_counter: Arc<AtomicU32>,
1438        oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1439    ) -> Self {
1440        // Define the schema for pg_database
1441        // This matches PostgreSQL's pg_database table columns
1442        let schema = Arc::new(Schema::new(vec![
1443            Field::new("oid", DataType::Int32, false), // Object identifier
1444            Field::new("datname", DataType::Utf8, false), // Database name
1445            Field::new("datdba", DataType::Int32, false), // Database owner's user ID
1446            Field::new("encoding", DataType::Int32, false), // Character encoding
1447            Field::new("datcollate", DataType::Utf8, false), // LC_COLLATE for this database
1448            Field::new("datctype", DataType::Utf8, false), // LC_CTYPE for this database
1449            Field::new("datistemplate", DataType::Boolean, false), // If true, database can be used as a template
1450            Field::new("datallowconn", DataType::Boolean, false), // If false, no one can connect to this database
1451            Field::new("datconnlimit", DataType::Int32, false), // Max number of concurrent connections (-1=no limit)
1452            Field::new("datlastsysoid", DataType::Int32, false), // Last system OID in database
1453            Field::new("datfrozenxid", DataType::Int32, false), // Frozen XID for this database
1454            Field::new("datminmxid", DataType::Int32, false),   // Minimum multixact ID
1455            Field::new("dattablespace", DataType::Int32, false), // Default tablespace for this database
1456            Field::new("datacl", DataType::Utf8, true),          // Access privileges
1457        ]));
1458
1459        Self {
1460            schema,
1461            catalog_list,
1462            oid_counter,
1463            oid_cache,
1464        }
1465    }
1466
1467    /// Generate record batches based on the current state of the catalog
1468    async fn get_data(this: PgDatabaseTable) -> Result<RecordBatch> {
1469        // Vectors to store column data
1470        let mut oids = Vec::new();
1471        let mut datnames = Vec::new();
1472        let mut datdbas = Vec::new();
1473        let mut encodings = Vec::new();
1474        let mut datcollates = Vec::new();
1475        let mut datctypes = Vec::new();
1476        let mut datistemplates = Vec::new();
1477        let mut datallowconns = Vec::new();
1478        let mut datconnlimits = Vec::new();
1479        let mut datlastsysoids = Vec::new();
1480        let mut datfrozenxids = Vec::new();
1481        let mut datminmxids = Vec::new();
1482        let mut dattablespaces = Vec::new();
1483        let mut datacles: Vec<Option<String>> = Vec::new();
1484
1485        // to store all schema-oid mapping temporarily before adding to global oid cache
1486        let mut catalog_oid_cache = HashMap::new();
1487
1488        let mut oid_cache = this.oid_cache.write().await;
1489
1490        // Add a record for each catalog (treating catalogs as "databases")
1491        for catalog_name in this.catalog_list.catalog_names() {
1492            let cache_key = OidCacheKey::Catalog(catalog_name.clone());
1493            let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1494                *oid
1495            } else {
1496                this.oid_counter.fetch_add(1, Ordering::Relaxed)
1497            };
1498            catalog_oid_cache.insert(cache_key, catalog_oid);
1499
1500            oids.push(catalog_oid as i32);
1501            datnames.push(catalog_name.clone());
1502            datdbas.push(10); // Default owner (assuming 10 = postgres user)
1503            encodings.push(6); // 6 = UTF8 in PostgreSQL
1504            datcollates.push("en_US.UTF-8".to_string()); // Default collation
1505            datctypes.push("en_US.UTF-8".to_string()); // Default ctype
1506            datistemplates.push(false);
1507            datallowconns.push(true);
1508            datconnlimits.push(-1); // No connection limit
1509            datlastsysoids.push(100000); // Arbitrary last system OID
1510            datfrozenxids.push(1); // Simplified transaction ID
1511            datminmxids.push(1); // Simplified multixact ID
1512            dattablespaces.push(1663); // Default tablespace (1663 = pg_default in PostgreSQL)
1513            datacles.push(None); // No specific ACLs
1514        }
1515
1516        // Always include a "postgres" database entry if not already present
1517        // (This is for compatibility with tools that expect it)
1518        let default_datname = "postgres".to_string();
1519        if !datnames.contains(&default_datname) {
1520            let cache_key = OidCacheKey::Catalog(default_datname.clone());
1521            let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1522                *oid
1523            } else {
1524                this.oid_counter.fetch_add(1, Ordering::Relaxed)
1525            };
1526            catalog_oid_cache.insert(cache_key, catalog_oid);
1527
1528            oids.push(catalog_oid as i32);
1529            datnames.push(default_datname);
1530            datdbas.push(10);
1531            encodings.push(6);
1532            datcollates.push("en_US.UTF-8".to_string());
1533            datctypes.push("en_US.UTF-8".to_string());
1534            datistemplates.push(false);
1535            datallowconns.push(true);
1536            datconnlimits.push(-1);
1537            datlastsysoids.push(100000);
1538            datfrozenxids.push(1);
1539            datminmxids.push(1);
1540            dattablespaces.push(1663);
1541            datacles.push(None);
1542        }
1543
1544        // Create Arrow arrays from the collected data
1545        let arrays: Vec<ArrayRef> = vec![
1546            Arc::new(Int32Array::from(oids)),
1547            Arc::new(StringArray::from(datnames)),
1548            Arc::new(Int32Array::from(datdbas)),
1549            Arc::new(Int32Array::from(encodings)),
1550            Arc::new(StringArray::from(datcollates)),
1551            Arc::new(StringArray::from(datctypes)),
1552            Arc::new(BooleanArray::from(datistemplates)),
1553            Arc::new(BooleanArray::from(datallowconns)),
1554            Arc::new(Int32Array::from(datconnlimits)),
1555            Arc::new(Int32Array::from(datlastsysoids)),
1556            Arc::new(Int32Array::from(datfrozenxids)),
1557            Arc::new(Int32Array::from(datminmxids)),
1558            Arc::new(Int32Array::from(dattablespaces)),
1559            Arc::new(StringArray::from_iter(datacles.into_iter())),
1560        ];
1561
1562        // Create a full record batch
1563        let full_batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1564
1565        // update cache
1566        // remove all schema cache and table of the schema which is no longer exists
1567        oid_cache.retain(|key, _| match key {
1568            OidCacheKey::Catalog(..) => false,
1569            OidCacheKey::Schema(catalog, ..) => {
1570                catalog_oid_cache.contains_key(&OidCacheKey::Catalog(catalog.clone()))
1571            }
1572            OidCacheKey::Table(catalog, ..) => {
1573                catalog_oid_cache.contains_key(&OidCacheKey::Catalog(catalog.clone()))
1574            }
1575        });
1576        // add new schema cache
1577        oid_cache.extend(catalog_oid_cache);
1578
1579        Ok(full_batch)
1580    }
1581}
1582
1583impl PartitionStream for PgDatabaseTable {
1584    fn schema(&self) -> &SchemaRef {
1585        &self.schema
1586    }
1587
1588    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1589        let this = self.clone();
1590        Box::pin(RecordBatchStreamAdapter::new(
1591            this.schema.clone(),
1592            futures::stream::once(async move { Self::get_data(this).await }),
1593        ))
1594    }
1595}
1596
1597#[derive(Debug)]
1598struct PgAttributeTable {
1599    schema: SchemaRef,
1600    catalog_list: Arc<dyn CatalogProviderList>,
1601}
1602
1603impl PgAttributeTable {
1604    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1605        // Define the schema for pg_attribute
1606        // This matches PostgreSQL's pg_attribute table columns
1607        let schema = Arc::new(Schema::new(vec![
1608            Field::new("attrelid", DataType::Int32, false), // OID of the relation this column belongs to
1609            Field::new("attname", DataType::Utf8, false),   // Column name
1610            Field::new("atttypid", DataType::Int32, false), // OID of the column data type
1611            Field::new("attstattarget", DataType::Int32, false), // Statistics target
1612            Field::new("attlen", DataType::Int16, false),   // Length of the type
1613            Field::new("attnum", DataType::Int16, false), // Column number (positive for regular columns)
1614            Field::new("attndims", DataType::Int32, false), // Number of dimensions for array types
1615            Field::new("attcacheoff", DataType::Int32, false), // Cache offset
1616            Field::new("atttypmod", DataType::Int32, false), // Type-specific modifier
1617            Field::new("attbyval", DataType::Boolean, false), // True if the type is pass-by-value
1618            Field::new("attalign", DataType::Utf8, false), // Type alignment
1619            Field::new("attstorage", DataType::Utf8, false), // Storage type
1620            Field::new("attcompression", DataType::Utf8, true), // Compression method
1621            Field::new("attnotnull", DataType::Boolean, false), // True if column cannot be null
1622            Field::new("atthasdef", DataType::Boolean, false), // True if column has a default value
1623            Field::new("atthasmissing", DataType::Boolean, false), // True if column has missing values
1624            Field::new("attidentity", DataType::Utf8, false),      // Identity column type
1625            Field::new("attgenerated", DataType::Utf8, false),     // Generated column type
1626            Field::new("attisdropped", DataType::Boolean, false), // True if column has been dropped
1627            Field::new("attislocal", DataType::Boolean, false), // True if column is local to this relation
1628            Field::new("attinhcount", DataType::Int32, false), // Number of direct inheritance ancestors
1629            Field::new("attcollation", DataType::Int32, false), // OID of collation
1630            Field::new("attacl", DataType::Utf8, true),        // Access privileges
1631            Field::new("attoptions", DataType::Utf8, true),    // Attribute-level options
1632            Field::new("attfdwoptions", DataType::Utf8, true), // Foreign data wrapper options
1633            Field::new("attmissingval", DataType::Utf8, true), // Missing value for added columns
1634        ]));
1635
1636        Self {
1637            schema,
1638            catalog_list,
1639        }
1640    }
1641
1642    /// Generate record batches based on the current state of the catalog
1643    async fn get_data(
1644        schema: SchemaRef,
1645        catalog_list: Arc<dyn CatalogProviderList>,
1646    ) -> Result<RecordBatch> {
1647        // Vectors to store column data
1648        let mut attrelids = Vec::new();
1649        let mut attnames = Vec::new();
1650        let mut atttypids = Vec::new();
1651        let mut attstattargets = Vec::new();
1652        let mut attlens = Vec::new();
1653        let mut attnums = Vec::new();
1654        let mut attndimss = Vec::new();
1655        let mut attcacheoffs = Vec::new();
1656        let mut atttymods = Vec::new();
1657        let mut attbyvals = Vec::new();
1658        let mut attaligns = Vec::new();
1659        let mut attstorages = Vec::new();
1660        let mut attcompressions: Vec<Option<String>> = Vec::new();
1661        let mut attnotnulls = Vec::new();
1662        let mut atthasdefs = Vec::new();
1663        let mut atthasmissings = Vec::new();
1664        let mut attidentitys = Vec::new();
1665        let mut attgenerateds = Vec::new();
1666        let mut attisdroppeds = Vec::new();
1667        let mut attislocals = Vec::new();
1668        let mut attinhcounts = Vec::new();
1669        let mut attcollations = Vec::new();
1670        let mut attacls: Vec<Option<String>> = Vec::new();
1671        let mut attoptions: Vec<Option<String>> = Vec::new();
1672        let mut attfdwoptions: Vec<Option<String>> = Vec::new();
1673        let mut attmissingvals: Vec<Option<String>> = Vec::new();
1674
1675        // Start OID counter (should be consistent with pg_class)
1676        let mut next_oid = 10000;
1677
1678        // Iterate through all catalogs and schemas
1679        for catalog_name in catalog_list.catalog_names() {
1680            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1681                for schema_name in catalog.schema_names() {
1682                    if let Some(schema_provider) = catalog.schema(&schema_name) {
1683                        // Process all tables in this schema
1684                        for table_name in schema_provider.table_names() {
1685                            let table_oid = next_oid;
1686                            next_oid += 1;
1687
1688                            if let Some(table) = schema_provider.table(&table_name).await? {
1689                                let table_schema = table.schema();
1690
1691                                // Add column entries for this table
1692                                for (column_idx, field) in table_schema.fields().iter().enumerate()
1693                                {
1694                                    let attnum = (column_idx + 1) as i16; // PostgreSQL column numbers start at 1
1695                                    let (pg_type_oid, type_len, by_val, align, storage) =
1696                                        Self::datafusion_to_pg_type(field.data_type());
1697
1698                                    attrelids.push(table_oid);
1699                                    attnames.push(field.name().clone());
1700                                    atttypids.push(pg_type_oid);
1701                                    attstattargets.push(-1); // Default statistics target
1702                                    attlens.push(type_len);
1703                                    attnums.push(attnum);
1704                                    attndimss.push(0); // No array support for now
1705                                    attcacheoffs.push(-1); // Not cached
1706                                    atttymods.push(-1); // No type modifiers
1707                                    attbyvals.push(by_val);
1708                                    attaligns.push(align.to_string());
1709                                    attstorages.push(storage.to_string());
1710                                    attcompressions.push(None); // No compression
1711                                    attnotnulls.push(!field.is_nullable());
1712                                    atthasdefs.push(false); // No default values
1713                                    atthasmissings.push(false); // No missing values
1714                                    attidentitys.push("".to_string()); // No identity columns
1715                                    attgenerateds.push("".to_string()); // No generated columns
1716                                    attisdroppeds.push(false); // Not dropped
1717                                    attislocals.push(true); // Local to this relation
1718                                    attinhcounts.push(0); // No inheritance
1719                                    attcollations.push(0); // Default collation
1720                                    attacls.push(None); // No ACLs
1721                                    attoptions.push(None); // No options
1722                                    attfdwoptions.push(None); // No FDW options
1723                                    attmissingvals.push(None); // No missing values
1724                                }
1725                            }
1726                        }
1727                    }
1728                }
1729            }
1730        }
1731
1732        // Create Arrow arrays from the collected data
1733        let arrays: Vec<ArrayRef> = vec![
1734            Arc::new(Int32Array::from(attrelids)),
1735            Arc::new(StringArray::from(attnames)),
1736            Arc::new(Int32Array::from(atttypids)),
1737            Arc::new(Int32Array::from(attstattargets)),
1738            Arc::new(Int16Array::from(attlens)),
1739            Arc::new(Int16Array::from(attnums)),
1740            Arc::new(Int32Array::from(attndimss)),
1741            Arc::new(Int32Array::from(attcacheoffs)),
1742            Arc::new(Int32Array::from(atttymods)),
1743            Arc::new(BooleanArray::from(attbyvals)),
1744            Arc::new(StringArray::from(attaligns)),
1745            Arc::new(StringArray::from(attstorages)),
1746            Arc::new(StringArray::from_iter(attcompressions.into_iter())),
1747            Arc::new(BooleanArray::from(attnotnulls)),
1748            Arc::new(BooleanArray::from(atthasdefs)),
1749            Arc::new(BooleanArray::from(atthasmissings)),
1750            Arc::new(StringArray::from(attidentitys)),
1751            Arc::new(StringArray::from(attgenerateds)),
1752            Arc::new(BooleanArray::from(attisdroppeds)),
1753            Arc::new(BooleanArray::from(attislocals)),
1754            Arc::new(Int32Array::from(attinhcounts)),
1755            Arc::new(Int32Array::from(attcollations)),
1756            Arc::new(StringArray::from_iter(attacls.into_iter())),
1757            Arc::new(StringArray::from_iter(attoptions.into_iter())),
1758            Arc::new(StringArray::from_iter(attfdwoptions.into_iter())),
1759            Arc::new(StringArray::from_iter(attmissingvals.into_iter())),
1760        ];
1761
1762        // Create a record batch
1763        let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1764        Ok(batch)
1765    }
1766
1767    /// Map DataFusion data types to PostgreSQL type information
1768    fn datafusion_to_pg_type(data_type: &DataType) -> (i32, i16, bool, &'static str, &'static str) {
1769        match data_type {
1770            DataType::Boolean => (16, 1, true, "c", "p"),    // bool
1771            DataType::Int8 => (18, 1, true, "c", "p"),       // char
1772            DataType::Int16 => (21, 2, true, "s", "p"),      // int2
1773            DataType::Int32 => (23, 4, true, "i", "p"),      // int4
1774            DataType::Int64 => (20, 8, true, "d", "p"),      // int8
1775            DataType::UInt8 => (21, 2, true, "s", "p"),      // Treat as int2
1776            DataType::UInt16 => (23, 4, true, "i", "p"),     // Treat as int4
1777            DataType::UInt32 => (20, 8, true, "d", "p"),     // Treat as int8
1778            DataType::UInt64 => (1700, -1, false, "i", "m"), // Treat as numeric
1779            DataType::Float32 => (700, 4, true, "i", "p"),   // float4
1780            DataType::Float64 => (701, 8, true, "d", "p"),   // float8
1781            DataType::Utf8 => (25, -1, false, "i", "x"),     // text
1782            DataType::LargeUtf8 => (25, -1, false, "i", "x"), // text
1783            DataType::Binary => (17, -1, false, "i", "x"),   // bytea
1784            DataType::LargeBinary => (17, -1, false, "i", "x"), // bytea
1785            DataType::Date32 => (1082, 4, true, "i", "p"),   // date
1786            DataType::Date64 => (1082, 4, true, "i", "p"),   // date
1787            DataType::Time32(_) => (1083, 8, true, "d", "p"), // time
1788            DataType::Time64(_) => (1083, 8, true, "d", "p"), // time
1789            DataType::Timestamp(_, _) => (1114, 8, true, "d", "p"), // timestamp
1790            DataType::Decimal128(_, _) => (1700, -1, false, "i", "m"), // numeric
1791            DataType::Decimal256(_, _) => (1700, -1, false, "i", "m"), // numeric
1792            _ => (25, -1, false, "i", "x"),                  // Default to text for unknown types
1793        }
1794    }
1795}
1796
1797impl PartitionStream for PgAttributeTable {
1798    fn schema(&self) -> &SchemaRef {
1799        &self.schema
1800    }
1801
1802    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1803        let catalog_list = self.catalog_list.clone();
1804        let schema = Arc::clone(&self.schema);
1805        Box::pin(RecordBatchStreamAdapter::new(
1806            schema.clone(),
1807            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1808        ))
1809    }
1810}
1811
1812pub fn create_current_schemas_udf() -> ScalarUDF {
1813    // Define the function implementation
1814    let func = move |args: &[ColumnarValue]| {
1815        let args = ColumnarValue::values_to_arrays(args)?;
1816        let input = as_boolean_array(&args[0]);
1817
1818        // Create a UTF8 array with a single value
1819        let mut values = vec!["public"];
1820        // include implicit schemas
1821        if input.value(0) {
1822            values.push("information_schema");
1823            values.push("pg_catalog");
1824        }
1825
1826        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
1827
1828        let array: ArrayRef = Arc::new(list_array.build_list_array());
1829
1830        Ok(ColumnarValue::Array(array))
1831    };
1832
1833    // Wrap the implementation in a scalar function
1834    create_udf(
1835        "current_schemas",
1836        vec![DataType::Boolean],
1837        DataType::List(Arc::new(Field::new("schema", DataType::Utf8, false))),
1838        Volatility::Immutable,
1839        Arc::new(func),
1840    )
1841}
1842
1843pub fn create_current_schema_udf() -> ScalarUDF {
1844    // Define the function implementation
1845    let func = move |_args: &[ColumnarValue]| {
1846        // Create a UTF8 array with a single value
1847        let mut builder = StringBuilder::new();
1848        builder.append_value("public");
1849        let array: ArrayRef = Arc::new(builder.finish());
1850
1851        Ok(ColumnarValue::Array(array))
1852    };
1853
1854    // Wrap the implementation in a scalar function
1855    create_udf(
1856        "current_schema",
1857        vec![],
1858        DataType::Utf8,
1859        Volatility::Immutable,
1860        Arc::new(func),
1861    )
1862}
1863
1864pub fn create_version_udf() -> ScalarUDF {
1865    // Define the function implementation
1866    let func = move |_args: &[ColumnarValue]| {
1867        // Create a UTF8 array with version information
1868        let mut builder = StringBuilder::new();
1869        // TODO: improve version string generation
1870        builder
1871            .append_value("DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu, compiled by Rust");
1872        let array: ArrayRef = Arc::new(builder.finish());
1873
1874        Ok(ColumnarValue::Array(array))
1875    };
1876
1877    // Wrap the implementation in a scalar function
1878    create_udf(
1879        "version",
1880        vec![],
1881        DataType::Utf8,
1882        Volatility::Immutable,
1883        Arc::new(func),
1884    )
1885}
1886
1887pub fn create_pg_get_userbyid_udf() -> ScalarUDF {
1888    // Define the function implementation
1889    let func = move |args: &[ColumnarValue]| {
1890        let args = ColumnarValue::values_to_arrays(args)?;
1891        let input = &args[0]; // User OID, but we'll ignore for now
1892
1893        // Create a UTF8 array with default user name
1894        let mut builder = StringBuilder::new();
1895        for _ in 0..input.len() {
1896            builder.append_value("postgres");
1897        }
1898
1899        let array: ArrayRef = Arc::new(builder.finish());
1900
1901        Ok(ColumnarValue::Array(array))
1902    };
1903
1904    // Wrap the implementation in a scalar function
1905    create_udf(
1906        "pg_catalog.pg_get_userbyid",
1907        vec![DataType::Int32],
1908        DataType::Utf8,
1909        Volatility::Stable,
1910        Arc::new(func),
1911    )
1912}
1913
1914pub fn create_pg_table_is_visible() -> ScalarUDF {
1915    // Define the function implementation
1916    let func = move |args: &[ColumnarValue]| {
1917        let args = ColumnarValue::values_to_arrays(args)?;
1918        let input = &args[0]; // Table OID
1919
1920        // Always return true
1921        let mut builder = BooleanBuilder::new();
1922        for _ in 0..input.len() {
1923            builder.append_value(true);
1924        }
1925
1926        let array: ArrayRef = Arc::new(builder.finish());
1927
1928        Ok(ColumnarValue::Array(array))
1929    };
1930
1931    // Wrap the implementation in a scalar function
1932    create_udf(
1933        "pg_catalog.pg_table_is_visible",
1934        vec![DataType::Int32],
1935        DataType::Boolean,
1936        Volatility::Stable,
1937        Arc::new(func),
1938    )
1939}
1940
1941pub fn create_has_table_privilege_3param_udf() -> ScalarUDF {
1942    // Define the function implementation for 3-parameter version
1943    let func = move |args: &[ColumnarValue]| {
1944        let args = ColumnarValue::values_to_arrays(args)?;
1945        let user = &args[0]; // User (can be name or OID)
1946        let _table = &args[1]; // Table (can be name or OID)
1947        let _privilege = &args[2]; // Privilege type (SELECT, INSERT, etc.)
1948
1949        // For now, always return true (full access)
1950        let mut builder = BooleanArray::builder(user.len());
1951        for _ in 0..user.len() {
1952            builder.append_value(true);
1953        }
1954
1955        let array: ArrayRef = Arc::new(builder.finish());
1956
1957        Ok(ColumnarValue::Array(array))
1958    };
1959
1960    // Wrap the implementation in a scalar function
1961    create_udf(
1962        "has_table_privilege",
1963        vec![DataType::Utf8, DataType::Utf8, DataType::Utf8],
1964        DataType::Boolean,
1965        Volatility::Stable,
1966        Arc::new(func),
1967    )
1968}
1969
1970pub fn create_has_table_privilege_2param_udf() -> ScalarUDF {
1971    // Define the function implementation for 2-parameter version (current user, table, privilege)
1972    let func = move |args: &[ColumnarValue]| {
1973        let args = ColumnarValue::values_to_arrays(args)?;
1974        let table = &args[0]; // Table (can be name or OID)
1975        let _privilege = &args[1]; // Privilege type (SELECT, INSERT, etc.)
1976
1977        // For now, always return true (full access for current user)
1978        let mut builder = BooleanArray::builder(table.len());
1979        for _ in 0..table.len() {
1980            builder.append_value(true);
1981        }
1982        let array: ArrayRef = Arc::new(builder.finish());
1983
1984        Ok(ColumnarValue::Array(array))
1985    };
1986
1987    // Wrap the implementation in a scalar function
1988    create_udf(
1989        "has_table_privilege",
1990        vec![DataType::Utf8, DataType::Utf8],
1991        DataType::Boolean,
1992        Volatility::Stable,
1993        Arc::new(func),
1994    )
1995}
1996
1997pub fn create_format_type_udf() -> ScalarUDF {
1998    let func = move |args: &[ColumnarValue]| {
1999        let args = ColumnarValue::values_to_arrays(args)?;
2000        let type_oids = &args[0]; // Table (can be name or OID)
2001        let _type_mods = &args[1]; // Privilege type (SELECT, INSERT, etc.)
2002
2003        // For now, always return true (full access for current user)
2004        let mut builder = StringBuilder::new();
2005        for _ in 0..type_oids.len() {
2006            builder.append_value("???");
2007        }
2008
2009        let array: ArrayRef = Arc::new(builder.finish());
2010
2011        Ok(ColumnarValue::Array(array))
2012    };
2013
2014    create_udf(
2015        "format_type",
2016        vec![DataType::Int32, DataType::Int32],
2017        DataType::Utf8,
2018        Volatility::Stable,
2019        Arc::new(func),
2020    )
2021}
2022
2023/// Install pg_catalog and postgres UDFs to current `SessionContext`
2024pub fn setup_pg_catalog(
2025    session_context: &SessionContext,
2026    catalog_name: &str,
2027) -> Result<(), Box<DataFusionError>> {
2028    let pg_catalog = PgCatalogSchemaProvider::new(session_context.state().catalog_list().clone());
2029    session_context
2030        .catalog(catalog_name)
2031        .ok_or_else(|| {
2032            DataFusionError::Configuration(format!(
2033                "Catalog not found when registering pg_catalog: {catalog_name}"
2034            ))
2035        })?
2036        .register_schema("pg_catalog", Arc::new(pg_catalog))?;
2037
2038    session_context.register_udf(create_current_schema_udf());
2039    session_context.register_udf(create_current_schemas_udf());
2040    session_context.register_udf(create_version_udf());
2041    session_context.register_udf(create_pg_get_userbyid_udf());
2042    session_context.register_udf(create_has_table_privilege_2param_udf());
2043    session_context.register_udf(create_pg_table_is_visible());
2044    session_context.register_udf(create_format_type_udf());
2045
2046    Ok(())
2047}