Skip to main content

lance_graph_catalog/
connector.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connector facade that bundles a [`CatalogProvider`] with [`TableReader`]s.
5//!
6//! Inspired by Presto's `Connector` interface which exposes `getMetadata()` +
7//! `getPageSourceProvider()`, this struct provides a convenient entry point
8//! for users who want to browse a catalog and register tables for querying.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use arrow_schema::SchemaRef;
14use datafusion::datasource::MemTable;
15use datafusion::execution::context::SessionContext;
16
17use crate::catalog_provider::{
18    CatalogError, CatalogInfo, CatalogProvider, CatalogResult, DataSourceFormat, SchemaInfo,
19    TableInfo,
20};
21use crate::table_reader::TableReader;
22
23/// Bundles a [`CatalogProvider`] with [`TableReader`]s for convenient use.
24///
25/// The `Connector` is the primary entry point for interacting with an external
26/// catalog. It delegates metadata operations to the catalog provider and data
27/// reading to the appropriate table reader based on the table's data format.
28///
29/// # Storage Options
30///
31/// Cloud storage credentials (S3, Azure, GCS) are passed via `storage_options`
32/// and forwarded to each [`TableReader`] during table registration.
33///
34/// # Extensibility
35///
36/// - Swap the catalog: pass a different `CatalogProvider` (e.g., AWS Glue).
37/// - Add formats: pass additional `TableReader`s (e.g., Iceberg).
38///
39/// # Example
40///
41/// ```no_run
42/// # use lance_graph_catalog::connector::Connector;
43/// # use std::collections::HashMap;
44/// # fn example() {
45/// // let connector = Connector::new(catalog, readers)
46/// //     .with_storage_options(HashMap::from([
47/// //         ("aws_access_key_id".into(), "...".into()),
48/// //         ("aws_secret_access_key".into(), "...".into()),
49/// //     ]));
50/// # }
51/// ```
52pub struct Connector {
53    catalog: Arc<dyn CatalogProvider>,
54    readers: Vec<Arc<dyn TableReader>>,
55    storage_options: HashMap<String, String>,
56}
57
58impl Connector {
59    /// Create a new connector with the given catalog provider and table readers.
60    pub fn new(catalog: Arc<dyn CatalogProvider>, readers: Vec<Arc<dyn TableReader>>) -> Self {
61        Self {
62            catalog,
63            readers,
64            storage_options: HashMap::new(),
65        }
66    }
67
68    /// Set storage options for cloud storage access (S3, Azure, GCS).
69    ///
70    /// Common keys:
71    /// - S3: `aws_access_key_id`, `aws_secret_access_key`, `aws_region`
72    /// - Azure: `azure_storage_account_name`, `azure_storage_account_key`
73    /// - GCS: `google_service_account_path`
74    pub fn with_storage_options(mut self, options: HashMap<String, String>) -> Self {
75        self.storage_options = options;
76        self
77    }
78
79    /// Get the current storage options.
80    pub fn storage_options(&self) -> &HashMap<String, String> {
81        &self.storage_options
82    }
83
84    /// Get a reference to the underlying catalog provider.
85    pub fn catalog(&self) -> &dyn CatalogProvider {
86        self.catalog.as_ref()
87    }
88
89    /// Find a table reader that supports the given data format.
90    pub fn reader_for(&self, format: &DataSourceFormat) -> Option<&dyn TableReader> {
91        self.readers
92            .iter()
93            .find(|r| r.supported_formats().contains(format))
94            .map(|r| r.as_ref())
95    }
96
97    /// List all available table readers.
98    pub fn readers(&self) -> &[Arc<dyn TableReader>] {
99        &self.readers
100    }
101
102    // ---- Delegate catalog operations ----
103
104    pub async fn list_catalogs(&self) -> CatalogResult<Vec<CatalogInfo>> {
105        self.catalog.list_catalogs().await
106    }
107
108    pub async fn get_catalog(&self, name: &str) -> CatalogResult<CatalogInfo> {
109        self.catalog.get_catalog(name).await
110    }
111
112    pub async fn list_schemas(&self, catalog_name: &str) -> CatalogResult<Vec<SchemaInfo>> {
113        self.catalog.list_schemas(catalog_name).await
114    }
115
116    pub async fn get_schema(
117        &self,
118        catalog_name: &str,
119        schema_name: &str,
120    ) -> CatalogResult<SchemaInfo> {
121        self.catalog.get_schema(catalog_name, schema_name).await
122    }
123
124    pub async fn list_tables(
125        &self,
126        catalog_name: &str,
127        schema_name: &str,
128    ) -> CatalogResult<Vec<TableInfo>> {
129        self.catalog.list_tables(catalog_name, schema_name).await
130    }
131
132    pub async fn get_table(
133        &self,
134        catalog_name: &str,
135        schema_name: &str,
136        table_name: &str,
137    ) -> CatalogResult<TableInfo> {
138        self.catalog
139            .get_table(catalog_name, schema_name, table_name)
140            .await
141    }
142
143    /// Register all tables from a catalog schema into a DataFusion `SessionContext`.
144    ///
145    /// For each table:
146    /// 1. Retrieves full table metadata (including columns) from the catalog.
147    /// 2. Converts columns to an Arrow schema.
148    /// 3. Finds an appropriate [`TableReader`] for the table's data format.
149    /// 4. Registers the table in the session context with storage options for cloud access.
150    ///
151    /// If no reader matches the table's format, falls back to registering an
152    /// empty `MemTable` with the correct schema (schema-only, for planning).
153    ///
154    /// Individual table failures are logged as warnings but do not abort the
155    /// registration of remaining tables.
156    ///
157    /// Returns a list of `(table_name, schema)` for successfully registered tables.
158    pub async fn register_schema(
159        &self,
160        ctx: &SessionContext,
161        catalog_name: &str,
162        schema_name: &str,
163    ) -> CatalogResult<Vec<(String, SchemaRef)>> {
164        let tables = self.catalog.list_tables(catalog_name, schema_name).await?;
165        let mut registered = Vec::new();
166
167        for table_summary in &tables {
168            match self
169                .register_single_table(ctx, catalog_name, schema_name, &table_summary.name)
170                .await
171            {
172                Ok((name, schema)) => {
173                    registered.push((name, schema));
174                }
175                Err(e) => {
176                    eprintln!(
177                        "Warning: failed to register table {}.{}.{}: {}",
178                        catalog_name, schema_name, table_summary.name, e
179                    );
180                }
181            }
182        }
183
184        Ok(registered)
185    }
186
187    async fn register_single_table(
188        &self,
189        ctx: &SessionContext,
190        catalog_name: &str,
191        schema_name: &str,
192        table_name: &str,
193    ) -> CatalogResult<(String, SchemaRef)> {
194        let table_info = self
195            .catalog
196            .get_table(catalog_name, schema_name, table_name)
197            .await?;
198        let arrow_schema = self.catalog.table_to_arrow_schema(&table_info)?;
199        let normalized_name = table_info.name.to_lowercase();
200
201        // Find a reader for this format
202        let reader = self.reader_for(&table_info.data_source_format);
203
204        match reader {
205            Some(r) => {
206                r.register_table(
207                    ctx,
208                    &normalized_name,
209                    &table_info,
210                    arrow_schema.clone(),
211                    &self.storage_options,
212                )
213                .await?;
214            }
215            None => {
216                // No reader — register schema-only (empty MemTable for planning)
217                let mem_table = MemTable::try_new(arrow_schema.clone(), vec![]).map_err(|e| {
218                    CatalogError::Other(format!(
219                        "Failed to create empty table '{}': {}",
220                        normalized_name, e
221                    ))
222                })?;
223                ctx.register_table(&normalized_name, Arc::new(mem_table))
224                    .map_err(|e| {
225                        CatalogError::Other(format!(
226                            "Failed to register table '{}': {}",
227                            normalized_name, e
228                        ))
229                    })?;
230            }
231        }
232
233        Ok((normalized_name, arrow_schema))
234    }
235}