Skip to main content

exoware_sql/
writer.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::sync::Arc;
5use std::sync::Mutex;
6
7use async_trait::async_trait;
8use commonware_codec::Encode;
9use datafusion::arrow::array::{
10    ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array,
11    FixedSizeBinaryArray, Float64Array, Int64Array, LargeStringArray, ListArray, StringArray,
12    StringViewArray, TimestampMicrosecondArray, UInt64Array,
13};
14use datafusion::arrow::datatypes::{i256, SchemaRef};
15use datafusion::arrow::record_batch::RecordBatch;
16use datafusion::common::{DataFusionError, Result as DataFusionResult};
17use datafusion::datasource::sink::DataSink;
18use datafusion::execution::context::TaskContext;
19use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
20use exoware_sdk::keys::Key;
21#[cfg(test)]
22use exoware_sdk::kv_codec::decode_stored_row;
23use exoware_sdk::kv_codec::{StoredRow, StoredValue};
24use exoware_sdk::{StoreBatchUpload, StoreClient, StoreWriteBatch};
25use futures::{future::BoxFuture, TryStreamExt};
26
27use crate::builder::archived_non_pk_value_is_valid;
28use crate::codec::*;
29use crate::types::*;
30
31#[derive(Debug)]
32pub struct TableWriter {
33    model: Arc<TableModel>,
34    index_specs: Arc<Vec<ResolvedIndexSpec>>,
35}
36
37impl TableWriter {
38    pub fn encode_row(&self, values: Vec<CellValue>) -> Result<Vec<(Key, Vec<u8>)>, String> {
39        let row = KvRow { values };
40        if row.values.len() != self.model.columns.len() {
41            return Err(format!(
42                "expected {} values, got {}",
43                self.model.columns.len(),
44                row.values.len()
45            ));
46        }
47        let base_key = encode_primary_key_from_row(self.model.table_prefix, &row, &self.model)?;
48        let base_value = encode_base_row_value(&row, &self.model).map_err(|e| format!("{e}"))?;
49        let mut out = vec![(base_key, base_value)];
50        for spec in self.index_specs.iter() {
51            let idx_key =
52                encode_secondary_index_key(self.model.table_prefix, spec, &self.model, &row)?;
53            let idx_value = encode_secondary_index_value(&row, &self.model, spec)
54                .map_err(|e| format!("{e}"))?;
55            out.push((idx_key, idx_value));
56        }
57        Ok(out)
58    }
59}
60
61#[derive(Debug)]
62pub struct BatchWriter {
63    client: StoreClient,
64    tables: HashMap<String, TableWriter>,
65    next_request_id: u64,
66    failed_prepared: Mutex<Vec<PreparedBatch>>,
67    pub(crate) pending_keys: Vec<Key>,
68    pub(crate) pending_values: Vec<Vec<u8>>,
69}
70
71#[derive(Debug)]
72#[must_use]
73pub struct PreparedBatch {
74    request_id: u64,
75    keys: Vec<Key>,
76    values: Vec<Vec<u8>>,
77}
78
79impl PreparedBatch {
80    pub fn request_id(&self) -> u64 {
81        self.request_id
82    }
83
84    pub fn entry_count(&self) -> usize {
85        self.keys.len()
86    }
87
88    pub fn is_empty(&self) -> bool {
89        self.keys.is_empty()
90    }
91}
92
93#[derive(Clone, Copy, Debug, PartialEq, Eq)]
94pub struct BatchReceipt {
95    pub writer_request_id: u64,
96    pub entry_count: usize,
97    pub store_sequence_number: u64,
98}
99
100impl BatchWriter {
101    pub(crate) fn new(client: StoreClient, table_configs: &[(String, KvTableConfig)]) -> Self {
102        let mut tables = HashMap::new();
103        for (name, config) in table_configs {
104            let model = Arc::new(
105                TableModel::from_config(config).expect("config already validated by KvSchema"),
106            );
107            let index_specs = Arc::new(
108                model
109                    .resolve_index_specs(&config.index_specs)
110                    .expect("specs already validated by KvSchema"),
111            );
112            tables.insert(name.clone(), TableWriter { model, index_specs });
113        }
114        Self {
115            client,
116            tables,
117            next_request_id: 0,
118            failed_prepared: Mutex::new(Vec::new()),
119            pending_keys: Vec::new(),
120            pending_values: Vec::new(),
121        }
122    }
123
124    pub fn insert(
125        &mut self,
126        table_name: &str,
127        values: Vec<CellValue>,
128    ) -> Result<&mut Self, String> {
129        let writer = self
130            .tables
131            .get(table_name)
132            .ok_or_else(|| format!("unknown table '{table_name}'"))?;
133        let entries = writer.encode_row(values)?;
134        for (key, value) in entries {
135            self.pending_keys.push(key);
136            self.pending_values.push(value);
137        }
138        Ok(self)
139    }
140
141    pub fn pending_count(&self) -> usize {
142        self.pending_keys.len()
143            + self
144                .failed_prepared
145                .lock()
146                .expect("failed prepared mutex poisoned")
147                .iter()
148                .map(PreparedBatch::entry_count)
149                .sum::<usize>()
150    }
151
152    /// Flush pending rows to ingest and return the post-ingest consistency token.
153    pub async fn flush(&mut self) -> DataFusionResult<u64> {
154        Ok(self
155            .flush_with_receipt()
156            .await?
157            .map(|receipt| receipt.store_sequence_number)
158            .unwrap_or(0))
159    }
160
161    /// Flush pending rows to ingest and return metadata for the persisted batch.
162    pub async fn flush_with_receipt(&mut self) -> DataFusionResult<Option<BatchReceipt>> {
163        let Some(prepared) = self.prepare_flush()? else {
164            return Ok(None);
165        };
166        Ok(Some(self.commit_upload(&self.client, prepared).await?))
167    }
168
169    pub fn prepare_flush(&mut self) -> DataFusionResult<Option<PreparedBatch>> {
170        if let Some(prepared) = self.take_failed_prepared() {
171            return Ok(Some(prepared));
172        }
173        if self.pending_keys.is_empty() {
174            return Ok(None);
175        }
176        let request_id = self.next_request_id;
177        self.next_request_id += 1;
178        Ok(Some(PreparedBatch {
179            request_id,
180            keys: std::mem::take(&mut self.pending_keys),
181            values: std::mem::take(&mut self.pending_values),
182        }))
183    }
184
185    pub fn stage_flush(
186        &self,
187        prepared: &PreparedBatch,
188        batch: &mut StoreWriteBatch,
189    ) -> DataFusionResult<()> {
190        for (key, value) in prepared.keys.iter().zip(prepared.values.iter()) {
191            batch
192                .push(&self.client, key, value)
193                .map_err(|e| DataFusionError::External(Box::new(e)))?;
194        }
195        Ok(())
196    }
197
198    pub fn mark_flush_persisted(
199        &self,
200        prepared: PreparedBatch,
201        sequence_number: u64,
202    ) -> BatchReceipt {
203        BatchReceipt {
204            writer_request_id: prepared.request_id,
205            entry_count: prepared.entry_count(),
206            store_sequence_number: sequence_number,
207        }
208    }
209
210    pub fn mark_flush_failed(&self, prepared: PreparedBatch) {
211        self.failed_prepared
212            .lock()
213            .expect("failed prepared mutex poisoned")
214            .push(prepared);
215    }
216
217    fn take_failed_prepared(&self) -> Option<PreparedBatch> {
218        let mut failed = self
219            .failed_prepared
220            .lock()
221            .expect("failed prepared mutex poisoned");
222        let (idx, _) = failed
223            .iter()
224            .enumerate()
225            .min_by_key(|(_, prepared)| prepared.request_id)?;
226        Some(failed.remove(idx))
227    }
228}
229
230impl StoreBatchUpload for BatchWriter {
231    type Prepared = PreparedBatch;
232    type Receipt = BatchReceipt;
233    type Error = DataFusionError;
234
235    fn stage_upload(
236        &self,
237        prepared: &Self::Prepared,
238        batch: &mut StoreWriteBatch,
239    ) -> Result<(), Self::Error> {
240        self.stage_flush(prepared, batch)
241    }
242
243    fn commit_error(&self, error: exoware_sdk::ClientError) -> Self::Error {
244        DataFusionError::External(Box::new(error))
245    }
246
247    fn mark_upload_persisted<'a>(
248        &'a self,
249        prepared: Self::Prepared,
250        sequence_number: u64,
251    ) -> BoxFuture<'a, Self::Receipt>
252    where
253        Self: Sync + 'a,
254        Self::Prepared: 'a,
255    {
256        Box::pin(async move { self.mark_flush_persisted(prepared, sequence_number) })
257    }
258
259    fn mark_upload_failed<'a>(
260        &'a self,
261        prepared: Self::Prepared,
262        _error: String,
263    ) -> BoxFuture<'a, ()>
264    where
265        Self: Sync + 'a,
266        Self::Prepared: 'a,
267    {
268        Box::pin(async move {
269            self.mark_flush_failed(prepared);
270        })
271    }
272}
273
274#[derive(Debug)]
275pub(crate) struct KvIngestSink {
276    pub(crate) client: StoreClient,
277    pub(crate) schema: SchemaRef,
278    pub(crate) model: Arc<TableModel>,
279    pub(crate) index_specs: Arc<Vec<ResolvedIndexSpec>>,
280}
281
282impl KvIngestSink {
283    pub(crate) fn new(
284        client: StoreClient,
285        schema: SchemaRef,
286        model: Arc<TableModel>,
287        index_specs: Arc<Vec<ResolvedIndexSpec>>,
288    ) -> Self {
289        Self {
290            client,
291            schema,
292            model,
293            index_specs,
294        }
295    }
296}
297
298impl DisplayAs for KvIngestSink {
299    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300        write!(f, "KvIngestSink")
301    }
302}
303
304#[async_trait]
305impl DataSink for KvIngestSink {
306    fn as_any(&self) -> &dyn Any {
307        self
308    }
309
310    fn schema(&self) -> &SchemaRef {
311        &self.schema
312    }
313
314    async fn write_all(
315        &self,
316        data: SendableRecordBatchStream,
317        _context: &Arc<TaskContext>,
318    ) -> DataFusionResult<u64> {
319        let mut data = data;
320        let mut pending_keys: Vec<Key> = Vec::new();
321        let mut pending_values: Vec<Vec<u8>> = Vec::new();
322        let mut logical_rows_written = 0u64;
323
324        while let Some(batch) = data.try_next().await? {
325            let encoded_entries = encode_insert_entries(&batch, &self.model, &self.index_specs)?;
326            logical_rows_written += batch.num_rows() as u64;
327            for (key, value) in encoded_entries {
328                pending_keys.push(key);
329                pending_values.push(value);
330            }
331        }
332
333        if !pending_keys.is_empty() {
334            flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
335        }
336        Ok(logical_rows_written)
337    }
338}
339
340pub(crate) fn encode_insert_entries(
341    batch: &RecordBatch,
342    model: &TableModel,
343    index_specs: &[ResolvedIndexSpec],
344) -> DataFusionResult<Vec<(Key, Vec<u8>)>> {
345    let mut out = Vec::with_capacity(batch.num_rows() * (1 + index_specs.len()));
346    for row_idx in 0..batch.num_rows() {
347        let row = extract_row_from_batch(batch, row_idx, model)?;
348        let base_key = encode_primary_key_from_row(model.table_prefix, &row, model)
349            .map_err(DataFusionError::Execution)?;
350        let base_value = encode_base_row_value(&row, model)?;
351        out.push((base_key, base_value));
352
353        for spec in index_specs {
354            let secondary_key = encode_secondary_index_key(model.table_prefix, spec, model, &row)
355                .map_err(DataFusionError::Execution)?;
356            let secondary_value = encode_secondary_index_value(&row, model, spec)?;
357            out.push((secondary_key, secondary_value));
358        }
359    }
360    Ok(out)
361}
362
363pub(crate) fn extract_row_from_batch(
364    batch: &RecordBatch,
365    row_idx: usize,
366    model: &TableModel,
367) -> DataFusionResult<KvRow> {
368    let mut values = Vec::with_capacity(model.columns.len());
369    for col in &model.columns {
370        let array = required_column(batch, &col.name)?;
371        if col.nullable && array.is_null(row_idx) {
372            values.push(CellValue::Null);
373            continue;
374        }
375        let value = match col.kind {
376            ColumnKind::Int64 => CellValue::Int64(i64_value_at(array, row_idx, &col.name)?),
377            ColumnKind::UInt64 => CellValue::UInt64(uint64_value_at(array, row_idx, &col.name)?),
378            ColumnKind::Float64 => CellValue::Float64(f64_value_at(array, row_idx, &col.name)?),
379            ColumnKind::Boolean => CellValue::Boolean(bool_value_at(array, row_idx, &col.name)?),
380            ColumnKind::Date32 => CellValue::Date32(date32_value_at(array, row_idx, &col.name)?),
381            ColumnKind::Date64 => CellValue::Date64(date64_value_at(array, row_idx, &col.name)?),
382            ColumnKind::Timestamp => {
383                CellValue::Timestamp(timestamp_micros_value_at(array, row_idx, &col.name)?)
384            }
385            ColumnKind::Decimal128 => {
386                CellValue::Decimal128(decimal128_value_at(array, row_idx, &col.name)?)
387            }
388            ColumnKind::Decimal256 => {
389                CellValue::Decimal256(decimal256_value_at(array, row_idx, &col.name)?)
390            }
391            ColumnKind::Utf8 => CellValue::Utf8(string_value_at(array, row_idx, &col.name)?),
392            ColumnKind::FixedSizeBinary(_) => {
393                CellValue::FixedBinary(fixed_binary_value_at(array, row_idx, &col.name)?)
394            }
395            ColumnKind::List(elem) => list_value_at(array, row_idx, &col.name, elem)?,
396        };
397        values.push(value);
398    }
399    Ok(KvRow { values })
400}
401
402pub(crate) fn encode_base_row_value(row: &KvRow, model: &TableModel) -> DataFusionResult<Vec<u8>> {
403    let mut values = Vec::with_capacity(model.columns.len());
404    for (idx, col) in model.columns.iter().enumerate() {
405        if model.is_pk_column(idx) {
406            values.push(None);
407            continue;
408        }
409        values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
410    }
411    let stored_row = StoredRow { values };
412    Ok(stored_row.encode().to_vec())
413}
414
415pub(crate) fn encode_secondary_index_value(
416    row: &KvRow,
417    model: &TableModel,
418    spec: &ResolvedIndexSpec,
419) -> DataFusionResult<Vec<u8>> {
420    let mut values = Vec::with_capacity(model.columns.len());
421    for (idx, col) in model.columns.iter().enumerate() {
422        if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
423            values.push(None);
424            continue;
425        }
426        values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
427    }
428    let stored_row = StoredRow { values };
429    Ok(stored_row.encode().to_vec())
430}
431
432pub(crate) fn encode_secondary_index_value_from_archived(
433    archived: &StoredRow,
434    model: &TableModel,
435    spec: &ResolvedIndexSpec,
436) -> DataFusionResult<Vec<u8>> {
437    if archived.values.len() != model.columns.len() {
438        return Err(DataFusionError::Execution(
439            "archived row column count mismatch".to_string(),
440        ));
441    }
442    let mut values = Vec::with_capacity(model.columns.len());
443    for (idx, col) in model.columns.iter().enumerate() {
444        if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
445            values.push(None);
446            continue;
447        }
448        let stored_opt = archived.values.get(idx).and_then(|value| value.as_ref());
449        if !archived_non_pk_value_is_valid(col, stored_opt) {
450            return Err(DataFusionError::Execution(format!(
451                "invalid archived value for secondary index column '{}'",
452                col.name
453            )));
454        }
455        values.push(owned_stored_value_from_archived(stored_opt)?);
456    }
457    let stored_row = StoredRow { values };
458    Ok(stored_row.encode().to_vec())
459}
460
461pub(crate) fn encode_non_pk_cell_value(
462    value: &CellValue,
463    col: &ResolvedColumn,
464) -> DataFusionResult<Option<StoredValue>> {
465    match (col.kind, value) {
466        (_, CellValue::Null) => {
467            if !col.nullable {
468                return Err(DataFusionError::Execution(format!(
469                    "column '{}' is not nullable but received NULL",
470                    col.name
471                )));
472            }
473            Ok(None)
474        }
475        (ColumnKind::Int64, CellValue::Int64(v)) => Ok(Some(StoredValue::Int64(*v))),
476        (ColumnKind::UInt64, CellValue::UInt64(v)) => Ok(Some(StoredValue::UInt64(*v))),
477        (ColumnKind::Float64, CellValue::Float64(v)) => Ok(Some(StoredValue::Float64(*v))),
478        (ColumnKind::Boolean, CellValue::Boolean(v)) => Ok(Some(StoredValue::Boolean(*v))),
479        (ColumnKind::Date32, CellValue::Date32(v)) => Ok(Some(StoredValue::Int64(*v as i64))),
480        (ColumnKind::Date64, CellValue::Date64(v)) => Ok(Some(StoredValue::Int64(*v))),
481        (ColumnKind::Timestamp, CellValue::Timestamp(v)) => Ok(Some(StoredValue::Int64(*v))),
482        (ColumnKind::Decimal128, CellValue::Decimal128(v)) => {
483            Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
484        }
485        (ColumnKind::Decimal256, CellValue::Decimal256(v)) => {
486            Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
487        }
488        (ColumnKind::Utf8, CellValue::Utf8(v)) => Ok(Some(StoredValue::Utf8(v.clone()))),
489        (ColumnKind::FixedSizeBinary(n), CellValue::FixedBinary(v)) => {
490            if v.len() != n {
491                return Err(DataFusionError::Execution(format!(
492                    "column '{}' expects FixedSizeBinary({n}) value with exactly {n} bytes, got {}",
493                    col.name,
494                    v.len()
495                )));
496            }
497            Ok(Some(StoredValue::Bytes(v.clone())))
498        }
499        (ColumnKind::List(elem), CellValue::List(items)) => {
500            let mut stored_items = Vec::with_capacity(items.len());
501            for item in items {
502                let stored_item = match (elem, item) {
503                    (ListElementKind::Int64, CellValue::Int64(v)) => StoredValue::Int64(*v),
504                    (ListElementKind::Float64, CellValue::Float64(v)) => StoredValue::Float64(*v),
505                    (ListElementKind::Boolean, CellValue::Boolean(v)) => StoredValue::Boolean(*v),
506                    (ListElementKind::Utf8, CellValue::Utf8(v)) => StoredValue::Utf8(v.clone()),
507                    _ => {
508                        return Err(DataFusionError::Execution(format!(
509                            "column '{}' list element type mismatch (expected {:?}, got {:?})",
510                            col.name, elem, item
511                        )))
512                    }
513                };
514                stored_items.push(stored_item);
515            }
516            Ok(Some(StoredValue::List(stored_items)))
517        }
518        _ => Err(DataFusionError::Execution(format!(
519            "column '{}' type mismatch (expected {:?}, got {:?})",
520            col.name, col.kind, value
521        ))),
522    }
523}
524
525pub(crate) fn owned_stored_value_from_archived(
526    stored_opt: Option<&StoredValue>,
527) -> DataFusionResult<Option<StoredValue>> {
528    let Some(stored) = stored_opt else {
529        return Ok(None);
530    };
531    Ok(Some(match stored {
532        StoredValue::Int64(v) => StoredValue::Int64(*v),
533        StoredValue::UInt64(v) => StoredValue::UInt64(*v),
534        StoredValue::Float64(v) => StoredValue::Float64(*v),
535        StoredValue::Boolean(v) => StoredValue::Boolean(*v),
536        StoredValue::Utf8(v) => StoredValue::Utf8(v.as_str().to_string()),
537        StoredValue::Bytes(v) => StoredValue::Bytes(v.as_slice().to_vec()),
538        StoredValue::List(items) => {
539            let mut out = Vec::with_capacity(items.len());
540            for item in items.iter() {
541                let owned = owned_stored_value_from_archived(Some(item))?.ok_or_else(|| {
542                    DataFusionError::Execution(
543                        "archived list item unexpectedly decoded as NULL".to_string(),
544                    )
545                })?;
546                out.push(owned);
547            }
548            StoredValue::List(out)
549        }
550    }))
551}
552
553#[cfg(test)]
554pub(crate) fn decode_base_row(
555    pk_values: Vec<CellValue>,
556    value: &[u8],
557    model: &TableModel,
558) -> Option<KvRow> {
559    if pk_values.len() != model.primary_key_indices.len() {
560        return None;
561    }
562    let archived = decode_stored_row(value).ok()?;
563    if archived.values.len() != model.columns.len() {
564        return None;
565    }
566    let mut values = vec![CellValue::Null; model.columns.len()];
567    for (pk_pos, pk_value) in pk_values.into_iter().enumerate() {
568        let col_idx = *model.primary_key_indices.get(pk_pos)?;
569        values[col_idx] = pk_value;
570    }
571
572    for (idx, col) in model.columns.iter().enumerate() {
573        if model.is_pk_column(idx) {
574            continue;
575        }
576        let Some(stored) = archived.values[idx].as_ref() else {
577            if col.nullable {
578                continue;
579            }
580            return None;
581        };
582        values[idx] = match (col.kind, stored) {
583            (ColumnKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
584            (ColumnKind::UInt64, StoredValue::UInt64(v)) => CellValue::UInt64(*v),
585            (ColumnKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
586            (ColumnKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
587            (ColumnKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
588            (ColumnKind::Date32, StoredValue::Int64(v)) => CellValue::Date32(*v as i32),
589            (ColumnKind::Date64, StoredValue::Int64(v)) => CellValue::Date64(*v),
590            (ColumnKind::Timestamp, StoredValue::Int64(v)) => CellValue::Timestamp(*v),
591            (ColumnKind::Decimal128, StoredValue::Bytes(bytes)) => {
592                let arr: [u8; 16] = bytes.as_slice().try_into().ok()?;
593                CellValue::Decimal128(i128::from_le_bytes(arr))
594            }
595            (ColumnKind::Decimal256, StoredValue::Bytes(bytes)) => {
596                let arr: [u8; 32] = bytes.as_slice().try_into().ok()?;
597                CellValue::Decimal256(i256::from_le_bytes(arr))
598            }
599            (ColumnKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
600            (ColumnKind::FixedSizeBinary(_), StoredValue::Bytes(v)) => {
601                CellValue::FixedBinary(v.as_slice().to_vec())
602            }
603            (ColumnKind::List(elem), StoredValue::List(items)) => {
604                let mut cells = Vec::with_capacity(items.len());
605                for item in items.iter() {
606                    cells.push(decode_list_element_archived(elem, item)?);
607                }
608                CellValue::List(cells)
609            }
610            _ => return None,
611        };
612    }
613    Some(KvRow { values })
614}
615
616pub(crate) fn decode_list_element_archived(
617    elem: ListElementKind,
618    stored: &StoredValue,
619) -> Option<CellValue> {
620    Some(match (elem, stored) {
621        (ListElementKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
622        (ListElementKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
623        (ListElementKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
624        (ListElementKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
625        (ListElementKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
626        _ => return None,
627    })
628}
629
630pub(crate) fn required_column<'a>(
631    batch: &'a RecordBatch,
632    name: &str,
633) -> DataFusionResult<&'a ArrayRef> {
634    batch.column_by_name(name).ok_or_else(|| {
635        DataFusionError::Execution(format!("insert batch is missing required column '{name}'"))
636    })
637}
638
639pub(crate) fn i64_value_at(
640    array: &ArrayRef,
641    row_idx: usize,
642    column_name: &str,
643) -> DataFusionResult<i64> {
644    if array.is_null(row_idx) {
645        return Err(DataFusionError::Execution(format!(
646            "column '{column_name}' cannot be NULL for kv table insert"
647        )));
648    }
649    let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
650        DataFusionError::Execution(format!(
651            "column '{column_name}' expected Int64, got {:?}",
652            array.data_type()
653        ))
654    })?;
655    Ok(values.value(row_idx))
656}
657
658pub(crate) fn string_value_at(
659    array: &ArrayRef,
660    row_idx: usize,
661    column_name: &str,
662) -> DataFusionResult<String> {
663    if array.is_null(row_idx) {
664        return Err(DataFusionError::Execution(format!(
665            "column '{column_name}' cannot be NULL for kv table insert"
666        )));
667    }
668    if let Some(values) = array.as_any().downcast_ref::<StringArray>() {
669        return Ok(values.value(row_idx).to_string());
670    }
671    if let Some(values) = array.as_any().downcast_ref::<LargeStringArray>() {
672        return Ok(values.value(row_idx).to_string());
673    }
674    if let Some(values) = array.as_any().downcast_ref::<StringViewArray>() {
675        return Ok(values.value(row_idx).to_string());
676    }
677    Err(DataFusionError::Execution(format!(
678        "column '{column_name}' expected string, got {:?}",
679        array.data_type()
680    )))
681}
682
683pub(crate) fn f64_value_at(
684    array: &ArrayRef,
685    row_idx: usize,
686    column_name: &str,
687) -> DataFusionResult<f64> {
688    if array.is_null(row_idx) {
689        return Err(DataFusionError::Execution(format!(
690            "column '{column_name}' cannot be NULL for kv table insert"
691        )));
692    }
693    let values = array
694        .as_any()
695        .downcast_ref::<Float64Array>()
696        .ok_or_else(|| {
697            DataFusionError::Execution(format!(
698                "column '{column_name}' expected Float64, got {:?}",
699                array.data_type()
700            ))
701        })?;
702    Ok(values.value(row_idx))
703}
704
705pub(crate) fn bool_value_at(
706    array: &ArrayRef,
707    row_idx: usize,
708    column_name: &str,
709) -> DataFusionResult<bool> {
710    if array.is_null(row_idx) {
711        return Err(DataFusionError::Execution(format!(
712            "column '{column_name}' cannot be NULL for kv table insert"
713        )));
714    }
715    let values = array
716        .as_any()
717        .downcast_ref::<BooleanArray>()
718        .ok_or_else(|| {
719            DataFusionError::Execution(format!(
720                "column '{column_name}' expected Boolean, got {:?}",
721                array.data_type()
722            ))
723        })?;
724    Ok(values.value(row_idx))
725}
726
727pub(crate) fn date32_value_at(
728    array: &ArrayRef,
729    row_idx: usize,
730    column_name: &str,
731) -> DataFusionResult<i32> {
732    if array.is_null(row_idx) {
733        return Err(DataFusionError::Execution(format!(
734            "column '{column_name}' cannot be NULL for kv table insert"
735        )));
736    }
737    let values = array
738        .as_any()
739        .downcast_ref::<Date32Array>()
740        .ok_or_else(|| {
741            DataFusionError::Execution(format!(
742                "column '{column_name}' expected Date32, got {:?}",
743                array.data_type()
744            ))
745        })?;
746    Ok(values.value(row_idx))
747}
748
749pub(crate) fn date64_value_at(
750    array: &ArrayRef,
751    row_idx: usize,
752    column_name: &str,
753) -> DataFusionResult<i64> {
754    if array.is_null(row_idx) {
755        return Err(DataFusionError::Execution(format!(
756            "column '{column_name}' cannot be NULL for kv table insert"
757        )));
758    }
759    let values = array
760        .as_any()
761        .downcast_ref::<Date64Array>()
762        .ok_or_else(|| {
763            DataFusionError::Execution(format!(
764                "column '{column_name}' expected Date64, got {:?}",
765                array.data_type()
766            ))
767        })?;
768    Ok(values.value(row_idx))
769}
770
771pub(crate) fn timestamp_micros_value_at(
772    array: &ArrayRef,
773    row_idx: usize,
774    column_name: &str,
775) -> DataFusionResult<i64> {
776    if array.is_null(row_idx) {
777        return Err(DataFusionError::Execution(format!(
778            "column '{column_name}' cannot be NULL for kv table insert"
779        )));
780    }
781    let values = array
782        .as_any()
783        .downcast_ref::<TimestampMicrosecondArray>()
784        .ok_or_else(|| {
785            DataFusionError::Execution(format!(
786                "column '{column_name}' expected TimestampMicrosecond, got {:?}",
787                array.data_type()
788            ))
789        })?;
790    Ok(values.value(row_idx))
791}
792
793pub(crate) fn decimal128_value_at(
794    array: &ArrayRef,
795    row_idx: usize,
796    column_name: &str,
797) -> DataFusionResult<i128> {
798    if array.is_null(row_idx) {
799        return Err(DataFusionError::Execution(format!(
800            "column '{column_name}' cannot be NULL for kv table insert"
801        )));
802    }
803    let values = array
804        .as_any()
805        .downcast_ref::<Decimal128Array>()
806        .ok_or_else(|| {
807            DataFusionError::Execution(format!(
808                "column '{column_name}' expected Decimal128, got {:?}",
809                array.data_type()
810            ))
811        })?;
812    Ok(values.value(row_idx))
813}
814
815pub(crate) fn uint64_value_at(
816    array: &ArrayRef,
817    row_idx: usize,
818    column_name: &str,
819) -> DataFusionResult<u64> {
820    if array.is_null(row_idx) {
821        return Err(DataFusionError::Execution(format!(
822            "column '{column_name}' cannot be NULL for kv table insert"
823        )));
824    }
825    let values = array
826        .as_any()
827        .downcast_ref::<UInt64Array>()
828        .ok_or_else(|| {
829            DataFusionError::Execution(format!(
830                "column '{column_name}' expected UInt64, got {:?}",
831                array.data_type()
832            ))
833        })?;
834    Ok(values.value(row_idx))
835}
836
837pub(crate) fn decimal256_value_at(
838    array: &ArrayRef,
839    row_idx: usize,
840    column_name: &str,
841) -> DataFusionResult<i256> {
842    if array.is_null(row_idx) {
843        return Err(DataFusionError::Execution(format!(
844            "column '{column_name}' cannot be NULL for kv table insert"
845        )));
846    }
847    let values = array
848        .as_any()
849        .downcast_ref::<Decimal256Array>()
850        .ok_or_else(|| {
851            DataFusionError::Execution(format!(
852                "column '{column_name}' expected Decimal256, got {:?}",
853                array.data_type()
854            ))
855        })?;
856    Ok(values.value(row_idx))
857}
858
859pub(crate) fn fixed_binary_value_at(
860    array: &ArrayRef,
861    row_idx: usize,
862    column_name: &str,
863) -> DataFusionResult<Vec<u8>> {
864    if array.is_null(row_idx) {
865        return Err(DataFusionError::Execution(format!(
866            "column '{column_name}' cannot be NULL for kv table insert"
867        )));
868    }
869    let values = array
870        .as_any()
871        .downcast_ref::<FixedSizeBinaryArray>()
872        .ok_or_else(|| {
873            DataFusionError::Execution(format!(
874                "column '{column_name}' expected FixedSizeBinary, got {:?}",
875                array.data_type()
876            ))
877        })?;
878    Ok(values.value(row_idx).to_vec())
879}
880
881pub(crate) fn list_value_at(
882    array: &ArrayRef,
883    row_idx: usize,
884    column_name: &str,
885    elem: ListElementKind,
886) -> DataFusionResult<CellValue> {
887    if array.is_null(row_idx) {
888        return Err(DataFusionError::Execution(format!(
889            "column '{column_name}' cannot be NULL for kv table insert"
890        )));
891    }
892    let list_array = array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
893        DataFusionError::Execution(format!(
894            "column '{column_name}' expected List, got {:?}",
895            array.data_type()
896        ))
897    })?;
898    let child = list_array.value(row_idx);
899    let mut items = Vec::with_capacity(child.len());
900    for i in 0..child.len() {
901        let item = match elem {
902            ListElementKind::Int64 => {
903                let arr = child.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
904                    DataFusionError::Execution(format!(
905                        "column '{column_name}' list element expected Int64"
906                    ))
907                })?;
908                CellValue::Int64(arr.value(i))
909            }
910            ListElementKind::Float64 => {
911                let arr = child
912                    .as_any()
913                    .downcast_ref::<Float64Array>()
914                    .ok_or_else(|| {
915                        DataFusionError::Execution(format!(
916                            "column '{column_name}' list element expected Float64"
917                        ))
918                    })?;
919                CellValue::Float64(arr.value(i))
920            }
921            ListElementKind::Boolean => {
922                let arr = child
923                    .as_any()
924                    .downcast_ref::<BooleanArray>()
925                    .ok_or_else(|| {
926                        DataFusionError::Execution(format!(
927                            "column '{column_name}' list element expected Boolean"
928                        ))
929                    })?;
930                CellValue::Boolean(arr.value(i))
931            }
932            ListElementKind::Utf8 => {
933                let arr = child
934                    .as_any()
935                    .downcast_ref::<StringArray>()
936                    .ok_or_else(|| {
937                        DataFusionError::Execution(format!(
938                            "column '{column_name}' list element expected Utf8"
939                        ))
940                    })?;
941                CellValue::Utf8(arr.value(i).to_string())
942            }
943        };
944        items.push(item);
945    }
946    Ok(CellValue::List(items))
947}
948
949pub(crate) async fn flush_ingest_batch(
950    client: &StoreClient,
951    keys: &mut Vec<Key>,
952    values: &mut Vec<Vec<u8>>,
953) -> DataFusionResult<u64> {
954    if keys.is_empty() {
955        return Ok(0);
956    }
957    let mut batch = StoreWriteBatch::new();
958    for (key, value) in keys.iter().zip(values.iter()) {
959        batch
960            .push(client, key, value)
961            .map_err(|e| DataFusionError::External(Box::new(e)))?;
962    }
963    let token = batch
964        .commit(client)
965        .await
966        .map_err(|e| DataFusionError::External(Box::new(e)))?;
967    keys.clear();
968    values.clear();
969    Ok(token)
970}