Skip to main content

indexlake_datafusion/
table.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::datatypes::SchemaRef;
5use datafusion_catalog::{Session, TableProvider};
6use datafusion_common::DataFusionError;
7use datafusion_common::stats::Precision;
8use datafusion_common::{DFSchema, Statistics};
9use datafusion_expr::Expr;
10use datafusion_expr::TableProviderFilterPushDown;
11use datafusion_expr::TableType;
12use datafusion_expr::dml::InsertOp;
13use datafusion_physical_plan::ExecutionPlan;
14use indexlake::Client;
15use indexlake::index::FilterSupport;
16use indexlake::table::{Table, TableScanPartition};
17use indexlake::utils::schema_without_row_id;
18use log::warn;
19use tokio::sync::Mutex;
20
21use crate::{
22    IndexLakeInsertExec, IndexLakeScanExec, datafusion_expr_to_indexlake_expr,
23    indexlake_scalar_to_datafusion_scalar,
24};
25
26#[derive(Debug)]
27pub struct IndexLakeTable {
28    client: Arc<Client>,
29    table: Arc<Table>,
30    batch_size: usize,
31    num_scan_partitions: usize,
32    column_defaults: HashMap<String, Expr>,
33    hide_row_id: bool,
34    bypass_insert_threshold: usize,
35}
36
37impl IndexLakeTable {
38    pub fn try_new(client: Arc<Client>, table: Arc<Table>) -> Result<Self, DataFusionError> {
39        let mut column_defaults = HashMap::new();
40        for field_record in table.field_records.iter() {
41            if let Some(default_value) = &field_record.default_value {
42                let scalar_value = indexlake_scalar_to_datafusion_scalar(default_value)?;
43                column_defaults.insert(
44                    field_record.field_name.clone(),
45                    Expr::Literal(scalar_value, None),
46                );
47            }
48        }
49        Ok(Self {
50            client,
51            table,
52            batch_size: 2048,
53            num_scan_partitions: 16,
54            column_defaults,
55            hide_row_id: false,
56            bypass_insert_threshold: 1000,
57        })
58    }
59
60    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
61        self.batch_size = batch_size;
62        self
63    }
64
65    pub fn with_num_scan_partitions(mut self, num_scan_partitions: usize) -> Self {
66        self.num_scan_partitions = num_scan_partitions;
67        self
68    }
69
70    pub fn with_hide_row_id(mut self, hide_row_id: bool) -> Self {
71        self.hide_row_id = hide_row_id;
72        self
73    }
74
75    pub fn with_bypass_insert_threshold(mut self, bypass_insert_threshold: usize) -> Self {
76        self.bypass_insert_threshold = bypass_insert_threshold;
77        self
78    }
79}
80
81#[async_trait::async_trait]
82impl TableProvider for IndexLakeTable {
83    fn as_any(&self) -> &dyn std::any::Any {
84        self
85    }
86
87    fn schema(&self) -> SchemaRef {
88        if self.hide_row_id {
89            Arc::new(schema_without_row_id(&self.table.output_schema))
90        } else {
91            self.table.output_schema.clone()
92        }
93    }
94
95    fn table_type(&self) -> TableType {
96        TableType::Base
97    }
98
99    fn get_column_default(&self, column: &str) -> Option<&Expr> {
100        self.column_defaults.get(column)
101    }
102
103    async fn scan(
104        &self,
105        _state: &dyn Session,
106        projection: Option<&Vec<usize>>,
107        filters: &[Expr],
108        limit: Option<usize>,
109    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
110        let data_file_count = self
111            .table
112            .data_file_count()
113            .await
114            .map_err(|e| DataFusionError::Internal(e.to_string()))?;
115        let data_files = if data_file_count > 1000 {
116            None
117        } else {
118            let records = self
119                .table
120                .data_file_records()
121                .await
122                .map_err(|e| DataFusionError::Internal(e.to_string()))?;
123            Some(Arc::new(records))
124        };
125
126        let il_projection = if let Some(df_projection) = projection
127            && self.hide_row_id
128        {
129            Some(df_projection.iter().map(|i| i + 1).collect::<Vec<_>>())
130        } else {
131            projection.cloned()
132        };
133
134        let lazy_table = LazyTable::new(
135            self.client.clone(),
136            self.table.namespace_name.clone(),
137            self.table.table_name.clone(),
138        )
139        .with_table(self.table.clone());
140
141        let exec = IndexLakeScanExec::try_new(
142            lazy_table,
143            self.table.output_schema.clone(),
144            self.num_scan_partitions,
145            data_files,
146            il_projection,
147            filters.to_vec(),
148            self.batch_size,
149            limit,
150        )?;
151        Ok(Arc::new(exec))
152    }
153
154    fn supports_filters_pushdown(
155        &self,
156        filters: &[&Expr],
157    ) -> Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
158        let df_schema = DFSchema::try_from(self.table.output_schema.clone())?;
159        let mut supports = Vec::with_capacity(filters.len());
160        for filter in filters {
161            let Ok(il_expr) = datafusion_expr_to_indexlake_expr(filter, &df_schema) else {
162                supports.push(TableProviderFilterPushDown::Unsupported);
163                continue;
164            };
165            let support = self
166                .table
167                .supports_filter(il_expr.clone())
168                .map_err(|e| DataFusionError::Internal(e.to_string()))?;
169            match support {
170                FilterSupport::Exact => supports.push(TableProviderFilterPushDown::Exact),
171                FilterSupport::Inexact => supports.push(TableProviderFilterPushDown::Inexact),
172                FilterSupport::Unsupported => {
173                    supports.push(TableProviderFilterPushDown::Unsupported)
174                }
175            }
176        }
177        Ok(supports)
178    }
179
180    fn statistics(&self) -> Option<Statistics> {
181        let row_count_result = tokio::task::block_in_place(|| {
182            tokio::runtime::Handle::current().block_on(async {
183                self.table
184                    .count(TableScanPartition::single_partition())
185                    .await
186            })
187        });
188        match row_count_result {
189            Ok(row_count) => Some(Statistics {
190                num_rows: Precision::Exact(row_count),
191                total_byte_size: Precision::Absent,
192                column_statistics: Statistics::unknown_column(&self.table.output_schema),
193            }),
194            Err(e) => {
195                warn!(
196                    "[indexlake] Error getting indexlake table {}.{} row count: {:?}",
197                    self.table.namespace_name, self.table.table_name, e
198                );
199                None
200            }
201        }
202    }
203
204    async fn insert_into(
205        &self,
206        _state: &dyn Session,
207        input: Arc<dyn ExecutionPlan>,
208        insert_op: InsertOp,
209    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
210        let lazy_table = LazyTable::new(
211            self.client.clone(),
212            self.table.namespace_name.clone(),
213            self.table.table_name.clone(),
214        )
215        .with_table(self.table.clone());
216
217        let insert_exec = IndexLakeInsertExec::try_new(
218            lazy_table,
219            input,
220            insert_op,
221            self.bypass_insert_threshold,
222        )?;
223
224        Ok(Arc::new(insert_exec))
225    }
226}
227
228/// A lazy-loaded table holder containing client and table metadata.
229/// The actual Table is loaded on first access if not provided upfront.
230#[derive(Debug, Clone)]
231pub struct LazyTable {
232    pub client: Arc<Client>,
233    pub namespace_name: String,
234    pub table_name: String,
235    table: Arc<Mutex<Option<Arc<Table>>>>,
236}
237
238impl LazyTable {
239    /// Create a new LazyTable without a pre-loaded table.
240    pub fn new(client: Arc<Client>, namespace_name: String, table_name: String) -> Self {
241        Self {
242            client,
243            namespace_name,
244            table_name,
245            table: Arc::new(Mutex::new(None)),
246        }
247    }
248
249    /// Create a new LazyTable with a pre-loaded table.
250    pub fn with_table(mut self, table: Arc<Table>) -> Self {
251        self.table = Arc::new(Mutex::new(Some(table)));
252        self
253    }
254
255    /// Get the table, loading it lazily if not already loaded.
256    pub async fn get_or_load(&self) -> Result<Arc<Table>, indexlake::ILError> {
257        let mut guard = self.table.lock().await;
258        if let Some(table) = guard.as_ref() {
259            return Ok(table.clone());
260        }
261
262        let table = self
263            .client
264            .load_table(&self.namespace_name, &self.table_name)
265            .await?;
266        let table = Arc::new(table);
267        *guard = Some(table.clone());
268        Ok(table)
269    }
270}