Skip to main content

lance_graph/
table_readers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Built-in [`TableReader`] implementations for common data formats.
5//!
6//! - [`ParquetTableReader`] — reads Parquet tables using DataFusion's built-in support.
7//! - [`DeltaTableReader`] — reads Delta Lake tables (behind `delta` feature flag).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use datafusion::execution::context::SessionContext;
14
15use lance_graph_catalog::catalog_provider::{
16    CatalogError, CatalogResult, DataSourceFormat, TableInfo,
17};
18use lance_graph_catalog::table_reader::TableReader;
19
20/// Reads Parquet tables using DataFusion's built-in `register_parquet()`.
21pub struct ParquetTableReader;
22
23#[async_trait]
24impl TableReader for ParquetTableReader {
25    fn name(&self) -> &str {
26        "parquet"
27    }
28
29    fn supported_formats(&self) -> &[DataSourceFormat] {
30        &[DataSourceFormat::Parquet]
31    }
32
33    async fn register_table(
34        &self,
35        ctx: &SessionContext,
36        table_name: &str,
37        table_info: &TableInfo,
38        _schema: arrow_schema::SchemaRef,
39        _storage_options: &HashMap<String, String>,
40    ) -> CatalogResult<()> {
41        let location = table_info.storage_location.as_deref().ok_or_else(|| {
42            CatalogError::Other(format!("Table '{}' has no storage_location", table_name))
43        })?;
44
45        ctx.register_parquet(
46            table_name,
47            location,
48            datafusion::datasource::file_format::options::ParquetReadOptions::default(),
49        )
50        .await
51        .map_err(|e| {
52            CatalogError::Other(format!(
53                "Failed to register Parquet table '{}' at '{}': {}",
54                table_name, location, e
55            ))
56        })
57    }
58}
59
60/// Reads Delta Lake tables using the `deltalake` crate.
61///
62/// Opens the Delta table at the storage location and registers it as a
63/// DataFusion `TableProvider`, enabling full SQL query support including
64/// time travel, schema evolution, and partition pruning.
65///
66/// Supports cloud storage (S3, Azure, GCS) via `storage_options`.
67#[cfg(feature = "delta")]
68pub struct DeltaTableReader;
69
70#[cfg(feature = "delta")]
71#[async_trait]
72impl TableReader for DeltaTableReader {
73    fn name(&self) -> &str {
74        "delta"
75    }
76
77    fn supported_formats(&self) -> &[DataSourceFormat] {
78        &[DataSourceFormat::Delta]
79    }
80
81    async fn register_table(
82        &self,
83        ctx: &SessionContext,
84        table_name: &str,
85        table_info: &TableInfo,
86        _schema: arrow_schema::SchemaRef,
87        storage_options: &HashMap<String, String>,
88    ) -> CatalogResult<()> {
89        let location = table_info.storage_location.as_deref().ok_or_else(|| {
90            CatalogError::Other(format!("Table '{}' has no storage_location", table_name))
91        })?;
92
93        let table_url = url::Url::parse(location).map_err(|e| {
94            CatalogError::Other(format!(
95                "Invalid storage location URL '{}': {}",
96                location, e
97            ))
98        })?;
99
100        let delta_table = if storage_options.is_empty() {
101            deltalake::open_table(table_url).await
102        } else {
103            deltalake::open_table_with_storage_options(table_url, storage_options.clone()).await
104        }
105        .map_err(|e| {
106            CatalogError::Other(format!(
107                "Failed to open Delta table '{}' at '{}': {}",
108                table_name, location, e
109            ))
110        })?;
111
112        ctx.register_table(table_name, Arc::new(delta_table))
113            .map_err(|e| {
114                CatalogError::Other(format!(
115                    "Failed to register Delta table '{}': {}",
116                    table_name, e
117                ))
118            })?;
119
120        Ok(())
121    }
122}
123
124/// Returns the default set of table readers.
125///
126/// Includes Parquet support, and Delta Lake support when the `delta` feature is enabled.
127pub fn default_table_readers() -> Vec<Arc<dyn TableReader>> {
128    let mut readers: Vec<Arc<dyn TableReader>> = vec![Arc::new(ParquetTableReader)];
129    #[cfg(feature = "delta")]
130    readers.push(Arc::new(DeltaTableReader));
131    readers
132}