1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use datafusion::arrow::array::{
7 as_boolean_array, ArrayRef, BooleanArray, BooleanBuilder, Float32Array, Float64Array,
8 Int16Array, Int32Array, RecordBatch, StringArray, StringBuilder,
9};
10use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
11use datafusion::catalog::streaming::StreamingTable;
12use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
13use datafusion::common::utils::SingleRowListArrayBuilder;
14use datafusion::datasource::{TableProvider, ViewTable};
15use datafusion::error::{DataFusionError, Result};
16use datafusion::execution::{SendableRecordBatchStream, TaskContext};
17use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
18use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
19use datafusion::physical_plan::streaming::PartitionStream;
20use datafusion::prelude::{create_udf, SessionContext};
21use postgres_types::Oid;
22use tokio::sync::RwLock;
23
24const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
25const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
26const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
27const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
28const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
29const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
30const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
31const PG_CATALOG_TABLE_PG_RANGE: &str = "pg_range";
32const PG_CATALOG_TABLE_PG_ENUM: &str = "pg_enum";
33const PG_CATALOG_TABLE_PG_DESCRIPTION: &str = "pg_description";
34
35fn get_table_type(table: &Arc<dyn TableProvider>) -> &'static str {
37 if table.as_any().is::<ViewTable>() {
39 "v" } else {
41 "r" }
43}
44
45fn get_table_type_with_name(
47 table: &Arc<dyn TableProvider>,
48 table_name: &str,
49 schema_name: &str,
50) -> &'static str {
51 if schema_name == "pg_catalog" || schema_name == "information_schema" {
53 if table_name.starts_with("pg_")
54 || table_name.contains("_table")
55 || table_name.contains("_column")
56 {
57 "r" } else {
59 "v" }
61 } else {
62 get_table_type(table)
63 }
64}
65
66pub const PG_CATALOG_TABLES: &[&str] = &[
67 PG_CATALOG_TABLE_PG_TYPE,
68 PG_CATALOG_TABLE_PG_CLASS,
69 PG_CATALOG_TABLE_PG_ATTRIBUTE,
70 PG_CATALOG_TABLE_PG_NAMESPACE,
71 PG_CATALOG_TABLE_PG_PROC,
72 PG_CATALOG_TABLE_PG_DATABASE,
73 PG_CATALOG_TABLE_PG_AM,
74 PG_CATALOG_TABLE_PG_RANGE,
75 PG_CATALOG_TABLE_PG_ENUM,
76 PG_CATALOG_TABLE_PG_DESCRIPTION,
77];
78
79#[derive(Debug)]
81struct PgTypesData {
82 oids: Vec<i32>,
83 typnames: Vec<String>,
84 typnamespaces: Vec<i32>,
85 typowners: Vec<i32>,
86 typlens: Vec<i16>,
87 typbyvals: Vec<bool>,
88 typtypes: Vec<String>,
89 typcategories: Vec<String>,
90 typispreferreds: Vec<bool>,
91 typisdefineds: Vec<bool>,
92 typdelims: Vec<String>,
93 typrelids: Vec<i32>,
94 typelems: Vec<i32>,
95 typarrays: Vec<i32>,
96 typinputs: Vec<String>,
97 typoutputs: Vec<String>,
98 typreceives: Vec<String>,
99 typsends: Vec<String>,
100 typmodins: Vec<String>,
101 typmodouts: Vec<String>,
102 typanalyzes: Vec<String>,
103 typaligns: Vec<String>,
104 typstorages: Vec<String>,
105 typnotnulls: Vec<bool>,
106 typbasetypes: Vec<i32>,
107 typtymods: Vec<i32>,
108 typndimss: Vec<i32>,
109 typcollations: Vec<i32>,
110 typdefaultbins: Vec<Option<String>>,
111 typdefaults: Vec<Option<String>>,
112}
113
114impl PgTypesData {
115 fn new() -> Self {
116 Self {
117 oids: Vec::new(),
118 typnames: Vec::new(),
119 typnamespaces: Vec::new(),
120 typowners: Vec::new(),
121 typlens: Vec::new(),
122 typbyvals: Vec::new(),
123 typtypes: Vec::new(),
124 typcategories: Vec::new(),
125 typispreferreds: Vec::new(),
126 typisdefineds: Vec::new(),
127 typdelims: Vec::new(),
128 typrelids: Vec::new(),
129 typelems: Vec::new(),
130 typarrays: Vec::new(),
131 typinputs: Vec::new(),
132 typoutputs: Vec::new(),
133 typreceives: Vec::new(),
134 typsends: Vec::new(),
135 typmodins: Vec::new(),
136 typmodouts: Vec::new(),
137 typanalyzes: Vec::new(),
138 typaligns: Vec::new(),
139 typstorages: Vec::new(),
140 typnotnulls: Vec::new(),
141 typbasetypes: Vec::new(),
142 typtymods: Vec::new(),
143 typndimss: Vec::new(),
144 typcollations: Vec::new(),
145 typdefaultbins: Vec::new(),
146 typdefaults: Vec::new(),
147 }
148 }
149
150 #[allow(clippy::too_many_arguments)]
151 fn add_type(
152 &mut self,
153 oid: i32,
154 typname: &str,
155 typnamespace: i32,
156 typowner: i32,
157 typlen: i16,
158 typbyval: bool,
159 typtype: &str,
160 typcategory: &str,
161 typispreferred: bool,
162 typisdefined: bool,
163 typdelim: &str,
164 typrelid: i32,
165 typelem: i32,
166 typarray: i32,
167 typinput: &str,
168 typoutput: &str,
169 typreceive: &str,
170 typsend: &str,
171 typmodin: &str,
172 typmodout: &str,
173 typanalyze: &str,
174 typalign: &str,
175 typstorage: &str,
176 typnotnull: bool,
177 typbasetype: i32,
178 typtypmod: i32,
179 typndims: i32,
180 typcollation: i32,
181 typdefaultbin: Option<String>,
182 typdefault: Option<String>,
183 ) {
184 self.oids.push(oid);
185 self.typnames.push(typname.to_string());
186 self.typnamespaces.push(typnamespace);
187 self.typowners.push(typowner);
188 self.typlens.push(typlen);
189 self.typbyvals.push(typbyval);
190 self.typtypes.push(typtype.to_string());
191 self.typcategories.push(typcategory.to_string());
192 self.typispreferreds.push(typispreferred);
193 self.typisdefineds.push(typisdefined);
194 self.typdelims.push(typdelim.to_string());
195 self.typrelids.push(typrelid);
196 self.typelems.push(typelem);
197 self.typarrays.push(typarray);
198 self.typinputs.push(typinput.to_string());
199 self.typoutputs.push(typoutput.to_string());
200 self.typreceives.push(typreceive.to_string());
201 self.typsends.push(typsend.to_string());
202 self.typmodins.push(typmodin.to_string());
203 self.typmodouts.push(typmodout.to_string());
204 self.typanalyzes.push(typanalyze.to_string());
205 self.typaligns.push(typalign.to_string());
206 self.typstorages.push(typstorage.to_string());
207 self.typnotnulls.push(typnotnull);
208 self.typbasetypes.push(typbasetype);
209 self.typtymods.push(typtypmod);
210 self.typndimss.push(typndims);
211 self.typcollations.push(typcollation);
212 self.typdefaultbins.push(typdefaultbin);
213 self.typdefaults.push(typdefault);
214 }
215}
216
217#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
218enum OidCacheKey {
219 Catalog(String),
220 Schema(String, String),
221 Table(String, String, String),
223}
224
225#[derive(Debug)]
227pub struct PgCatalogSchemaProvider {
228 catalog_list: Arc<dyn CatalogProviderList>,
229 oid_counter: Arc<AtomicU32>,
230 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
231}
232
233#[async_trait]
234impl SchemaProvider for PgCatalogSchemaProvider {
235 fn as_any(&self) -> &dyn std::any::Any {
236 self
237 }
238
239 fn table_names(&self) -> Vec<String> {
240 PG_CATALOG_TABLES.iter().map(ToString::to_string).collect()
241 }
242
243 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
244 match name.to_ascii_lowercase().as_str() {
245 PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
246 PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
247 PG_CATALOG_TABLE_PG_CLASS => {
248 let table = Arc::new(PgClassTable::new(
249 self.catalog_list.clone(),
250 self.oid_counter.clone(),
251 self.oid_cache.clone(),
252 ));
253 Ok(Some(Arc::new(
254 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
255 )))
256 }
257 PG_CATALOG_TABLE_PG_NAMESPACE => {
258 let table = Arc::new(PgNamespaceTable::new(
259 self.catalog_list.clone(),
260 self.oid_counter.clone(),
261 self.oid_cache.clone(),
262 ));
263 Ok(Some(Arc::new(
264 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
265 )))
266 }
267 PG_CATALOG_TABLE_PG_DATABASE => {
268 let table = Arc::new(PgDatabaseTable::new(
269 self.catalog_list.clone(),
270 self.oid_counter.clone(),
271 self.oid_cache.clone(),
272 ));
273 Ok(Some(Arc::new(
274 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
275 )))
276 }
277 PG_CATALOG_TABLE_PG_ATTRIBUTE => {
278 let table = Arc::new(PgAttributeTable::new(self.catalog_list.clone()));
279 Ok(Some(Arc::new(
280 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
281 )))
282 }
283 PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.create_pg_proc_table())),
284 PG_CATALOG_TABLE_PG_RANGE => Ok(Some(self.create_pg_range_table())),
285 PG_CATALOG_TABLE_PG_ENUM => Ok(Some(self.create_pg_enum_table())),
286 PG_CATALOG_TABLE_PG_DESCRIPTION => Ok(Some(self.create_pg_description_table())),
287 _ => Ok(None),
288 }
289 }
290
291 fn table_exist(&self, name: &str) -> bool {
292 PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str())
293 }
294}
295
296impl PgCatalogSchemaProvider {
297 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
298 Self {
299 catalog_list,
300 oid_counter: Arc::new(AtomicU32::new(16384)),
301 oid_cache: Arc::new(RwLock::new(HashMap::new())),
302 }
303 }
304
305 fn create_pg_type_table(&self) -> Arc<dyn TableProvider> {
307 let schema = Arc::new(Schema::new(vec![
309 Field::new("oid", DataType::Int32, false),
310 Field::new("typname", DataType::Utf8, false),
311 Field::new("typnamespace", DataType::Int32, false),
312 Field::new("typowner", DataType::Int32, false),
313 Field::new("typlen", DataType::Int16, false),
314 Field::new("typbyval", DataType::Boolean, false),
315 Field::new("typtype", DataType::Utf8, false),
316 Field::new("typcategory", DataType::Utf8, false),
317 Field::new("typispreferred", DataType::Boolean, false),
318 Field::new("typisdefined", DataType::Boolean, false),
319 Field::new("typdelim", DataType::Utf8, false),
320 Field::new("typrelid", DataType::Int32, false),
321 Field::new("typelem", DataType::Int32, false),
322 Field::new("typarray", DataType::Int32, false),
323 Field::new("typinput", DataType::Utf8, false),
324 Field::new("typoutput", DataType::Utf8, false),
325 Field::new("typreceive", DataType::Utf8, false),
326 Field::new("typsend", DataType::Utf8, false),
327 Field::new("typmodin", DataType::Utf8, false),
328 Field::new("typmodout", DataType::Utf8, false),
329 Field::new("typanalyze", DataType::Utf8, false),
330 Field::new("typalign", DataType::Utf8, false),
331 Field::new("typstorage", DataType::Utf8, false),
332 Field::new("typnotnull", DataType::Boolean, false),
333 Field::new("typbasetype", DataType::Int32, false),
334 Field::new("typtypmod", DataType::Int32, false),
335 Field::new("typndims", DataType::Int32, false),
336 Field::new("typcollation", DataType::Int32, false),
337 Field::new("typdefaultbin", DataType::Utf8, true),
338 Field::new("typdefault", DataType::Utf8, true),
339 ]));
340
341 let pg_types_data = Self::get_standard_pg_types();
343
344 let arrays: Vec<ArrayRef> = vec![
346 Arc::new(Int32Array::from(pg_types_data.oids)),
347 Arc::new(StringArray::from(pg_types_data.typnames)),
348 Arc::new(Int32Array::from(pg_types_data.typnamespaces)),
349 Arc::new(Int32Array::from(pg_types_data.typowners)),
350 Arc::new(Int16Array::from(pg_types_data.typlens)),
351 Arc::new(BooleanArray::from(pg_types_data.typbyvals)),
352 Arc::new(StringArray::from(pg_types_data.typtypes)),
353 Arc::new(StringArray::from(pg_types_data.typcategories)),
354 Arc::new(BooleanArray::from(pg_types_data.typispreferreds)),
355 Arc::new(BooleanArray::from(pg_types_data.typisdefineds)),
356 Arc::new(StringArray::from(pg_types_data.typdelims)),
357 Arc::new(Int32Array::from(pg_types_data.typrelids)),
358 Arc::new(Int32Array::from(pg_types_data.typelems)),
359 Arc::new(Int32Array::from(pg_types_data.typarrays)),
360 Arc::new(StringArray::from(pg_types_data.typinputs)),
361 Arc::new(StringArray::from(pg_types_data.typoutputs)),
362 Arc::new(StringArray::from(pg_types_data.typreceives)),
363 Arc::new(StringArray::from(pg_types_data.typsends)),
364 Arc::new(StringArray::from(pg_types_data.typmodins)),
365 Arc::new(StringArray::from(pg_types_data.typmodouts)),
366 Arc::new(StringArray::from(pg_types_data.typanalyzes)),
367 Arc::new(StringArray::from(pg_types_data.typaligns)),
368 Arc::new(StringArray::from(pg_types_data.typstorages)),
369 Arc::new(BooleanArray::from(pg_types_data.typnotnulls)),
370 Arc::new(Int32Array::from(pg_types_data.typbasetypes)),
371 Arc::new(Int32Array::from(pg_types_data.typtymods)),
372 Arc::new(Int32Array::from(pg_types_data.typndimss)),
373 Arc::new(Int32Array::from(pg_types_data.typcollations)),
374 Arc::new(StringArray::from_iter(
375 pg_types_data.typdefaultbins.into_iter(),
376 )),
377 Arc::new(StringArray::from_iter(
378 pg_types_data.typdefaults.into_iter(),
379 )),
380 ];
381
382 let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
383
384 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
386
387 Arc::new(provider)
388 }
389
390 fn get_standard_pg_types() -> PgTypesData {
392 let mut data = PgTypesData::new();
393
394 data.add_type(
396 16, "bool", 11, 10, 1, true, "b", "B", true, true, ",", 0, 0, 1000, "boolin",
397 "boolout", "boolrecv", "boolsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
398 None,
399 );
400 data.add_type(
401 17,
402 "bytea",
403 11,
404 10,
405 -1,
406 false,
407 "b",
408 "U",
409 false,
410 true,
411 ",",
412 0,
413 0,
414 1001,
415 "byteain",
416 "byteaout",
417 "bytearecv",
418 "byteasend",
419 "-",
420 "-",
421 "-",
422 "i",
423 "x",
424 false,
425 0,
426 -1,
427 0,
428 0,
429 None,
430 None,
431 );
432 data.add_type(
433 18, "char", 11, 10, 1, true, "b", "S", false, true, ",", 0, 0, 1002, "charin",
434 "charout", "charrecv", "charsend", "-", "-", "-", "c", "p", false, 0, -1, 0, 0, None,
435 None,
436 );
437 data.add_type(
438 19, "name", 11, 10, 64, false, "b", "S", false, true, ",", 0, 0, 1003, "namein",
439 "nameout", "namerecv", "namesend", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
440 None,
441 );
442 data.add_type(
443 20, "int8", 11, 10, 8, true, "b", "N", false, true, ",", 0, 0, 1016, "int8in",
444 "int8out", "int8recv", "int8send", "-", "-", "-", "d", "p", false, 0, -1, 0, 0, None,
445 None,
446 );
447 data.add_type(
448 21, "int2", 11, 10, 2, true, "b", "N", false, true, ",", 0, 0, 1005, "int2in",
449 "int2out", "int2recv", "int2send", "-", "-", "-", "s", "p", false, 0, -1, 0, 0, None,
450 None,
451 );
452 data.add_type(
453 23, "int4", 11, 10, 4, true, "b", "N", true, true, ",", 0, 0, 1007, "int4in",
454 "int4out", "int4recv", "int4send", "-", "-", "-", "i", "p", false, 0, -1, 0, 0, None,
455 None,
456 );
457 data.add_type(
458 25, "text", 11, 10, -1, false, "b", "S", true, true, ",", 0, 0, 1009, "textin",
459 "textout", "textrecv", "textsend", "-", "-", "-", "i", "x", false, 0, -1, 0, 100, None,
460 None,
461 );
462 data.add_type(
463 700,
464 "float4",
465 11,
466 10,
467 4,
468 true,
469 "b",
470 "N",
471 false,
472 true,
473 ",",
474 0,
475 0,
476 1021,
477 "float4in",
478 "float4out",
479 "float4recv",
480 "float4send",
481 "-",
482 "-",
483 "-",
484 "i",
485 "p",
486 false,
487 0,
488 -1,
489 0,
490 0,
491 None,
492 None,
493 );
494 data.add_type(
495 701,
496 "float8",
497 11,
498 10,
499 8,
500 true,
501 "b",
502 "N",
503 true,
504 true,
505 ",",
506 0,
507 0,
508 1022,
509 "float8in",
510 "float8out",
511 "float8recv",
512 "float8send",
513 "-",
514 "-",
515 "-",
516 "d",
517 "p",
518 false,
519 0,
520 -1,
521 0,
522 0,
523 None,
524 None,
525 );
526 data.add_type(
527 1043,
528 "varchar",
529 11,
530 10,
531 -1,
532 false,
533 "b",
534 "S",
535 false,
536 true,
537 ",",
538 0,
539 0,
540 1015,
541 "varcharin",
542 "varcharout",
543 "varcharrecv",
544 "varcharsend",
545 "varchartypmodin",
546 "varchartypmodout",
547 "-",
548 "i",
549 "x",
550 false,
551 0,
552 -1,
553 0,
554 100,
555 None,
556 None,
557 );
558 data.add_type(
559 1082,
560 "date",
561 11,
562 10,
563 4,
564 true,
565 "b",
566 "D",
567 false,
568 true,
569 ",",
570 0,
571 0,
572 1182,
573 "date_in",
574 "date_out",
575 "date_recv",
576 "date_send",
577 "-",
578 "-",
579 "-",
580 "i",
581 "p",
582 false,
583 0,
584 -1,
585 0,
586 0,
587 None,
588 None,
589 );
590 data.add_type(
591 1083,
592 "time",
593 11,
594 10,
595 8,
596 true,
597 "b",
598 "D",
599 false,
600 true,
601 ",",
602 0,
603 0,
604 1183,
605 "time_in",
606 "time_out",
607 "time_recv",
608 "time_send",
609 "timetypmodin",
610 "timetypmodout",
611 "-",
612 "d",
613 "p",
614 false,
615 0,
616 -1,
617 0,
618 0,
619 None,
620 None,
621 );
622 data.add_type(
623 1114,
624 "timestamp",
625 11,
626 10,
627 8,
628 true,
629 "b",
630 "D",
631 false,
632 true,
633 ",",
634 0,
635 0,
636 1115,
637 "timestamp_in",
638 "timestamp_out",
639 "timestamp_recv",
640 "timestamp_send",
641 "timestamptypmodin",
642 "timestamptypmodout",
643 "-",
644 "d",
645 "p",
646 false,
647 0,
648 -1,
649 0,
650 0,
651 None,
652 None,
653 );
654 data.add_type(
655 1184,
656 "timestamptz",
657 11,
658 10,
659 8,
660 true,
661 "b",
662 "D",
663 true,
664 true,
665 ",",
666 0,
667 0,
668 1185,
669 "timestamptz_in",
670 "timestamptz_out",
671 "timestamptz_recv",
672 "timestamptz_send",
673 "timestamptztypmodin",
674 "timestamptztypmodout",
675 "-",
676 "d",
677 "p",
678 false,
679 0,
680 -1,
681 0,
682 0,
683 None,
684 None,
685 );
686 data.add_type(
687 1700,
688 "numeric",
689 11,
690 10,
691 -1,
692 false,
693 "b",
694 "N",
695 false,
696 true,
697 ",",
698 0,
699 0,
700 1231,
701 "numeric_in",
702 "numeric_out",
703 "numeric_recv",
704 "numeric_send",
705 "numerictypmodin",
706 "numerictypmodout",
707 "-",
708 "i",
709 "m",
710 false,
711 0,
712 -1,
713 0,
714 0,
715 None,
716 None,
717 );
718
719 data
720 }
721
722 fn create_pg_am_table(&self) -> Arc<dyn TableProvider> {
724 let schema = Arc::new(Schema::new(vec![
727 Field::new("oid", DataType::Int32, false), Field::new("amname", DataType::Utf8, false), Field::new("amhandler", DataType::Int32, false), Field::new("amtype", DataType::Utf8, false), Field::new("amstrategies", DataType::Int32, false), Field::new("amsupport", DataType::Int32, false), Field::new("amcanorder", DataType::Boolean, false), Field::new("amcanorderbyop", DataType::Boolean, false), Field::new("amcanbackward", DataType::Boolean, false), Field::new("amcanunique", DataType::Boolean, false), Field::new("amcanmulticol", DataType::Boolean, false), Field::new("amoptionalkey", DataType::Boolean, false), Field::new("amsearcharray", DataType::Boolean, false), Field::new("amsearchnulls", DataType::Boolean, false), Field::new("amstorage", DataType::Boolean, false), Field::new("amclusterable", DataType::Boolean, false), Field::new("ampredlocks", DataType::Boolean, false), Field::new("amcanparallel", DataType::Boolean, false), Field::new("amcanbeginscan", DataType::Boolean, false), Field::new("amcanmarkpos", DataType::Boolean, false), Field::new("amcanfetch", DataType::Boolean, false), Field::new("amkeytype", DataType::Int32, false), ]));
750
751 let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
753
754 Arc::new(provider)
755 }
756
757 fn create_pg_range_table(&self) -> Arc<dyn TableProvider> {
759 let schema = Arc::new(Schema::new(vec![
762 Field::new("rngtypid", DataType::Int32, false), Field::new("rngsubtype", DataType::Int32, false), Field::new("rngmultitypid", DataType::Int32, false), Field::new("rngcollation", DataType::Int32, false), Field::new("rngsubopc", DataType::Int32, false), Field::new("rngcanonical", DataType::Int32, false), Field::new("rngsubdiff", DataType::Int32, false), ]));
770
771 let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
773 Arc::new(provider)
774 }
775
776 fn create_pg_enum_table(&self) -> Arc<dyn TableProvider> {
778 let schema = Arc::new(Schema::new(vec![
779 Field::new("oid", DataType::Int32, false), Field::new("enumtypid", DataType::Int32, false), Field::new("enumsortorder", DataType::Float32, false), Field::new("enumlabel", DataType::Utf8, false), ]));
784 let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
785 Arc::new(provider)
786 }
787
788 fn create_pg_description_table(&self) -> Arc<dyn TableProvider> {
790 let schema = Arc::new(Schema::new(vec![
791 Field::new("objoid", DataType::Int32, false), Field::new("classoid", DataType::Int32, false), Field::new("objsubid", DataType::Int32, false), Field::new("description", DataType::Utf8, false),
795 ]));
796 let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
797 Arc::new(provider)
798 }
799
800 fn create_pg_proc_table(&self) -> Arc<dyn TableProvider> {
802 let schema = Arc::new(Schema::new(vec![
804 Field::new("oid", DataType::Int32, false), Field::new("proname", DataType::Utf8, false), Field::new("pronamespace", DataType::Int32, false), Field::new("proowner", DataType::Int32, false), Field::new("prolang", DataType::Int32, false), Field::new("procost", DataType::Float32, false), Field::new("prorows", DataType::Float32, false), Field::new("provariadic", DataType::Int32, false), Field::new("prosupport", DataType::Int32, false), Field::new("prokind", DataType::Utf8, false), Field::new("prosecdef", DataType::Boolean, false), Field::new("proleakproof", DataType::Boolean, false), Field::new("proisstrict", DataType::Boolean, false), Field::new("proretset", DataType::Boolean, false), Field::new("provolatile", DataType::Utf8, false), Field::new("proparallel", DataType::Utf8, false), Field::new("pronargs", DataType::Int16, false), Field::new("pronargdefaults", DataType::Int16, false), Field::new("prorettype", DataType::Int32, false), Field::new("proargtypes", DataType::Utf8, false), Field::new("proallargtypes", DataType::Utf8, true), Field::new("proargmodes", DataType::Utf8, true), Field::new("proargnames", DataType::Utf8, true), Field::new("proargdefaults", DataType::Utf8, true), Field::new("protrftypes", DataType::Utf8, true), Field::new("prosrc", DataType::Utf8, false), Field::new("probin", DataType::Utf8, true), Field::new("prosqlbody", DataType::Utf8, true), Field::new("proconfig", DataType::Utf8, true), Field::new("proacl", DataType::Utf8, true), ]));
835
836 let pg_proc_data = Self::get_standard_pg_functions();
838
839 let arrays: Vec<ArrayRef> = vec![
841 Arc::new(Int32Array::from(pg_proc_data.oids)),
842 Arc::new(StringArray::from(pg_proc_data.pronames)),
843 Arc::new(Int32Array::from(pg_proc_data.pronamespaces)),
844 Arc::new(Int32Array::from(pg_proc_data.proowners)),
845 Arc::new(Int32Array::from(pg_proc_data.prolangs)),
846 Arc::new(Float32Array::from(pg_proc_data.procosts)),
847 Arc::new(Float32Array::from(pg_proc_data.prorows)),
848 Arc::new(Int32Array::from(pg_proc_data.provariadics)),
849 Arc::new(Int32Array::from(pg_proc_data.prosupports)),
850 Arc::new(StringArray::from(pg_proc_data.prokinds)),
851 Arc::new(BooleanArray::from(pg_proc_data.prosecdefs)),
852 Arc::new(BooleanArray::from(pg_proc_data.proleakproofs)),
853 Arc::new(BooleanArray::from(pg_proc_data.proisstricts)),
854 Arc::new(BooleanArray::from(pg_proc_data.proretsets)),
855 Arc::new(StringArray::from(pg_proc_data.provolatiles)),
856 Arc::new(StringArray::from(pg_proc_data.proparallels)),
857 Arc::new(Int16Array::from(pg_proc_data.pronargs)),
858 Arc::new(Int16Array::from(pg_proc_data.pronargdefaults)),
859 Arc::new(Int32Array::from(pg_proc_data.prorettypes)),
860 Arc::new(StringArray::from(pg_proc_data.proargtypes)),
861 Arc::new(StringArray::from_iter(
862 pg_proc_data.proallargtypes.into_iter(),
863 )),
864 Arc::new(StringArray::from_iter(pg_proc_data.proargmodes.into_iter())),
865 Arc::new(StringArray::from_iter(pg_proc_data.proargnames.into_iter())),
866 Arc::new(StringArray::from_iter(
867 pg_proc_data.proargdefaults.into_iter(),
868 )),
869 Arc::new(StringArray::from_iter(pg_proc_data.protrftypes.into_iter())),
870 Arc::new(StringArray::from(pg_proc_data.prosrcs)),
871 Arc::new(StringArray::from_iter(pg_proc_data.probins.into_iter())),
872 Arc::new(StringArray::from_iter(pg_proc_data.prosqlbodys.into_iter())),
873 Arc::new(StringArray::from_iter(pg_proc_data.proconfigs.into_iter())),
874 Arc::new(StringArray::from_iter(pg_proc_data.proacls.into_iter())),
875 ];
876
877 let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
878
879 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
881
882 Arc::new(provider)
883 }
884
885 fn get_standard_pg_functions() -> PgProcData {
887 let mut data = PgProcData::new();
888
889 data.add_function(
891 1242, "boolin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
892 0, 16, "2275", None, None, None, None, None, "boolin", None, None, None, None,
893 );
894 data.add_function(
895 1243, "boolout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
896 1, 0, 2275, "16", None, None, None, None, None, "boolout", None, None, None, None,
897 );
898 data.add_function(
899 1564, "textin", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s", 1,
900 0, 25, "2275", None, None, None, None, None, "textin", None, None, None, None,
901 );
902 data.add_function(
903 1565, "textout", 11, 10, 12, 1.0, 0.0, 0, 0, "f", false, true, true, false, "i", "s",
904 1, 0, 2275, "25", None, None, None, None, None, "textout", None, None, None, None,
905 );
906 data.add_function(
907 1242,
908 "version",
909 11,
910 10,
911 12,
912 1.0,
913 0.0,
914 0,
915 0,
916 "f",
917 false,
918 true,
919 false,
920 false,
921 "s",
922 "s",
923 0,
924 0,
925 25,
926 "",
927 None,
928 None,
929 None,
930 None,
931 None,
932 "SELECT 'DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu'",
933 None,
934 None,
935 None,
936 None,
937 );
938
939 data
940 }
941}
942
943#[derive(Debug)]
945struct PgProcData {
946 oids: Vec<i32>,
947 pronames: Vec<String>,
948 pronamespaces: Vec<i32>,
949 proowners: Vec<i32>,
950 prolangs: Vec<i32>,
951 procosts: Vec<f32>,
952 prorows: Vec<f32>,
953 provariadics: Vec<i32>,
954 prosupports: Vec<i32>,
955 prokinds: Vec<String>,
956 prosecdefs: Vec<bool>,
957 proleakproofs: Vec<bool>,
958 proisstricts: Vec<bool>,
959 proretsets: Vec<bool>,
960 provolatiles: Vec<String>,
961 proparallels: Vec<String>,
962 pronargs: Vec<i16>,
963 pronargdefaults: Vec<i16>,
964 prorettypes: Vec<i32>,
965 proargtypes: Vec<String>,
966 proallargtypes: Vec<Option<String>>,
967 proargmodes: Vec<Option<String>>,
968 proargnames: Vec<Option<String>>,
969 proargdefaults: Vec<Option<String>>,
970 protrftypes: Vec<Option<String>>,
971 prosrcs: Vec<String>,
972 probins: Vec<Option<String>>,
973 prosqlbodys: Vec<Option<String>>,
974 proconfigs: Vec<Option<String>>,
975 proacls: Vec<Option<String>>,
976}
977
978impl PgProcData {
979 fn new() -> Self {
980 Self {
981 oids: Vec::new(),
982 pronames: Vec::new(),
983 pronamespaces: Vec::new(),
984 proowners: Vec::new(),
985 prolangs: Vec::new(),
986 procosts: Vec::new(),
987 prorows: Vec::new(),
988 provariadics: Vec::new(),
989 prosupports: Vec::new(),
990 prokinds: Vec::new(),
991 prosecdefs: Vec::new(),
992 proleakproofs: Vec::new(),
993 proisstricts: Vec::new(),
994 proretsets: Vec::new(),
995 provolatiles: Vec::new(),
996 proparallels: Vec::new(),
997 pronargs: Vec::new(),
998 pronargdefaults: Vec::new(),
999 prorettypes: Vec::new(),
1000 proargtypes: Vec::new(),
1001 proallargtypes: Vec::new(),
1002 proargmodes: Vec::new(),
1003 proargnames: Vec::new(),
1004 proargdefaults: Vec::new(),
1005 protrftypes: Vec::new(),
1006 prosrcs: Vec::new(),
1007 probins: Vec::new(),
1008 prosqlbodys: Vec::new(),
1009 proconfigs: Vec::new(),
1010 proacls: Vec::new(),
1011 }
1012 }
1013
1014 #[allow(clippy::too_many_arguments)]
1015 fn add_function(
1016 &mut self,
1017 oid: i32,
1018 proname: &str,
1019 pronamespace: i32,
1020 proowner: i32,
1021 prolang: i32,
1022 procost: f32,
1023 prorows: f32,
1024 provariadic: i32,
1025 prosupport: i32,
1026 prokind: &str,
1027 prosecdef: bool,
1028 proleakproof: bool,
1029 proisstrict: bool,
1030 proretset: bool,
1031 provolatile: &str,
1032 proparallel: &str,
1033 pronargs: i16,
1034 pronargdefaults: i16,
1035 prorettype: i32,
1036 proargtypes: &str,
1037 proallargtypes: Option<String>,
1038 proargmodes: Option<String>,
1039 proargnames: Option<String>,
1040 proargdefaults: Option<String>,
1041 protrftypes: Option<String>,
1042 prosrc: &str,
1043 probin: Option<String>,
1044 prosqlbody: Option<String>,
1045 proconfig: Option<String>,
1046 proacl: Option<String>,
1047 ) {
1048 self.oids.push(oid);
1049 self.pronames.push(proname.to_string());
1050 self.pronamespaces.push(pronamespace);
1051 self.proowners.push(proowner);
1052 self.prolangs.push(prolang);
1053 self.procosts.push(procost);
1054 self.prorows.push(prorows);
1055 self.provariadics.push(provariadic);
1056 self.prosupports.push(prosupport);
1057 self.prokinds.push(prokind.to_string());
1058 self.prosecdefs.push(prosecdef);
1059 self.proleakproofs.push(proleakproof);
1060 self.proisstricts.push(proisstrict);
1061 self.proretsets.push(proretset);
1062 self.provolatiles.push(provolatile.to_string());
1063 self.proparallels.push(proparallel.to_string());
1064 self.pronargs.push(pronargs);
1065 self.pronargdefaults.push(pronargdefaults);
1066 self.prorettypes.push(prorettype);
1067 self.proargtypes.push(proargtypes.to_string());
1068 self.proallargtypes.push(proallargtypes);
1069 self.proargmodes.push(proargmodes);
1070 self.proargnames.push(proargnames);
1071 self.proargdefaults.push(proargdefaults);
1072 self.protrftypes.push(protrftypes);
1073 self.prosrcs.push(prosrc.to_string());
1074 self.probins.push(probin);
1075 self.prosqlbodys.push(prosqlbody);
1076 self.proconfigs.push(proconfig);
1077 self.proacls.push(proacl);
1078 }
1079}
1080
1081#[derive(Debug, Clone)]
1082struct PgClassTable {
1083 schema: SchemaRef,
1084 catalog_list: Arc<dyn CatalogProviderList>,
1085 oid_counter: Arc<AtomicU32>,
1086 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1087}
1088
1089impl PgClassTable {
1090 fn new(
1091 catalog_list: Arc<dyn CatalogProviderList>,
1092 oid_counter: Arc<AtomicU32>,
1093 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1094 ) -> PgClassTable {
1095 let schema = Arc::new(Schema::new(vec![
1098 Field::new("oid", DataType::Int32, false), Field::new("relname", DataType::Utf8, false), Field::new("relnamespace", DataType::Int32, false), Field::new("reltype", DataType::Int32, false), Field::new("reloftype", DataType::Int32, true), Field::new("relowner", DataType::Int32, false), Field::new("relam", DataType::Int32, false), Field::new("relfilenode", DataType::Int32, false), Field::new("reltablespace", DataType::Int32, false), Field::new("relpages", DataType::Int32, false), Field::new("reltuples", DataType::Float64, false), Field::new("relallvisible", DataType::Int32, false), Field::new("reltoastrelid", DataType::Int32, false), Field::new("relhasindex", DataType::Boolean, false), Field::new("relisshared", DataType::Boolean, false), Field::new("relpersistence", DataType::Utf8, false), Field::new("relkind", DataType::Utf8, false), Field::new("relnatts", DataType::Int16, false), Field::new("relchecks", DataType::Int16, false), Field::new("relhasrules", DataType::Boolean, false), Field::new("relhastriggers", DataType::Boolean, false), Field::new("relhassubclass", DataType::Boolean, false), Field::new("relrowsecurity", DataType::Boolean, false), Field::new("relforcerowsecurity", DataType::Boolean, false), Field::new("relispopulated", DataType::Boolean, false), Field::new("relreplident", DataType::Utf8, false), Field::new("relispartition", DataType::Boolean, false), Field::new("relrewrite", DataType::Int32, true), Field::new("relfrozenxid", DataType::Int32, false), Field::new("relminmxid", DataType::Int32, false), ]));
1129
1130 Self {
1131 schema,
1132 catalog_list,
1133 oid_counter,
1134 oid_cache,
1135 }
1136 }
1137
1138 async fn get_data(this: PgClassTable) -> Result<RecordBatch> {
1140 let mut oids = Vec::new();
1142 let mut relnames = Vec::new();
1143 let mut relnamespaces = Vec::new();
1144 let mut reltypes = Vec::new();
1145 let mut reloftypes = Vec::new();
1146 let mut relowners = Vec::new();
1147 let mut relams = Vec::new();
1148 let mut relfilenodes = Vec::new();
1149 let mut reltablespaces = Vec::new();
1150 let mut relpages = Vec::new();
1151 let mut reltuples = Vec::new();
1152 let mut relallvisibles = Vec::new();
1153 let mut reltoastrelids = Vec::new();
1154 let mut relhasindexes = Vec::new();
1155 let mut relisshareds = Vec::new();
1156 let mut relpersistences = Vec::new();
1157 let mut relkinds = Vec::new();
1158 let mut relnattses = Vec::new();
1159 let mut relcheckses = Vec::new();
1160 let mut relhasruleses = Vec::new();
1161 let mut relhastriggersses = Vec::new();
1162 let mut relhassubclasses = Vec::new();
1163 let mut relrowsecurities = Vec::new();
1164 let mut relforcerowsecurities = Vec::new();
1165 let mut relispopulateds = Vec::new();
1166 let mut relreplidents = Vec::new();
1167 let mut relispartitions = Vec::new();
1168 let mut relrewrites = Vec::new();
1169 let mut relfrozenxids = Vec::new();
1170 let mut relminmxids = Vec::new();
1171
1172 let mut oid_cache = this.oid_cache.write().await;
1173 let mut swap_cache = HashMap::new();
1176
1177 for catalog_name in this.catalog_list.catalog_names() {
1179 let cache_key = OidCacheKey::Catalog(catalog_name.clone());
1180 let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1181 *oid
1182 } else {
1183 this.oid_counter.fetch_add(1, Ordering::Relaxed)
1184 };
1185 swap_cache.insert(cache_key, catalog_oid);
1186
1187 if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
1188 for schema_name in catalog.schema_names() {
1189 if let Some(schema) = catalog.schema(&schema_name) {
1190 let cache_key =
1191 OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
1192 let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1193 *oid
1194 } else {
1195 this.oid_counter.fetch_add(1, Ordering::Relaxed)
1196 };
1197 swap_cache.insert(cache_key, schema_oid);
1198
1199 for table_name in schema.table_names() {
1204 let cache_key = OidCacheKey::Table(
1205 catalog_name.clone(),
1206 schema_name.clone(),
1207 table_name.clone(),
1208 );
1209 let table_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1210 *oid
1211 } else {
1212 this.oid_counter.fetch_add(1, Ordering::Relaxed)
1213 };
1214 swap_cache.insert(cache_key, table_oid);
1215
1216 if let Some(table) = schema.table(&table_name).await? {
1217 let table_type =
1219 get_table_type_with_name(&table, &table_name, &schema_name);
1220
1221 let column_count = table.schema().fields().len() as i16;
1223
1224 oids.push(table_oid as i32);
1226 relnames.push(table_name.clone());
1227 relnamespaces.push(schema_oid as i32);
1228 reltypes.push(0); reloftypes.push(None);
1230 relowners.push(0); relams.push(0); relfilenodes.push(table_oid as i32); reltablespaces.push(0); relpages.push(1); reltuples.push(0.0); relallvisibles.push(0);
1237 reltoastrelids.push(0);
1238 relhasindexes.push(false);
1239 relisshareds.push(false);
1240 relpersistences.push("p".to_string()); relkinds.push(table_type.to_string());
1242 relnattses.push(column_count);
1243 relcheckses.push(0);
1244 relhasruleses.push(false);
1245 relhastriggersses.push(false);
1246 relhassubclasses.push(false);
1247 relrowsecurities.push(false);
1248 relforcerowsecurities.push(false);
1249 relispopulateds.push(true);
1250 relreplidents.push("d".to_string()); relispartitions.push(false);
1252 relrewrites.push(None);
1253 relfrozenxids.push(0);
1254 relminmxids.push(0);
1255 }
1256 }
1257 }
1258 }
1259 }
1260 }
1261
1262 *oid_cache = swap_cache;
1263
1264 let arrays: Vec<ArrayRef> = vec![
1266 Arc::new(Int32Array::from(oids)),
1267 Arc::new(StringArray::from(relnames)),
1268 Arc::new(Int32Array::from(relnamespaces)),
1269 Arc::new(Int32Array::from(reltypes)),
1270 Arc::new(Int32Array::from_iter(reloftypes.into_iter())),
1271 Arc::new(Int32Array::from(relowners)),
1272 Arc::new(Int32Array::from(relams)),
1273 Arc::new(Int32Array::from(relfilenodes)),
1274 Arc::new(Int32Array::from(reltablespaces)),
1275 Arc::new(Int32Array::from(relpages)),
1276 Arc::new(Float64Array::from_iter(reltuples.into_iter())),
1277 Arc::new(Int32Array::from(relallvisibles)),
1278 Arc::new(Int32Array::from(reltoastrelids)),
1279 Arc::new(BooleanArray::from(relhasindexes)),
1280 Arc::new(BooleanArray::from(relisshareds)),
1281 Arc::new(StringArray::from(relpersistences)),
1282 Arc::new(StringArray::from(relkinds)),
1283 Arc::new(Int16Array::from(relnattses)),
1284 Arc::new(Int16Array::from(relcheckses)),
1285 Arc::new(BooleanArray::from(relhasruleses)),
1286 Arc::new(BooleanArray::from(relhastriggersses)),
1287 Arc::new(BooleanArray::from(relhassubclasses)),
1288 Arc::new(BooleanArray::from(relrowsecurities)),
1289 Arc::new(BooleanArray::from(relforcerowsecurities)),
1290 Arc::new(BooleanArray::from(relispopulateds)),
1291 Arc::new(StringArray::from(relreplidents)),
1292 Arc::new(BooleanArray::from(relispartitions)),
1293 Arc::new(Int32Array::from_iter(relrewrites.into_iter())),
1294 Arc::new(Int32Array::from(relfrozenxids)),
1295 Arc::new(Int32Array::from(relminmxids)),
1296 ];
1297
1298 let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1300
1301 Ok(batch)
1302 }
1303}
1304
1305impl PartitionStream for PgClassTable {
1306 fn schema(&self) -> &SchemaRef {
1307 &self.schema
1308 }
1309
1310 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1311 let this = self.clone();
1312 Box::pin(RecordBatchStreamAdapter::new(
1313 this.schema.clone(),
1314 futures::stream::once(async move { PgClassTable::get_data(this).await }),
1315 ))
1316 }
1317}
1318
1319#[derive(Debug, Clone)]
1320struct PgNamespaceTable {
1321 schema: SchemaRef,
1322 catalog_list: Arc<dyn CatalogProviderList>,
1323 oid_counter: Arc<AtomicU32>,
1324 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1325}
1326
1327impl PgNamespaceTable {
1328 pub fn new(
1329 catalog_list: Arc<dyn CatalogProviderList>,
1330 oid_counter: Arc<AtomicU32>,
1331 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1332 ) -> Self {
1333 let schema = Arc::new(Schema::new(vec![
1336 Field::new("oid", DataType::Int32, false), Field::new("nspname", DataType::Utf8, false), Field::new("nspowner", DataType::Int32, false), Field::new("nspacl", DataType::Utf8, true), Field::new("options", DataType::Utf8, true), ]));
1342
1343 Self {
1344 schema,
1345 catalog_list,
1346 oid_counter,
1347 oid_cache,
1348 }
1349 }
1350
1351 async fn get_data(this: PgNamespaceTable) -> Result<RecordBatch> {
1353 let mut oids = Vec::new();
1355 let mut nspnames = Vec::new();
1356 let mut nspowners = Vec::new();
1357 let mut nspacls: Vec<Option<String>> = Vec::new();
1358 let mut options: Vec<Option<String>> = Vec::new();
1359
1360 let mut schema_oid_cache = HashMap::new();
1362
1363 let mut oid_cache = this.oid_cache.write().await;
1364
1365 for catalog_name in this.catalog_list.catalog_names() {
1367 if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
1368 for schema_name in catalog.schema_names() {
1369 let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
1370 let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1371 *oid
1372 } else {
1373 this.oid_counter.fetch_add(1, Ordering::Relaxed)
1374 };
1375 schema_oid_cache.insert(cache_key, schema_oid);
1376
1377 oids.push(schema_oid as i32);
1378 nspnames.push(schema_name.clone());
1379 nspowners.push(10); nspacls.push(None);
1381 options.push(None);
1382 }
1383 }
1384 }
1385
1386 oid_cache.retain(|key, _| match key {
1388 OidCacheKey::Catalog(..) => true,
1389 OidCacheKey::Schema(..) => false,
1390 OidCacheKey::Table(catalog, schema_name, _) => schema_oid_cache
1391 .contains_key(&OidCacheKey::Schema(catalog.clone(), schema_name.clone())),
1392 });
1393 oid_cache.extend(schema_oid_cache);
1395
1396 let arrays: Vec<ArrayRef> = vec![
1398 Arc::new(Int32Array::from(oids)),
1399 Arc::new(StringArray::from(nspnames)),
1400 Arc::new(Int32Array::from(nspowners)),
1401 Arc::new(StringArray::from_iter(nspacls.into_iter())),
1402 Arc::new(StringArray::from_iter(options.into_iter())),
1403 ];
1404
1405 let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1407
1408 Ok(batch)
1409 }
1410}
1411
1412impl PartitionStream for PgNamespaceTable {
1413 fn schema(&self) -> &SchemaRef {
1414 &self.schema
1415 }
1416
1417 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1418 let this = self.clone();
1419 Box::pin(RecordBatchStreamAdapter::new(
1420 this.schema.clone(),
1421 futures::stream::once(async move { Self::get_data(this).await }),
1422 ))
1423 }
1424}
1425
1426#[derive(Debug, Clone)]
1427struct PgDatabaseTable {
1428 schema: SchemaRef,
1429 catalog_list: Arc<dyn CatalogProviderList>,
1430 oid_counter: Arc<AtomicU32>,
1431 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1432}
1433
1434impl PgDatabaseTable {
1435 pub fn new(
1436 catalog_list: Arc<dyn CatalogProviderList>,
1437 oid_counter: Arc<AtomicU32>,
1438 oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1439 ) -> Self {
1440 let schema = Arc::new(Schema::new(vec![
1443 Field::new("oid", DataType::Int32, false), Field::new("datname", DataType::Utf8, false), Field::new("datdba", DataType::Int32, false), Field::new("encoding", DataType::Int32, false), Field::new("datcollate", DataType::Utf8, false), Field::new("datctype", DataType::Utf8, false), Field::new("datistemplate", DataType::Boolean, false), Field::new("datallowconn", DataType::Boolean, false), Field::new("datconnlimit", DataType::Int32, false), Field::new("datlastsysoid", DataType::Int32, false), Field::new("datfrozenxid", DataType::Int32, false), Field::new("datminmxid", DataType::Int32, false), Field::new("dattablespace", DataType::Int32, false), Field::new("datacl", DataType::Utf8, true), ]));
1458
1459 Self {
1460 schema,
1461 catalog_list,
1462 oid_counter,
1463 oid_cache,
1464 }
1465 }
1466
1467 async fn get_data(this: PgDatabaseTable) -> Result<RecordBatch> {
1469 let mut oids = Vec::new();
1471 let mut datnames = Vec::new();
1472 let mut datdbas = Vec::new();
1473 let mut encodings = Vec::new();
1474 let mut datcollates = Vec::new();
1475 let mut datctypes = Vec::new();
1476 let mut datistemplates = Vec::new();
1477 let mut datallowconns = Vec::new();
1478 let mut datconnlimits = Vec::new();
1479 let mut datlastsysoids = Vec::new();
1480 let mut datfrozenxids = Vec::new();
1481 let mut datminmxids = Vec::new();
1482 let mut dattablespaces = Vec::new();
1483 let mut datacles: Vec<Option<String>> = Vec::new();
1484
1485 let mut catalog_oid_cache = HashMap::new();
1487
1488 let mut oid_cache = this.oid_cache.write().await;
1489
1490 for catalog_name in this.catalog_list.catalog_names() {
1492 let cache_key = OidCacheKey::Catalog(catalog_name.clone());
1493 let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1494 *oid
1495 } else {
1496 this.oid_counter.fetch_add(1, Ordering::Relaxed)
1497 };
1498 catalog_oid_cache.insert(cache_key, catalog_oid);
1499
1500 oids.push(catalog_oid as i32);
1501 datnames.push(catalog_name.clone());
1502 datdbas.push(10); encodings.push(6); datcollates.push("en_US.UTF-8".to_string()); datctypes.push("en_US.UTF-8".to_string()); datistemplates.push(false);
1507 datallowconns.push(true);
1508 datconnlimits.push(-1); datlastsysoids.push(100000); datfrozenxids.push(1); datminmxids.push(1); dattablespaces.push(1663); datacles.push(None); }
1515
1516 let default_datname = "postgres".to_string();
1519 if !datnames.contains(&default_datname) {
1520 let cache_key = OidCacheKey::Catalog(default_datname.clone());
1521 let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1522 *oid
1523 } else {
1524 this.oid_counter.fetch_add(1, Ordering::Relaxed)
1525 };
1526 catalog_oid_cache.insert(cache_key, catalog_oid);
1527
1528 oids.push(catalog_oid as i32);
1529 datnames.push(default_datname);
1530 datdbas.push(10);
1531 encodings.push(6);
1532 datcollates.push("en_US.UTF-8".to_string());
1533 datctypes.push("en_US.UTF-8".to_string());
1534 datistemplates.push(false);
1535 datallowconns.push(true);
1536 datconnlimits.push(-1);
1537 datlastsysoids.push(100000);
1538 datfrozenxids.push(1);
1539 datminmxids.push(1);
1540 dattablespaces.push(1663);
1541 datacles.push(None);
1542 }
1543
1544 let arrays: Vec<ArrayRef> = vec![
1546 Arc::new(Int32Array::from(oids)),
1547 Arc::new(StringArray::from(datnames)),
1548 Arc::new(Int32Array::from(datdbas)),
1549 Arc::new(Int32Array::from(encodings)),
1550 Arc::new(StringArray::from(datcollates)),
1551 Arc::new(StringArray::from(datctypes)),
1552 Arc::new(BooleanArray::from(datistemplates)),
1553 Arc::new(BooleanArray::from(datallowconns)),
1554 Arc::new(Int32Array::from(datconnlimits)),
1555 Arc::new(Int32Array::from(datlastsysoids)),
1556 Arc::new(Int32Array::from(datfrozenxids)),
1557 Arc::new(Int32Array::from(datminmxids)),
1558 Arc::new(Int32Array::from(dattablespaces)),
1559 Arc::new(StringArray::from_iter(datacles.into_iter())),
1560 ];
1561
1562 let full_batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1564
1565 oid_cache.retain(|key, _| match key {
1568 OidCacheKey::Catalog(..) => false,
1569 OidCacheKey::Schema(catalog, ..) => {
1570 catalog_oid_cache.contains_key(&OidCacheKey::Catalog(catalog.clone()))
1571 }
1572 OidCacheKey::Table(catalog, ..) => {
1573 catalog_oid_cache.contains_key(&OidCacheKey::Catalog(catalog.clone()))
1574 }
1575 });
1576 oid_cache.extend(catalog_oid_cache);
1578
1579 Ok(full_batch)
1580 }
1581}
1582
1583impl PartitionStream for PgDatabaseTable {
1584 fn schema(&self) -> &SchemaRef {
1585 &self.schema
1586 }
1587
1588 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1589 let this = self.clone();
1590 Box::pin(RecordBatchStreamAdapter::new(
1591 this.schema.clone(),
1592 futures::stream::once(async move { Self::get_data(this).await }),
1593 ))
1594 }
1595}
1596
1597#[derive(Debug)]
1598struct PgAttributeTable {
1599 schema: SchemaRef,
1600 catalog_list: Arc<dyn CatalogProviderList>,
1601}
1602
1603impl PgAttributeTable {
1604 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1605 let schema = Arc::new(Schema::new(vec![
1608 Field::new("attrelid", DataType::Int32, false), Field::new("attname", DataType::Utf8, false), Field::new("atttypid", DataType::Int32, false), Field::new("attstattarget", DataType::Int32, false), Field::new("attlen", DataType::Int16, false), Field::new("attnum", DataType::Int16, false), Field::new("attndims", DataType::Int32, false), Field::new("attcacheoff", DataType::Int32, false), Field::new("atttypmod", DataType::Int32, false), Field::new("attbyval", DataType::Boolean, false), Field::new("attalign", DataType::Utf8, false), Field::new("attstorage", DataType::Utf8, false), Field::new("attcompression", DataType::Utf8, true), Field::new("attnotnull", DataType::Boolean, false), Field::new("atthasdef", DataType::Boolean, false), Field::new("atthasmissing", DataType::Boolean, false), Field::new("attidentity", DataType::Utf8, false), Field::new("attgenerated", DataType::Utf8, false), Field::new("attisdropped", DataType::Boolean, false), Field::new("attislocal", DataType::Boolean, false), Field::new("attinhcount", DataType::Int32, false), Field::new("attcollation", DataType::Int32, false), Field::new("attacl", DataType::Utf8, true), Field::new("attoptions", DataType::Utf8, true), Field::new("attfdwoptions", DataType::Utf8, true), Field::new("attmissingval", DataType::Utf8, true), ]));
1635
1636 Self {
1637 schema,
1638 catalog_list,
1639 }
1640 }
1641
1642 async fn get_data(
1644 schema: SchemaRef,
1645 catalog_list: Arc<dyn CatalogProviderList>,
1646 ) -> Result<RecordBatch> {
1647 let mut attrelids = Vec::new();
1649 let mut attnames = Vec::new();
1650 let mut atttypids = Vec::new();
1651 let mut attstattargets = Vec::new();
1652 let mut attlens = Vec::new();
1653 let mut attnums = Vec::new();
1654 let mut attndimss = Vec::new();
1655 let mut attcacheoffs = Vec::new();
1656 let mut atttymods = Vec::new();
1657 let mut attbyvals = Vec::new();
1658 let mut attaligns = Vec::new();
1659 let mut attstorages = Vec::new();
1660 let mut attcompressions: Vec<Option<String>> = Vec::new();
1661 let mut attnotnulls = Vec::new();
1662 let mut atthasdefs = Vec::new();
1663 let mut atthasmissings = Vec::new();
1664 let mut attidentitys = Vec::new();
1665 let mut attgenerateds = Vec::new();
1666 let mut attisdroppeds = Vec::new();
1667 let mut attislocals = Vec::new();
1668 let mut attinhcounts = Vec::new();
1669 let mut attcollations = Vec::new();
1670 let mut attacls: Vec<Option<String>> = Vec::new();
1671 let mut attoptions: Vec<Option<String>> = Vec::new();
1672 let mut attfdwoptions: Vec<Option<String>> = Vec::new();
1673 let mut attmissingvals: Vec<Option<String>> = Vec::new();
1674
1675 let mut next_oid = 10000;
1677
1678 for catalog_name in catalog_list.catalog_names() {
1680 if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1681 for schema_name in catalog.schema_names() {
1682 if let Some(schema_provider) = catalog.schema(&schema_name) {
1683 for table_name in schema_provider.table_names() {
1685 let table_oid = next_oid;
1686 next_oid += 1;
1687
1688 if let Some(table) = schema_provider.table(&table_name).await? {
1689 let table_schema = table.schema();
1690
1691 for (column_idx, field) in table_schema.fields().iter().enumerate()
1693 {
1694 let attnum = (column_idx + 1) as i16; let (pg_type_oid, type_len, by_val, align, storage) =
1696 Self::datafusion_to_pg_type(field.data_type());
1697
1698 attrelids.push(table_oid);
1699 attnames.push(field.name().clone());
1700 atttypids.push(pg_type_oid);
1701 attstattargets.push(-1); attlens.push(type_len);
1703 attnums.push(attnum);
1704 attndimss.push(0); attcacheoffs.push(-1); atttymods.push(-1); attbyvals.push(by_val);
1708 attaligns.push(align.to_string());
1709 attstorages.push(storage.to_string());
1710 attcompressions.push(None); attnotnulls.push(!field.is_nullable());
1712 atthasdefs.push(false); atthasmissings.push(false); attidentitys.push("".to_string()); attgenerateds.push("".to_string()); attisdroppeds.push(false); attislocals.push(true); attinhcounts.push(0); attcollations.push(0); attacls.push(None); attoptions.push(None); attfdwoptions.push(None); attmissingvals.push(None); }
1725 }
1726 }
1727 }
1728 }
1729 }
1730 }
1731
1732 let arrays: Vec<ArrayRef> = vec![
1734 Arc::new(Int32Array::from(attrelids)),
1735 Arc::new(StringArray::from(attnames)),
1736 Arc::new(Int32Array::from(atttypids)),
1737 Arc::new(Int32Array::from(attstattargets)),
1738 Arc::new(Int16Array::from(attlens)),
1739 Arc::new(Int16Array::from(attnums)),
1740 Arc::new(Int32Array::from(attndimss)),
1741 Arc::new(Int32Array::from(attcacheoffs)),
1742 Arc::new(Int32Array::from(atttymods)),
1743 Arc::new(BooleanArray::from(attbyvals)),
1744 Arc::new(StringArray::from(attaligns)),
1745 Arc::new(StringArray::from(attstorages)),
1746 Arc::new(StringArray::from_iter(attcompressions.into_iter())),
1747 Arc::new(BooleanArray::from(attnotnulls)),
1748 Arc::new(BooleanArray::from(atthasdefs)),
1749 Arc::new(BooleanArray::from(atthasmissings)),
1750 Arc::new(StringArray::from(attidentitys)),
1751 Arc::new(StringArray::from(attgenerateds)),
1752 Arc::new(BooleanArray::from(attisdroppeds)),
1753 Arc::new(BooleanArray::from(attislocals)),
1754 Arc::new(Int32Array::from(attinhcounts)),
1755 Arc::new(Int32Array::from(attcollations)),
1756 Arc::new(StringArray::from_iter(attacls.into_iter())),
1757 Arc::new(StringArray::from_iter(attoptions.into_iter())),
1758 Arc::new(StringArray::from_iter(attfdwoptions.into_iter())),
1759 Arc::new(StringArray::from_iter(attmissingvals.into_iter())),
1760 ];
1761
1762 let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1764 Ok(batch)
1765 }
1766
1767 fn datafusion_to_pg_type(data_type: &DataType) -> (i32, i16, bool, &'static str, &'static str) {
1769 match data_type {
1770 DataType::Boolean => (16, 1, true, "c", "p"), DataType::Int8 => (18, 1, true, "c", "p"), DataType::Int16 => (21, 2, true, "s", "p"), DataType::Int32 => (23, 4, true, "i", "p"), DataType::Int64 => (20, 8, true, "d", "p"), DataType::UInt8 => (21, 2, true, "s", "p"), DataType::UInt16 => (23, 4, true, "i", "p"), DataType::UInt32 => (20, 8, true, "d", "p"), DataType::UInt64 => (1700, -1, false, "i", "m"), DataType::Float32 => (700, 4, true, "i", "p"), DataType::Float64 => (701, 8, true, "d", "p"), DataType::Utf8 => (25, -1, false, "i", "x"), DataType::LargeUtf8 => (25, -1, false, "i", "x"), DataType::Binary => (17, -1, false, "i", "x"), DataType::LargeBinary => (17, -1, false, "i", "x"), DataType::Date32 => (1082, 4, true, "i", "p"), DataType::Date64 => (1082, 4, true, "i", "p"), DataType::Time32(_) => (1083, 8, true, "d", "p"), DataType::Time64(_) => (1083, 8, true, "d", "p"), DataType::Timestamp(_, _) => (1114, 8, true, "d", "p"), DataType::Decimal128(_, _) => (1700, -1, false, "i", "m"), DataType::Decimal256(_, _) => (1700, -1, false, "i", "m"), _ => (25, -1, false, "i", "x"), }
1794 }
1795}
1796
1797impl PartitionStream for PgAttributeTable {
1798 fn schema(&self) -> &SchemaRef {
1799 &self.schema
1800 }
1801
1802 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1803 let catalog_list = self.catalog_list.clone();
1804 let schema = Arc::clone(&self.schema);
1805 Box::pin(RecordBatchStreamAdapter::new(
1806 schema.clone(),
1807 futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1808 ))
1809 }
1810}
1811
1812pub fn create_current_schemas_udf() -> ScalarUDF {
1813 let func = move |args: &[ColumnarValue]| {
1815 let args = ColumnarValue::values_to_arrays(args)?;
1816 let input = as_boolean_array(&args[0]);
1817
1818 let mut values = vec!["public"];
1820 if input.value(0) {
1822 values.push("information_schema");
1823 values.push("pg_catalog");
1824 }
1825
1826 let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
1827
1828 let array: ArrayRef = Arc::new(list_array.build_list_array());
1829
1830 Ok(ColumnarValue::Array(array))
1831 };
1832
1833 create_udf(
1835 "current_schemas",
1836 vec![DataType::Boolean],
1837 DataType::List(Arc::new(Field::new("schema", DataType::Utf8, false))),
1838 Volatility::Immutable,
1839 Arc::new(func),
1840 )
1841}
1842
1843pub fn create_current_schema_udf() -> ScalarUDF {
1844 let func = move |_args: &[ColumnarValue]| {
1846 let mut builder = StringBuilder::new();
1848 builder.append_value("public");
1849 let array: ArrayRef = Arc::new(builder.finish());
1850
1851 Ok(ColumnarValue::Array(array))
1852 };
1853
1854 create_udf(
1856 "current_schema",
1857 vec![],
1858 DataType::Utf8,
1859 Volatility::Immutable,
1860 Arc::new(func),
1861 )
1862}
1863
1864pub fn create_version_udf() -> ScalarUDF {
1865 let func = move |_args: &[ColumnarValue]| {
1867 let mut builder = StringBuilder::new();
1869 builder
1871 .append_value("DataFusion PostgreSQL 48.0.0 on x86_64-pc-linux-gnu, compiled by Rust");
1872 let array: ArrayRef = Arc::new(builder.finish());
1873
1874 Ok(ColumnarValue::Array(array))
1875 };
1876
1877 create_udf(
1879 "version",
1880 vec![],
1881 DataType::Utf8,
1882 Volatility::Immutable,
1883 Arc::new(func),
1884 )
1885}
1886
1887pub fn create_pg_get_userbyid_udf() -> ScalarUDF {
1888 let func = move |args: &[ColumnarValue]| {
1890 let args = ColumnarValue::values_to_arrays(args)?;
1891 let input = &args[0]; let mut builder = StringBuilder::new();
1895 for _ in 0..input.len() {
1896 builder.append_value("postgres");
1897 }
1898
1899 let array: ArrayRef = Arc::new(builder.finish());
1900
1901 Ok(ColumnarValue::Array(array))
1902 };
1903
1904 create_udf(
1906 "pg_catalog.pg_get_userbyid",
1907 vec![DataType::Int32],
1908 DataType::Utf8,
1909 Volatility::Stable,
1910 Arc::new(func),
1911 )
1912}
1913
1914pub fn create_pg_table_is_visible() -> ScalarUDF {
1915 let func = move |args: &[ColumnarValue]| {
1917 let args = ColumnarValue::values_to_arrays(args)?;
1918 let input = &args[0]; let mut builder = BooleanBuilder::new();
1922 for _ in 0..input.len() {
1923 builder.append_value(true);
1924 }
1925
1926 let array: ArrayRef = Arc::new(builder.finish());
1927
1928 Ok(ColumnarValue::Array(array))
1929 };
1930
1931 create_udf(
1933 "pg_catalog.pg_table_is_visible",
1934 vec![DataType::Int32],
1935 DataType::Boolean,
1936 Volatility::Stable,
1937 Arc::new(func),
1938 )
1939}
1940
1941pub fn create_has_table_privilege_3param_udf() -> ScalarUDF {
1942 let func = move |args: &[ColumnarValue]| {
1944 let args = ColumnarValue::values_to_arrays(args)?;
1945 let user = &args[0]; let _table = &args[1]; let _privilege = &args[2]; let mut builder = BooleanArray::builder(user.len());
1951 for _ in 0..user.len() {
1952 builder.append_value(true);
1953 }
1954
1955 let array: ArrayRef = Arc::new(builder.finish());
1956
1957 Ok(ColumnarValue::Array(array))
1958 };
1959
1960 create_udf(
1962 "has_table_privilege",
1963 vec![DataType::Utf8, DataType::Utf8, DataType::Utf8],
1964 DataType::Boolean,
1965 Volatility::Stable,
1966 Arc::new(func),
1967 )
1968}
1969
1970pub fn create_has_table_privilege_2param_udf() -> ScalarUDF {
1971 let func = move |args: &[ColumnarValue]| {
1973 let args = ColumnarValue::values_to_arrays(args)?;
1974 let table = &args[0]; let _privilege = &args[1]; let mut builder = BooleanArray::builder(table.len());
1979 for _ in 0..table.len() {
1980 builder.append_value(true);
1981 }
1982 let array: ArrayRef = Arc::new(builder.finish());
1983
1984 Ok(ColumnarValue::Array(array))
1985 };
1986
1987 create_udf(
1989 "has_table_privilege",
1990 vec![DataType::Utf8, DataType::Utf8],
1991 DataType::Boolean,
1992 Volatility::Stable,
1993 Arc::new(func),
1994 )
1995}
1996
1997pub fn create_format_type_udf() -> ScalarUDF {
1998 let func = move |args: &[ColumnarValue]| {
1999 let args = ColumnarValue::values_to_arrays(args)?;
2000 let type_oids = &args[0]; let _type_mods = &args[1]; let mut builder = StringBuilder::new();
2005 for _ in 0..type_oids.len() {
2006 builder.append_value("???");
2007 }
2008
2009 let array: ArrayRef = Arc::new(builder.finish());
2010
2011 Ok(ColumnarValue::Array(array))
2012 };
2013
2014 create_udf(
2015 "format_type",
2016 vec![DataType::Int32, DataType::Int32],
2017 DataType::Utf8,
2018 Volatility::Stable,
2019 Arc::new(func),
2020 )
2021}
2022
2023pub fn setup_pg_catalog(
2025 session_context: &SessionContext,
2026 catalog_name: &str,
2027) -> Result<(), Box<DataFusionError>> {
2028 let pg_catalog = PgCatalogSchemaProvider::new(session_context.state().catalog_list().clone());
2029 session_context
2030 .catalog(catalog_name)
2031 .ok_or_else(|| {
2032 DataFusionError::Configuration(format!(
2033 "Catalog not found when registering pg_catalog: {catalog_name}"
2034 ))
2035 })?
2036 .register_schema("pg_catalog", Arc::new(pg_catalog))?;
2037
2038 session_context.register_udf(create_current_schema_udf());
2039 session_context.register_udf(create_current_schemas_udf());
2040 session_context.register_udf(create_version_udf());
2041 session_context.register_udf(create_pg_get_userbyid_udf());
2042 session_context.register_udf(create_has_table_privilege_2param_udf());
2043 session_context.register_udf(create_pg_table_is_visible());
2044 session_context.register_udf(create_format_type_udf());
2045
2046 Ok(())
2047}