datafusion_postgres/
pg_catalog.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::arrow::array::{
5    as_boolean_array, ArrayRef, BooleanArray, Float64Array, Int16Array, Int32Array, RecordBatch,
6    StringArray, StringBuilder,
7};
8use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
9use datafusion::catalog::streaming::StreamingTable;
10use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
11use datafusion::common::utils::SingleRowListArrayBuilder;
12use datafusion::datasource::TableProvider;
13use datafusion::error::{DataFusionError, Result};
14use datafusion::execution::{SendableRecordBatchStream, TaskContext};
15use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
16use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
17use datafusion::physical_plan::streaming::PartitionStream;
18use datafusion::prelude::{create_udf, SessionContext};
19
20const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
21const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
22const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
23const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
24const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
25const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
26const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
27
28pub const PG_CATALOG_TABLES: &[&str] = &[
29    PG_CATALOG_TABLE_PG_TYPE,
30    PG_CATALOG_TABLE_PG_CLASS,
31    PG_CATALOG_TABLE_PG_ATTRIBUTE,
32    PG_CATALOG_TABLE_PG_NAMESPACE,
33    PG_CATALOG_TABLE_PG_PROC,
34    PG_CATALOG_TABLE_PG_DATABASE,
35    PG_CATALOG_TABLE_PG_AM,
36];
37
38// Create custom schema provider for pg_catalog
39#[derive(Debug)]
40pub struct PgCatalogSchemaProvider {
41    catalog_list: Arc<dyn CatalogProviderList>,
42}
43
44#[async_trait]
45impl SchemaProvider for PgCatalogSchemaProvider {
46    fn as_any(&self) -> &dyn std::any::Any {
47        self
48    }
49
50    fn table_names(&self) -> Vec<String> {
51        PG_CATALOG_TABLES.iter().map(ToString::to_string).collect()
52    }
53
54    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
55        match name.to_ascii_lowercase().as_str() {
56            PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
57            PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
58            PG_CATALOG_TABLE_PG_CLASS => {
59                let table = Arc::new(PgClassTable::new(self.catalog_list.clone()));
60                Ok(Some(Arc::new(
61                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
62                )))
63            }
64            PG_CATALOG_TABLE_PG_NAMESPACE => {
65                let table = Arc::new(PgNamespaceTable::new(self.catalog_list.clone()));
66                Ok(Some(Arc::new(
67                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
68                )))
69            }
70            PG_CATALOG_TABLE_PG_DATABASE => {
71                let table = Arc::new(PgDatabaseTable::new(self.catalog_list.clone()));
72                Ok(Some(Arc::new(
73                    StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
74                )))
75            }
76            _ => Ok(None),
77        }
78    }
79
80    fn table_exist(&self, name: &str) -> bool {
81        PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str())
82    }
83}
84
85impl PgCatalogSchemaProvider {
86    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
87        Self { catalog_list }
88    }
89
90    /// Create a mock empty table for pg_type
91    fn create_pg_type_table(&self) -> Arc<dyn TableProvider> {
92        // Define schema for pg_type
93        let schema = Arc::new(Schema::new(vec![
94            Field::new("oid", DataType::Int32, false),
95            Field::new("typname", DataType::Utf8, false),
96            Field::new("typnamespace", DataType::Int32, false),
97            Field::new("typlen", DataType::Int16, false),
98            // Add other necessary columns
99        ]));
100
101        // Create memory table with schema
102        let provider = MemTable::try_new(schema, vec![]).unwrap();
103
104        Arc::new(provider)
105    }
106
107    /// Create a mock empty table for pg_am
108    fn create_pg_am_table(&self) -> Arc<dyn TableProvider> {
109        // Define the schema for pg_am
110        // This matches PostgreSQL's pg_am table columns
111        let schema = Arc::new(Schema::new(vec![
112            Field::new("oid", DataType::Int32, false), // Object identifier
113            Field::new("amname", DataType::Utf8, false), // Name of the access method
114            Field::new("amhandler", DataType::Int32, false), // OID of handler function
115            Field::new("amtype", DataType::Utf8, false), // Type of access method (i=index, t=table)
116            Field::new("amstrategies", DataType::Int32, false), // Number of operator strategies
117            Field::new("amsupport", DataType::Int32, false), // Number of support routines
118            Field::new("amcanorder", DataType::Boolean, false), // Does AM support ordered scans?
119            Field::new("amcanorderbyop", DataType::Boolean, false), // Does AM support order by operator result?
120            Field::new("amcanbackward", DataType::Boolean, false), // Does AM support backward scanning?
121            Field::new("amcanunique", DataType::Boolean, false), // Does AM support unique indexes?
122            Field::new("amcanmulticol", DataType::Boolean, false), // Does AM support multi-column indexes?
123            Field::new("amoptionalkey", DataType::Boolean, false), // Can first index column be omitted in search?
124            Field::new("amsearcharray", DataType::Boolean, false), // Does AM support ScalarArrayOpExpr searches?
125            Field::new("amsearchnulls", DataType::Boolean, false), // Does AM support searching for NULL/NOT NULL?
126            Field::new("amstorage", DataType::Boolean, false), // Can storage type differ from column type?
127            Field::new("amclusterable", DataType::Boolean, false), // Can index be clustered on?
128            Field::new("ampredlocks", DataType::Boolean, false), // Does AM manage fine-grained predicate locks?
129            Field::new("amcanparallel", DataType::Boolean, false), // Does AM support parallel scan?
130            Field::new("amcanbeginscan", DataType::Boolean, false), // Does AM support BRIN index scans?
131            Field::new("amcanmarkpos", DataType::Boolean, false), // Does AM support mark/restore positions?
132            Field::new("amcanfetch", DataType::Boolean, false), // Does AM support fetching specific tuples?
133            Field::new("amkeytype", DataType::Int32, false),    // Type of data in index
134        ]));
135
136        // Create memory table with schema
137        let provider = MemTable::try_new(schema, vec![]).unwrap();
138
139        Arc::new(provider)
140    }
141}
142
143#[derive(Debug)]
144struct PgClassTable {
145    schema: SchemaRef,
146    catalog_list: Arc<dyn CatalogProviderList>,
147}
148
149impl PgClassTable {
150    fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgClassTable {
151        // Define the schema for pg_class
152        // This matches key columns from PostgreSQL's pg_class
153        let schema = Arc::new(Schema::new(vec![
154            Field::new("oid", DataType::Int32, false), // Object identifier
155            Field::new("relname", DataType::Utf8, false), // Name of the table, index, view, etc.
156            Field::new("relnamespace", DataType::Int32, false), // OID of the namespace that contains this relation
157            Field::new("reltype", DataType::Int32, false), // OID of the data type (composite type) this table describes
158            Field::new("reloftype", DataType::Int32, true), // OID of the composite type for typed table, 0 otherwise
159            Field::new("relowner", DataType::Int32, false), // Owner of the relation
160            Field::new("relam", DataType::Int32, false), // If this is an index, the access method used
161            Field::new("relfilenode", DataType::Int32, false), // Name of the on-disk file of this relation
162            Field::new("reltablespace", DataType::Int32, false), // Tablespace OID for this relation
163            Field::new("relpages", DataType::Int32, false), // Size of the on-disk representation in pages
164            Field::new("reltuples", DataType::Float64, false), // Number of tuples
165            Field::new("relallvisible", DataType::Int32, false), // Number of all-visible pages
166            Field::new("reltoastrelid", DataType::Int32, false), // OID of the TOAST table
167            Field::new("relhasindex", DataType::Boolean, false), // True if this is a table and it has (or recently had) any indexes
168            Field::new("relisshared", DataType::Boolean, false), // True if this table is shared across all databases
169            Field::new("relpersistence", DataType::Utf8, false), // p=permanent table, u=unlogged table, t=temporary table
170            Field::new("relkind", DataType::Utf8, false), // r=ordinary table, i=index, S=sequence, v=view, etc.
171            Field::new("relnatts", DataType::Int16, false), // Number of user columns
172            Field::new("relchecks", DataType::Int16, false), // Number of CHECK constraints
173            Field::new("relhasrules", DataType::Boolean, false), // True if table has (or once had) rules
174            Field::new("relhastriggers", DataType::Boolean, false), // True if table has (or once had) triggers
175            Field::new("relhassubclass", DataType::Boolean, false), // True if table or index has (or once had) any inheritance children
176            Field::new("relrowsecurity", DataType::Boolean, false), // True if row security is enabled
177            Field::new("relforcerowsecurity", DataType::Boolean, false), // True if row security forced for owners
178            Field::new("relispopulated", DataType::Boolean, false), // True if relation is populated (not true for some materialized views)
179            Field::new("relreplident", DataType::Utf8, false), // Columns used to form "replica identity" for rows
180            Field::new("relispartition", DataType::Boolean, false), // True if table is a partition
181            Field::new("relrewrite", DataType::Int32, true), // OID of a rule that rewrites this relation
182            Field::new("relfrozenxid", DataType::Int32, false), // All transaction IDs before this have been replaced with a permanent ("frozen") transaction ID
183            Field::new("relminmxid", DataType::Int32, false), // All Multixact IDs before this have been replaced with a transaction ID
184        ]));
185
186        Self {
187            schema,
188            catalog_list,
189        }
190    }
191
192    /// Generate record batches based on the current state of the catalog
193    async fn get_data(
194        schema: SchemaRef,
195        catalog_list: Arc<dyn CatalogProviderList>,
196    ) -> Result<RecordBatch> {
197        // Vectors to store column data
198        let mut oids = Vec::new();
199        let mut relnames = Vec::new();
200        let mut relnamespaces = Vec::new();
201        let mut reltypes = Vec::new();
202        let mut reloftypes = Vec::new();
203        let mut relowners = Vec::new();
204        let mut relams = Vec::new();
205        let mut relfilenodes = Vec::new();
206        let mut reltablespaces = Vec::new();
207        let mut relpages = Vec::new();
208        let mut reltuples = Vec::new();
209        let mut relallvisibles = Vec::new();
210        let mut reltoastrelids = Vec::new();
211        let mut relhasindexes = Vec::new();
212        let mut relisshareds = Vec::new();
213        let mut relpersistences = Vec::new();
214        let mut relkinds = Vec::new();
215        let mut relnattses = Vec::new();
216        let mut relcheckses = Vec::new();
217        let mut relhasruleses = Vec::new();
218        let mut relhastriggersses = Vec::new();
219        let mut relhassubclasses = Vec::new();
220        let mut relrowsecurities = Vec::new();
221        let mut relforcerowsecurities = Vec::new();
222        let mut relispopulateds = Vec::new();
223        let mut relreplidents = Vec::new();
224        let mut relispartitions = Vec::new();
225        let mut relrewrites = Vec::new();
226        let mut relfrozenxids = Vec::new();
227        let mut relminmxids = Vec::new();
228
229        // Start OID counter (this is simplistic and would need to be more robust in practice)
230        let mut next_oid = 10000;
231
232        // Iterate through all catalogs and schemas
233        for catalog_name in catalog_list.catalog_names() {
234            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
235                for schema_name in catalog.schema_names() {
236                    if let Some(schema) = catalog.schema(&schema_name) {
237                        let schema_oid = next_oid;
238                        next_oid += 1;
239
240                        // Add an entry for the schema itself (as a namespace)
241                        // (In a full implementation, this would go in pg_namespace)
242
243                        // Now process all tables in this schema
244                        for table_name in schema.table_names() {
245                            let table_oid = next_oid;
246                            next_oid += 1;
247
248                            if let Some(table) = schema.table(&table_name).await? {
249                                // TODO: correct table type
250                                let table_type = "r";
251
252                                // Get column count from schema
253                                let column_count = table.schema().fields().len() as i16;
254
255                                // Add table entry
256                                oids.push(table_oid);
257                                relnames.push(table_name.clone());
258                                relnamespaces.push(schema_oid);
259                                reltypes.push(0); // Simplified: we're not tracking data types
260                                reloftypes.push(None);
261                                relowners.push(0); // Simplified: no owner tracking
262                                relams.push(0); // Default access method
263                                relfilenodes.push(table_oid); // Use OID as filenode
264                                reltablespaces.push(0); // Default tablespace
265                                relpages.push(1); // Default page count
266                                reltuples.push(0.0); // No row count stats
267                                relallvisibles.push(0);
268                                reltoastrelids.push(0);
269                                relhasindexes.push(false);
270                                relisshareds.push(false);
271                                relpersistences.push("p".to_string()); // Permanent
272                                relkinds.push(table_type.to_string());
273                                relnattses.push(column_count);
274                                relcheckses.push(0);
275                                relhasruleses.push(false);
276                                relhastriggersses.push(false);
277                                relhassubclasses.push(false);
278                                relrowsecurities.push(false);
279                                relforcerowsecurities.push(false);
280                                relispopulateds.push(true);
281                                relreplidents.push("d".to_string()); // Default
282                                relispartitions.push(false);
283                                relrewrites.push(None);
284                                relfrozenxids.push(0);
285                                relminmxids.push(0);
286                            }
287                        }
288                    }
289                }
290            }
291        }
292
293        // Create Arrow arrays from the collected data
294        let arrays: Vec<ArrayRef> = vec![
295            Arc::new(Int32Array::from(oids)),
296            Arc::new(StringArray::from(relnames)),
297            Arc::new(Int32Array::from(relnamespaces)),
298            Arc::new(Int32Array::from(reltypes)),
299            Arc::new(Int32Array::from_iter(reloftypes.into_iter())),
300            Arc::new(Int32Array::from(relowners)),
301            Arc::new(Int32Array::from(relams)),
302            Arc::new(Int32Array::from(relfilenodes)),
303            Arc::new(Int32Array::from(reltablespaces)),
304            Arc::new(Int32Array::from(relpages)),
305            Arc::new(Float64Array::from_iter(reltuples.into_iter())),
306            Arc::new(Int32Array::from(relallvisibles)),
307            Arc::new(Int32Array::from(reltoastrelids)),
308            Arc::new(BooleanArray::from(relhasindexes)),
309            Arc::new(BooleanArray::from(relisshareds)),
310            Arc::new(StringArray::from(relpersistences)),
311            Arc::new(StringArray::from(relkinds)),
312            Arc::new(Int16Array::from(relnattses)),
313            Arc::new(Int16Array::from(relcheckses)),
314            Arc::new(BooleanArray::from(relhasruleses)),
315            Arc::new(BooleanArray::from(relhastriggersses)),
316            Arc::new(BooleanArray::from(relhassubclasses)),
317            Arc::new(BooleanArray::from(relrowsecurities)),
318            Arc::new(BooleanArray::from(relforcerowsecurities)),
319            Arc::new(BooleanArray::from(relispopulateds)),
320            Arc::new(StringArray::from(relreplidents)),
321            Arc::new(BooleanArray::from(relispartitions)),
322            Arc::new(Int32Array::from_iter(relrewrites.into_iter())),
323            Arc::new(Int32Array::from(relfrozenxids)),
324            Arc::new(Int32Array::from(relminmxids)),
325        ];
326
327        // Create a record batch
328        let batch = RecordBatch::try_new(schema.clone(), arrays)?;
329
330        Ok(batch)
331    }
332}
333
334impl PartitionStream for PgClassTable {
335    fn schema(&self) -> &SchemaRef {
336        &self.schema
337    }
338
339    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
340        let catalog_list = self.catalog_list.clone();
341        let schema = Arc::clone(&self.schema);
342        Box::pin(RecordBatchStreamAdapter::new(
343            schema.clone(),
344            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
345        ))
346    }
347}
348
349#[derive(Debug)]
350struct PgNamespaceTable {
351    schema: SchemaRef,
352    catalog_list: Arc<dyn CatalogProviderList>,
353}
354
355impl PgNamespaceTable {
356    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
357        // Define the schema for pg_namespace
358        // This matches the columns from PostgreSQL's pg_namespace
359        let schema = Arc::new(Schema::new(vec![
360            Field::new("oid", DataType::Int32, false), // Object identifier
361            Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema)
362            Field::new("nspowner", DataType::Int32, false), // Owner of the namespace
363            Field::new("nspacl", DataType::Utf8, true), // Access privileges
364            Field::new("options", DataType::Utf8, true), // Schema-level options
365        ]));
366
367        Self {
368            schema,
369            catalog_list,
370        }
371    }
372
373    /// Generate record batches based on the current state of the catalog
374    async fn get_data(
375        schema: SchemaRef,
376        catalog_list: Arc<dyn CatalogProviderList>,
377    ) -> Result<RecordBatch> {
378        // Vectors to store column data
379        let mut oids = Vec::new();
380        let mut nspnames = Vec::new();
381        let mut nspowners = Vec::new();
382        let mut nspacls: Vec<Option<String>> = Vec::new();
383        let mut options: Vec<Option<String>> = Vec::new();
384
385        // Start OID counter (should be consistent with the values used in pg_class)
386        let mut next_oid = 10000;
387
388        // Add standard PostgreSQL system schemas
389        // pg_catalog schema (OID 11)
390        oids.push(11);
391        nspnames.push("pg_catalog".to_string());
392        nspowners.push(10); // Default superuser
393        nspacls.push(None);
394        options.push(None);
395
396        // public schema (OID 2200)
397        oids.push(2200);
398        nspnames.push("public".to_string());
399        nspowners.push(10); // Default superuser
400        nspacls.push(None);
401        options.push(None);
402
403        // information_schema (OID 12)
404        oids.push(12);
405        nspnames.push("information_schema".to_string());
406        nspowners.push(10); // Default superuser
407        nspacls.push(None);
408        options.push(None);
409
410        // Now add all schemas from DataFusion catalogs
411        for catalog_name in catalog_list.catalog_names() {
412            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
413                for schema_name in catalog.schema_names() {
414                    // Skip schemas we've already added as system schemas
415                    if schema_name == "pg_catalog"
416                        || schema_name == "public"
417                        || schema_name == "information_schema"
418                    {
419                        continue;
420                    }
421
422                    let schema_oid = next_oid;
423                    next_oid += 1;
424
425                    oids.push(schema_oid);
426                    nspnames.push(schema_name.clone());
427                    nspowners.push(10); // Default owner
428                    nspacls.push(None);
429                    options.push(None);
430                }
431            }
432        }
433
434        // Create Arrow arrays from the collected data
435        let arrays: Vec<ArrayRef> = vec![
436            Arc::new(Int32Array::from(oids)),
437            Arc::new(StringArray::from(nspnames)),
438            Arc::new(Int32Array::from(nspowners)),
439            Arc::new(StringArray::from_iter(nspacls.into_iter())),
440            Arc::new(StringArray::from_iter(options.into_iter())),
441        ];
442
443        // Create a full record batch
444        let batch = RecordBatch::try_new(schema.clone(), arrays)?;
445
446        Ok(batch)
447    }
448}
449
450impl PartitionStream for PgNamespaceTable {
451    fn schema(&self) -> &SchemaRef {
452        &self.schema
453    }
454
455    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
456        let catalog_list = self.catalog_list.clone();
457        let schema = Arc::clone(&self.schema);
458        Box::pin(RecordBatchStreamAdapter::new(
459            schema.clone(),
460            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
461        ))
462    }
463}
464
465#[derive(Debug)]
466struct PgDatabaseTable {
467    schema: SchemaRef,
468    catalog_list: Arc<dyn CatalogProviderList>,
469}
470
471impl PgDatabaseTable {
472    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
473        // Define the schema for pg_database
474        // This matches PostgreSQL's pg_database table columns
475        let schema = Arc::new(Schema::new(vec![
476            Field::new("oid", DataType::Int32, false), // Object identifier
477            Field::new("datname", DataType::Utf8, false), // Database name
478            Field::new("datdba", DataType::Int32, false), // Database owner's user ID
479            Field::new("encoding", DataType::Int32, false), // Character encoding
480            Field::new("datcollate", DataType::Utf8, false), // LC_COLLATE for this database
481            Field::new("datctype", DataType::Utf8, false), // LC_CTYPE for this database
482            Field::new("datistemplate", DataType::Boolean, false), // If true, database can be used as a template
483            Field::new("datallowconn", DataType::Boolean, false), // If false, no one can connect to this database
484            Field::new("datconnlimit", DataType::Int32, false), // Max number of concurrent connections (-1=no limit)
485            Field::new("datlastsysoid", DataType::Int32, false), // Last system OID in database
486            Field::new("datfrozenxid", DataType::Int32, false), // Frozen XID for this database
487            Field::new("datminmxid", DataType::Int32, false),   // Minimum multixact ID
488            Field::new("dattablespace", DataType::Int32, false), // Default tablespace for this database
489            Field::new("datacl", DataType::Utf8, true),          // Access privileges
490        ]));
491
492        Self {
493            schema,
494            catalog_list,
495        }
496    }
497
498    /// Generate record batches based on the current state of the catalog
499    async fn get_data(
500        schema: SchemaRef,
501        catalog_list: Arc<dyn CatalogProviderList>,
502    ) -> Result<RecordBatch> {
503        // Vectors to store column data
504        let mut oids = Vec::new();
505        let mut datnames = Vec::new();
506        let mut datdbas = Vec::new();
507        let mut encodings = Vec::new();
508        let mut datcollates = Vec::new();
509        let mut datctypes = Vec::new();
510        let mut datistemplates = Vec::new();
511        let mut datallowconns = Vec::new();
512        let mut datconnlimits = Vec::new();
513        let mut datlastsysoids = Vec::new();
514        let mut datfrozenxids = Vec::new();
515        let mut datminmxids = Vec::new();
516        let mut dattablespaces = Vec::new();
517        let mut datacles: Vec<Option<String>> = Vec::new();
518
519        // Start OID counter (this is simplistic and would need to be more robust in practice)
520        let mut next_oid = 16384; // Standard PostgreSQL starting OID for user databases
521
522        // Add a record for each catalog (treating catalogs as "databases")
523        for catalog_name in catalog_list.catalog_names() {
524            let oid = next_oid;
525            next_oid += 1;
526
527            oids.push(oid);
528            datnames.push(catalog_name.clone());
529            datdbas.push(10); // Default owner (assuming 10 = postgres user)
530            encodings.push(6); // 6 = UTF8 in PostgreSQL
531            datcollates.push("en_US.UTF-8".to_string()); // Default collation
532            datctypes.push("en_US.UTF-8".to_string()); // Default ctype
533            datistemplates.push(false);
534            datallowconns.push(true);
535            datconnlimits.push(-1); // No connection limit
536            datlastsysoids.push(100000); // Arbitrary last system OID
537            datfrozenxids.push(1); // Simplified transaction ID
538            datminmxids.push(1); // Simplified multixact ID
539            dattablespaces.push(1663); // Default tablespace (1663 = pg_default in PostgreSQL)
540            datacles.push(None); // No specific ACLs
541        }
542
543        // Always include a "postgres" database entry if not already present
544        // (This is for compatibility with tools that expect it)
545        if !datnames.contains(&"postgres".to_string()) {
546            let oid = next_oid;
547
548            oids.push(oid);
549            datnames.push("postgres".to_string());
550            datdbas.push(10);
551            encodings.push(6);
552            datcollates.push("en_US.UTF-8".to_string());
553            datctypes.push("en_US.UTF-8".to_string());
554            datistemplates.push(false);
555            datallowconns.push(true);
556            datconnlimits.push(-1);
557            datlastsysoids.push(100000);
558            datfrozenxids.push(1);
559            datminmxids.push(1);
560            dattablespaces.push(1663);
561            datacles.push(None);
562        }
563
564        // Create Arrow arrays from the collected data
565        let arrays: Vec<ArrayRef> = vec![
566            Arc::new(Int32Array::from(oids)),
567            Arc::new(StringArray::from(datnames)),
568            Arc::new(Int32Array::from(datdbas)),
569            Arc::new(Int32Array::from(encodings)),
570            Arc::new(StringArray::from(datcollates)),
571            Arc::new(StringArray::from(datctypes)),
572            Arc::new(BooleanArray::from(datistemplates)),
573            Arc::new(BooleanArray::from(datallowconns)),
574            Arc::new(Int32Array::from(datconnlimits)),
575            Arc::new(Int32Array::from(datlastsysoids)),
576            Arc::new(Int32Array::from(datfrozenxids)),
577            Arc::new(Int32Array::from(datminmxids)),
578            Arc::new(Int32Array::from(dattablespaces)),
579            Arc::new(StringArray::from_iter(datacles.into_iter())),
580        ];
581
582        // Create a full record batch
583        let full_batch = RecordBatch::try_new(schema.clone(), arrays)?;
584        Ok(full_batch)
585    }
586}
587
588impl PartitionStream for PgDatabaseTable {
589    fn schema(&self) -> &SchemaRef {
590        &self.schema
591    }
592
593    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
594        let catalog_list = self.catalog_list.clone();
595        let schema = Arc::clone(&self.schema);
596        Box::pin(RecordBatchStreamAdapter::new(
597            schema.clone(),
598            futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
599        ))
600    }
601}
602
603pub fn create_current_schemas_udf() -> ScalarUDF {
604    // Define the function implementation
605    let func = move |args: &[ColumnarValue]| {
606        let args = ColumnarValue::values_to_arrays(args)?;
607        let input = as_boolean_array(&args[0]);
608
609        // Create a UTF8 array with a single value
610        let mut values = vec!["public"];
611        // include implicit schemas
612        if input.value(0) {
613            values.push("information_schema");
614            values.push("pg_catalog");
615        }
616
617        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
618
619        let array: ArrayRef = Arc::new(list_array.build_list_array());
620
621        Ok(ColumnarValue::Array(array))
622    };
623
624    // Wrap the implementation in a scalar function
625    create_udf(
626        "current_schemas",
627        vec![DataType::Boolean],
628        DataType::List(Arc::new(Field::new("schema", DataType::Utf8, false))),
629        Volatility::Immutable,
630        Arc::new(func),
631    )
632}
633
634pub fn create_current_schema_udf() -> ScalarUDF {
635    // Define the function implementation
636    let func = move |_args: &[ColumnarValue]| {
637        // Create a UTF8 array with a single value
638        let mut builder = StringBuilder::new();
639        builder.append_value("public");
640        let array: ArrayRef = Arc::new(builder.finish());
641
642        Ok(ColumnarValue::Array(array))
643    };
644
645    // Wrap the implementation in a scalar function
646    create_udf(
647        "current_schema",
648        vec![],
649        DataType::Utf8,
650        Volatility::Immutable,
651        Arc::new(func),
652    )
653}
654
655/// Install pg_catalog and postgres UDFs to current `SessionContext`
656pub fn setup_pg_catalog(
657    session_context: &SessionContext,
658    catalog_name: &str,
659) -> Result<(), Box<DataFusionError>> {
660    let pg_catalog = PgCatalogSchemaProvider::new(session_context.state().catalog_list().clone());
661    session_context
662        .catalog(catalog_name)
663        .ok_or_else(|| {
664            DataFusionError::Configuration(format!(
665                "Catalog not found when registering pg_catalog: {}",
666                catalog_name
667            ))
668        })?
669        .register_schema("pg_catalog", Arc::new(pg_catalog))?;
670
671    session_context.register_udf(create_current_schema_udf());
672    session_context.register_udf(create_current_schemas_udf());
673
674    Ok(())
675}