Skip to main content

exoware_sql/
writer.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use commonware_codec::Encode;
8use datafusion::arrow::array::{
9    ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array,
10    FixedSizeBinaryArray, Float64Array, Int64Array, LargeStringArray, ListArray, StringArray,
11    StringViewArray, TimestampMicrosecondArray, UInt64Array,
12};
13use datafusion::arrow::datatypes::{i256, SchemaRef};
14use datafusion::arrow::record_batch::RecordBatch;
15use datafusion::common::{DataFusionError, Result as DataFusionResult};
16use datafusion::datasource::sink::DataSink;
17use datafusion::execution::context::TaskContext;
18use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
19use exoware_sdk_rs::keys::Key;
20#[cfg(test)]
21use exoware_sdk_rs::kv_codec::decode_stored_row;
22use exoware_sdk_rs::kv_codec::{StoredRow, StoredValue};
23use exoware_sdk_rs::StoreClient;
24use futures::TryStreamExt;
25
26use crate::builder::archived_non_pk_value_is_valid;
27use crate::codec::*;
28use crate::types::*;
29
30#[derive(Debug)]
31pub struct TableWriter {
32    model: Arc<TableModel>,
33    index_specs: Arc<Vec<ResolvedIndexSpec>>,
34}
35
36impl TableWriter {
37    pub fn encode_row(&self, values: Vec<CellValue>) -> Result<Vec<(Key, Vec<u8>)>, String> {
38        let row = KvRow { values };
39        if row.values.len() != self.model.columns.len() {
40            return Err(format!(
41                "expected {} values, got {}",
42                self.model.columns.len(),
43                row.values.len()
44            ));
45        }
46        let base_key = encode_primary_key_from_row(self.model.table_prefix, &row, &self.model)?;
47        let base_value = encode_base_row_value(&row, &self.model).map_err(|e| format!("{e}"))?;
48        let mut out = vec![(base_key, base_value)];
49        for spec in self.index_specs.iter() {
50            let idx_key =
51                encode_secondary_index_key(self.model.table_prefix, spec, &self.model, &row)?;
52            let idx_value = encode_secondary_index_value(&row, &self.model, spec)
53                .map_err(|e| format!("{e}"))?;
54            out.push((idx_key, idx_value));
55        }
56        Ok(out)
57    }
58}
59
60#[derive(Debug)]
61pub struct BatchWriter {
62    client: StoreClient,
63    tables: HashMap<String, TableWriter>,
64    pub(crate) pending_keys: Vec<Key>,
65    pub(crate) pending_values: Vec<Vec<u8>>,
66}
67
68impl BatchWriter {
69    pub(crate) fn new(client: StoreClient, table_configs: &[(String, KvTableConfig)]) -> Self {
70        let mut tables = HashMap::new();
71        for (name, config) in table_configs {
72            let model = Arc::new(
73                TableModel::from_config(config).expect("config already validated by KvSchema"),
74            );
75            let index_specs = Arc::new(
76                model
77                    .resolve_index_specs(&config.index_specs)
78                    .expect("specs already validated by KvSchema"),
79            );
80            tables.insert(name.clone(), TableWriter { model, index_specs });
81        }
82        Self {
83            client,
84            tables,
85            pending_keys: Vec::new(),
86            pending_values: Vec::new(),
87        }
88    }
89
90    pub fn insert(
91        &mut self,
92        table_name: &str,
93        values: Vec<CellValue>,
94    ) -> Result<&mut Self, String> {
95        let writer = self
96            .tables
97            .get(table_name)
98            .ok_or_else(|| format!("unknown table '{table_name}'"))?;
99        let entries = writer.encode_row(values)?;
100        for (key, value) in entries {
101            self.pending_keys.push(key);
102            self.pending_values.push(value);
103        }
104        Ok(self)
105    }
106
107    pub fn pending_count(&self) -> usize {
108        self.pending_keys.len()
109    }
110
111    /// Flush pending rows to ingest and return the post-ingest consistency token.
112    pub async fn flush(&mut self) -> DataFusionResult<u64> {
113        if self.pending_keys.is_empty() {
114            return Ok(0);
115        }
116        flush_ingest_batch(
117            &self.client,
118            &mut self.pending_keys,
119            &mut self.pending_values,
120        )
121        .await
122    }
123}
124
125#[derive(Debug)]
126pub(crate) struct KvIngestSink {
127    pub(crate) client: StoreClient,
128    pub(crate) schema: SchemaRef,
129    pub(crate) model: Arc<TableModel>,
130    pub(crate) index_specs: Arc<Vec<ResolvedIndexSpec>>,
131}
132
133impl KvIngestSink {
134    pub(crate) fn new(
135        client: StoreClient,
136        schema: SchemaRef,
137        model: Arc<TableModel>,
138        index_specs: Arc<Vec<ResolvedIndexSpec>>,
139    ) -> Self {
140        Self {
141            client,
142            schema,
143            model,
144            index_specs,
145        }
146    }
147}
148
149impl DisplayAs for KvIngestSink {
150    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        write!(f, "KvIngestSink")
152    }
153}
154
155#[async_trait]
156impl DataSink for KvIngestSink {
157    fn as_any(&self) -> &dyn Any {
158        self
159    }
160
161    fn schema(&self) -> &SchemaRef {
162        &self.schema
163    }
164
165    async fn write_all(
166        &self,
167        data: SendableRecordBatchStream,
168        _context: &Arc<TaskContext>,
169    ) -> DataFusionResult<u64> {
170        let mut data = data;
171        let mut pending_keys: Vec<Key> = Vec::new();
172        let mut pending_values: Vec<Vec<u8>> = Vec::new();
173        let mut logical_rows_written = 0u64;
174
175        while let Some(batch) = data.try_next().await? {
176            let encoded_entries = encode_insert_entries(&batch, &self.model, &self.index_specs)?;
177            logical_rows_written += batch.num_rows() as u64;
178            for (key, value) in encoded_entries {
179                pending_keys.push(key);
180                pending_values.push(value);
181            }
182        }
183
184        if !pending_keys.is_empty() {
185            flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
186        }
187        Ok(logical_rows_written)
188    }
189}
190
191pub(crate) fn encode_insert_entries(
192    batch: &RecordBatch,
193    model: &TableModel,
194    index_specs: &[ResolvedIndexSpec],
195) -> DataFusionResult<Vec<(Key, Vec<u8>)>> {
196    let mut out = Vec::with_capacity(batch.num_rows() * (1 + index_specs.len()));
197    for row_idx in 0..batch.num_rows() {
198        let row = extract_row_from_batch(batch, row_idx, model)?;
199        let base_key = encode_primary_key_from_row(model.table_prefix, &row, model)
200            .map_err(DataFusionError::Execution)?;
201        let base_value = encode_base_row_value(&row, model)?;
202        out.push((base_key, base_value));
203
204        for spec in index_specs {
205            let secondary_key = encode_secondary_index_key(model.table_prefix, spec, model, &row)
206                .map_err(DataFusionError::Execution)?;
207            let secondary_value = encode_secondary_index_value(&row, model, spec)?;
208            out.push((secondary_key, secondary_value));
209        }
210    }
211    Ok(out)
212}
213
214pub(crate) fn extract_row_from_batch(
215    batch: &RecordBatch,
216    row_idx: usize,
217    model: &TableModel,
218) -> DataFusionResult<KvRow> {
219    let mut values = Vec::with_capacity(model.columns.len());
220    for col in &model.columns {
221        let array = required_column(batch, &col.name)?;
222        if col.nullable && array.is_null(row_idx) {
223            values.push(CellValue::Null);
224            continue;
225        }
226        let value = match col.kind {
227            ColumnKind::Int64 => CellValue::Int64(i64_value_at(array, row_idx, &col.name)?),
228            ColumnKind::UInt64 => CellValue::UInt64(uint64_value_at(array, row_idx, &col.name)?),
229            ColumnKind::Float64 => CellValue::Float64(f64_value_at(array, row_idx, &col.name)?),
230            ColumnKind::Boolean => CellValue::Boolean(bool_value_at(array, row_idx, &col.name)?),
231            ColumnKind::Date32 => CellValue::Date32(date32_value_at(array, row_idx, &col.name)?),
232            ColumnKind::Date64 => CellValue::Date64(date64_value_at(array, row_idx, &col.name)?),
233            ColumnKind::Timestamp => {
234                CellValue::Timestamp(timestamp_micros_value_at(array, row_idx, &col.name)?)
235            }
236            ColumnKind::Decimal128 => {
237                CellValue::Decimal128(decimal128_value_at(array, row_idx, &col.name)?)
238            }
239            ColumnKind::Decimal256 => {
240                CellValue::Decimal256(decimal256_value_at(array, row_idx, &col.name)?)
241            }
242            ColumnKind::Utf8 => CellValue::Utf8(string_value_at(array, row_idx, &col.name)?),
243            ColumnKind::FixedSizeBinary(_) => {
244                CellValue::FixedBinary(fixed_binary_value_at(array, row_idx, &col.name)?)
245            }
246            ColumnKind::List(elem) => list_value_at(array, row_idx, &col.name, elem)?,
247        };
248        values.push(value);
249    }
250    Ok(KvRow { values })
251}
252
253pub(crate) fn encode_base_row_value(row: &KvRow, model: &TableModel) -> DataFusionResult<Vec<u8>> {
254    let mut values = Vec::with_capacity(model.columns.len());
255    for (idx, col) in model.columns.iter().enumerate() {
256        if model.is_pk_column(idx) {
257            values.push(None);
258            continue;
259        }
260        values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
261    }
262    let stored_row = StoredRow { values };
263    Ok(stored_row.encode().to_vec())
264}
265
266pub(crate) fn encode_secondary_index_value(
267    row: &KvRow,
268    model: &TableModel,
269    spec: &ResolvedIndexSpec,
270) -> DataFusionResult<Vec<u8>> {
271    let mut values = Vec::with_capacity(model.columns.len());
272    for (idx, col) in model.columns.iter().enumerate() {
273        if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
274            values.push(None);
275            continue;
276        }
277        values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
278    }
279    let stored_row = StoredRow { values };
280    Ok(stored_row.encode().to_vec())
281}
282
283pub(crate) fn encode_secondary_index_value_from_archived(
284    archived: &StoredRow,
285    model: &TableModel,
286    spec: &ResolvedIndexSpec,
287) -> DataFusionResult<Vec<u8>> {
288    if archived.values.len() != model.columns.len() {
289        return Err(DataFusionError::Execution(
290            "archived row column count mismatch".to_string(),
291        ));
292    }
293    let mut values = Vec::with_capacity(model.columns.len());
294    for (idx, col) in model.columns.iter().enumerate() {
295        if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
296            values.push(None);
297            continue;
298        }
299        let stored_opt = archived.values.get(idx).and_then(|value| value.as_ref());
300        if !archived_non_pk_value_is_valid(col, stored_opt) {
301            return Err(DataFusionError::Execution(format!(
302                "invalid archived value for secondary index column '{}'",
303                col.name
304            )));
305        }
306        values.push(owned_stored_value_from_archived(stored_opt)?);
307    }
308    let stored_row = StoredRow { values };
309    Ok(stored_row.encode().to_vec())
310}
311
312pub(crate) fn encode_non_pk_cell_value(
313    value: &CellValue,
314    col: &ResolvedColumn,
315) -> DataFusionResult<Option<StoredValue>> {
316    match (col.kind, value) {
317        (_, CellValue::Null) => {
318            if !col.nullable {
319                return Err(DataFusionError::Execution(format!(
320                    "column '{}' is not nullable but received NULL",
321                    col.name
322                )));
323            }
324            Ok(None)
325        }
326        (ColumnKind::Int64, CellValue::Int64(v)) => Ok(Some(StoredValue::Int64(*v))),
327        (ColumnKind::UInt64, CellValue::UInt64(v)) => Ok(Some(StoredValue::UInt64(*v))),
328        (ColumnKind::Float64, CellValue::Float64(v)) => Ok(Some(StoredValue::Float64(*v))),
329        (ColumnKind::Boolean, CellValue::Boolean(v)) => Ok(Some(StoredValue::Boolean(*v))),
330        (ColumnKind::Date32, CellValue::Date32(v)) => Ok(Some(StoredValue::Int64(*v as i64))),
331        (ColumnKind::Date64, CellValue::Date64(v)) => Ok(Some(StoredValue::Int64(*v))),
332        (ColumnKind::Timestamp, CellValue::Timestamp(v)) => Ok(Some(StoredValue::Int64(*v))),
333        (ColumnKind::Decimal128, CellValue::Decimal128(v)) => {
334            Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
335        }
336        (ColumnKind::Decimal256, CellValue::Decimal256(v)) => {
337            Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
338        }
339        (ColumnKind::Utf8, CellValue::Utf8(v)) => Ok(Some(StoredValue::Utf8(v.clone()))),
340        (ColumnKind::FixedSizeBinary(n), CellValue::FixedBinary(v)) => {
341            if v.len() != n {
342                return Err(DataFusionError::Execution(format!(
343                    "column '{}' expects FixedSizeBinary({n}) value with exactly {n} bytes, got {}",
344                    col.name,
345                    v.len()
346                )));
347            }
348            Ok(Some(StoredValue::Bytes(v.clone())))
349        }
350        (ColumnKind::List(elem), CellValue::List(items)) => {
351            let mut stored_items = Vec::with_capacity(items.len());
352            for item in items {
353                let stored_item = match (elem, item) {
354                    (ListElementKind::Int64, CellValue::Int64(v)) => StoredValue::Int64(*v),
355                    (ListElementKind::Float64, CellValue::Float64(v)) => StoredValue::Float64(*v),
356                    (ListElementKind::Boolean, CellValue::Boolean(v)) => StoredValue::Boolean(*v),
357                    (ListElementKind::Utf8, CellValue::Utf8(v)) => StoredValue::Utf8(v.clone()),
358                    _ => {
359                        return Err(DataFusionError::Execution(format!(
360                            "column '{}' list element type mismatch (expected {:?}, got {:?})",
361                            col.name, elem, item
362                        )))
363                    }
364                };
365                stored_items.push(stored_item);
366            }
367            Ok(Some(StoredValue::List(stored_items)))
368        }
369        _ => Err(DataFusionError::Execution(format!(
370            "column '{}' type mismatch (expected {:?}, got {:?})",
371            col.name, col.kind, value
372        ))),
373    }
374}
375
376pub(crate) fn owned_stored_value_from_archived(
377    stored_opt: Option<&StoredValue>,
378) -> DataFusionResult<Option<StoredValue>> {
379    let Some(stored) = stored_opt else {
380        return Ok(None);
381    };
382    Ok(Some(match stored {
383        StoredValue::Int64(v) => StoredValue::Int64(*v),
384        StoredValue::UInt64(v) => StoredValue::UInt64(*v),
385        StoredValue::Float64(v) => StoredValue::Float64(*v),
386        StoredValue::Boolean(v) => StoredValue::Boolean(*v),
387        StoredValue::Utf8(v) => StoredValue::Utf8(v.as_str().to_string()),
388        StoredValue::Bytes(v) => StoredValue::Bytes(v.as_slice().to_vec()),
389        StoredValue::List(items) => {
390            let mut out = Vec::with_capacity(items.len());
391            for item in items.iter() {
392                let owned = owned_stored_value_from_archived(Some(item))?.ok_or_else(|| {
393                    DataFusionError::Execution(
394                        "archived list item unexpectedly decoded as NULL".to_string(),
395                    )
396                })?;
397                out.push(owned);
398            }
399            StoredValue::List(out)
400        }
401    }))
402}
403
404#[cfg(test)]
405pub(crate) fn decode_base_row(
406    pk_values: Vec<CellValue>,
407    value: &[u8],
408    model: &TableModel,
409) -> Option<KvRow> {
410    if pk_values.len() != model.primary_key_indices.len() {
411        return None;
412    }
413    let archived = decode_stored_row(value).ok()?;
414    if archived.values.len() != model.columns.len() {
415        return None;
416    }
417    let mut values = vec![CellValue::Null; model.columns.len()];
418    for (pk_pos, pk_value) in pk_values.into_iter().enumerate() {
419        let col_idx = *model.primary_key_indices.get(pk_pos)?;
420        values[col_idx] = pk_value;
421    }
422
423    for (idx, col) in model.columns.iter().enumerate() {
424        if model.is_pk_column(idx) {
425            continue;
426        }
427        let Some(stored) = archived.values[idx].as_ref() else {
428            if col.nullable {
429                continue;
430            }
431            return None;
432        };
433        values[idx] = match (col.kind, stored) {
434            (ColumnKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
435            (ColumnKind::UInt64, StoredValue::UInt64(v)) => CellValue::UInt64(*v),
436            (ColumnKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
437            (ColumnKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
438            (ColumnKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
439            (ColumnKind::Date32, StoredValue::Int64(v)) => CellValue::Date32(*v as i32),
440            (ColumnKind::Date64, StoredValue::Int64(v)) => CellValue::Date64(*v),
441            (ColumnKind::Timestamp, StoredValue::Int64(v)) => CellValue::Timestamp(*v),
442            (ColumnKind::Decimal128, StoredValue::Bytes(bytes)) => {
443                let arr: [u8; 16] = bytes.as_slice().try_into().ok()?;
444                CellValue::Decimal128(i128::from_le_bytes(arr))
445            }
446            (ColumnKind::Decimal256, StoredValue::Bytes(bytes)) => {
447                let arr: [u8; 32] = bytes.as_slice().try_into().ok()?;
448                CellValue::Decimal256(i256::from_le_bytes(arr))
449            }
450            (ColumnKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
451            (ColumnKind::FixedSizeBinary(_), StoredValue::Bytes(v)) => {
452                CellValue::FixedBinary(v.as_slice().to_vec())
453            }
454            (ColumnKind::List(elem), StoredValue::List(items)) => {
455                let mut cells = Vec::with_capacity(items.len());
456                for item in items.iter() {
457                    cells.push(decode_list_element_archived(elem, item)?);
458                }
459                CellValue::List(cells)
460            }
461            _ => return None,
462        };
463    }
464    Some(KvRow { values })
465}
466
467pub(crate) fn decode_list_element_archived(
468    elem: ListElementKind,
469    stored: &StoredValue,
470) -> Option<CellValue> {
471    Some(match (elem, stored) {
472        (ListElementKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
473        (ListElementKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
474        (ListElementKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
475        (ListElementKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
476        (ListElementKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
477        _ => return None,
478    })
479}
480
481pub(crate) fn required_column<'a>(
482    batch: &'a RecordBatch,
483    name: &str,
484) -> DataFusionResult<&'a ArrayRef> {
485    batch.column_by_name(name).ok_or_else(|| {
486        DataFusionError::Execution(format!("insert batch is missing required column '{name}'"))
487    })
488}
489
490pub(crate) fn i64_value_at(
491    array: &ArrayRef,
492    row_idx: usize,
493    column_name: &str,
494) -> DataFusionResult<i64> {
495    if array.is_null(row_idx) {
496        return Err(DataFusionError::Execution(format!(
497            "column '{column_name}' cannot be NULL for kv table insert"
498        )));
499    }
500    let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
501        DataFusionError::Execution(format!(
502            "column '{column_name}' expected Int64, got {:?}",
503            array.data_type()
504        ))
505    })?;
506    Ok(values.value(row_idx))
507}
508
509pub(crate) fn string_value_at(
510    array: &ArrayRef,
511    row_idx: usize,
512    column_name: &str,
513) -> DataFusionResult<String> {
514    if array.is_null(row_idx) {
515        return Err(DataFusionError::Execution(format!(
516            "column '{column_name}' cannot be NULL for kv table insert"
517        )));
518    }
519    if let Some(values) = array.as_any().downcast_ref::<StringArray>() {
520        return Ok(values.value(row_idx).to_string());
521    }
522    if let Some(values) = array.as_any().downcast_ref::<LargeStringArray>() {
523        return Ok(values.value(row_idx).to_string());
524    }
525    if let Some(values) = array.as_any().downcast_ref::<StringViewArray>() {
526        return Ok(values.value(row_idx).to_string());
527    }
528    Err(DataFusionError::Execution(format!(
529        "column '{column_name}' expected string, got {:?}",
530        array.data_type()
531    )))
532}
533
534pub(crate) fn f64_value_at(
535    array: &ArrayRef,
536    row_idx: usize,
537    column_name: &str,
538) -> DataFusionResult<f64> {
539    if array.is_null(row_idx) {
540        return Err(DataFusionError::Execution(format!(
541            "column '{column_name}' cannot be NULL for kv table insert"
542        )));
543    }
544    let values = array
545        .as_any()
546        .downcast_ref::<Float64Array>()
547        .ok_or_else(|| {
548            DataFusionError::Execution(format!(
549                "column '{column_name}' expected Float64, got {:?}",
550                array.data_type()
551            ))
552        })?;
553    Ok(values.value(row_idx))
554}
555
556pub(crate) fn bool_value_at(
557    array: &ArrayRef,
558    row_idx: usize,
559    column_name: &str,
560) -> DataFusionResult<bool> {
561    if array.is_null(row_idx) {
562        return Err(DataFusionError::Execution(format!(
563            "column '{column_name}' cannot be NULL for kv table insert"
564        )));
565    }
566    let values = array
567        .as_any()
568        .downcast_ref::<BooleanArray>()
569        .ok_or_else(|| {
570            DataFusionError::Execution(format!(
571                "column '{column_name}' expected Boolean, got {:?}",
572                array.data_type()
573            ))
574        })?;
575    Ok(values.value(row_idx))
576}
577
578pub(crate) fn date32_value_at(
579    array: &ArrayRef,
580    row_idx: usize,
581    column_name: &str,
582) -> DataFusionResult<i32> {
583    if array.is_null(row_idx) {
584        return Err(DataFusionError::Execution(format!(
585            "column '{column_name}' cannot be NULL for kv table insert"
586        )));
587    }
588    let values = array
589        .as_any()
590        .downcast_ref::<Date32Array>()
591        .ok_or_else(|| {
592            DataFusionError::Execution(format!(
593                "column '{column_name}' expected Date32, got {:?}",
594                array.data_type()
595            ))
596        })?;
597    Ok(values.value(row_idx))
598}
599
600pub(crate) fn date64_value_at(
601    array: &ArrayRef,
602    row_idx: usize,
603    column_name: &str,
604) -> DataFusionResult<i64> {
605    if array.is_null(row_idx) {
606        return Err(DataFusionError::Execution(format!(
607            "column '{column_name}' cannot be NULL for kv table insert"
608        )));
609    }
610    let values = array
611        .as_any()
612        .downcast_ref::<Date64Array>()
613        .ok_or_else(|| {
614            DataFusionError::Execution(format!(
615                "column '{column_name}' expected Date64, got {:?}",
616                array.data_type()
617            ))
618        })?;
619    Ok(values.value(row_idx))
620}
621
622pub(crate) fn timestamp_micros_value_at(
623    array: &ArrayRef,
624    row_idx: usize,
625    column_name: &str,
626) -> DataFusionResult<i64> {
627    if array.is_null(row_idx) {
628        return Err(DataFusionError::Execution(format!(
629            "column '{column_name}' cannot be NULL for kv table insert"
630        )));
631    }
632    let values = array
633        .as_any()
634        .downcast_ref::<TimestampMicrosecondArray>()
635        .ok_or_else(|| {
636            DataFusionError::Execution(format!(
637                "column '{column_name}' expected TimestampMicrosecond, got {:?}",
638                array.data_type()
639            ))
640        })?;
641    Ok(values.value(row_idx))
642}
643
644pub(crate) fn decimal128_value_at(
645    array: &ArrayRef,
646    row_idx: usize,
647    column_name: &str,
648) -> DataFusionResult<i128> {
649    if array.is_null(row_idx) {
650        return Err(DataFusionError::Execution(format!(
651            "column '{column_name}' cannot be NULL for kv table insert"
652        )));
653    }
654    let values = array
655        .as_any()
656        .downcast_ref::<Decimal128Array>()
657        .ok_or_else(|| {
658            DataFusionError::Execution(format!(
659                "column '{column_name}' expected Decimal128, got {:?}",
660                array.data_type()
661            ))
662        })?;
663    Ok(values.value(row_idx))
664}
665
666pub(crate) fn uint64_value_at(
667    array: &ArrayRef,
668    row_idx: usize,
669    column_name: &str,
670) -> DataFusionResult<u64> {
671    if array.is_null(row_idx) {
672        return Err(DataFusionError::Execution(format!(
673            "column '{column_name}' cannot be NULL for kv table insert"
674        )));
675    }
676    let values = array
677        .as_any()
678        .downcast_ref::<UInt64Array>()
679        .ok_or_else(|| {
680            DataFusionError::Execution(format!(
681                "column '{column_name}' expected UInt64, got {:?}",
682                array.data_type()
683            ))
684        })?;
685    Ok(values.value(row_idx))
686}
687
688pub(crate) fn decimal256_value_at(
689    array: &ArrayRef,
690    row_idx: usize,
691    column_name: &str,
692) -> DataFusionResult<i256> {
693    if array.is_null(row_idx) {
694        return Err(DataFusionError::Execution(format!(
695            "column '{column_name}' cannot be NULL for kv table insert"
696        )));
697    }
698    let values = array
699        .as_any()
700        .downcast_ref::<Decimal256Array>()
701        .ok_or_else(|| {
702            DataFusionError::Execution(format!(
703                "column '{column_name}' expected Decimal256, got {:?}",
704                array.data_type()
705            ))
706        })?;
707    Ok(values.value(row_idx))
708}
709
710pub(crate) fn fixed_binary_value_at(
711    array: &ArrayRef,
712    row_idx: usize,
713    column_name: &str,
714) -> DataFusionResult<Vec<u8>> {
715    if array.is_null(row_idx) {
716        return Err(DataFusionError::Execution(format!(
717            "column '{column_name}' cannot be NULL for kv table insert"
718        )));
719    }
720    let values = array
721        .as_any()
722        .downcast_ref::<FixedSizeBinaryArray>()
723        .ok_or_else(|| {
724            DataFusionError::Execution(format!(
725                "column '{column_name}' expected FixedSizeBinary, got {:?}",
726                array.data_type()
727            ))
728        })?;
729    Ok(values.value(row_idx).to_vec())
730}
731
732pub(crate) fn list_value_at(
733    array: &ArrayRef,
734    row_idx: usize,
735    column_name: &str,
736    elem: ListElementKind,
737) -> DataFusionResult<CellValue> {
738    if array.is_null(row_idx) {
739        return Err(DataFusionError::Execution(format!(
740            "column '{column_name}' cannot be NULL for kv table insert"
741        )));
742    }
743    let list_array = array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
744        DataFusionError::Execution(format!(
745            "column '{column_name}' expected List, got {:?}",
746            array.data_type()
747        ))
748    })?;
749    let child = list_array.value(row_idx);
750    let mut items = Vec::with_capacity(child.len());
751    for i in 0..child.len() {
752        let item = match elem {
753            ListElementKind::Int64 => {
754                let arr = child.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
755                    DataFusionError::Execution(format!(
756                        "column '{column_name}' list element expected Int64"
757                    ))
758                })?;
759                CellValue::Int64(arr.value(i))
760            }
761            ListElementKind::Float64 => {
762                let arr = child
763                    .as_any()
764                    .downcast_ref::<Float64Array>()
765                    .ok_or_else(|| {
766                        DataFusionError::Execution(format!(
767                            "column '{column_name}' list element expected Float64"
768                        ))
769                    })?;
770                CellValue::Float64(arr.value(i))
771            }
772            ListElementKind::Boolean => {
773                let arr = child
774                    .as_any()
775                    .downcast_ref::<BooleanArray>()
776                    .ok_or_else(|| {
777                        DataFusionError::Execution(format!(
778                            "column '{column_name}' list element expected Boolean"
779                        ))
780                    })?;
781                CellValue::Boolean(arr.value(i))
782            }
783            ListElementKind::Utf8 => {
784                let arr = child
785                    .as_any()
786                    .downcast_ref::<StringArray>()
787                    .ok_or_else(|| {
788                        DataFusionError::Execution(format!(
789                            "column '{column_name}' list element expected Utf8"
790                        ))
791                    })?;
792                CellValue::Utf8(arr.value(i).to_string())
793            }
794        };
795        items.push(item);
796    }
797    Ok(CellValue::List(items))
798}
799
800pub(crate) async fn flush_ingest_batch(
801    client: &StoreClient,
802    keys: &mut Vec<Key>,
803    values: &mut Vec<Vec<u8>>,
804) -> DataFusionResult<u64> {
805    if keys.is_empty() {
806        return Ok(0);
807    }
808    let refs: Vec<(&Key, &[u8])> = keys
809        .iter()
810        .zip(values.iter())
811        .map(|(key, value)| (key, value.as_slice()))
812        .collect();
813    let token = client
814        .put(&refs)
815        .await
816        .map_err(|e| DataFusionError::External(Box::new(e)))?;
817    keys.clear();
818    values.clear();
819    Ok(token)
820}