1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use datafusion::arrow::datatypes::{DataType, SchemaRef};
6use datafusion::catalog::Session;
7use datafusion::common::{DataFusionError, Result as DataFusionResult, SchemaExt};
8use datafusion::datasource::sink::DataSinkExec;
9use datafusion::datasource::TableProvider;
10use datafusion::logical_expr::dml::InsertOp;
11use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::prelude::SessionContext;
14use exoware_sdk::kv_codec::decode_stored_row;
15use exoware_sdk::StoreClient;
16
17use crate::aggregate::KvAggregatePushdownRule;
18use crate::codec::*;
19use crate::predicate::*;
20use crate::scan::*;
21use crate::types::*;
22use crate::writer::*;
23
24pub(crate) fn register_kv_table(
25 ctx: &SessionContext,
26 table_name: &str,
27 client: StoreClient,
28 config: KvTableConfig,
29) -> DataFusionResult<()> {
30 let table = Arc::new(
31 KvTable::new(client, config)
32 .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?,
33 );
34 let _ = ctx.register_table(table_name, table)?;
35 Ok(())
36}
37
38pub struct KvSchema {
39 client: StoreClient,
40 tables: Vec<(String, KvTableConfig)>,
41 next_prefix: u8,
42}
43
44impl KvSchema {
45 pub fn new(client: StoreClient) -> Self {
46 Self {
47 client,
48 tables: Vec::new(),
49 next_prefix: 0,
50 }
51 }
52
53 pub fn table(
54 mut self,
55 name: impl Into<String>,
56 columns: Vec<TableColumnConfig>,
57 primary_key_columns: Vec<String>,
58 index_specs: Vec<IndexSpec>,
59 ) -> Result<Self, String> {
60 if self.tables.len() >= MAX_TABLES {
61 return Err(format!(
62 "too many tables for codec layout (max {MAX_TABLES})"
63 ));
64 }
65 let prefix = self.next_prefix;
66 let config = KvTableConfig::new(prefix, columns, primary_key_columns, index_specs)?;
67 self.tables.push((name.into(), config));
68 self.next_prefix = self.next_prefix.wrapping_add(1);
69 Ok(self)
70 }
71
72 pub fn orders_table(
73 self,
74 table_name: impl Into<String>,
75 index_specs: Vec<IndexSpec>,
76 ) -> Result<Self, String> {
77 self.table(
78 table_name,
79 vec![
80 TableColumnConfig::new("region", DataType::Utf8, false),
81 TableColumnConfig::new("customer_id", DataType::Int64, false),
82 TableColumnConfig::new("order_id", DataType::Int64, false),
83 TableColumnConfig::new("amount_cents", DataType::Int64, false),
84 TableColumnConfig::new("status", DataType::Utf8, false),
85 ],
86 vec!["order_id".to_string()],
87 index_specs,
88 )
89 }
90
91 pub fn table_versioned(
105 self,
106 name: impl Into<String>,
107 columns: Vec<TableColumnConfig>,
108 entity_column: impl Into<String>,
109 version_column: impl Into<String>,
110 index_specs: Vec<IndexSpec>,
111 ) -> Result<Self, String> {
112 let entity = entity_column.into();
113 let version = version_column.into();
114 self.table(name, columns, vec![entity, version], index_specs)
115 }
116
117 pub fn table_count(&self) -> usize {
118 self.tables.len()
119 }
120
121 pub(crate) fn client(&self) -> &StoreClient {
122 &self.client
123 }
124
125 pub(crate) fn tables(&self) -> &[(String, KvTableConfig)] {
126 &self.tables
127 }
128
129 pub fn register_all(self, ctx: &SessionContext) -> DataFusionResult<()> {
130 let _ = ctx.remove_optimizer_rule("kv_aggregate_pushdown");
131 ctx.add_optimizer_rule(Arc::new(KvAggregatePushdownRule::new()));
132 for (name, config) in &self.tables {
133 register_kv_table(ctx, name, self.client.clone(), config.clone())?;
134 }
135 Ok(())
136 }
137
138 pub fn batch_writer(&self) -> BatchWriter {
139 BatchWriter::new(self.client.clone(), &self.tables)
140 }
141
142 pub async fn backfill_added_indexes(
153 &self,
154 table_name: &str,
155 previous_index_specs: &[IndexSpec],
156 ) -> DataFusionResult<IndexBackfillReport> {
157 self.backfill_added_indexes_with_options(
158 table_name,
159 previous_index_specs,
160 IndexBackfillOptions::default(),
161 )
162 .await
163 }
164
165 pub async fn backfill_added_indexes_with_options(
168 &self,
169 table_name: &str,
170 previous_index_specs: &[IndexSpec],
171 options: IndexBackfillOptions,
172 ) -> DataFusionResult<IndexBackfillReport> {
173 self.backfill_added_indexes_with_options_and_progress(
174 table_name,
175 previous_index_specs,
176 options,
177 None,
178 )
179 .await
180 }
181
182 pub async fn backfill_added_indexes_with_options_and_progress(
190 &self,
191 table_name: &str,
192 previous_index_specs: &[IndexSpec],
193 options: IndexBackfillOptions,
194 progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
195 ) -> DataFusionResult<IndexBackfillReport> {
196 if options.row_batch_size == 0 {
197 return Err(DataFusionError::Execution(
198 "index backfill row_batch_size must be > 0".to_string(),
199 ));
200 }
201
202 let config = self
203 .tables
204 .iter()
205 .find(|(name, _)| name == table_name)
206 .map(|(_, config)| config.clone())
207 .ok_or_else(|| {
208 DataFusionError::Execution(format!(
209 "unknown table '{table_name}' for index backfill"
210 ))
211 })?;
212
213 let model = TableModel::from_config(&config)
214 .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?;
215 let current_specs = model
216 .resolve_index_specs(&config.index_specs)
217 .map_err(|e| DataFusionError::Execution(format!("invalid index specs: {e}")))?;
218 let previous_specs = model
219 .resolve_index_specs(previous_index_specs)
220 .map_err(|e| {
221 DataFusionError::Execution(format!("invalid previous index specs: {e}"))
222 })?;
223
224 if previous_specs.len() > current_specs.len() {
225 return Err(DataFusionError::Execution(format!(
226 "table '{table_name}' previous index count ({}) exceeds current index count ({})",
227 previous_specs.len(),
228 current_specs.len()
229 )));
230 }
231 for (idx, previous) in previous_specs.iter().enumerate() {
232 let current = ¤t_specs[idx];
233 if !resolved_index_layout_matches(previous, current) {
234 return Err(DataFusionError::Execution(format!(
235 "table '{table_name}' index evolution must be append-only; index at position {} changed",
236 idx + 1
237 )));
238 }
239 }
240
241 let full_range = primary_key_prefix_range(model.table_prefix);
242 let mut cursor = options
243 .start_from_primary_key
244 .unwrap_or_else(|| full_range.start.clone());
245 if !model.primary_key_codec.matches(&cursor) {
246 return Err(DataFusionError::Execution(
247 "index backfill start_from_primary_key must use this table's primary-key prefix"
248 .to_string(),
249 ));
250 }
251 if cursor < full_range.start || cursor > full_range.end {
252 return Err(DataFusionError::Execution(
253 "index backfill start_from_primary_key is outside table key range".to_string(),
254 ));
255 }
256
257 let new_specs = current_specs[previous_specs.len()..].to_vec();
258 if new_specs.is_empty() {
259 let report = IndexBackfillReport::default();
260 send_backfill_event(
261 progress_tx,
262 IndexBackfillEvent::Started {
263 table_name: table_name.to_string(),
264 indexes_backfilled: 0,
265 row_batch_size: options.row_batch_size,
266 start_cursor: cursor.clone(),
267 },
268 );
269 send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
270 return Ok(report);
271 }
272
273 let mut report = IndexBackfillReport {
274 scanned_rows: 0,
275 indexes_backfilled: new_specs.len(),
276 index_entries_written: 0,
277 };
278 let mut pending_keys = Vec::new();
279 let mut pending_values = Vec::new();
280 let session = self.client.create_session();
281 let decode_pk_mask = vec![true; model.primary_key_kinds.len()];
282 send_backfill_event(
283 progress_tx,
284 IndexBackfillEvent::Started {
285 table_name: table_name.to_string(),
286 indexes_backfilled: new_specs.len(),
287 row_batch_size: options.row_batch_size,
288 start_cursor: cursor.clone(),
289 },
290 );
291
292 loop {
293 let mut stream = session
294 .range_stream(
295 &cursor,
296 &full_range.end,
297 options.row_batch_size,
298 options.row_batch_size,
299 )
300 .await
301 .map_err(|e| DataFusionError::External(Box::new(e)))?;
302 let mut last_key = None;
303 while let Some(chunk) = stream
304 .next_chunk()
305 .await
306 .map_err(|e| DataFusionError::External(Box::new(e)))?
307 {
308 for (base_key, base_value) in &chunk {
309 last_key = Some(base_key.clone());
310 let Some(pk_values) = decode_primary_key_selected(
311 model.table_prefix,
312 base_key,
313 &model,
314 &decode_pk_mask,
315 ) else {
316 return Err(DataFusionError::Execution(format!(
317 "invalid primary key while backfilling index (key={})",
318 hex::encode(base_key)
319 )));
320 };
321 let archived = decode_stored_row(base_value).map_err(|e| {
322 DataFusionError::Execution(format!(
323 "invalid base row payload while backfilling index (key={}): {e}",
324 hex::encode(base_key)
325 ))
326 })?;
327 if archived.values.len() != model.columns.len() {
328 return Err(DataFusionError::Execution(format!(
329 "invalid base row payload while backfilling index (key={})",
330 hex::encode(base_key)
331 )));
332 }
333 report.scanned_rows += 1;
334
335 for spec in &new_specs {
336 let index_key = encode_secondary_index_key_from_parts(
337 model.table_prefix,
338 spec,
339 &model,
340 &pk_values,
341 &archived,
342 )?;
343 let index_value =
344 encode_secondary_index_value_from_archived(&archived, &model, spec)?;
345 pending_keys.push(index_key);
346 pending_values.push(index_value);
347 report.index_entries_written += 1;
348 }
349
350 if pending_keys.len() >= INDEX_BACKFILL_FLUSH_ENTRIES {
351 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values)
352 .await?;
353 }
354 }
355 }
356 let Some(last_key) = last_key else {
357 break;
358 };
359
360 let next_cursor = if last_key >= full_range.end {
361 None
362 } else {
363 next_key(&last_key)
364 };
365 if !pending_keys.is_empty() {
366 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
367 }
368 send_backfill_event(
369 progress_tx,
370 IndexBackfillEvent::Progress {
371 scanned_rows: report.scanned_rows,
372 index_entries_written: report.index_entries_written,
373 last_scanned_primary_key: last_key,
374 next_cursor: next_cursor.clone(),
375 },
376 );
377
378 if let Some(next) = next_cursor {
379 cursor = next;
380 } else {
381 break;
382 }
383 }
384
385 if !pending_keys.is_empty() {
386 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
387 }
388 send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
389 Ok(report)
390 }
391}
392
393pub(crate) fn send_backfill_event(
394 progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
395 event: IndexBackfillEvent,
396) {
397 if let Some(tx) = progress_tx {
398 let _ = tx.send(event);
399 }
400}
401
402pub(crate) fn resolved_index_layout_matches(
403 previous: &ResolvedIndexSpec,
404 current: &ResolvedIndexSpec,
405) -> bool {
406 previous.id == current.id
407 && previous.name == current.name
408 && previous.layout == current.layout
409 && previous.key_columns == current.key_columns
410 && previous.value_column_mask == current.value_column_mask
411 && previous.key_columns_width == current.key_columns_width
412}
413
414#[async_trait]
415impl TableProvider for KvTable {
416 fn as_any(&self) -> &dyn Any {
417 self
418 }
419
420 fn schema(&self) -> SchemaRef {
421 self.model.schema.clone()
422 }
423
424 fn table_type(&self) -> TableType {
425 TableType::Base
426 }
427
428 fn supports_filters_pushdown(
429 &self,
430 filters: &[&Expr],
431 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
432 Ok(filters
433 .iter()
434 .map(|expr| {
435 if QueryPredicate::supports_filter(expr, &self.model) {
436 TableProviderFilterPushDown::Exact
437 } else {
438 TableProviderFilterPushDown::Unsupported
439 }
440 })
441 .collect())
442 }
443
444 async fn scan(
445 &self,
446 _state: &dyn Session,
447 projection: Option<&Vec<usize>>,
448 filters: &[Expr],
449 limit: Option<usize>,
450 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
451 let predicate = QueryPredicate::from_filters(filters, &self.model);
452 let projected_schema = match projection {
453 Some(proj) => Arc::new(self.model.schema.project(proj)?),
454 None => self.model.schema.clone(),
455 };
456 Ok(Arc::new(KvScanExec::new(
457 self.client.clone(),
458 self.model.clone(),
459 self.index_specs.clone(),
460 predicate,
461 limit,
462 projected_schema,
463 projection.cloned(),
464 )))
465 }
466
467 async fn insert_into(
468 &self,
469 _state: &dyn Session,
470 input: Arc<dyn ExecutionPlan>,
471 insert_op: InsertOp,
472 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
473 self.schema()
474 .logically_equivalent_names_and_types(&input.schema())?;
475 if insert_op != InsertOp::Append {
476 return Err(DataFusionError::NotImplemented(format!(
477 "{insert_op} not implemented for kv table"
478 )));
479 }
480
481 let sink = KvIngestSink::new(
482 self.client.clone(),
483 self.model.schema.clone(),
484 self.model.clone(),
485 self.index_specs.clone(),
486 );
487 Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
488 }
489}