lance_graph_catalog/connector.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connector facade that bundles a [`CatalogProvider`] with [`TableReader`]s.
5//!
6//! Inspired by Presto's `Connector` interface which exposes `getMetadata()` +
7//! `getPageSourceProvider()`, this struct provides a convenient entry point
8//! for users who want to browse a catalog and register tables for querying.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use arrow_schema::SchemaRef;
14use datafusion::datasource::MemTable;
15use datafusion::execution::context::SessionContext;
16
17use crate::catalog_provider::{
18 CatalogError, CatalogInfo, CatalogProvider, CatalogResult, DataSourceFormat, SchemaInfo,
19 TableInfo,
20};
21use crate::table_reader::TableReader;
22
23/// Bundles a [`CatalogProvider`] with [`TableReader`]s for convenient use.
24///
25/// The `Connector` is the primary entry point for interacting with an external
26/// catalog. It delegates metadata operations to the catalog provider and data
27/// reading to the appropriate table reader based on the table's data format.
28///
29/// # Storage Options
30///
31/// Cloud storage credentials (S3, Azure, GCS) are passed via `storage_options`
32/// and forwarded to each [`TableReader`] during table registration.
33///
34/// # Extensibility
35///
36/// - Swap the catalog: pass a different `CatalogProvider` (e.g., AWS Glue).
37/// - Add formats: pass additional `TableReader`s (e.g., Iceberg).
38///
39/// # Example
40///
41/// ```no_run
42/// # use lance_graph_catalog::connector::Connector;
43/// # use std::collections::HashMap;
44/// # fn example() {
45/// // let connector = Connector::new(catalog, readers)
46/// // .with_storage_options(HashMap::from([
47/// // ("aws_access_key_id".into(), "...".into()),
48/// // ("aws_secret_access_key".into(), "...".into()),
49/// // ]));
50/// # }
51/// ```
52pub struct Connector {
53 catalog: Arc<dyn CatalogProvider>,
54 readers: Vec<Arc<dyn TableReader>>,
55 storage_options: HashMap<String, String>,
56}
57
58impl Connector {
59 /// Create a new connector with the given catalog provider and table readers.
60 pub fn new(catalog: Arc<dyn CatalogProvider>, readers: Vec<Arc<dyn TableReader>>) -> Self {
61 Self {
62 catalog,
63 readers,
64 storage_options: HashMap::new(),
65 }
66 }
67
68 /// Set storage options for cloud storage access (S3, Azure, GCS).
69 ///
70 /// Common keys:
71 /// - S3: `aws_access_key_id`, `aws_secret_access_key`, `aws_region`
72 /// - Azure: `azure_storage_account_name`, `azure_storage_account_key`
73 /// - GCS: `google_service_account_path`
74 pub fn with_storage_options(mut self, options: HashMap<String, String>) -> Self {
75 self.storage_options = options;
76 self
77 }
78
79 /// Get the current storage options.
80 pub fn storage_options(&self) -> &HashMap<String, String> {
81 &self.storage_options
82 }
83
84 /// Get a reference to the underlying catalog provider.
85 pub fn catalog(&self) -> &dyn CatalogProvider {
86 self.catalog.as_ref()
87 }
88
89 /// Find a table reader that supports the given data format.
90 pub fn reader_for(&self, format: &DataSourceFormat) -> Option<&dyn TableReader> {
91 self.readers
92 .iter()
93 .find(|r| r.supported_formats().contains(format))
94 .map(|r| r.as_ref())
95 }
96
97 /// List all available table readers.
98 pub fn readers(&self) -> &[Arc<dyn TableReader>] {
99 &self.readers
100 }
101
102 // ---- Delegate catalog operations ----
103
104 pub async fn list_catalogs(&self) -> CatalogResult<Vec<CatalogInfo>> {
105 self.catalog.list_catalogs().await
106 }
107
108 pub async fn get_catalog(&self, name: &str) -> CatalogResult<CatalogInfo> {
109 self.catalog.get_catalog(name).await
110 }
111
112 pub async fn list_schemas(&self, catalog_name: &str) -> CatalogResult<Vec<SchemaInfo>> {
113 self.catalog.list_schemas(catalog_name).await
114 }
115
116 pub async fn get_schema(
117 &self,
118 catalog_name: &str,
119 schema_name: &str,
120 ) -> CatalogResult<SchemaInfo> {
121 self.catalog.get_schema(catalog_name, schema_name).await
122 }
123
124 pub async fn list_tables(
125 &self,
126 catalog_name: &str,
127 schema_name: &str,
128 ) -> CatalogResult<Vec<TableInfo>> {
129 self.catalog.list_tables(catalog_name, schema_name).await
130 }
131
132 pub async fn get_table(
133 &self,
134 catalog_name: &str,
135 schema_name: &str,
136 table_name: &str,
137 ) -> CatalogResult<TableInfo> {
138 self.catalog
139 .get_table(catalog_name, schema_name, table_name)
140 .await
141 }
142
143 /// Register all tables from a catalog schema into a DataFusion `SessionContext`.
144 ///
145 /// For each table:
146 /// 1. Retrieves full table metadata (including columns) from the catalog.
147 /// 2. Converts columns to an Arrow schema.
148 /// 3. Finds an appropriate [`TableReader`] for the table's data format.
149 /// 4. Registers the table in the session context with storage options for cloud access.
150 ///
151 /// If no reader matches the table's format, falls back to registering an
152 /// empty `MemTable` with the correct schema (schema-only, for planning).
153 ///
154 /// Individual table failures are logged as warnings but do not abort the
155 /// registration of remaining tables.
156 ///
157 /// Returns a list of `(table_name, schema)` for successfully registered tables.
158 pub async fn register_schema(
159 &self,
160 ctx: &SessionContext,
161 catalog_name: &str,
162 schema_name: &str,
163 ) -> CatalogResult<Vec<(String, SchemaRef)>> {
164 let tables = self.catalog.list_tables(catalog_name, schema_name).await?;
165 let mut registered = Vec::new();
166
167 for table_summary in &tables {
168 match self
169 .register_single_table(ctx, catalog_name, schema_name, &table_summary.name)
170 .await
171 {
172 Ok((name, schema)) => {
173 registered.push((name, schema));
174 }
175 Err(e) => {
176 eprintln!(
177 "Warning: failed to register table {}.{}.{}: {}",
178 catalog_name, schema_name, table_summary.name, e
179 );
180 }
181 }
182 }
183
184 Ok(registered)
185 }
186
187 async fn register_single_table(
188 &self,
189 ctx: &SessionContext,
190 catalog_name: &str,
191 schema_name: &str,
192 table_name: &str,
193 ) -> CatalogResult<(String, SchemaRef)> {
194 let table_info = self
195 .catalog
196 .get_table(catalog_name, schema_name, table_name)
197 .await?;
198 let arrow_schema = self.catalog.table_to_arrow_schema(&table_info)?;
199 let normalized_name = table_info.name.to_lowercase();
200
201 // Find a reader for this format
202 let reader = self.reader_for(&table_info.data_source_format);
203
204 match reader {
205 Some(r) => {
206 r.register_table(
207 ctx,
208 &normalized_name,
209 &table_info,
210 arrow_schema.clone(),
211 &self.storage_options,
212 )
213 .await?;
214 }
215 None => {
216 // No reader — register schema-only (empty MemTable for planning)
217 let mem_table = MemTable::try_new(arrow_schema.clone(), vec![]).map_err(|e| {
218 CatalogError::Other(format!(
219 "Failed to create empty table '{}': {}",
220 normalized_name, e
221 ))
222 })?;
223 ctx.register_table(&normalized_name, Arc::new(mem_table))
224 .map_err(|e| {
225 CatalogError::Other(format!(
226 "Failed to register table '{}': {}",
227 normalized_name, e
228 ))
229 })?;
230 }
231 }
232
233 Ok((normalized_name, arrow_schema))
234 }
235}