1use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder};
4use datafusion::arrow::datatypes::{Field, Schema};
5use datafusion::arrow::record_batch::RecordBatch;
6use datafusion::catalog::memory::MemorySchemaProvider;
7use datafusion::catalog::{CatalogProvider, SchemaProvider};
8use datafusion::datasource::{MemTable, TableProvider};
9use datafusion::error::DataFusionError;
10use std::convert::TryInto;
11use std::sync::Arc;
12
13macro_rules! table_builder {
14 ($type:ident $($field_name:ident: $builder_type:ty, $param_type:ty,)*) => {
15 struct $type {
16 $($field_name: $builder_type,)*
17 }
18
19 impl $type {
20 fn new() -> Self {
21 Self {
22 $($field_name: <$builder_type>::new(),)*
23 }
24 }
25
26 #[allow(dead_code)] fn add_row(
28 &mut self,
29 $($field_name: $param_type,)*
30 ) -> Result<(), DataFusionError> {
31 $(self.$field_name.append_value($field_name);)*
32 Ok(())
33 }
34 }
35
36 impl TryInto<Arc<dyn TableProvider>> for $type {
37 type Error = DataFusionError;
38
39 fn try_into(mut self) -> Result<Arc<dyn TableProvider>, Self::Error> {
40 let columns: Vec<ArrayRef> = vec![
41 $(Arc::new(self.$field_name.finish()),)*
42 ];
43
44 let column_names = &[
45 $(stringify!($field_name),)*
46 ];
47
48 let fields: Vec<_> = columns.iter().zip(column_names).map(|(c, name)| Field::new(name.to_owned(), c.data_type().clone(), true)).collect();
49 let schema = Arc::new(Schema::new(fields));
50 let batch = RecordBatch::try_new(schema, columns)?;
51 Ok(Arc::new(MemTable::try_new(batch.schema(), vec![vec![batch]])?))
52 }
53 }
54 };
55}
56
57table_builder! {
58 PgDatabaseBuilder
59 datname: StringBuilder, &str,
60}
61
62table_builder! {
63 PgTablesBuilder
64 schemaname: StringBuilder, &str,
65 tablename: StringBuilder, &str,
66}
67
68table_builder! {
69 PgNamespaceBuilder
70 oid: UInt32Builder, u32,
71 nspname: StringBuilder, &str,
72}
73
74table_builder! {
75 PgClassBuilder
76 oid: UInt32Builder, u32,
77 relname: StringBuilder, &str,
78 relnamespace: UInt32Builder, u32,
79 relkind: StringBuilder, &str,
80}
81
82table_builder! {
83 PgProc
84 oid: UInt32Builder, u32,
85 proname: StringBuilder, &str,
86 pronamespace: UInt32Builder, u32,
87}
88
89table_builder! {
90 PgDescription
91 objoid: UInt32Builder, u32,
92 classoid: UInt32Builder, u32,
93 objsubid: Int32Builder, i32,
94 description: StringBuilder, &str,
95}
96
97struct MetadataBuilder {
98 next_oid: u32,
99 pg_database: PgDatabaseBuilder,
100 pg_namespace: PgNamespaceBuilder,
101 pg_tables: PgTablesBuilder,
102 pg_class: PgClassBuilder,
103 pg_proc: PgProc,
104 pg_description: PgDescription,
105}
106
107impl MetadataBuilder {
108 fn new() -> Self {
109 Self {
110 next_oid: 0,
111 pg_database: PgDatabaseBuilder::new(),
112 pg_namespace: PgNamespaceBuilder::new(),
113 pg_tables: PgTablesBuilder::new(),
114 pg_class: PgClassBuilder::new(),
115 pg_proc: PgProc::new(),
116 pg_description: PgDescription::new(),
117 }
118 }
119
120 fn alloc_oid(&mut self) -> u32 {
121 self.next_oid += 1;
122 self.next_oid
123 }
124
125 fn add_schema(&mut self, schema_name: &str, schema: &dyn SchemaProvider) -> Result<(), DataFusionError> {
126 let schema_oid = self.alloc_oid();
127
128 for table_name in schema.table_names() {
129 let table_oid = self.alloc_oid();
130
131 self.pg_tables.add_row(schema_name, &table_name)?;
132 self.pg_namespace.add_row(schema_oid, schema_name)?;
133 self.pg_class.add_row(table_oid, &table_name, schema_oid, "r")?;
134 let desc_oid = self.alloc_oid();
135 self.pg_description.add_row(desc_oid, table_oid, 0, "")?;
136 }
137
138 Ok(())
139 }
140
141 fn into_schema(self) -> Result<MemorySchemaProvider, DataFusionError> {
142 let schema = MemorySchemaProvider::new();
143
144 schema.register_table("pg_tables".to_owned(), self.pg_tables.try_into()?)?;
145 schema.register_table("pg_namespace".to_owned(), self.pg_namespace.try_into()?)?;
146 schema.register_table("pg_class".to_owned(), self.pg_class.try_into()?)?;
147 schema.register_table("pg_database".to_owned(), self.pg_database.try_into()?)?;
148 schema.register_table("pg_proc".to_owned(), self.pg_proc.try_into()?)?;
149 schema.register_table("pg_description".to_owned(), self.pg_description.try_into()?)?;
150
151 Ok(schema)
152 }
153}
154
155#[derive(Debug)]
157pub struct Catalog {
158 wrapped: Arc<dyn CatalogProvider>,
159}
160
161impl Catalog {
162 pub fn new(wrapped: Arc<dyn CatalogProvider>) -> Self {
164 Self { wrapped }
165 }
166
167 fn build_metadata_schema(&self) -> Result<MemorySchemaProvider, DataFusionError> {
168 let mut builder = MetadataBuilder::new();
169 builder.pg_database.add_row("datafusion")?;
170
171 for schema_name in self.wrapped.schema_names() {
172 let schema = match self.wrapped.schema(&schema_name) {
173 Some(s) => s,
174 None => continue,
175 };
176
177 builder.add_schema(&schema_name, schema.as_ref())?;
178 }
179
180 builder.into_schema()
181 }
182}
183
184impl CatalogProvider for Catalog {
185 fn as_any(&self) -> &dyn std::any::Any {
186 self
187 }
188
189 fn schema_names(&self) -> Vec<String> {
190 let mut ret = self.wrapped.schema_names();
191 ret.push("pg_catalog".to_owned());
192 ret
193 }
194
195 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
196 if name.eq_ignore_ascii_case("pg_catalog") {
197 return Some(Arc::new(
198 self.build_metadata_schema().expect("failed to build metadata schema"),
199 ));
200 }
201
202 self.wrapped.schema(name)
203 }
204}