Skip to main content

exoware_sql/
schema.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use datafusion::arrow::datatypes::{DataType, SchemaRef};
6use datafusion::catalog::Session;
7use datafusion::common::{DataFusionError, Result as DataFusionResult, SchemaExt};
8use datafusion::datasource::sink::DataSinkExec;
9use datafusion::datasource::TableProvider;
10use datafusion::logical_expr::dml::InsertOp;
11use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::prelude::SessionContext;
14use exoware_sdk::kv_codec::decode_stored_row;
15use exoware_sdk::StoreClient;
16
17use crate::aggregate::KvAggregatePushdownRule;
18use crate::codec::*;
19use crate::predicate::*;
20use crate::scan::*;
21use crate::types::*;
22use crate::writer::*;
23
24pub(crate) fn register_kv_table(
25    ctx: &SessionContext,
26    table_name: &str,
27    client: StoreClient,
28    config: KvTableConfig,
29) -> DataFusionResult<()> {
30    let table = Arc::new(
31        KvTable::new(client, config)
32            .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?,
33    );
34    let _ = ctx.register_table(table_name, table)?;
35    Ok(())
36}
37
38pub struct KvSchema {
39    client: StoreClient,
40    tables: Vec<(String, KvTableConfig)>,
41    next_prefix: u8,
42}
43
44impl KvSchema {
45    pub fn new(client: StoreClient) -> Self {
46        Self {
47            client,
48            tables: Vec::new(),
49            next_prefix: 0,
50        }
51    }
52
53    pub fn table(
54        mut self,
55        name: impl Into<String>,
56        columns: Vec<TableColumnConfig>,
57        primary_key_columns: Vec<String>,
58        index_specs: Vec<IndexSpec>,
59    ) -> Result<Self, String> {
60        if self.tables.len() >= MAX_TABLES {
61            return Err(format!(
62                "too many tables for codec layout (max {MAX_TABLES})"
63            ));
64        }
65        let prefix = self.next_prefix;
66        let config = KvTableConfig::new(prefix, columns, primary_key_columns, index_specs)?;
67        self.tables.push((name.into(), config));
68        self.next_prefix = self.next_prefix.wrapping_add(1);
69        Ok(self)
70    }
71
72    pub fn orders_table(
73        self,
74        table_name: impl Into<String>,
75        index_specs: Vec<IndexSpec>,
76    ) -> Result<Self, String> {
77        self.table(
78            table_name,
79            vec![
80                TableColumnConfig::new("region", DataType::Utf8, false),
81                TableColumnConfig::new("customer_id", DataType::Int64, false),
82                TableColumnConfig::new("order_id", DataType::Int64, false),
83                TableColumnConfig::new("amount_cents", DataType::Int64, false),
84                TableColumnConfig::new("status", DataType::Utf8, false),
85            ],
86            vec!["order_id".to_string()],
87            index_specs,
88        )
89    }
90
91    /// Create a table with a versioned composite primary key.
92    ///
93    /// The entity column and version column (UInt64) together form the
94    /// composite primary key. The entity can be any supported primary-key
95    /// type, including variable-length logical keys encoded through the
96    /// crate's ordered variable-length `Utf8` mapping.
97    ///
98    /// Versions sort
99    /// numerically via big-endian encoding, so a reverse range scan
100    /// from `(entity, V)` downward with LIMIT 1 yields the latest
101    /// version <= V. See `examples/versioned_kv.rs` for the basic
102    /// query pattern plus an immutable-friendly companion watermark
103    /// table pattern for out-of-order batch uploads.
104    pub fn table_versioned(
105        self,
106        name: impl Into<String>,
107        columns: Vec<TableColumnConfig>,
108        entity_column: impl Into<String>,
109        version_column: impl Into<String>,
110        index_specs: Vec<IndexSpec>,
111    ) -> Result<Self, String> {
112        let entity = entity_column.into();
113        let version = version_column.into();
114        self.table(name, columns, vec![entity, version], index_specs)
115    }
116
117    pub fn table_count(&self) -> usize {
118        self.tables.len()
119    }
120
121    pub(crate) fn client(&self) -> &StoreClient {
122        &self.client
123    }
124
125    pub(crate) fn tables(&self) -> &[(String, KvTableConfig)] {
126        &self.tables
127    }
128
129    pub fn register_all(self, ctx: &SessionContext) -> DataFusionResult<()> {
130        let _ = ctx.remove_optimizer_rule("kv_aggregate_pushdown");
131        ctx.add_optimizer_rule(Arc::new(KvAggregatePushdownRule::new()));
132        for (name, config) in &self.tables {
133            register_kv_table(ctx, name, self.client.clone(), config.clone())?;
134        }
135        Ok(())
136    }
137
138    pub fn batch_writer(&self) -> BatchWriter {
139        BatchWriter::new(self.client.clone(), &self.tables)
140    }
141
142    /// Backfill secondary index entries after adding new index specs.
143    ///
144    /// `previous_index_specs` must represent the index list used when existing
145    /// rows were written. The current schema's index list must be an append-only
146    /// extension of that list (same order/layout for existing indexes, with new
147    /// indexes only added at the tail).
148    ///
149    /// Operational ordering requirement: start writing new rows with the new
150    /// index specs before backfilling historical rows, or rows written during
151    /// the backfill window may be missing from the new index.
152    pub async fn backfill_added_indexes(
153        &self,
154        table_name: &str,
155        previous_index_specs: &[IndexSpec],
156    ) -> DataFusionResult<IndexBackfillReport> {
157        self.backfill_added_indexes_with_options(
158            table_name,
159            previous_index_specs,
160            IndexBackfillOptions::default(),
161        )
162        .await
163    }
164
165    /// Backfill secondary index entries after adding new index specs, with
166    /// configurable row page size for the full-scan read.
167    pub async fn backfill_added_indexes_with_options(
168        &self,
169        table_name: &str,
170        previous_index_specs: &[IndexSpec],
171        options: IndexBackfillOptions,
172    ) -> DataFusionResult<IndexBackfillReport> {
173        self.backfill_added_indexes_with_options_and_progress(
174            table_name,
175            previous_index_specs,
176            options,
177            None,
178        )
179        .await
180    }
181
182    /// Backfill secondary index entries after adding new index specs, with
183    /// configurable row page size for the full-scan read and an optional
184    /// progress event channel.
185    ///
186    /// Progress events are emitted only after buffered ingest writes for the
187    /// reported cursor are flushed, so `Progress.next_cursor` can be persisted
188    /// and used to resume later.
189    pub async fn backfill_added_indexes_with_options_and_progress(
190        &self,
191        table_name: &str,
192        previous_index_specs: &[IndexSpec],
193        options: IndexBackfillOptions,
194        progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
195    ) -> DataFusionResult<IndexBackfillReport> {
196        if options.row_batch_size == 0 {
197            return Err(DataFusionError::Execution(
198                "index backfill row_batch_size must be > 0".to_string(),
199            ));
200        }
201
202        let config = self
203            .tables
204            .iter()
205            .find(|(name, _)| name == table_name)
206            .map(|(_, config)| config.clone())
207            .ok_or_else(|| {
208                DataFusionError::Execution(format!(
209                    "unknown table '{table_name}' for index backfill"
210                ))
211            })?;
212
213        let model = TableModel::from_config(&config)
214            .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?;
215        let current_specs = model
216            .resolve_index_specs(&config.index_specs)
217            .map_err(|e| DataFusionError::Execution(format!("invalid index specs: {e}")))?;
218        let previous_specs = model
219            .resolve_index_specs(previous_index_specs)
220            .map_err(|e| {
221                DataFusionError::Execution(format!("invalid previous index specs: {e}"))
222            })?;
223
224        if previous_specs.len() > current_specs.len() {
225            return Err(DataFusionError::Execution(format!(
226                "table '{table_name}' previous index count ({}) exceeds current index count ({})",
227                previous_specs.len(),
228                current_specs.len()
229            )));
230        }
231        for (idx, previous) in previous_specs.iter().enumerate() {
232            let current = &current_specs[idx];
233            if !resolved_index_layout_matches(previous, current) {
234                return Err(DataFusionError::Execution(format!(
235                    "table '{table_name}' index evolution must be append-only; index at position {} changed",
236                    idx + 1
237                )));
238            }
239        }
240
241        let full_range = primary_key_prefix_range(model.table_prefix);
242        let mut cursor = options
243            .start_from_primary_key
244            .unwrap_or_else(|| full_range.start.clone());
245        if !model.primary_key_codec.matches(&cursor) {
246            return Err(DataFusionError::Execution(
247                "index backfill start_from_primary_key must use this table's primary-key prefix"
248                    .to_string(),
249            ));
250        }
251        if cursor < full_range.start || cursor > full_range.end {
252            return Err(DataFusionError::Execution(
253                "index backfill start_from_primary_key is outside table key range".to_string(),
254            ));
255        }
256
257        let new_specs = current_specs[previous_specs.len()..].to_vec();
258        if new_specs.is_empty() {
259            let report = IndexBackfillReport::default();
260            send_backfill_event(
261                progress_tx,
262                IndexBackfillEvent::Started {
263                    table_name: table_name.to_string(),
264                    indexes_backfilled: 0,
265                    row_batch_size: options.row_batch_size,
266                    start_cursor: cursor.clone(),
267                },
268            );
269            send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
270            return Ok(report);
271        }
272
273        let mut report = IndexBackfillReport {
274            scanned_rows: 0,
275            indexes_backfilled: new_specs.len(),
276            index_entries_written: 0,
277        };
278        let mut pending_keys = Vec::new();
279        let mut pending_values = Vec::new();
280        let session = self.client.create_session();
281        let decode_pk_mask = vec![true; model.primary_key_kinds.len()];
282        send_backfill_event(
283            progress_tx,
284            IndexBackfillEvent::Started {
285                table_name: table_name.to_string(),
286                indexes_backfilled: new_specs.len(),
287                row_batch_size: options.row_batch_size,
288                start_cursor: cursor.clone(),
289            },
290        );
291
292        loop {
293            let mut stream = session
294                .range_stream(
295                    &cursor,
296                    &full_range.end,
297                    options.row_batch_size,
298                    options.row_batch_size,
299                )
300                .await
301                .map_err(|e| DataFusionError::External(Box::new(e)))?;
302            let mut last_key = None;
303            while let Some(chunk) = stream
304                .next_chunk()
305                .await
306                .map_err(|e| DataFusionError::External(Box::new(e)))?
307            {
308                for (base_key, base_value) in &chunk {
309                    last_key = Some(base_key.clone());
310                    let Some(pk_values) = decode_primary_key_selected(
311                        model.table_prefix,
312                        base_key,
313                        &model,
314                        &decode_pk_mask,
315                    ) else {
316                        return Err(DataFusionError::Execution(format!(
317                            "invalid primary key while backfilling index (key={})",
318                            hex::encode(base_key)
319                        )));
320                    };
321                    let archived = decode_stored_row(base_value).map_err(|e| {
322                        DataFusionError::Execution(format!(
323                            "invalid base row payload while backfilling index (key={}): {e}",
324                            hex::encode(base_key)
325                        ))
326                    })?;
327                    if archived.values.len() != model.columns.len() {
328                        return Err(DataFusionError::Execution(format!(
329                            "invalid base row payload while backfilling index (key={})",
330                            hex::encode(base_key)
331                        )));
332                    }
333                    report.scanned_rows += 1;
334
335                    for spec in &new_specs {
336                        let index_key = encode_secondary_index_key_from_parts(
337                            model.table_prefix,
338                            spec,
339                            &model,
340                            &pk_values,
341                            &archived,
342                        )?;
343                        let index_value =
344                            encode_secondary_index_value_from_archived(&archived, &model, spec)?;
345                        pending_keys.push(index_key);
346                        pending_values.push(index_value);
347                        report.index_entries_written += 1;
348                    }
349
350                    if pending_keys.len() >= INDEX_BACKFILL_FLUSH_ENTRIES {
351                        flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values)
352                            .await?;
353                    }
354                }
355            }
356            let Some(last_key) = last_key else {
357                break;
358            };
359
360            let next_cursor = if last_key >= full_range.end {
361                None
362            } else {
363                next_key(&last_key)
364            };
365            if !pending_keys.is_empty() {
366                flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
367            }
368            send_backfill_event(
369                progress_tx,
370                IndexBackfillEvent::Progress {
371                    scanned_rows: report.scanned_rows,
372                    index_entries_written: report.index_entries_written,
373                    last_scanned_primary_key: last_key,
374                    next_cursor: next_cursor.clone(),
375                },
376            );
377
378            if let Some(next) = next_cursor {
379                cursor = next;
380            } else {
381                break;
382            }
383        }
384
385        if !pending_keys.is_empty() {
386            flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
387        }
388        send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
389        Ok(report)
390    }
391}
392
393pub(crate) fn send_backfill_event(
394    progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
395    event: IndexBackfillEvent,
396) {
397    if let Some(tx) = progress_tx {
398        let _ = tx.send(event);
399    }
400}
401
402pub(crate) fn resolved_index_layout_matches(
403    previous: &ResolvedIndexSpec,
404    current: &ResolvedIndexSpec,
405) -> bool {
406    previous.id == current.id
407        && previous.name == current.name
408        && previous.layout == current.layout
409        && previous.key_columns == current.key_columns
410        && previous.value_column_mask == current.value_column_mask
411        && previous.key_columns_width == current.key_columns_width
412}
413
414#[async_trait]
415impl TableProvider for KvTable {
416    fn as_any(&self) -> &dyn Any {
417        self
418    }
419
420    fn schema(&self) -> SchemaRef {
421        self.model.schema.clone()
422    }
423
424    fn table_type(&self) -> TableType {
425        TableType::Base
426    }
427
428    fn supports_filters_pushdown(
429        &self,
430        filters: &[&Expr],
431    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
432        Ok(filters
433            .iter()
434            .map(|expr| {
435                if QueryPredicate::supports_filter(expr, &self.model) {
436                    TableProviderFilterPushDown::Exact
437                } else {
438                    TableProviderFilterPushDown::Unsupported
439                }
440            })
441            .collect())
442    }
443
444    async fn scan(
445        &self,
446        _state: &dyn Session,
447        projection: Option<&Vec<usize>>,
448        filters: &[Expr],
449        limit: Option<usize>,
450    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
451        let predicate = QueryPredicate::from_filters(filters, &self.model);
452        let projected_schema = match projection {
453            Some(proj) => Arc::new(self.model.schema.project(proj)?),
454            None => self.model.schema.clone(),
455        };
456        Ok(Arc::new(KvScanExec::new(
457            self.client.clone(),
458            self.model.clone(),
459            self.index_specs.clone(),
460            predicate,
461            limit,
462            projected_schema,
463            projection.cloned(),
464        )))
465    }
466
467    async fn insert_into(
468        &self,
469        _state: &dyn Session,
470        input: Arc<dyn ExecutionPlan>,
471        insert_op: InsertOp,
472    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
473        self.schema()
474            .logically_equivalent_names_and_types(&input.schema())?;
475        if insert_op != InsertOp::Append {
476            return Err(DataFusionError::NotImplemented(format!(
477                "{insert_op} not implemented for kv table"
478            )));
479        }
480
481        let sink = KvIngestSink::new(
482            self.client.clone(),
483            self.model.schema.clone(),
484            self.model.clone(),
485            self.index_specs.clone(),
486        );
487        Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
488    }
489}