lance_graph/
table_readers.rs1use std::collections::HashMap;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use datafusion::execution::context::SessionContext;
14
15use lance_graph_catalog::catalog_provider::{
16 CatalogError, CatalogResult, DataSourceFormat, TableInfo,
17};
18use lance_graph_catalog::table_reader::TableReader;
19
20pub struct ParquetTableReader;
22
23#[async_trait]
24impl TableReader for ParquetTableReader {
25 fn name(&self) -> &str {
26 "parquet"
27 }
28
29 fn supported_formats(&self) -> &[DataSourceFormat] {
30 &[DataSourceFormat::Parquet]
31 }
32
33 async fn register_table(
34 &self,
35 ctx: &SessionContext,
36 table_name: &str,
37 table_info: &TableInfo,
38 _schema: arrow_schema::SchemaRef,
39 _storage_options: &HashMap<String, String>,
40 ) -> CatalogResult<()> {
41 let location = table_info.storage_location.as_deref().ok_or_else(|| {
42 CatalogError::Other(format!("Table '{}' has no storage_location", table_name))
43 })?;
44
45 ctx.register_parquet(
46 table_name,
47 location,
48 datafusion::datasource::file_format::options::ParquetReadOptions::default(),
49 )
50 .await
51 .map_err(|e| {
52 CatalogError::Other(format!(
53 "Failed to register Parquet table '{}' at '{}': {}",
54 table_name, location, e
55 ))
56 })
57 }
58}
59
60#[cfg(feature = "delta")]
68pub struct DeltaTableReader;
69
70#[cfg(feature = "delta")]
71#[async_trait]
72impl TableReader for DeltaTableReader {
73 fn name(&self) -> &str {
74 "delta"
75 }
76
77 fn supported_formats(&self) -> &[DataSourceFormat] {
78 &[DataSourceFormat::Delta]
79 }
80
81 async fn register_table(
82 &self,
83 ctx: &SessionContext,
84 table_name: &str,
85 table_info: &TableInfo,
86 _schema: arrow_schema::SchemaRef,
87 storage_options: &HashMap<String, String>,
88 ) -> CatalogResult<()> {
89 let location = table_info.storage_location.as_deref().ok_or_else(|| {
90 CatalogError::Other(format!("Table '{}' has no storage_location", table_name))
91 })?;
92
93 let table_url = url::Url::parse(location).map_err(|e| {
94 CatalogError::Other(format!(
95 "Invalid storage location URL '{}': {}",
96 location, e
97 ))
98 })?;
99
100 let delta_table = if storage_options.is_empty() {
101 deltalake::open_table(table_url).await
102 } else {
103 deltalake::open_table_with_storage_options(table_url, storage_options.clone()).await
104 }
105 .map_err(|e| {
106 CatalogError::Other(format!(
107 "Failed to open Delta table '{}' at '{}': {}",
108 table_name, location, e
109 ))
110 })?;
111
112 ctx.register_table(table_name, Arc::new(delta_table))
113 .map_err(|e| {
114 CatalogError::Other(format!(
115 "Failed to register Delta table '{}': {}",
116 table_name, e
117 ))
118 })?;
119
120 Ok(())
121 }
122}
123
124pub fn default_table_readers() -> Vec<Arc<dyn TableReader>> {
128 let mut readers: Vec<Arc<dyn TableReader>> = vec![Arc::new(ParquetTableReader)];
129 #[cfg(feature = "delta")]
130 readers.push(Arc::new(DeltaTableReader));
131 readers
132}