use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::execution::context::SessionContext;
use lance_graph_catalog::catalog_provider::{
CatalogError, CatalogResult, DataSourceFormat, TableInfo,
};
use lance_graph_catalog::table_reader::TableReader;
pub struct ParquetTableReader;
#[async_trait]
impl TableReader for ParquetTableReader {
fn name(&self) -> &str {
"parquet"
}
fn supported_formats(&self) -> &[DataSourceFormat] {
&[DataSourceFormat::Parquet]
}
async fn register_table(
&self,
ctx: &SessionContext,
table_name: &str,
table_info: &TableInfo,
_schema: arrow_schema::SchemaRef,
_storage_options: &HashMap<String, String>,
) -> CatalogResult<()> {
let location = table_info.storage_location.as_deref().ok_or_else(|| {
CatalogError::Other(format!("Table '{}' has no storage_location", table_name))
})?;
ctx.register_parquet(
table_name,
location,
datafusion::datasource::file_format::options::ParquetReadOptions::default(),
)
.await
.map_err(|e| {
CatalogError::Other(format!(
"Failed to register Parquet table '{}' at '{}': {}",
table_name, location, e
))
})
}
}
#[cfg(feature = "delta")]
pub struct DeltaTableReader;
#[cfg(feature = "delta")]
#[async_trait]
impl TableReader for DeltaTableReader {
fn name(&self) -> &str {
"delta"
}
fn supported_formats(&self) -> &[DataSourceFormat] {
&[DataSourceFormat::Delta]
}
async fn register_table(
&self,
ctx: &SessionContext,
table_name: &str,
table_info: &TableInfo,
_schema: arrow_schema::SchemaRef,
storage_options: &HashMap<String, String>,
) -> CatalogResult<()> {
let location = table_info.storage_location.as_deref().ok_or_else(|| {
CatalogError::Other(format!("Table '{}' has no storage_location", table_name))
})?;
let table_url = url::Url::parse(location).map_err(|e| {
CatalogError::Other(format!(
"Invalid storage location URL '{}': {}",
location, e
))
})?;
let delta_table = if storage_options.is_empty() {
deltalake::open_table(table_url).await
} else {
deltalake::open_table_with_storage_options(table_url, storage_options.clone()).await
}
.map_err(|e| {
CatalogError::Other(format!(
"Failed to open Delta table '{}' at '{}': {}",
table_name, location, e
))
})?;
ctx.register_table(table_name, Arc::new(delta_table))
.map_err(|e| {
CatalogError::Other(format!(
"Failed to register Delta table '{}': {}",
table_name, e
))
})?;
Ok(())
}
}
pub fn default_table_readers() -> Vec<Arc<dyn TableReader>> {
let mut readers: Vec<Arc<dyn TableReader>> = vec![Arc::new(ParquetTableReader)];
#[cfg(feature = "delta")]
readers.push(Arc::new(DeltaTableReader));
readers
}