convergence_arrow/
metadata.rs

1//! Supports generating Postgres metadata from a DataFusion catalog.
2
3use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder};
4use datafusion::arrow::datatypes::{Field, Schema};
5use datafusion::arrow::record_batch::RecordBatch;
6use datafusion::catalog::memory::MemorySchemaProvider;
7use datafusion::catalog::{CatalogProvider, SchemaProvider};
8use datafusion::datasource::{MemTable, TableProvider};
9use datafusion::error::DataFusionError;
10use std::convert::TryInto;
11use std::sync::Arc;
12
13macro_rules! table_builder {
14	($type:ident $($field_name:ident: $builder_type:ty, $param_type:ty,)*) => {
15		struct $type {
16			$($field_name: $builder_type,)*
17		}
18
19		impl $type {
20			fn new() -> Self {
21				Self {
22					$($field_name: <$builder_type>::new(),)*
23				}
24			}
25
26			#[allow(dead_code)] // some tables aren't currently written to
27			fn add_row(
28				&mut self,
29				$($field_name: $param_type,)*
30			) -> Result<(), DataFusionError> {
31				$(self.$field_name.append_value($field_name);)*
32				Ok(())
33			}
34		}
35
36		impl TryInto<Arc<dyn TableProvider>> for $type {
37			type Error = DataFusionError;
38
39			fn try_into(mut self) -> Result<Arc<dyn TableProvider>, Self::Error> {
40				let columns: Vec<ArrayRef> = vec![
41					$(Arc::new(self.$field_name.finish()),)*
42				];
43
44				let column_names = &[
45					$(stringify!($field_name),)*
46				];
47
48				let fields: Vec<_> = columns.iter().zip(column_names).map(|(c, name)| Field::new(name.to_owned(), c.data_type().clone(), true)).collect();
49				let schema = Arc::new(Schema::new(fields));
50				let batch = RecordBatch::try_new(schema, columns)?;
51				Ok(Arc::new(MemTable::try_new(batch.schema(), vec![vec![batch]])?))
52			}
53		}
54	};
55}
56
57table_builder! {
58	PgDatabaseBuilder
59	datname: StringBuilder, &str,
60}
61
62table_builder! {
63	PgTablesBuilder
64	schemaname: StringBuilder, &str,
65	tablename: StringBuilder, &str,
66}
67
68table_builder! {
69	PgNamespaceBuilder
70	oid: UInt32Builder, u32,
71	nspname: StringBuilder, &str,
72}
73
74table_builder! {
75	PgClassBuilder
76	oid: UInt32Builder, u32,
77	relname: StringBuilder, &str,
78	relnamespace: UInt32Builder, u32,
79	relkind: StringBuilder, &str,
80}
81
82table_builder! {
83	PgProc
84	oid: UInt32Builder, u32,
85	proname: StringBuilder, &str,
86	pronamespace: UInt32Builder, u32,
87}
88
89table_builder! {
90	PgDescription
91	objoid: UInt32Builder, u32,
92	classoid: UInt32Builder, u32,
93	objsubid: Int32Builder, i32,
94	description: StringBuilder, &str,
95}
96
97struct MetadataBuilder {
98	next_oid: u32,
99	pg_database: PgDatabaseBuilder,
100	pg_namespace: PgNamespaceBuilder,
101	pg_tables: PgTablesBuilder,
102	pg_class: PgClassBuilder,
103	pg_proc: PgProc,
104	pg_description: PgDescription,
105}
106
107impl MetadataBuilder {
108	fn new() -> Self {
109		Self {
110			next_oid: 0,
111			pg_database: PgDatabaseBuilder::new(),
112			pg_namespace: PgNamespaceBuilder::new(),
113			pg_tables: PgTablesBuilder::new(),
114			pg_class: PgClassBuilder::new(),
115			pg_proc: PgProc::new(),
116			pg_description: PgDescription::new(),
117		}
118	}
119
120	fn alloc_oid(&mut self) -> u32 {
121		self.next_oid += 1;
122		self.next_oid
123	}
124
125	fn add_schema(&mut self, schema_name: &str, schema: &dyn SchemaProvider) -> Result<(), DataFusionError> {
126		let schema_oid = self.alloc_oid();
127
128		for table_name in schema.table_names() {
129			let table_oid = self.alloc_oid();
130
131			self.pg_tables.add_row(schema_name, &table_name)?;
132			self.pg_namespace.add_row(schema_oid, schema_name)?;
133			self.pg_class.add_row(table_oid, &table_name, schema_oid, "r")?;
134			let desc_oid = self.alloc_oid();
135			self.pg_description.add_row(desc_oid, table_oid, 0, "")?;
136		}
137
138		Ok(())
139	}
140
141	fn into_schema(self) -> Result<MemorySchemaProvider, DataFusionError> {
142		let schema = MemorySchemaProvider::new();
143
144		schema.register_table("pg_tables".to_owned(), self.pg_tables.try_into()?)?;
145		schema.register_table("pg_namespace".to_owned(), self.pg_namespace.try_into()?)?;
146		schema.register_table("pg_class".to_owned(), self.pg_class.try_into()?)?;
147		schema.register_table("pg_database".to_owned(), self.pg_database.try_into()?)?;
148		schema.register_table("pg_proc".to_owned(), self.pg_proc.try_into()?)?;
149		schema.register_table("pg_description".to_owned(), self.pg_description.try_into()?)?;
150
151		Ok(schema)
152	}
153}
154
155/// Wrapper catalog supporting generation of pg metadata (e.g. pg_catalog schema).
156#[derive(Debug)]
157pub struct Catalog {
158	wrapped: Arc<dyn CatalogProvider>,
159}
160
161impl Catalog {
162	/// Create a new wrapper catalog that provides postgres metadata for the contained objects.
163	pub fn new(wrapped: Arc<dyn CatalogProvider>) -> Self {
164		Self { wrapped }
165	}
166
167	fn build_metadata_schema(&self) -> Result<MemorySchemaProvider, DataFusionError> {
168		let mut builder = MetadataBuilder::new();
169		builder.pg_database.add_row("datafusion")?;
170
171		for schema_name in self.wrapped.schema_names() {
172			let schema = match self.wrapped.schema(&schema_name) {
173				Some(s) => s,
174				None => continue,
175			};
176
177			builder.add_schema(&schema_name, schema.as_ref())?;
178		}
179
180		builder.into_schema()
181	}
182}
183
184impl CatalogProvider for Catalog {
185	fn as_any(&self) -> &dyn std::any::Any {
186		self
187	}
188
189	fn schema_names(&self) -> Vec<String> {
190		let mut ret = self.wrapped.schema_names();
191		ret.push("pg_catalog".to_owned());
192		ret
193	}
194
195	fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
196		if name.eq_ignore_ascii_case("pg_catalog") {
197			return Some(Arc::new(
198				self.build_metadata_schema().expect("failed to build metadata schema"),
199			));
200		}
201
202		self.wrapped.schema(name)
203	}
204}