1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use datafusion::arrow::datatypes::{DataType, SchemaRef};
6use datafusion::catalog::Session;
7use datafusion::common::{DataFusionError, Result as DataFusionResult, SchemaExt};
8use datafusion::datasource::sink::DataSinkExec;
9use datafusion::datasource::TableProvider;
10use datafusion::logical_expr::dml::InsertOp;
11use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::prelude::SessionContext;
14use exoware_sdk_rs::kv_codec::decode_stored_row;
15use exoware_sdk_rs::StoreClient;
16
17use crate::aggregate::KvAggregatePushdownRule;
18use crate::codec::*;
19use crate::predicate::*;
20use crate::scan::*;
21use crate::types::*;
22use crate::writer::*;
23
24pub(crate) fn register_kv_table(
25 ctx: &SessionContext,
26 table_name: &str,
27 client: StoreClient,
28 config: KvTableConfig,
29) -> DataFusionResult<()> {
30 let table = Arc::new(
31 KvTable::new(client, config)
32 .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?,
33 );
34 let _ = ctx.register_table(table_name, table)?;
35 Ok(())
36}
37
38pub struct KvSchema {
39 client: StoreClient,
40 tables: Vec<(String, KvTableConfig)>,
41 next_prefix: u8,
42}
43
44impl KvSchema {
45 pub fn new(client: StoreClient) -> Self {
46 Self {
47 client,
48 tables: Vec::new(),
49 next_prefix: 0,
50 }
51 }
52
53 pub fn table(
54 mut self,
55 name: impl Into<String>,
56 columns: Vec<TableColumnConfig>,
57 primary_key_columns: Vec<String>,
58 index_specs: Vec<IndexSpec>,
59 ) -> Result<Self, String> {
60 if self.tables.len() >= MAX_TABLES {
61 return Err(format!(
62 "too many tables for codec layout (max {MAX_TABLES})"
63 ));
64 }
65 let prefix = self.next_prefix;
66 let config = KvTableConfig::new(prefix, columns, primary_key_columns, index_specs)?;
67 self.tables.push((name.into(), config));
68 self.next_prefix = self.next_prefix.wrapping_add(1);
69 Ok(self)
70 }
71
72 pub fn orders_table(
73 self,
74 table_name: impl Into<String>,
75 index_specs: Vec<IndexSpec>,
76 ) -> Result<Self, String> {
77 self.table(
78 table_name,
79 vec![
80 TableColumnConfig::new("region", DataType::Utf8, false),
81 TableColumnConfig::new("customer_id", DataType::Int64, false),
82 TableColumnConfig::new("order_id", DataType::Int64, false),
83 TableColumnConfig::new("amount_cents", DataType::Int64, false),
84 TableColumnConfig::new("status", DataType::Utf8, false),
85 ],
86 vec!["order_id".to_string()],
87 index_specs,
88 )
89 }
90
91 pub fn table_versioned(
105 self,
106 name: impl Into<String>,
107 columns: Vec<TableColumnConfig>,
108 entity_column: impl Into<String>,
109 version_column: impl Into<String>,
110 index_specs: Vec<IndexSpec>,
111 ) -> Result<Self, String> {
112 let entity = entity_column.into();
113 let version = version_column.into();
114 self.table(name, columns, vec![entity, version], index_specs)
115 }
116
117 pub fn table_count(&self) -> usize {
118 self.tables.len()
119 }
120
121 pub fn register_all(self, ctx: &SessionContext) -> DataFusionResult<()> {
122 let _ = ctx.remove_optimizer_rule("kv_aggregate_pushdown");
123 ctx.add_optimizer_rule(Arc::new(KvAggregatePushdownRule::new()));
124 for (name, config) in &self.tables {
125 register_kv_table(ctx, name, self.client.clone(), config.clone())?;
126 }
127 Ok(())
128 }
129
130 pub fn batch_writer(&self) -> BatchWriter {
131 BatchWriter::new(self.client.clone(), &self.tables)
132 }
133
134 pub async fn backfill_added_indexes(
145 &self,
146 table_name: &str,
147 previous_index_specs: &[IndexSpec],
148 ) -> DataFusionResult<IndexBackfillReport> {
149 self.backfill_added_indexes_with_options(
150 table_name,
151 previous_index_specs,
152 IndexBackfillOptions::default(),
153 )
154 .await
155 }
156
157 pub async fn backfill_added_indexes_with_options(
160 &self,
161 table_name: &str,
162 previous_index_specs: &[IndexSpec],
163 options: IndexBackfillOptions,
164 ) -> DataFusionResult<IndexBackfillReport> {
165 self.backfill_added_indexes_with_options_and_progress(
166 table_name,
167 previous_index_specs,
168 options,
169 None,
170 )
171 .await
172 }
173
174 pub async fn backfill_added_indexes_with_options_and_progress(
182 &self,
183 table_name: &str,
184 previous_index_specs: &[IndexSpec],
185 options: IndexBackfillOptions,
186 progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
187 ) -> DataFusionResult<IndexBackfillReport> {
188 if options.row_batch_size == 0 {
189 return Err(DataFusionError::Execution(
190 "index backfill row_batch_size must be > 0".to_string(),
191 ));
192 }
193
194 let config = self
195 .tables
196 .iter()
197 .find(|(name, _)| name == table_name)
198 .map(|(_, config)| config.clone())
199 .ok_or_else(|| {
200 DataFusionError::Execution(format!(
201 "unknown table '{table_name}' for index backfill"
202 ))
203 })?;
204
205 let model = TableModel::from_config(&config)
206 .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?;
207 let current_specs = model
208 .resolve_index_specs(&config.index_specs)
209 .map_err(|e| DataFusionError::Execution(format!("invalid index specs: {e}")))?;
210 let previous_specs = model
211 .resolve_index_specs(previous_index_specs)
212 .map_err(|e| {
213 DataFusionError::Execution(format!("invalid previous index specs: {e}"))
214 })?;
215
216 if previous_specs.len() > current_specs.len() {
217 return Err(DataFusionError::Execution(format!(
218 "table '{table_name}' previous index count ({}) exceeds current index count ({})",
219 previous_specs.len(),
220 current_specs.len()
221 )));
222 }
223 for (idx, previous) in previous_specs.iter().enumerate() {
224 let current = ¤t_specs[idx];
225 if !resolved_index_layout_matches(previous, current) {
226 return Err(DataFusionError::Execution(format!(
227 "table '{table_name}' index evolution must be append-only; index at position {} changed",
228 idx + 1
229 )));
230 }
231 }
232
233 let full_range = primary_key_prefix_range(model.table_prefix);
234 let mut cursor = options
235 .start_from_primary_key
236 .unwrap_or_else(|| full_range.start.clone());
237 if !model.primary_key_codec.matches(&cursor) {
238 return Err(DataFusionError::Execution(
239 "index backfill start_from_primary_key must use this table's primary-key prefix"
240 .to_string(),
241 ));
242 }
243 if cursor < full_range.start || cursor > full_range.end {
244 return Err(DataFusionError::Execution(
245 "index backfill start_from_primary_key is outside table key range".to_string(),
246 ));
247 }
248
249 let new_specs = current_specs[previous_specs.len()..].to_vec();
250 if new_specs.is_empty() {
251 let report = IndexBackfillReport::default();
252 send_backfill_event(
253 progress_tx,
254 IndexBackfillEvent::Started {
255 table_name: table_name.to_string(),
256 indexes_backfilled: 0,
257 row_batch_size: options.row_batch_size,
258 start_cursor: cursor.clone(),
259 },
260 );
261 send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
262 return Ok(report);
263 }
264
265 let mut report = IndexBackfillReport {
266 scanned_rows: 0,
267 indexes_backfilled: new_specs.len(),
268 index_entries_written: 0,
269 };
270 let mut pending_keys = Vec::new();
271 let mut pending_values = Vec::new();
272 let session = self.client.create_session();
273 let decode_pk_mask = vec![true; model.primary_key_kinds.len()];
274 send_backfill_event(
275 progress_tx,
276 IndexBackfillEvent::Started {
277 table_name: table_name.to_string(),
278 indexes_backfilled: new_specs.len(),
279 row_batch_size: options.row_batch_size,
280 start_cursor: cursor.clone(),
281 },
282 );
283
284 loop {
285 let mut stream = session
286 .range_stream(
287 &cursor,
288 &full_range.end,
289 options.row_batch_size,
290 options.row_batch_size,
291 )
292 .await
293 .map_err(|e| DataFusionError::External(Box::new(e)))?;
294 let mut last_key = None;
295 while let Some(chunk) = stream
296 .next_chunk()
297 .await
298 .map_err(|e| DataFusionError::External(Box::new(e)))?
299 {
300 for (base_key, base_value) in &chunk {
301 last_key = Some(base_key.clone());
302 let Some(pk_values) = decode_primary_key_selected(
303 model.table_prefix,
304 base_key,
305 &model,
306 &decode_pk_mask,
307 ) else {
308 return Err(DataFusionError::Execution(format!(
309 "invalid primary key while backfilling index (key={})",
310 hex::encode(base_key)
311 )));
312 };
313 let archived = decode_stored_row(base_value).map_err(|e| {
314 DataFusionError::Execution(format!(
315 "invalid base row payload while backfilling index (key={}): {e}",
316 hex::encode(base_key)
317 ))
318 })?;
319 if archived.values.len() != model.columns.len() {
320 return Err(DataFusionError::Execution(format!(
321 "invalid base row payload while backfilling index (key={})",
322 hex::encode(base_key)
323 )));
324 }
325 report.scanned_rows += 1;
326
327 for spec in &new_specs {
328 let index_key = encode_secondary_index_key_from_parts(
329 model.table_prefix,
330 spec,
331 &model,
332 &pk_values,
333 &archived,
334 )?;
335 let index_value =
336 encode_secondary_index_value_from_archived(&archived, &model, spec)?;
337 pending_keys.push(index_key);
338 pending_values.push(index_value);
339 report.index_entries_written += 1;
340 }
341
342 if pending_keys.len() >= INDEX_BACKFILL_FLUSH_ENTRIES {
343 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values)
344 .await?;
345 }
346 }
347 }
348 let Some(last_key) = last_key else {
349 break;
350 };
351
352 let next_cursor = if last_key >= full_range.end {
353 None
354 } else {
355 next_key(&last_key)
356 };
357 if !pending_keys.is_empty() {
358 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
359 }
360 send_backfill_event(
361 progress_tx,
362 IndexBackfillEvent::Progress {
363 scanned_rows: report.scanned_rows,
364 index_entries_written: report.index_entries_written,
365 last_scanned_primary_key: last_key,
366 next_cursor: next_cursor.clone(),
367 },
368 );
369
370 if let Some(next) = next_cursor {
371 cursor = next;
372 } else {
373 break;
374 }
375 }
376
377 if !pending_keys.is_empty() {
378 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
379 }
380 send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
381 Ok(report)
382 }
383}
384
385pub(crate) fn send_backfill_event(
386 progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
387 event: IndexBackfillEvent,
388) {
389 if let Some(tx) = progress_tx {
390 let _ = tx.send(event);
391 }
392}
393
394pub(crate) fn resolved_index_layout_matches(
395 previous: &ResolvedIndexSpec,
396 current: &ResolvedIndexSpec,
397) -> bool {
398 previous.id == current.id
399 && previous.name == current.name
400 && previous.layout == current.layout
401 && previous.key_columns == current.key_columns
402 && previous.value_column_mask == current.value_column_mask
403 && previous.key_columns_width == current.key_columns_width
404}
405
406#[async_trait]
407impl TableProvider for KvTable {
408 fn as_any(&self) -> &dyn Any {
409 self
410 }
411
412 fn schema(&self) -> SchemaRef {
413 self.model.schema.clone()
414 }
415
416 fn table_type(&self) -> TableType {
417 TableType::Base
418 }
419
420 fn supports_filters_pushdown(
421 &self,
422 filters: &[&Expr],
423 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
424 Ok(filters
425 .iter()
426 .map(|expr| {
427 if QueryPredicate::supports_filter(expr, &self.model) {
428 TableProviderFilterPushDown::Exact
429 } else {
430 TableProviderFilterPushDown::Unsupported
431 }
432 })
433 .collect())
434 }
435
436 async fn scan(
437 &self,
438 _state: &dyn Session,
439 projection: Option<&Vec<usize>>,
440 filters: &[Expr],
441 limit: Option<usize>,
442 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
443 let predicate = QueryPredicate::from_filters(filters, &self.model);
444 let projected_schema = match projection {
445 Some(proj) => Arc::new(self.model.schema.project(proj)?),
446 None => self.model.schema.clone(),
447 };
448 Ok(Arc::new(KvScanExec::new(
449 self.client.clone(),
450 self.model.clone(),
451 self.index_specs.clone(),
452 predicate,
453 limit,
454 projected_schema,
455 projection.cloned(),
456 )))
457 }
458
459 async fn insert_into(
460 &self,
461 _state: &dyn Session,
462 input: Arc<dyn ExecutionPlan>,
463 insert_op: InsertOp,
464 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
465 self.schema()
466 .logically_equivalent_names_and_types(&input.schema())?;
467 if insert_op != InsertOp::Append {
468 return Err(DataFusionError::NotImplemented(format!(
469 "{insert_op} not implemented for kv table"
470 )));
471 }
472
473 let sink = KvIngestSink::new(
474 self.client.clone(),
475 self.model.schema.clone(),
476 self.model.clone(),
477 self.index_specs.clone(),
478 );
479 Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
480 }
481}