Skip to main content

exoware_sql/
schema.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use datafusion::arrow::datatypes::{DataType, SchemaRef};
6use datafusion::catalog::Session;
7use datafusion::common::{DataFusionError, Result as DataFusionResult, SchemaExt};
8use datafusion::datasource::sink::DataSinkExec;
9use datafusion::datasource::TableProvider;
10use datafusion::logical_expr::dml::InsertOp;
11use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::prelude::SessionContext;
14use exoware_sdk_rs::kv_codec::decode_stored_row;
15use exoware_sdk_rs::StoreClient;
16
17use crate::aggregate::KvAggregatePushdownRule;
18use crate::codec::*;
19use crate::predicate::*;
20use crate::scan::*;
21use crate::types::*;
22use crate::writer::*;
23
24pub(crate) fn register_kv_table(
25    ctx: &SessionContext,
26    table_name: &str,
27    client: StoreClient,
28    config: KvTableConfig,
29) -> DataFusionResult<()> {
30    let table = Arc::new(
31        KvTable::new(client, config)
32            .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?,
33    );
34    let _ = ctx.register_table(table_name, table)?;
35    Ok(())
36}
37
38pub struct KvSchema {
39    client: StoreClient,
40    tables: Vec<(String, KvTableConfig)>,
41    next_prefix: u8,
42}
43
44impl KvSchema {
45    pub fn new(client: StoreClient) -> Self {
46        Self {
47            client,
48            tables: Vec::new(),
49            next_prefix: 0,
50        }
51    }
52
53    pub fn table(
54        mut self,
55        name: impl Into<String>,
56        columns: Vec<TableColumnConfig>,
57        primary_key_columns: Vec<String>,
58        index_specs: Vec<IndexSpec>,
59    ) -> Result<Self, String> {
60        if self.tables.len() >= MAX_TABLES {
61            return Err(format!(
62                "too many tables for codec layout (max {MAX_TABLES})"
63            ));
64        }
65        let prefix = self.next_prefix;
66        let config = KvTableConfig::new(prefix, columns, primary_key_columns, index_specs)?;
67        self.tables.push((name.into(), config));
68        self.next_prefix = self.next_prefix.wrapping_add(1);
69        Ok(self)
70    }
71
72    pub fn orders_table(
73        self,
74        table_name: impl Into<String>,
75        index_specs: Vec<IndexSpec>,
76    ) -> Result<Self, String> {
77        self.table(
78            table_name,
79            vec![
80                TableColumnConfig::new("region", DataType::Utf8, false),
81                TableColumnConfig::new("customer_id", DataType::Int64, false),
82                TableColumnConfig::new("order_id", DataType::Int64, false),
83                TableColumnConfig::new("amount_cents", DataType::Int64, false),
84                TableColumnConfig::new("status", DataType::Utf8, false),
85            ],
86            vec!["order_id".to_string()],
87            index_specs,
88        )
89    }
90
91    /// Create a table with a versioned composite primary key.
92    ///
93    /// The entity column and version column (UInt64) together form the
94    /// composite primary key. The entity can be any supported primary-key
95    /// type, including variable-length logical keys encoded through the
96    /// crate's ordered variable-length `Utf8` mapping.
97    ///
98    /// Versions sort
99    /// numerically via big-endian encoding, so a reverse range scan
100    /// from `(entity, V)` downward with LIMIT 1 yields the latest
101    /// version <= V. See `examples/versioned_kv.rs` for the basic
102    /// query pattern plus an immutable-friendly companion watermark
103    /// table pattern for out-of-order batch uploads.
104    pub fn table_versioned(
105        self,
106        name: impl Into<String>,
107        columns: Vec<TableColumnConfig>,
108        entity_column: impl Into<String>,
109        version_column: impl Into<String>,
110        index_specs: Vec<IndexSpec>,
111    ) -> Result<Self, String> {
112        let entity = entity_column.into();
113        let version = version_column.into();
114        self.table(name, columns, vec![entity, version], index_specs)
115    }
116
117    pub fn table_count(&self) -> usize {
118        self.tables.len()
119    }
120
121    pub fn register_all(self, ctx: &SessionContext) -> DataFusionResult<()> {
122        let _ = ctx.remove_optimizer_rule("kv_aggregate_pushdown");
123        ctx.add_optimizer_rule(Arc::new(KvAggregatePushdownRule::new()));
124        for (name, config) in &self.tables {
125            register_kv_table(ctx, name, self.client.clone(), config.clone())?;
126        }
127        Ok(())
128    }
129
130    pub fn batch_writer(&self) -> BatchWriter {
131        BatchWriter::new(self.client.clone(), &self.tables)
132    }
133
134    /// Backfill secondary index entries after adding new index specs.
135    ///
136    /// `previous_index_specs` must represent the index list used when existing
137    /// rows were written. The current schema's index list must be an append-only
138    /// extension of that list (same order/layout for existing indexes, with new
139    /// indexes only added at the tail).
140    ///
141    /// Operational ordering requirement: start writing new rows with the new
142    /// index specs before backfilling historical rows, or rows written during
143    /// the backfill window may be missing from the new index.
144    pub async fn backfill_added_indexes(
145        &self,
146        table_name: &str,
147        previous_index_specs: &[IndexSpec],
148    ) -> DataFusionResult<IndexBackfillReport> {
149        self.backfill_added_indexes_with_options(
150            table_name,
151            previous_index_specs,
152            IndexBackfillOptions::default(),
153        )
154        .await
155    }
156
157    /// Backfill secondary index entries after adding new index specs, with
158    /// configurable row page size for the full-scan read.
159    pub async fn backfill_added_indexes_with_options(
160        &self,
161        table_name: &str,
162        previous_index_specs: &[IndexSpec],
163        options: IndexBackfillOptions,
164    ) -> DataFusionResult<IndexBackfillReport> {
165        self.backfill_added_indexes_with_options_and_progress(
166            table_name,
167            previous_index_specs,
168            options,
169            None,
170        )
171        .await
172    }
173
174    /// Backfill secondary index entries after adding new index specs, with
175    /// configurable row page size for the full-scan read and an optional
176    /// progress event channel.
177    ///
178    /// Progress events are emitted only after buffered ingest writes for the
179    /// reported cursor are flushed, so `Progress.next_cursor` can be persisted
180    /// and used to resume later.
181    pub async fn backfill_added_indexes_with_options_and_progress(
182        &self,
183        table_name: &str,
184        previous_index_specs: &[IndexSpec],
185        options: IndexBackfillOptions,
186        progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
187    ) -> DataFusionResult<IndexBackfillReport> {
188        if options.row_batch_size == 0 {
189            return Err(DataFusionError::Execution(
190                "index backfill row_batch_size must be > 0".to_string(),
191            ));
192        }
193
194        let config = self
195            .tables
196            .iter()
197            .find(|(name, _)| name == table_name)
198            .map(|(_, config)| config.clone())
199            .ok_or_else(|| {
200                DataFusionError::Execution(format!(
201                    "unknown table '{table_name}' for index backfill"
202                ))
203            })?;
204
205        let model = TableModel::from_config(&config)
206            .map_err(|e| DataFusionError::Execution(format!("invalid table config: {e}")))?;
207        let current_specs = model
208            .resolve_index_specs(&config.index_specs)
209            .map_err(|e| DataFusionError::Execution(format!("invalid index specs: {e}")))?;
210        let previous_specs = model
211            .resolve_index_specs(previous_index_specs)
212            .map_err(|e| {
213                DataFusionError::Execution(format!("invalid previous index specs: {e}"))
214            })?;
215
216        if previous_specs.len() > current_specs.len() {
217            return Err(DataFusionError::Execution(format!(
218                "table '{table_name}' previous index count ({}) exceeds current index count ({})",
219                previous_specs.len(),
220                current_specs.len()
221            )));
222        }
223        for (idx, previous) in previous_specs.iter().enumerate() {
224            let current = &current_specs[idx];
225            if !resolved_index_layout_matches(previous, current) {
226                return Err(DataFusionError::Execution(format!(
227                    "table '{table_name}' index evolution must be append-only; index at position {} changed",
228                    idx + 1
229                )));
230            }
231        }
232
233        let full_range = primary_key_prefix_range(model.table_prefix);
234        let mut cursor = options
235            .start_from_primary_key
236            .unwrap_or_else(|| full_range.start.clone());
237        if !model.primary_key_codec.matches(&cursor) {
238            return Err(DataFusionError::Execution(
239                "index backfill start_from_primary_key must use this table's primary-key prefix"
240                    .to_string(),
241            ));
242        }
243        if cursor < full_range.start || cursor > full_range.end {
244            return Err(DataFusionError::Execution(
245                "index backfill start_from_primary_key is outside table key range".to_string(),
246            ));
247        }
248
249        let new_specs = current_specs[previous_specs.len()..].to_vec();
250        if new_specs.is_empty() {
251            let report = IndexBackfillReport::default();
252            send_backfill_event(
253                progress_tx,
254                IndexBackfillEvent::Started {
255                    table_name: table_name.to_string(),
256                    indexes_backfilled: 0,
257                    row_batch_size: options.row_batch_size,
258                    start_cursor: cursor.clone(),
259                },
260            );
261            send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
262            return Ok(report);
263        }
264
265        let mut report = IndexBackfillReport {
266            scanned_rows: 0,
267            indexes_backfilled: new_specs.len(),
268            index_entries_written: 0,
269        };
270        let mut pending_keys = Vec::new();
271        let mut pending_values = Vec::new();
272        let session = self.client.create_session();
273        let decode_pk_mask = vec![true; model.primary_key_kinds.len()];
274        send_backfill_event(
275            progress_tx,
276            IndexBackfillEvent::Started {
277                table_name: table_name.to_string(),
278                indexes_backfilled: new_specs.len(),
279                row_batch_size: options.row_batch_size,
280                start_cursor: cursor.clone(),
281            },
282        );
283
284        loop {
285            let mut stream = session
286                .range_stream(
287                    &cursor,
288                    &full_range.end,
289                    options.row_batch_size,
290                    options.row_batch_size,
291                )
292                .await
293                .map_err(|e| DataFusionError::External(Box::new(e)))?;
294            let mut last_key = None;
295            while let Some(chunk) = stream
296                .next_chunk()
297                .await
298                .map_err(|e| DataFusionError::External(Box::new(e)))?
299            {
300                for (base_key, base_value) in &chunk {
301                    last_key = Some(base_key.clone());
302                    let Some(pk_values) = decode_primary_key_selected(
303                        model.table_prefix,
304                        base_key,
305                        &model,
306                        &decode_pk_mask,
307                    ) else {
308                        return Err(DataFusionError::Execution(format!(
309                            "invalid primary key while backfilling index (key={})",
310                            hex::encode(base_key)
311                        )));
312                    };
313                    let archived = decode_stored_row(base_value).map_err(|e| {
314                        DataFusionError::Execution(format!(
315                            "invalid base row payload while backfilling index (key={}): {e}",
316                            hex::encode(base_key)
317                        ))
318                    })?;
319                    if archived.values.len() != model.columns.len() {
320                        return Err(DataFusionError::Execution(format!(
321                            "invalid base row payload while backfilling index (key={})",
322                            hex::encode(base_key)
323                        )));
324                    }
325                    report.scanned_rows += 1;
326
327                    for spec in &new_specs {
328                        let index_key = encode_secondary_index_key_from_parts(
329                            model.table_prefix,
330                            spec,
331                            &model,
332                            &pk_values,
333                            &archived,
334                        )?;
335                        let index_value =
336                            encode_secondary_index_value_from_archived(&archived, &model, spec)?;
337                        pending_keys.push(index_key);
338                        pending_values.push(index_value);
339                        report.index_entries_written += 1;
340                    }
341
342                    if pending_keys.len() >= INDEX_BACKFILL_FLUSH_ENTRIES {
343                        flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values)
344                            .await?;
345                    }
346                }
347            }
348            let Some(last_key) = last_key else {
349                break;
350            };
351
352            let next_cursor = if last_key >= full_range.end {
353                None
354            } else {
355                next_key(&last_key)
356            };
357            if !pending_keys.is_empty() {
358                flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
359            }
360            send_backfill_event(
361                progress_tx,
362                IndexBackfillEvent::Progress {
363                    scanned_rows: report.scanned_rows,
364                    index_entries_written: report.index_entries_written,
365                    last_scanned_primary_key: last_key,
366                    next_cursor: next_cursor.clone(),
367                },
368            );
369
370            if let Some(next) = next_cursor {
371                cursor = next;
372            } else {
373                break;
374            }
375        }
376
377        if !pending_keys.is_empty() {
378            flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
379        }
380        send_backfill_event(progress_tx, IndexBackfillEvent::Completed { report });
381        Ok(report)
382    }
383}
384
385pub(crate) fn send_backfill_event(
386    progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<IndexBackfillEvent>>,
387    event: IndexBackfillEvent,
388) {
389    if let Some(tx) = progress_tx {
390        let _ = tx.send(event);
391    }
392}
393
394pub(crate) fn resolved_index_layout_matches(
395    previous: &ResolvedIndexSpec,
396    current: &ResolvedIndexSpec,
397) -> bool {
398    previous.id == current.id
399        && previous.name == current.name
400        && previous.layout == current.layout
401        && previous.key_columns == current.key_columns
402        && previous.value_column_mask == current.value_column_mask
403        && previous.key_columns_width == current.key_columns_width
404}
405
406#[async_trait]
407impl TableProvider for KvTable {
408    fn as_any(&self) -> &dyn Any {
409        self
410    }
411
412    fn schema(&self) -> SchemaRef {
413        self.model.schema.clone()
414    }
415
416    fn table_type(&self) -> TableType {
417        TableType::Base
418    }
419
420    fn supports_filters_pushdown(
421        &self,
422        filters: &[&Expr],
423    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
424        Ok(filters
425            .iter()
426            .map(|expr| {
427                if QueryPredicate::supports_filter(expr, &self.model) {
428                    TableProviderFilterPushDown::Exact
429                } else {
430                    TableProviderFilterPushDown::Unsupported
431                }
432            })
433            .collect())
434    }
435
436    async fn scan(
437        &self,
438        _state: &dyn Session,
439        projection: Option<&Vec<usize>>,
440        filters: &[Expr],
441        limit: Option<usize>,
442    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
443        let predicate = QueryPredicate::from_filters(filters, &self.model);
444        let projected_schema = match projection {
445            Some(proj) => Arc::new(self.model.schema.project(proj)?),
446            None => self.model.schema.clone(),
447        };
448        Ok(Arc::new(KvScanExec::new(
449            self.client.clone(),
450            self.model.clone(),
451            self.index_specs.clone(),
452            predicate,
453            limit,
454            projected_schema,
455            projection.cloned(),
456        )))
457    }
458
459    async fn insert_into(
460        &self,
461        _state: &dyn Session,
462        input: Arc<dyn ExecutionPlan>,
463        insert_op: InsertOp,
464    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
465        self.schema()
466            .logically_equivalent_names_and_types(&input.schema())?;
467        if insert_op != InsertOp::Append {
468            return Err(DataFusionError::NotImplemented(format!(
469                "{insert_op} not implemented for kv table"
470            )));
471        }
472
473        let sink = KvIngestSink::new(
474            self.client.clone(),
475            self.model.schema.clone(),
476            self.model.clone(),
477            self.index_specs.clone(),
478        );
479        Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
480    }
481}