1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::arrow::array::{
5 as_boolean_array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
6 RecordBatch, StringArray, StringBuilder,
7};
8use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
9use datafusion::catalog::streaming::StreamingTable;
10use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
11use datafusion::common::utils::SingleRowListArrayBuilder;
12use datafusion::datasource::{TableProvider, ViewTable};
13use datafusion::error::{DataFusionError, Result};
14use datafusion::execution::{SendableRecordBatchStream, TaskContext};
15use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
16use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
17use datafusion::physical_plan::streaming::PartitionStream;
18use datafusion::prelude::{create_udf, SessionContext};
19
20const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
21const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
22const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
23const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
24const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
25const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
26const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
27
28fn get_table_type(table: &Arc<dyn TableProvider>) -> &'static str {
30 if table.as_any().is::<ViewTable>() {
32 "v" } else {
34 "r" }
36}
37
38fn get_table_type_with_name(
40 table: &Arc<dyn TableProvider>,
41 table_name: &str,
42 schema_name: &str,
43) -> &'static str {
44 if schema_name == "pg_catalog" || schema_name == "information_schema" {
46 if table_name.starts_with("pg_")
47 || table_name.contains("_table")
48 || table_name.contains("_column")
49 {
50 "r" } else {
52 "v" }
54 } else {
55 get_table_type(table)
56 }
57}
58
59pub const PG_CATALOG_TABLES: &[&str] = &[
60 PG_CATALOG_TABLE_PG_TYPE,
61 PG_CATALOG_TABLE_PG_CLASS,
62 PG_CATALOG_TABLE_PG_ATTRIBUTE,
63 PG_CATALOG_TABLE_PG_NAMESPACE,
64 PG_CATALOG_TABLE_PG_PROC,
65 PG_CATALOG_TABLE_PG_DATABASE,
66 PG_CATALOG_TABLE_PG_AM,
67];
68
69#[derive(Debug)]
71struct PgTypesData {
72 oids: Vec<i32>,
73 typnames: Vec<String>,
74 typnamespaces: Vec<i32>,
75 typowners: Vec<i32>,
76 typlens: Vec<i16>,
77 typbyvals: Vec<bool>,
78 typtypes: Vec<String>,
79 typcategories: Vec<String>,
80 typispreferreds: Vec<bool>,
81 typisdefineds: Vec<bool>,
82 typdelims: Vec<String>,
83 typrelids: Vec<i32>,
84 typelems: Vec<i32>,
85 typarrays: Vec<i32>,
86 typinputs: Vec<String>,
87 typoutputs: Vec<String>,
88 typreceives: Vec<String>,
89 typsends: Vec<String>,
90 typmodins: Vec<String>,
91 typmodouts: Vec<String>,
92 typanalyzes: Vec<String>,
93 typaligns: Vec<String>,
94 typstorages: Vec<String>,
95 typnotnulls: Vec<bool>,
96 typbasetypes: Vec<i32>,
97 typtymods: Vec<i32>,
98 typndimss: Vec<i32>,
99 typcollations: Vec<i32>,
100 typdefaultbins: Vec<Option<String>>,
101 typdefaults: Vec<Option<String>>,
102}
103
104impl PgTypesData {
105 fn new() -> Self {
106 Self {
107 oids: Vec::new(),
108 typnames: Vec::new(),
109 typnamespaces: Vec::new(),
110 typowners: Vec::new(),
111 typlens: Vec::new(),
112 typbyvals: Vec::new(),
113 typtypes: Vec::new(),
114 typcategories: Vec::new(),
115 typispreferreds: Vec::new(),
116 typisdefineds: Vec::new(),
117 typdelims: Vec::new(),
118 typrelids: Vec::new(),
119 typelems: Vec::new(),
120 typarrays: Vec::new(),
121 typinputs: Vec::new(),
122 typoutputs: Vec::new(),
123 typreceives: Vec::new(),
124 typsends: Vec::new(),
125 typmodins: Vec::new(),
126 typmodouts: Vec::new(),
127 typanalyzes: Vec::new(),
128 typaligns: Vec::new(),
129 typstorages: Vec::new(),
130 typnotnulls: Vec::new(),
131 typbasetypes: Vec::new(),
132 typtymods: Vec::new(),
133 typndimss: Vec::new(),
134 typcollations: Vec::new(),
135 typdefaultbins: Vec::new(),
136 typdefaults: Vec::new(),
137 }
138 }
139
140 #[allow(clippy::too_many_arguments)]
141 fn add_type(
142 &mut self,
143 oid: i32,
144 typname: &str,
145 typnamespace: i32,
146 typowner: i32,
147 typlen: i16,
148 typbyval: bool,
149 typtype: &str,
150 typcategory: &str,
151 typispreferred: bool,
152 typisdefined: bool,
153 typdelim: &str,
154 typrelid: i32,
155 typelem: i32,
156 typarray: i32,
157 typinput: &str,
158 typoutput: &str,
159 typreceive: &str,
160 typsend: &str,
161 typmodin: &str,
162 typmodout: &str,
163 typanalyze: &str,
164 typalign: &str,
165 typstorage: &str,
166 typnotnull: bool,
167 typbasetype: i32,
168 typtypmod: i32,
169 typndims: i32,
170 typcollation: i32,
171 typdefaultbin: Option<String>,
172 typdefault: Option<String>,
173 ) {
174 self.oids.push(oid);
175 self.typnames.push(typname.to_string());
176 self.typnamespaces.push(typnamespace);
177 self.typowners.push(typowner);
178 self.typlens.push(typlen);
179 self.typbyvals.push(typbyval);
180 self.typtypes.push(typtype.to_string());
181 self.typcategories.push(typcategory.to_string());
182 self.typispreferreds.push(typispreferred);
183 self.typisdefineds.push(typisdefined);
184 self.typdelims.push(typdelim.to_string());
185 self.typrelids.push(typrelid);
186 self.typelems.push(typelem);
187 self.typarrays.push(typarray);
188 self.typinputs.push(typinput.to_string());
189 self.typoutputs.push(typoutput.to_string());
190 self.typreceives.push(typreceive.to_string());
191 self.typsends.push(typsend.to_string());
192 self.typmodins.push(typmodin.to_string());
193 self.typmodouts.push(typmodout.to_string());
194 self.typanalyzes.push(typanalyze.to_string());
195 self.typaligns.push(typalign.to_string());
196 self.typstorages.push(typstorage.to_string());
197 self.typnotnulls.push(typnotnull);
198 self.typbasetypes.push(typbasetype);
199 self.typtymods.push(typtypmod);
200 self.typndimss.push(typndims);
201 self.typcollations.push(typcollation);
202 self.typdefaultbins.push(typdefaultbin);
203 self.typdefaults.push(typdefault);
204 }
205}
206
207#[derive(Debug)]
209pub struct PgCatalogSchemaProvider {
210 catalog_list: Arc<dyn CatalogProviderList>,
211}
212
213#[async_trait]
214impl SchemaProvider for PgCatalogSchemaProvider {
215 fn as_any(&self) -> &dyn std::any::Any {
216 self
217 }
218
219 fn table_names(&self) -> Vec<String> {
220 PG_CATALOG_TABLES.iter().map(ToString::to_string).collect()
221 }
222
223 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
224 match name.to_ascii_lowercase().as_str() {
225 PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
226 PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
227 PG_CATALOG_TABLE_PG_CLASS => {
228 let table = Arc::new(PgClassTable::new(self.catalog_list.clone()));
229 Ok(Some(Arc::new(
230 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
231 )))
232 }
233 PG_CATALOG_TABLE_PG_NAMESPACE => {
234 let table = Arc::new(PgNamespaceTable::new(self.catalog_list.clone()));
235 Ok(Some(Arc::new(
236 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
237 )))
238 }
239 PG_CATALOG_TABLE_PG_DATABASE => {
240 let table = Arc::new(PgDatabaseTable::new(self.catalog_list.clone()));
241 Ok(Some(Arc::new(
242 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
243 )))
244 }
245 PG_CATALOG_TABLE_PG_ATTRIBUTE => {
246 let table = Arc::new(PgAttributeTable::new(self.catalog_list.clone()));
247 Ok(Some(Arc::new(
248 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
249 )))
250 }
251 PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.create_pg_proc_table())),
252 _ => Ok(None),
253 }
254 }
255
256 fn table_exist(&self, name: &str) -> bool {
257 PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str())
258 }
259}
260
261impl PgCatalogSchemaProvider {
262 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
263 Self { catalog_list }
264 }
265
266 fn create_pg_type_table(&self) -> Arc<dyn TableProvider> {
268 let schema = Arc::new(Schema::new(vec![
270 Field::new("oid", DataType::Int32, false),
271 Field::new("typname", DataType::Utf8, false),
272 Field::new("typnamespace", DataType::Int32, false),
273 Field::new("typowner", DataType::Int32, false),
274 Field::new("typlen", DataType::Int16, false),
275 Field::new("typbyval", DataType::Boolean, false),
276 Field::new("typtype", DataType::Utf8, false),
277 Field::new("typcategory", DataType::Utf8, false),
278 Field::new("typispreferred", DataType::Boolean, false),
279 Field::new("typisdefined", DataType::Boolean, false),
280 Field::new("typdelim", DataType::Utf8, false),
281 Field::new("typrelid", DataType::Int32, false),
282 Field::new("typelem", DataType::Int32, false),
283 Field::new("typarray", DataType::Int32, false),
284 Field::new("typinput", DataType::Utf8, false),
285 Field::new("typoutput", DataType::Utf8, false),
286 Field::new("typreceive", DataType::Utf8, false),
287 Field::new("typsend", DataType::Utf8, false),
288 Field::new("typmodin", DataType::Utf8, false),
289 Field::new("typmodout", DataType::Utf8, false),
290 Field::new("typanalyze", DataType::Utf8, false),
291 Field::new("typalign", DataType::Utf8, false),
292 Field::new("typstorage", DataType::Utf8, false),
293 Field::new("typnotnull", DataType::Boolean, false),
294 Field::new("typbasetype", DataType::Int32, false),
295 Field::new("typtypmod", DataType::Int32, false),
296 Field::new("typndims", DataType::Int32, false),
297 Field::new("typcollation", DataType::Int32, false),
298 Field::new("typdefaultbin", DataType::Utf8, true),
299 Field::new("typdefault", DataType::Utf8, true),
300 ]));
301
302 let pg_types_data = Self::get_standard_pg_types();
304
305 let arrays: Vec<ArrayRef> = vec![
307 Arc::new(Int32Array::from(pg_types_data.oids)),
308 Arc::new(StringArray::from(pg_types_data.typnames)),
309 Arc::new(Int32Array::from(pg_types_data.typnamespaces)),
310 Arc::new(Int32Array::from(pg_types_data.typowners)),
311 Arc::new(Int16Array::from(pg_types_data.typlens)),
312 Arc::new(BooleanArray::from(pg_types_data.typbyvals)),
313 Arc::new(StringArray::from(pg_types_data.typtypes)),
314 Arc::new(StringArray::from(pg_types_data.typcategories)),
315 Arc::new(BooleanArray::from(pg_types_data.typispreferreds)),
316 Arc::new(BooleanArray::from(pg_types_data.typisdefineds)),
317 Arc::new(StringArray::from(pg_types_data.typdelims)),
318 Arc::new(Int32Array::from(pg_types_data.typrelids)),
319 Arc::new(Int32Array::from(pg_types_data.typelems)),
320 Arc::new(Int32Array::from(pg_types_data.typarrays)),
321 Arc::new(StringArray::from(pg_types_data.typinputs)),
322 Arc::new(StringArray::from(pg_types_data.typoutputs)),
323 Arc::new(StringArray::from(pg_types_data.typreceives)),
324 Arc::new(StringArray::from(pg_types_data.typsends)),
325 Arc::new(StringArray::from(pg_types_data.typmodins)),
326 Arc::new(StringArray::from(pg_types_data.typmodouts)),
327 Arc::new(StringArray::from(pg_types_data.typanalyzes)),
328 Arc::new(StringArray::from(pg_types_data.typaligns)),
329 Arc::new(StringArray::from(pg_types_data.typstorages)),
330 Arc::new(BooleanArray::from(pg_types_data.typnotnulls)),
331 Arc::new(Int32Array::from(pg_types_data.typbasetypes)),
332 Arc::new(Int32Array::from(pg_types_data.typtymods)),
333 Arc::new(Int32Array::from(pg_types_data.typndimss)),
334 Arc::new(Int32Array::from(pg_types_data.typcollations)),
335 Arc::new(StringArray::from_iter(
336 pg_types_data.typdefaultbins.into_iter(),
337 )),
338 Arc::new(StringArray::from_iter(
339 pg_types_data.typdefaults.into_iter(),
340 )),
341 ];
342
343 let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
344
345 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
347
348 Arc::new(provider)
349 }
350
351 fn get_standard_pg_types() -> PgTypesData {
353 let mut data = PgTypesData::new();
354
355 data.add_type(
357 16, "bool", 11, 10, 1, true, "b", "B", true, true, ",", 0, 0, 1000, "boolin",
358 "boolout", "boolrecv", "boolsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
359 None,
360 );
361 data.add_type(
362 17,
363 "bytea",
364 11,
365 10,
366 -1,
367 false,
368 "b",
369 "U",
370 false,
371 true,
372 ",",
373 0,
374 0,
375 1001,
376 "byteain",
377 "byteaout",
378 "bytearecv",
379 "byteasend",
380 "-",
381 "-",
382 "-",
383 "i",
384 "x",
385 false,
386 0,
387 -1,
388 0,
389 0,
390 None,
391 None,
392 );
393 data.add_type(
394 18, "char", 11, 10, 1, true, "b", "S", false, true, ",", 0, 0, 1002, "charin",
395 "charout", "charrecv", "charsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
396 None,
397 );
398 data.add_type(
399 19, "name", 11, 10, 64, false, "b", "S", false, true, ",", 0, 0, 1003, "namein",
400 "nameout", "namerecv", "namesend", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
401 None,
402 );
403 data.add_type(
404 20, "int8", 11, 10, 8, true, "b", "N", false, true, ",", 0, 0, 1016, "int8in",
405 "int8out", "int8recv", "int8send", "-", "-", "-", "d", "p", false, 0, -1, 0, 0, None,
406 None,
407 );
408 data.add_type(
409 21, "int2", 11, 10, 2, true, "b", "N", false, true, ",", 0, 0, 1005, "int2in",
410 "int2out", "int2recv", "int2send", "-", "-", "-", "s", "p", false, 0, -1, 0, 0, None,
411 None,
412 );
413 data.add_type(
414 23, "int4", 11, 10, 4, true, "b", "N", true, true, ",", 0, 0, 1007, "int4in",
415 "int4out", "int4recv", "int4send", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
416 None,
417 );
418 data.add_type(
419 25, "text", 11, 10, -1, false, "b", "S", true, true, ",", 0, 0, 1009, "textin",
420 "textout", "textrecv", "textsend", "-", "-", "-", "i", "x", false, 0, -1, 0, 100, None,
421 None,
422 );
423 data.add_type(
424 700,
425 "float4",
426 11,
427 10,
428 4,
429 true,
430 "b",
431 "N",
432 false,
433 true,
434 ",",
435 0,
436 0,
437 1021,
438 "float4in",
439 "float4out",
440 "float4recv",
441 "float4send",
442 "-",
443 "-",
444 "-",
445 "i",
446 "p",
447 false,
448 0,
449 -1,
450 0,
451 0,
452 None,
453 None,
454 );
455 data.add_type(
456 701,
457 "float8",
458 11,
459 10,
460 8,
461 true,
462 "b",
463 "N",
464 true,
465 true,
466 ",",
467 0,
468 0,
469 1022,
470 "float8in",
471 "float8out",
472 "float8recv",
473 "float8send",
474 "-",
475 "-",
476 "-",
477 "d",
478 "p",
479 false,
480 0,
481 -1,
482 0,
483 0,
484 None,
485 None,
486 );
487 data.add_type(
488 1043,
489 "varchar",
490 11,
491 10,
492 -1,
493 false,
494 "b",
495 "S",
496 false,
497 true,
498 ",",
499 0,
500 0,
501 1015,
502 "varcharin",
503 "varcharout",
504 "varcharrecv",
505 "varcharsend",
506 "varchartypmodin",
507 "varchartypmodout",
508 "-",
509 "i",
510 "x",
511 false,
512 0,
513 -1,
514 0,
515 100,
516 None,
517 None,
518 );
519 data.add_type(
520 1082,
521 "date",
522 11,
523 10,
524 4,
525 true,
526 "b",
527 "D",
528 false,
529 true,
530 ",",
531 0,
532 0,
533 1182,
534 "date_in",
535 "date_out",
536 "date_recv",
537 "date_send",
538 "-",
539 "-",
540 "-",
541 "i",
542 "p",
543 false,
544 0,
545 -1,
546 0,
547 0,
548 None,
549 None,
550 );
551 data.add_type(
552 1083,
553 "time",
554 11,
555 10,
556 8,
557 true,
558 "b",
559 "D",
560 false,
561 true,
562 ",",
563 0,
564 0,
565 1183,
566 "time_in",
567 "time_out",
568 "time_recv",
569 "time_send",
570 "timetypmodin",
571 "timetypmodout",
572 "-",
573 "d",
574 "p",
575 false,
576 0,
577 -1,
578 0,
579 0,
580 None,
581 None,
582 );
583 data.add_type(
584 1114,
585 "timestamp",
586 11,
587 10,
588 8,
589 true,
590 "b",
591 "D",
592 false,
593 true,
594 ",",
595 0,
596 0,
597 1115,
598 "timestamp_in",
599 "timestamp_out",
600 "timestamp_recv",
601 "timestamp_send",
602 "timestamptypmodin",
603 "timestamptypmodout",
604 "-",
605 "d",
606 "p",
607 false,
608 0,
609 -1,
610 0,
611 0,
612 None,
613 None,
614 );
615 data.add_type(
616 1184,
617 "timestamptz",
618 11,
619 10,
620 8,
621 true,
622 "b",
623 "D",
624 true,
625 true,
626 ",",
627 0,
628 0,
629 1185,
630 "timestamptz_in",
631 "timestamptz_out",
632 "timestamptz_recv",
633 "timestamptz_send",
634 "timestamptztypmodin",
635 "timestamptztypmodout",
636 "-",
637 "d",
638 "p",
639 false,
640 0,
641 -1,
642 0,
643 0,
644 None,
645 None,
646 );
647 data.add_type(
648 1700,
649 "numeric",
650 11,
651 10,
652 -1,
653 false,
654 "b",
655 "N",
656 false,
657 true,
658 ",",
659 0,
660 0,
661 1231,
662 "numeric_in",
663 "numeric_out",
664 "numeric_recv",
665 "numeric_send",
666 "numerictypmodin",
667 "numerictypmodout",
668 "-",
669 "i",
670 "m",
671 false,
672 0,
673 -1,
674 0,
675 0,
676 None,
677 None,
678 );
679
680 data
681 }
682
683 fn create_pg_am_table(&self) -> Arc<dyn TableProvider> {
685 let schema = Arc::new(Schema::new(vec![
688 Field::new("oid", DataType::Int32, false), Field::new("amname", DataType::Utf8, false), Field::new("amhandler", DataType::Int32, false), Field::new("amtype", DataType::Utf8, false), Field::new("amstrategies", DataType::Int32, false), Field::new("amsupport", DataType::Int32, false), Field::new("amcanorder", DataType::Boolean, false), Field::new("amcanorderbyop", DataType::Boolean, false), Field::new("amcanbackward", DataType::Boolean, false), Field::new("amcanunique", DataType::Boolean, false), Field::new("amcanmulticol", DataType::Boolean, false), Field::new("amoptionalkey", DataType::Boolean, false), Field::new("amsearcharray", DataType::Boolean, false), Field::new("amsearchnulls", DataType::Boolean, false), Field::new("amstorage", DataType::Boolean, false), Field::new("amclusterable", DataType::Boolean, false), Field::new("ampredlocks", DataType::Boolean, false), Field::new("amcanparallel", DataType::Boolean, false), Field::new("amcanbeginscan", DataType::Boolean, false), Field::new("amcanmarkpos", DataType::Boolean, false), Field::new("amcanfetch", DataType::Boolean, false), Field::new("amkeytype", DataType::Int32, false), ]));
711
712 let provider = MemTable::try_new(schema, vec![]).unwrap();
714
715 Arc::new(provider)
716 }
717
718 fn create_pg_proc_table(&self) -> Arc<dyn TableProvider> {
720 let schema = Arc::new(Schema::new(vec![
722 Field::new("oid", DataType::Int32, false), Field::new("proname", DataType::Utf8, false), Field::new("pronamespace", DataType::Int32, false), Field::new("proowner", DataType::Int32, false), Field::new("prolang", DataType::Int32, false), Field::new("procost", DataType::Float32, false), Field::new("prorows", DataType::Float32, false), Field::new("provariadic", DataType::Int32, false), Field::new("prosupport", DataType::Int32, false), Field::new("prokind", DataType::Utf8, false), Field::new("prosecdef", DataType::Boolean, false), Field::new("proleakproof", DataType::Boolean, false), Field::new("proisstrict", DataType::Boolean, false), Field::new("proretset", DataType::Boolean, false), Field::new("provolatile", DataType::Utf8, false), Field::new("proparallel", DataType::Utf8, false), Field::new("pronargs", DataType::Int16, false), Field::new("pronargdefaults", DataType::Int16, false), Field::new("prorettype", DataType::Int32, false), Field::new("proargtypes", DataType::Utf8, false), Field::new("proallargtypes", DataType::Utf8, true), Field::new("proargmodes", DataType::Utf8, true), Field::new("proargnames", DataType::Utf8, true), Field::new("proargdefaults", DataType::Utf8, true), Field::new("protrftypes", DataType::Utf8, true), Field::new("prosrc", DataType::Utf8, false), Field::new("probin", DataType::Utf8, true), Field::new("prosqlbody", DataType::Utf8, true), Field::new("proconfig", DataType::Utf8, true), Field::new("proacl", DataType::Utf8, true), ]));
753
754 let pg_proc_data = Self::get_standard_pg_functions();
756
757 let arrays: Vec<ArrayRef> = vec![
759 Arc::new(Int32Array::from(pg_proc_data.oids)),
760 Arc::new(StringArray::from(pg_proc_data.pronames)),
761 Arc::new(Int32Array::from(pg_proc_data.pronamespaces)),
762 Arc::new(Int32Array::from(pg_proc_data.proowners)),
763 Arc::new(Int32Array::from(pg_proc_data.prolangs)),
764 Arc::new(Float32Array::from(pg_proc_data.procosts)),
765 Arc::new(Float32Array::from(pg_proc_data.prorows)),
766 Arc::new(Int32Array::from(pg_proc_data.provariadics)),
767 Arc::new(Int32Array::from(pg_proc_data.prosupports)),
768 Arc::new(StringArray::from(pg_proc_data.prokinds)),
769 Arc::new(BooleanArray::from(pg_proc_data.prosecdefs)),
770 Arc::new(BooleanArray::from(pg_proc_data.proleakproofs)),
771 Arc::new(BooleanArray::from(pg_proc_data.proisstricts)),
772 Arc::new(BooleanArray::from(pg_proc_data.proretsets)),
773 Arc::new(StringArray::from(pg_proc_data.provolatiles)),
774 Arc::new(StringArray::from(pg_proc_data.proparallels)),
775 Arc::new(Int16Array::from(pg_proc_data.pronargs)),
776 Arc::new(Int16Array::from(pg_proc_data.pronargdefaults)),
777 Arc::new(Int32Array::from(pg_proc_data.prorettypes)),
778 Arc::new(StringArray::from(pg_proc_data.proargtypes)),
779 Arc::new(StringArray::from_iter(
780 pg_proc_data.proallargtypes.into_iter(),
781 )),
782 Arc::new(StringArray::from_iter(pg_proc_data.proargmodes.into_iter())),
783 Arc::new(StringArray::from_iter(pg_proc_data.proargnames.into_iter())),
784 Arc::new(StringArray::from_iter(
785 pg_proc_data.proargdefaults.into_iter(),
786 )),
787 Arc::new(StringArray::from_iter(pg_proc_data.protrftypes.into_iter())),
788 Arc::new(StringArray::from(pg_proc_data.prosrcs)),
789 Arc::new(StringArray::from_iter(pg_proc_data.probins.into_iter())),
790 Arc::new(StringArray::from_iter(pg_proc_data.prosqlbodys.into_iter())),
791 Arc::new(StringArray::from_iter(pg_proc_data.proconfigs.into_iter())),
792 Arc::new(StringArray::from_iter(pg_proc_data.proacls.into_iter())),
793 ];
794
795 let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
796
797 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
799
800 Arc::new(provider)
801 }
802
803 fn get_standard_pg_functions() -> PgProcData {
805 let mut data = PgProcData::new();
806
807 data.add_function(
809 1242, "boolin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
810 0, 16, "2275", None, None, None, None, None, "boolin", None, None, None, None,
811 );
812 data.add_function(
813 1243, "boolout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
814 1, 0, 2275, "16", None, None, None, None, None, "boolout", None, None, None, None,
815 );
816 data.add_function(
817 1564, "textin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
818 0, 25, "2275", None, None, None, None, None, "textin", None, None, None, None,
819 );
820 data.add_function(
821 1565, "textout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
822 1, 0, 2275, "25", None, None, None, None, None, "textout", None, None, None, None,
823 );
824 data.add_function(
825 1242,
826 "version",
827 11,
828 10,
829 12,
830 1.0,
831 0.0,
832 0,
833 0,
834 "f",
835 false,
836 true,
837 false,
838 false,
839 "s",
840 "s",
841 0,
842 0,
843 25,
844 "",
845 None,
846 None,
847 None,
848 None,
849 None,
850 "SELECT 'DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu'",
851 None,
852 None,
853 None,
854 None,
855 );
856
857 data
858 }
859}
860
861#[derive(Debug)]
863struct PgProcData {
864 oids: Vec<i32>,
865 pronames: Vec<String>,
866 pronamespaces: Vec<i32>,
867 proowners: Vec<i32>,
868 prolangs: Vec<i32>,
869 procosts: Vec<f32>,
870 prorows: Vec<f32>,
871 provariadics: Vec<i32>,
872 prosupports: Vec<i32>,
873 prokinds: Vec<String>,
874 prosecdefs: Vec<bool>,
875 proleakproofs: Vec<bool>,
876 proisstricts: Vec<bool>,
877 proretsets: Vec<bool>,
878 provolatiles: Vec<String>,
879 proparallels: Vec<String>,
880 pronargs: Vec<i16>,
881 pronargdefaults: Vec<i16>,
882 prorettypes: Vec<i32>,
883 proargtypes: Vec<String>,
884 proallargtypes: Vec<Option<String>>,
885 proargmodes: Vec<Option<String>>,
886 proargnames: Vec<Option<String>>,
887 proargdefaults: Vec<Option<String>>,
888 protrftypes: Vec<Option<String>>,
889 prosrcs: Vec<String>,
890 probins: Vec<Option<String>>,
891 prosqlbodys: Vec<Option<String>>,
892 proconfigs: Vec<Option<String>>,
893 proacls: Vec<Option<String>>,
894}
895
896impl PgProcData {
897 fn new() -> Self {
898 Self {
899 oids: Vec::new(),
900 pronames: Vec::new(),
901 pronamespaces: Vec::new(),
902 proowners: Vec::new(),
903 prolangs: Vec::new(),
904 procosts: Vec::new(),
905 prorows: Vec::new(),
906 provariadics: Vec::new(),
907 prosupports: Vec::new(),
908 prokinds: Vec::new(),
909 prosecdefs: Vec::new(),
910 proleakproofs: Vec::new(),
911 proisstricts: Vec::new(),
912 proretsets: Vec::new(),
913 provolatiles: Vec::new(),
914 proparallels: Vec::new(),
915 pronargs: Vec::new(),
916 pronargdefaults: Vec::new(),
917 prorettypes: Vec::new(),
918 proargtypes: Vec::new(),
919 proallargtypes: Vec::new(),
920 proargmodes: Vec::new(),
921 proargnames: Vec::new(),
922 proargdefaults: Vec::new(),
923 protrftypes: Vec::new(),
924 prosrcs: Vec::new(),
925 probins: Vec::new(),
926 prosqlbodys: Vec::new(),
927 proconfigs: Vec::new(),
928 proacls: Vec::new(),
929 }
930 }
931
932 #[allow(clippy::too_many_arguments)]
933 fn add_function(
934 &mut self,
935 oid: i32,
936 proname: &str,
937 pronamespace: i32,
938 proowner: i32,
939 prolang: i32,
940 procost: f32,
941 prorows: f32,
942 provariadic: i32,
943 prosupport: i32,
944 prokind: &str,
945 prosecdef: bool,
946 proleakproof: bool,
947 proisstrict: bool,
948 proretset: bool,
949 provolatile: &str,
950 proparallel: &str,
951 pronargs: i16,
952 pronargdefaults: i16,
953 prorettype: i32,
954 proargtypes: &str,
955 proallargtypes: Option<String>,
956 proargmodes: Option<String>,
957 proargnames: Option<String>,
958 proargdefaults: Option<String>,
959 protrftypes: Option<String>,
960 prosrc: &str,
961 probin: Option<String>,
962 prosqlbody: Option<String>,
963 proconfig: Option<String>,
964 proacl: Option<String>,
965 ) {
966 self.oids.push(oid);
967 self.pronames.push(proname.to_string());
968 self.pronamespaces.push(pronamespace);
969 self.proowners.push(proowner);
970 self.prolangs.push(prolang);
971 self.procosts.push(procost);
972 self.prorows.push(prorows);
973 self.provariadics.push(provariadic);
974 self.prosupports.push(prosupport);
975 self.prokinds.push(prokind.to_string());
976 self.prosecdefs.push(prosecdef);
977 self.proleakproofs.push(proleakproof);
978 self.proisstricts.push(proisstrict);
979 self.proretsets.push(proretset);
980 self.provolatiles.push(provolatile.to_string());
981 self.proparallels.push(proparallel.to_string());
982 self.pronargs.push(pronargs);
983 self.pronargdefaults.push(pronargdefaults);
984 self.prorettypes.push(prorettype);
985 self.proargtypes.push(proargtypes.to_string());
986 self.proallargtypes.push(proallargtypes);
987 self.proargmodes.push(proargmodes);
988 self.proargnames.push(proargnames);
989 self.proargdefaults.push(proargdefaults);
990 self.protrftypes.push(protrftypes);
991 self.prosrcs.push(prosrc.to_string());
992 self.probins.push(probin);
993 self.prosqlbodys.push(prosqlbody);
994 self.proconfigs.push(proconfig);
995 self.proacls.push(proacl);
996 }
997}
998
999#[derive(Debug)]
1000struct PgClassTable {
1001 schema: SchemaRef,
1002 catalog_list: Arc<dyn CatalogProviderList>,
1003}
1004
1005impl PgClassTable {
1006 fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgClassTable {
1007 let schema = Arc::new(Schema::new(vec![
1010 Field::new("oid", DataType::Int32, false), Field::new("relname", DataType::Utf8, false), Field::new("relnamespace", DataType::Int32, false), Field::new("reltype", DataType::Int32, false), Field::new("reloftype", DataType::Int32, true), Field::new("relowner", DataType::Int32, false), Field::new("relam", DataType::Int32, false), Field::new("relfilenode", DataType::Int32, false), Field::new("reltablespace", DataType::Int32, false), Field::new("relpages", DataType::Int32, false), Field::new("reltuples", DataType::Float64, false), Field::new("relallvisible", DataType::Int32, false), Field::new("reltoastrelid", DataType::Int32, false), Field::new("relhasindex", DataType::Boolean, false), Field::new("relisshared", DataType::Boolean, false), Field::new("relpersistence", DataType::Utf8, false), Field::new("relkind", DataType::Utf8, false), Field::new("relnatts", DataType::Int16, false), Field::new("relchecks", DataType::Int16, false), Field::new("relhasrules", DataType::Boolean, false), Field::new("relhastriggers", DataType::Boolean, false), Field::new("relhassubclass", DataType::Boolean, false), Field::new("relrowsecurity", DataType::Boolean, false), Field::new("relforcerowsecurity", DataType::Boolean, false), Field::new("relispopulated", DataType::Boolean, false), Field::new("relreplident", DataType::Utf8, false), Field::new("relispartition", DataType::Boolean, false), Field::new("relrewrite", DataType::Int32, true), Field::new("relfrozenxid", DataType::Int32, false), Field::new("relminmxid", DataType::Int32, false), ]));
1041
1042 Self {
1043 schema,
1044 catalog_list,
1045 }
1046 }
1047
1048 async fn get_data(
1050 schema: SchemaRef,
1051 catalog_list: Arc<dyn CatalogProviderList>,
1052 ) -> Result<RecordBatch> {
1053 let mut oids = Vec::new();
1055 let mut relnames = Vec::new();
1056 let mut relnamespaces = Vec::new();
1057 let mut reltypes = Vec::new();
1058 let mut reloftypes = Vec::new();
1059 let mut relowners = Vec::new();
1060 let mut relams = Vec::new();
1061 let mut relfilenodes = Vec::new();
1062 let mut reltablespaces = Vec::new();
1063 let mut relpages = Vec::new();
1064 let mut reltuples = Vec::new();
1065 let mut relallvisibles = Vec::new();
1066 let mut reltoastrelids = Vec::new();
1067 let mut relhasindexes = Vec::new();
1068 let mut relisshareds = Vec::new();
1069 let mut relpersistences = Vec::new();
1070 let mut relkinds = Vec::new();
1071 let mut relnattses = Vec::new();
1072 let mut relcheckses = Vec::new();
1073 let mut relhasruleses = Vec::new();
1074 let mut relhastriggersses = Vec::new();
1075 let mut relhassubclasses = Vec::new();
1076 let mut relrowsecurities = Vec::new();
1077 let mut relforcerowsecurities = Vec::new();
1078 let mut relispopulateds = Vec::new();
1079 let mut relreplidents = Vec::new();
1080 let mut relispartitions = Vec::new();
1081 let mut relrewrites = Vec::new();
1082 let mut relfrozenxids = Vec::new();
1083 let mut relminmxids = Vec::new();
1084
1085 let mut next_oid = 10000;
1087
1088 for catalog_name in catalog_list.catalog_names() {
1090 if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1091 for schema_name in catalog.schema_names() {
1092 if let Some(schema) = catalog.schema(&schema_name) {
1093 let schema_oid = next_oid;
1094 next_oid += 1;
1095
1096 for table_name in schema.table_names() {
1101 let table_oid = next_oid;
1102 next_oid += 1;
1103
1104 if let Some(table) = schema.table(&table_name).await? {
1105 let table_type =
1107 get_table_type_with_name(&table, &table_name, &schema_name);
1108
1109 let column_count = table.schema().fields().len() as i16;
1111
1112 oids.push(table_oid);
1114 relnames.push(table_name.clone());
1115 relnamespaces.push(schema_oid);
1116 reltypes.push(0); reloftypes.push(None);
1118 relowners.push(0); relams.push(0); relfilenodes.push(table_oid); reltablespaces.push(0); relpages.push(1); reltuples.push(0.0); relallvisibles.push(0);
1125 reltoastrelids.push(0);
1126 relhasindexes.push(false);
1127 relisshareds.push(false);
1128 relpersistences.push("p".to_string()); relkinds.push(table_type.to_string());
1130 relnattses.push(column_count);
1131 relcheckses.push(0);
1132 relhasruleses.push(false);
1133 relhastriggersses.push(false);
1134 relhassubclasses.push(false);
1135 relrowsecurities.push(false);
1136 relforcerowsecurities.push(false);
1137 relispopulateds.push(true);
1138 relreplidents.push("d".to_string()); relispartitions.push(false);
1140 relrewrites.push(None);
1141 relfrozenxids.push(0);
1142 relminmxids.push(0);
1143 }
1144 }
1145 }
1146 }
1147 }
1148 }
1149
1150 let arrays: Vec<ArrayRef> = vec![
1152 Arc::new(Int32Array::from(oids)),
1153 Arc::new(StringArray::from(relnames)),
1154 Arc::new(Int32Array::from(relnamespaces)),
1155 Arc::new(Int32Array::from(reltypes)),
1156 Arc::new(Int32Array::from_iter(reloftypes.into_iter())),
1157 Arc::new(Int32Array::from(relowners)),
1158 Arc::new(Int32Array::from(relams)),
1159 Arc::new(Int32Array::from(relfilenodes)),
1160 Arc::new(Int32Array::from(reltablespaces)),
1161 Arc::new(Int32Array::from(relpages)),
1162 Arc::new(Float64Array::from_iter(reltuples.into_iter())),
1163 Arc::new(Int32Array::from(relallvisibles)),
1164 Arc::new(Int32Array::from(reltoastrelids)),
1165 Arc::new(BooleanArray::from(relhasindexes)),
1166 Arc::new(BooleanArray::from(relisshareds)),
1167 Arc::new(StringArray::from(relpersistences)),
1168 Arc::new(StringArray::from(relkinds)),
1169 Arc::new(Int16Array::from(relnattses)),
1170 Arc::new(Int16Array::from(relcheckses)),
1171 Arc::new(BooleanArray::from(relhasruleses)),
1172 Arc::new(BooleanArray::from(relhastriggersses)),
1173 Arc::new(BooleanArray::from(relhassubclasses)),
1174 Arc::new(BooleanArray::from(relrowsecurities)),
1175 Arc::new(BooleanArray::from(relforcerowsecurities)),
1176 Arc::new(BooleanArray::from(relispopulateds)),
1177 Arc::new(StringArray::from(relreplidents)),
1178 Arc::new(BooleanArray::from(relispartitions)),
1179 Arc::new(Int32Array::from_iter(relrewrites.into_iter())),
1180 Arc::new(Int32Array::from(relfrozenxids)),
1181 Arc::new(Int32Array::from(relminmxids)),
1182 ];
1183
1184 let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1186
1187 Ok(batch)
1188 }
1189}
1190
1191impl PartitionStream for PgClassTable {
1192 fn schema(&self) -> &SchemaRef {
1193 &self.schema
1194 }
1195
1196 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1197 let catalog_list = self.catalog_list.clone();
1198 let schema = Arc::clone(&self.schema);
1199 Box::pin(RecordBatchStreamAdapter::new(
1200 schema.clone(),
1201 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1202 ))
1203 }
1204}
1205
1206#[derive(Debug)]
1207struct PgNamespaceTable {
1208 schema: SchemaRef,
1209 catalog_list: Arc<dyn CatalogProviderList>,
1210}
1211
1212impl PgNamespaceTable {
1213 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1214 let schema = Arc::new(Schema::new(vec![
1217 Field::new("oid", DataType::Int32, false), Field::new("nspname", DataType::Utf8, false), Field::new("nspowner", DataType::Int32, false), Field::new("nspacl", DataType::Utf8, true), Field::new("options", DataType::Utf8, true), ]));
1223
1224 Self {
1225 schema,
1226 catalog_list,
1227 }
1228 }
1229
1230 async fn get_data(
1232 schema: SchemaRef,
1233 catalog_list: Arc<dyn CatalogProviderList>,
1234 ) -> Result<RecordBatch> {
1235 let mut oids = Vec::new();
1237 let mut nspnames = Vec::new();
1238 let mut nspowners = Vec::new();
1239 let mut nspacls: Vec<Option<String>> = Vec::new();
1240 let mut options: Vec<Option<String>> = Vec::new();
1241
1242 let mut next_oid = 10000;
1244
1245 oids.push(11);
1248 nspnames.push("pg_catalog".to_string());
1249 nspowners.push(10); nspacls.push(None);
1251 options.push(None);
1252
1253 oids.push(2200);
1255 nspnames.push("public".to_string());
1256 nspowners.push(10); nspacls.push(None);
1258 options.push(None);
1259
1260 oids.push(12);
1262 nspnames.push("information_schema".to_string());
1263 nspowners.push(10); nspacls.push(None);
1265 options.push(None);
1266
1267 for catalog_name in catalog_list.catalog_names() {
1269 if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1270 for schema_name in catalog.schema_names() {
1271 if schema_name == "pg_catalog"
1273 || schema_name == "public"
1274 || schema_name == "information_schema"
1275 {
1276 continue;
1277 }
1278
1279 let schema_oid = next_oid;
1280 next_oid += 1;
1281
1282 oids.push(schema_oid);
1283 nspnames.push(schema_name.clone());
1284 nspowners.push(10); nspacls.push(None);
1286 options.push(None);
1287 }
1288 }
1289 }
1290
1291 let arrays: Vec<ArrayRef> = vec![
1293 Arc::new(Int32Array::from(oids)),
1294 Arc::new(StringArray::from(nspnames)),
1295 Arc::new(Int32Array::from(nspowners)),
1296 Arc::new(StringArray::from_iter(nspacls.into_iter())),
1297 Arc::new(StringArray::from_iter(options.into_iter())),
1298 ];
1299
1300 let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1302
1303 Ok(batch)
1304 }
1305}
1306
1307impl PartitionStream for PgNamespaceTable {
1308 fn schema(&self) -> &SchemaRef {
1309 &self.schema
1310 }
1311
1312 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1313 let catalog_list = self.catalog_list.clone();
1314 let schema = Arc::clone(&self.schema);
1315 Box::pin(RecordBatchStreamAdapter::new(
1316 schema.clone(),
1317 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1318 ))
1319 }
1320}
1321
1322#[derive(Debug)]
1323struct PgDatabaseTable {
1324 schema: SchemaRef,
1325 catalog_list: Arc<dyn CatalogProviderList>,
1326}
1327
1328impl PgDatabaseTable {
1329 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1330 let schema = Arc::new(Schema::new(vec![
1333 Field::new("oid", DataType::Int32, false), Field::new("datname", DataType::Utf8, false), Field::new("datdba", DataType::Int32, false), Field::new("encoding", DataType::Int32, false), Field::new("datcollate", DataType::Utf8, false), Field::new("datctype", DataType::Utf8, false), Field::new("datistemplate", DataType::Boolean, false), Field::new("datallowconn", DataType::Boolean, false), Field::new("datconnlimit", DataType::Int32, false), Field::new("datlastsysoid", DataType::Int32, false), Field::new("datfrozenxid", DataType::Int32, false), Field::new("datminmxid", DataType::Int32, false), Field::new("dattablespace", DataType::Int32, false), Field::new("datacl", DataType::Utf8, true), ]));
1348
1349 Self {
1350 schema,
1351 catalog_list,
1352 }
1353 }
1354
1355 async fn get_data(
1357 schema: SchemaRef,
1358 catalog_list: Arc<dyn CatalogProviderList>,
1359 ) -> Result<RecordBatch> {
1360 let mut oids = Vec::new();
1362 let mut datnames = Vec::new();
1363 let mut datdbas = Vec::new();
1364 let mut encodings = Vec::new();
1365 let mut datcollates = Vec::new();
1366 let mut datctypes = Vec::new();
1367 let mut datistemplates = Vec::new();
1368 let mut datallowconns = Vec::new();
1369 let mut datconnlimits = Vec::new();
1370 let mut datlastsysoids = Vec::new();
1371 let mut datfrozenxids = Vec::new();
1372 let mut datminmxids = Vec::new();
1373 let mut dattablespaces = Vec::new();
1374 let mut datacles: Vec<Option<String>> = Vec::new();
1375
1376 let mut next_oid = 16384; for catalog_name in catalog_list.catalog_names() {
1381 let oid = next_oid;
1382 next_oid += 1;
1383
1384 oids.push(oid);
1385 datnames.push(catalog_name.clone());
1386 datdbas.push(10); encodings.push(6); datcollates.push("en_US.UTF-8".to_string()); datctypes.push("en_US.UTF-8".to_string()); datistemplates.push(false);
1391 datallowconns.push(true);
1392 datconnlimits.push(-1); datlastsysoids.push(100000); datfrozenxids.push(1); datminmxids.push(1); dattablespaces.push(1663); datacles.push(None); }
1399
1400 if !datnames.contains(&"postgres".to_string()) {
1403 let oid = next_oid;
1404
1405 oids.push(oid);
1406 datnames.push("postgres".to_string());
1407 datdbas.push(10);
1408 encodings.push(6);
1409 datcollates.push("en_US.UTF-8".to_string());
1410 datctypes.push("en_US.UTF-8".to_string());
1411 datistemplates.push(false);
1412 datallowconns.push(true);
1413 datconnlimits.push(-1);
1414 datlastsysoids.push(100000);
1415 datfrozenxids.push(1);
1416 datminmxids.push(1);
1417 dattablespaces.push(1663);
1418 datacles.push(None);
1419 }
1420
1421 let arrays: Vec<ArrayRef> = vec![
1423 Arc::new(Int32Array::from(oids)),
1424 Arc::new(StringArray::from(datnames)),
1425 Arc::new(Int32Array::from(datdbas)),
1426 Arc::new(Int32Array::from(encodings)),
1427 Arc::new(StringArray::from(datcollates)),
1428 Arc::new(StringArray::from(datctypes)),
1429 Arc::new(BooleanArray::from(datistemplates)),
1430 Arc::new(BooleanArray::from(datallowconns)),
1431 Arc::new(Int32Array::from(datconnlimits)),
1432 Arc::new(Int32Array::from(datlastsysoids)),
1433 Arc::new(Int32Array::from(datfrozenxids)),
1434 Arc::new(Int32Array::from(datminmxids)),
1435 Arc::new(Int32Array::from(dattablespaces)),
1436 Arc::new(StringArray::from_iter(datacles.into_iter())),
1437 ];
1438
1439 let full_batch = RecordBatch::try_new(schema.clone(), arrays)?;
1441 Ok(full_batch)
1442 }
1443}
1444
1445impl PartitionStream for PgDatabaseTable {
1446 fn schema(&self) -> &SchemaRef {
1447 &self.schema
1448 }
1449
1450 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1451 let catalog_list = self.catalog_list.clone();
1452 let schema = Arc::clone(&self.schema);
1453 Box::pin(RecordBatchStreamAdapter::new(
1454 schema.clone(),
1455 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1456 ))
1457 }
1458}
1459
1460#[derive(Debug)]
1461struct PgAttributeTable {
1462 schema: SchemaRef,
1463 catalog_list: Arc<dyn CatalogProviderList>,
1464}
1465
1466impl PgAttributeTable {
1467 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1468 let schema = Arc::new(Schema::new(vec![
1471 Field::new("attrelid", DataType::Int32, false), Field::new("attname", DataType::Utf8, false), Field::new("atttypid", DataType::Int32, false), Field::new("attstattarget", DataType::Int32, false), Field::new("attlen", DataType::Int16, false), Field::new("attnum", DataType::Int16, false), Field::new("attndims", DataType::Int32, false), Field::new("attcacheoff", DataType::Int32, false), Field::new("atttypmod", DataType::Int32, false), Field::new("attbyval", DataType::Boolean, false), Field::new("attalign", DataType::Utf8, false), Field::new("attstorage", DataType::Utf8, false), Field::new("attcompression", DataType::Utf8, true), Field::new("attnotnull", DataType::Boolean, false), Field::new("atthasdef", DataType::Boolean, false), Field::new("atthasmissing", DataType::Boolean, false), Field::new("attidentity", DataType::Utf8, false), Field::new("attgenerated", DataType::Utf8, false), Field::new("attisdropped", DataType::Boolean, false), Field::new("attislocal", DataType::Boolean, false), Field::new("attinhcount", DataType::Int32, false), Field::new("attcollation", DataType::Int32, false), Field::new("attacl", DataType::Utf8, true), Field::new("attoptions", DataType::Utf8, true), Field::new("attfdwoptions", DataType::Utf8, true), Field::new("attmissingval", DataType::Utf8, true), ]));
1498
1499 Self {
1500 schema,
1501 catalog_list,
1502 }
1503 }
1504
1505 async fn get_data(
1507 schema: SchemaRef,
1508 catalog_list: Arc<dyn CatalogProviderList>,
1509 ) -> Result<RecordBatch> {
1510 let mut attrelids = Vec::new();
1512 let mut attnames = Vec::new();
1513 let mut atttypids = Vec::new();
1514 let mut attstattargets = Vec::new();
1515 let mut attlens = Vec::new();
1516 let mut attnums = Vec::new();
1517 let mut attndimss = Vec::new();
1518 let mut attcacheoffs = Vec::new();
1519 let mut atttymods = Vec::new();
1520 let mut attbyvals = Vec::new();
1521 let mut attaligns = Vec::new();
1522 let mut attstorages = Vec::new();
1523 let mut attcompressions: Vec<Option<String>> = Vec::new();
1524 let mut attnotnulls = Vec::new();
1525 let mut atthasdefs = Vec::new();
1526 let mut atthasmissings = Vec::new();
1527 let mut attidentitys = Vec::new();
1528 let mut attgenerateds = Vec::new();
1529 let mut attisdroppeds = Vec::new();
1530 let mut attislocals = Vec::new();
1531 let mut attinhcounts = Vec::new();
1532 let mut attcollations = Vec::new();
1533 let mut attacls: Vec<Option<String>> = Vec::new();
1534 let mut attoptions: Vec<Option<String>> = Vec::new();
1535 let mut attfdwoptions: Vec<Option<String>> = Vec::new();
1536 let mut attmissingvals: Vec<Option<String>> = Vec::new();
1537
1538 let mut next_oid = 10000;
1540
1541 for catalog_name in catalog_list.catalog_names() {
1543 if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1544 for schema_name in catalog.schema_names() {
1545 if let Some(schema_provider) = catalog.schema(&schema_name) {
1546 for table_name in schema_provider.table_names() {
1548 let table_oid = next_oid;
1549 next_oid += 1;
1550
1551 if let Some(table) = schema_provider.table(&table_name).await? {
1552 let table_schema = table.schema();
1553
1554 for (column_idx, field) in table_schema.fields().iter().enumerate()
1556 {
1557 let attnum = (column_idx + 1) as i16; let (pg_type_oid, type_len, by_val, align, storage) =
1559 Self::datafusion_to_pg_type(field.data_type());
1560
1561 attrelids.push(table_oid);
1562 attnames.push(field.name().clone());
1563 atttypids.push(pg_type_oid);
1564 attstattargets.push(-1); attlens.push(type_len);
1566 attnums.push(attnum);
1567 attndimss.push(0); attcacheoffs.push(-1); atttymods.push(-1); attbyvals.push(by_val);
1571 attaligns.push(align.to_string());
1572 attstorages.push(storage.to_string());
1573 attcompressions.push(None); attnotnulls.push(!field.is_nullable());
1575 atthasdefs.push(false); atthasmissings.push(false); attidentitys.push("".to_string()); attgenerateds.push("".to_string()); attisdroppeds.push(false); attislocals.push(true); attinhcounts.push(0); attcollations.push(0); attacls.push(None); attoptions.push(None); attfdwoptions.push(None); attmissingvals.push(None); }
1588 }
1589 }
1590 }
1591 }
1592 }
1593 }
1594
1595 let arrays: Vec<ArrayRef> = vec![
1597 Arc::new(Int32Array::from(attrelids)),
1598 Arc::new(StringArray::from(attnames)),
1599 Arc::new(Int32Array::from(atttypids)),
1600 Arc::new(Int32Array::from(attstattargets)),
1601 Arc::new(Int16Array::from(attlens)),
1602 Arc::new(Int16Array::from(attnums)),
1603 Arc::new(Int32Array::from(attndimss)),
1604 Arc::new(Int32Array::from(attcacheoffs)),
1605 Arc::new(Int32Array::from(atttymods)),
1606 Arc::new(BooleanArray::from(attbyvals)),
1607 Arc::new(StringArray::from(attaligns)),
1608 Arc::new(StringArray::from(attstorages)),
1609 Arc::new(StringArray::from_iter(attcompressions.into_iter())),
1610 Arc::new(BooleanArray::from(attnotnulls)),
1611 Arc::new(BooleanArray::from(atthasdefs)),
1612 Arc::new(BooleanArray::from(atthasmissings)),
1613 Arc::new(StringArray::from(attidentitys)),
1614 Arc::new(StringArray::from(attgenerateds)),
1615 Arc::new(BooleanArray::from(attisdroppeds)),
1616 Arc::new(BooleanArray::from(attislocals)),
1617 Arc::new(Int32Array::from(attinhcounts)),
1618 Arc::new(Int32Array::from(attcollations)),
1619 Arc::new(StringArray::from_iter(attacls.into_iter())),
1620 Arc::new(StringArray::from_iter(attoptions.into_iter())),
1621 Arc::new(StringArray::from_iter(attfdwoptions.into_iter())),
1622 Arc::new(StringArray::from_iter(attmissingvals.into_iter())),
1623 ];
1624
1625 let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1627 Ok(batch)
1628 }
1629
1630 fn datafusion_to_pg_type(data_type: &DataType) -> (i32, i16, bool, &'static str, &'static str) {
1632 match data_type {
1633 DataType::Boolean => (16, 1, true, "c", "p"), DataType::Int8 => (18, 1, true, "c", "p"), DataType::Int16 => (21, 2, true, "s", "p"), DataType::Int32 => (23, 4, true, "i", "p"), DataType::Int64 => (20, 8, true, "d", "p"), DataType::UInt8 => (21, 2, true, "s", "p"), DataType::UInt16 => (23, 4, true, "i", "p"), DataType::UInt32 => (20, 8, true, "d", "p"), DataType::UInt64 => (1700, -1, false, "i", "m"), DataType::Float32 => (700, 4, true, "i", "p"), DataType::Float64 => (701, 8, true, "d", "p"), DataType::Utf8 => (25, -1, false, "i", "x"), DataType::LargeUtf8 => (25, -1, false, "i", "x"), DataType::Binary => (17, -1, false, "i", "x"), DataType::LargeBinary => (17, -1, false, "i", "x"), DataType::Date32 => (1082, 4, true, "i", "p"), DataType::Date64 => (1082, 4, true, "i", "p"), DataType::Time32(_) => (1083, 8, true, "d", "p"), DataType::Time64(_) => (1083, 8, true, "d", "p"), DataType::Timestamp(_, _) => (1114, 8, true, "d", "p"), DataType::Decimal128(_, _) => (1700, -1, false, "i", "m"), DataType::Decimal256(_, _) => (1700, -1, false, "i", "m"), _ => (25, -1, false, "i", "x"), }
1657 }
1658}
1659
1660impl PartitionStream for PgAttributeTable {
1661 fn schema(&self) -> &SchemaRef {
1662 &self.schema
1663 }
1664
1665 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1666 let catalog_list = self.catalog_list.clone();
1667 let schema = Arc::clone(&self.schema);
1668 Box::pin(RecordBatchStreamAdapter::new(
1669 schema.clone(),
1670 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1671 ))
1672 }
1673}
1674
1675pub fn create_current_schemas_udf() -> ScalarUDF {
1676 let func = move |args: &[ColumnarValue]| {
1678 let args = ColumnarValue::values_to_arrays(args)?;
1679 let input = as_boolean_array(&args[0]);
1680
1681 let mut values = vec!["public"];
1683 if input.value(0) {
1685 values.push("information_schema");
1686 values.push("pg_catalog");
1687 }
1688
1689 let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
1690
1691 let array: ArrayRef = Arc::new(list_array.build_list_array());
1692
1693 Ok(ColumnarValue::Array(array))
1694 };
1695
1696 create_udf(
1698 "current_schemas",
1699 vec![DataType::Boolean],
1700 DataType::List(Arc::new(Field::new("schema", DataType::Utf8, false))),
1701 Volatility::Immutable,
1702 Arc::new(func),
1703 )
1704}
1705
1706pub fn create_current_schema_udf() -> ScalarUDF {
1707 let func = move |_args: &[ColumnarValue]| {
1709 let mut builder = StringBuilder::new();
1711 builder.append_value("public");
1712 let array: ArrayRef = Arc::new(builder.finish());
1713
1714 Ok(ColumnarValue::Array(array))
1715 };
1716
1717 create_udf(
1719 "current_schema",
1720 vec![],
1721 DataType::Utf8,
1722 Volatility::Immutable,
1723 Arc::new(func),
1724 )
1725}
1726
1727pub fn create_version_udf() -> ScalarUDF {
1728 let func = move |_args: &[ColumnarValue]| {
1730 let mut builder = StringBuilder::new();
1732 builder
1733 .append_value("DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu, compiled by Rust");
1734 let array: ArrayRef = Arc::new(builder.finish());
1735
1736 Ok(ColumnarValue::Array(array))
1737 };
1738
1739 create_udf(
1741 "version",
1742 vec![],
1743 DataType::Utf8,
1744 Volatility::Immutable,
1745 Arc::new(func),
1746 )
1747}
1748
1749pub fn create_pg_get_userbyid_udf() -> ScalarUDF {
1750 let func = move |args: &[ColumnarValue]| {
1752 let args = ColumnarValue::values_to_arrays(args)?;
1753 let _input = &args[0]; let mut builder = StringBuilder::new();
1757 builder.append_value("postgres");
1758 let array: ArrayRef = Arc::new(builder.finish());
1759
1760 Ok(ColumnarValue::Array(array))
1761 };
1762
1763 create_udf(
1765 "pg_get_userbyid",
1766 vec![DataType::Int32],
1767 DataType::Utf8,
1768 Volatility::Stable,
1769 Arc::new(func),
1770 )
1771}
1772
1773pub fn create_has_table_privilege_3param_udf() -> ScalarUDF {
1774 let func = move |args: &[ColumnarValue]| {
1776 let args = ColumnarValue::values_to_arrays(args)?;
1777 let _user = &args[0]; let _table = &args[1]; let _privilege = &args[2]; let mut builder = BooleanArray::builder(1);
1783 builder.append_value(true);
1784 let array: ArrayRef = Arc::new(builder.finish());
1785
1786 Ok(ColumnarValue::Array(array))
1787 };
1788
1789 create_udf(
1791 "has_table_privilege",
1792 vec![DataType::Utf8, DataType::Utf8, DataType::Utf8],
1793 DataType::Boolean,
1794 Volatility::Stable,
1795 Arc::new(func),
1796 )
1797}
1798
1799pub fn create_has_table_privilege_2param_udf() -> ScalarUDF {
1800 let func = move |args: &[ColumnarValue]| {
1802 let args = ColumnarValue::values_to_arrays(args)?;
1803 let _table = &args[0]; let _privilege = &args[1]; let mut builder = BooleanArray::builder(1);
1808 builder.append_value(true);
1809 let array: ArrayRef = Arc::new(builder.finish());
1810
1811 Ok(ColumnarValue::Array(array))
1812 };
1813
1814 create_udf(
1816 "has_table_privilege",
1817 vec![DataType::Utf8, DataType::Utf8],
1818 DataType::Boolean,
1819 Volatility::Stable,
1820 Arc::new(func),
1821 )
1822}
1823
1824pub fn setup_pg_catalog(
1826 session_context: &SessionContext,
1827 catalog_name: &str,
1828) -> Result<(), Box<DataFusionError>> {
1829 let pg_catalog = PgCatalogSchemaProvider::new(session_context.state().catalog_list().clone());
1830 session_context
1831 .catalog(catalog_name)
1832 .ok_or_else(|| {
1833 DataFusionError::Configuration(format!(
1834 "Catalog not found when registering pg_catalog: {catalog_name}"
1835 ))
1836 })?
1837 .register_schema("pg_catalog", Arc::new(pg_catalog))?;
1838
1839 session_context.register_udf(create_current_schema_udf());
1840 session_context.register_udf(create_current_schemas_udf());
1841 session_context.register_udf(create_version_udf());
1842 session_context.register_udf(create_pg_get_userbyid_udf());
1843 session_context.register_udf(create_has_table_privilege_2param_udf());
1844
1845 Ok(())
1846}