1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::datatypes::SchemaRef;
5use datafusion_catalog::{Session, TableProvider};
6use datafusion_common::DataFusionError;
7use datafusion_common::stats::Precision;
8use datafusion_common::{DFSchema, Statistics};
9use datafusion_expr::Expr;
10use datafusion_expr::TableProviderFilterPushDown;
11use datafusion_expr::TableType;
12use datafusion_expr::dml::InsertOp;
13use datafusion_physical_plan::ExecutionPlan;
14use indexlake::Client;
15use indexlake::index::FilterSupport;
16use indexlake::table::{Table, TableScanPartition};
17use indexlake::utils::schema_without_row_id;
18use log::warn;
19use tokio::sync::Mutex;
20
21use crate::{
22 IndexLakeInsertExec, IndexLakeScanExec, datafusion_expr_to_indexlake_expr,
23 indexlake_scalar_to_datafusion_scalar,
24};
25
26#[derive(Debug)]
27pub struct IndexLakeTable {
28 client: Arc<Client>,
29 table: Arc<Table>,
30 batch_size: usize,
31 num_scan_partitions: usize,
32 column_defaults: HashMap<String, Expr>,
33 hide_row_id: bool,
34 bypass_insert_threshold: usize,
35}
36
37impl IndexLakeTable {
38 pub fn try_new(client: Arc<Client>, table: Arc<Table>) -> Result<Self, DataFusionError> {
39 let mut column_defaults = HashMap::new();
40 for field_record in table.field_records.iter() {
41 if let Some(default_value) = &field_record.default_value {
42 let scalar_value = indexlake_scalar_to_datafusion_scalar(default_value)?;
43 column_defaults.insert(
44 field_record.field_name.clone(),
45 Expr::Literal(scalar_value, None),
46 );
47 }
48 }
49 Ok(Self {
50 client,
51 table,
52 batch_size: 2048,
53 num_scan_partitions: 16,
54 column_defaults,
55 hide_row_id: false,
56 bypass_insert_threshold: 1000,
57 })
58 }
59
60 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
61 self.batch_size = batch_size;
62 self
63 }
64
65 pub fn with_num_scan_partitions(mut self, num_scan_partitions: usize) -> Self {
66 self.num_scan_partitions = num_scan_partitions;
67 self
68 }
69
70 pub fn with_hide_row_id(mut self, hide_row_id: bool) -> Self {
71 self.hide_row_id = hide_row_id;
72 self
73 }
74
75 pub fn with_bypass_insert_threshold(mut self, bypass_insert_threshold: usize) -> Self {
76 self.bypass_insert_threshold = bypass_insert_threshold;
77 self
78 }
79}
80
81#[async_trait::async_trait]
82impl TableProvider for IndexLakeTable {
83 fn as_any(&self) -> &dyn std::any::Any {
84 self
85 }
86
87 fn schema(&self) -> SchemaRef {
88 if self.hide_row_id {
89 Arc::new(schema_without_row_id(&self.table.output_schema))
90 } else {
91 self.table.output_schema.clone()
92 }
93 }
94
95 fn table_type(&self) -> TableType {
96 TableType::Base
97 }
98
99 fn get_column_default(&self, column: &str) -> Option<&Expr> {
100 self.column_defaults.get(column)
101 }
102
103 async fn scan(
104 &self,
105 _state: &dyn Session,
106 projection: Option<&Vec<usize>>,
107 filters: &[Expr],
108 limit: Option<usize>,
109 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
110 let data_file_count = self
111 .table
112 .data_file_count()
113 .await
114 .map_err(|e| DataFusionError::Internal(e.to_string()))?;
115 let data_files = if data_file_count > 1000 {
116 None
117 } else {
118 let records = self
119 .table
120 .data_file_records()
121 .await
122 .map_err(|e| DataFusionError::Internal(e.to_string()))?;
123 Some(Arc::new(records))
124 };
125
126 let il_projection = if let Some(df_projection) = projection
127 && self.hide_row_id
128 {
129 Some(df_projection.iter().map(|i| i + 1).collect::<Vec<_>>())
130 } else {
131 projection.cloned()
132 };
133
134 let lazy_table = LazyTable::new(
135 self.client.clone(),
136 self.table.namespace_name.clone(),
137 self.table.table_name.clone(),
138 )
139 .with_table(self.table.clone());
140
141 let exec = IndexLakeScanExec::try_new(
142 lazy_table,
143 self.table.output_schema.clone(),
144 self.num_scan_partitions,
145 data_files,
146 il_projection,
147 filters.to_vec(),
148 self.batch_size,
149 limit,
150 )?;
151 Ok(Arc::new(exec))
152 }
153
154 fn supports_filters_pushdown(
155 &self,
156 filters: &[&Expr],
157 ) -> Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
158 let df_schema = DFSchema::try_from(self.table.output_schema.clone())?;
159 let mut supports = Vec::with_capacity(filters.len());
160 for filter in filters {
161 let Ok(il_expr) = datafusion_expr_to_indexlake_expr(filter, &df_schema) else {
162 supports.push(TableProviderFilterPushDown::Unsupported);
163 continue;
164 };
165 let support = self
166 .table
167 .supports_filter(il_expr.clone())
168 .map_err(|e| DataFusionError::Internal(e.to_string()))?;
169 match support {
170 FilterSupport::Exact => supports.push(TableProviderFilterPushDown::Exact),
171 FilterSupport::Inexact => supports.push(TableProviderFilterPushDown::Inexact),
172 FilterSupport::Unsupported => {
173 supports.push(TableProviderFilterPushDown::Unsupported)
174 }
175 }
176 }
177 Ok(supports)
178 }
179
180 fn statistics(&self) -> Option<Statistics> {
181 let row_count_result = tokio::task::block_in_place(|| {
182 tokio::runtime::Handle::current().block_on(async {
183 self.table
184 .count(TableScanPartition::single_partition())
185 .await
186 })
187 });
188 match row_count_result {
189 Ok(row_count) => Some(Statistics {
190 num_rows: Precision::Exact(row_count),
191 total_byte_size: Precision::Absent,
192 column_statistics: Statistics::unknown_column(&self.table.output_schema),
193 }),
194 Err(e) => {
195 warn!(
196 "[indexlake] Error getting indexlake table {}.{} row count: {:?}",
197 self.table.namespace_name, self.table.table_name, e
198 );
199 None
200 }
201 }
202 }
203
204 async fn insert_into(
205 &self,
206 _state: &dyn Session,
207 input: Arc<dyn ExecutionPlan>,
208 insert_op: InsertOp,
209 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
210 let lazy_table = LazyTable::new(
211 self.client.clone(),
212 self.table.namespace_name.clone(),
213 self.table.table_name.clone(),
214 )
215 .with_table(self.table.clone());
216
217 let insert_exec = IndexLakeInsertExec::try_new(
218 lazy_table,
219 input,
220 insert_op,
221 self.bypass_insert_threshold,
222 )?;
223
224 Ok(Arc::new(insert_exec))
225 }
226}
227
228#[derive(Debug, Clone)]
231pub struct LazyTable {
232 pub client: Arc<Client>,
233 pub namespace_name: String,
234 pub table_name: String,
235 table: Arc<Mutex<Option<Arc<Table>>>>,
236}
237
238impl LazyTable {
239 pub fn new(client: Arc<Client>, namespace_name: String, table_name: String) -> Self {
241 Self {
242 client,
243 namespace_name,
244 table_name,
245 table: Arc::new(Mutex::new(None)),
246 }
247 }
248
249 pub fn with_table(mut self, table: Arc<Table>) -> Self {
251 self.table = Arc::new(Mutex::new(Some(table)));
252 self
253 }
254
255 pub async fn get_or_load(&self) -> Result<Arc<Table>, indexlake::ILError> {
257 let mut guard = self.table.lock().await;
258 if let Some(table) = guard.as_ref() {
259 return Ok(table.clone());
260 }
261
262 let table = self
263 .client
264 .load_table(&self.namespace_name, &self.table_name)
265 .await?;
266 let table = Arc::new(table);
267 *guard = Some(table.clone());
268 Ok(table)
269 }
270}