1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::sync::Arc;
5use std::sync::Mutex;
6
7use async_trait::async_trait;
8use commonware_codec::Encode;
9use datafusion::arrow::array::{
10 ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array,
11 FixedSizeBinaryArray, Float64Array, Int64Array, LargeStringArray, ListArray, StringArray,
12 StringViewArray, TimestampMicrosecondArray, UInt64Array,
13};
14use datafusion::arrow::datatypes::{i256, SchemaRef};
15use datafusion::arrow::record_batch::RecordBatch;
16use datafusion::common::{DataFusionError, Result as DataFusionResult};
17use datafusion::datasource::sink::DataSink;
18use datafusion::execution::context::TaskContext;
19use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
20use exoware_sdk::keys::Key;
21#[cfg(test)]
22use exoware_sdk::kv_codec::decode_stored_row;
23use exoware_sdk::kv_codec::{StoredRow, StoredValue};
24use exoware_sdk::{StoreBatchUpload, StoreClient, StoreWriteBatch};
25use futures::{future::BoxFuture, TryStreamExt};
26
27use crate::builder::archived_non_pk_value_is_valid;
28use crate::codec::*;
29use crate::types::*;
30
31#[derive(Debug)]
32pub struct TableWriter {
33 model: Arc<TableModel>,
34 index_specs: Arc<Vec<ResolvedIndexSpec>>,
35}
36
37impl TableWriter {
38 pub fn encode_row(&self, values: Vec<CellValue>) -> Result<Vec<(Key, Vec<u8>)>, String> {
39 let row = KvRow { values };
40 if row.values.len() != self.model.columns.len() {
41 return Err(format!(
42 "expected {} values, got {}",
43 self.model.columns.len(),
44 row.values.len()
45 ));
46 }
47 let base_key = encode_primary_key_from_row(self.model.table_prefix, &row, &self.model)?;
48 let base_value = encode_base_row_value(&row, &self.model).map_err(|e| format!("{e}"))?;
49 let mut out = vec![(base_key, base_value)];
50 for spec in self.index_specs.iter() {
51 let idx_key =
52 encode_secondary_index_key(self.model.table_prefix, spec, &self.model, &row)?;
53 let idx_value = encode_secondary_index_value(&row, &self.model, spec)
54 .map_err(|e| format!("{e}"))?;
55 out.push((idx_key, idx_value));
56 }
57 Ok(out)
58 }
59}
60
61#[derive(Debug)]
62pub struct BatchWriter {
63 client: StoreClient,
64 tables: HashMap<String, TableWriter>,
65 next_request_id: u64,
66 failed_prepared: Mutex<Vec<PreparedBatch>>,
67 pub(crate) pending_keys: Vec<Key>,
68 pub(crate) pending_values: Vec<Vec<u8>>,
69}
70
71#[derive(Debug)]
72#[must_use]
73pub struct PreparedBatch {
74 request_id: u64,
75 keys: Vec<Key>,
76 values: Vec<Vec<u8>>,
77}
78
79impl PreparedBatch {
80 pub fn request_id(&self) -> u64 {
81 self.request_id
82 }
83
84 pub fn entry_count(&self) -> usize {
85 self.keys.len()
86 }
87
88 pub fn is_empty(&self) -> bool {
89 self.keys.is_empty()
90 }
91}
92
93#[derive(Clone, Copy, Debug, PartialEq, Eq)]
94pub struct BatchReceipt {
95 pub writer_request_id: u64,
96 pub entry_count: usize,
97 pub store_sequence_number: u64,
98}
99
100impl BatchWriter {
101 pub(crate) fn new(client: StoreClient, table_configs: &[(String, KvTableConfig)]) -> Self {
102 let mut tables = HashMap::new();
103 for (name, config) in table_configs {
104 let model = Arc::new(
105 TableModel::from_config(config).expect("config already validated by KvSchema"),
106 );
107 let index_specs = Arc::new(
108 model
109 .resolve_index_specs(&config.index_specs)
110 .expect("specs already validated by KvSchema"),
111 );
112 tables.insert(name.clone(), TableWriter { model, index_specs });
113 }
114 Self {
115 client,
116 tables,
117 next_request_id: 0,
118 failed_prepared: Mutex::new(Vec::new()),
119 pending_keys: Vec::new(),
120 pending_values: Vec::new(),
121 }
122 }
123
124 pub fn insert(
125 &mut self,
126 table_name: &str,
127 values: Vec<CellValue>,
128 ) -> Result<&mut Self, String> {
129 let writer = self
130 .tables
131 .get(table_name)
132 .ok_or_else(|| format!("unknown table '{table_name}'"))?;
133 let entries = writer.encode_row(values)?;
134 for (key, value) in entries {
135 self.pending_keys.push(key);
136 self.pending_values.push(value);
137 }
138 Ok(self)
139 }
140
141 pub fn pending_count(&self) -> usize {
142 self.pending_keys.len()
143 + self
144 .failed_prepared
145 .lock()
146 .expect("failed prepared mutex poisoned")
147 .iter()
148 .map(PreparedBatch::entry_count)
149 .sum::<usize>()
150 }
151
152 pub async fn flush(&mut self) -> DataFusionResult<u64> {
154 Ok(self
155 .flush_with_receipt()
156 .await?
157 .map(|receipt| receipt.store_sequence_number)
158 .unwrap_or(0))
159 }
160
161 pub async fn flush_with_receipt(&mut self) -> DataFusionResult<Option<BatchReceipt>> {
163 let Some(prepared) = self.prepare_flush()? else {
164 return Ok(None);
165 };
166 Ok(Some(self.commit_upload(&self.client, prepared).await?))
167 }
168
169 pub fn prepare_flush(&mut self) -> DataFusionResult<Option<PreparedBatch>> {
170 if let Some(prepared) = self.take_failed_prepared() {
171 return Ok(Some(prepared));
172 }
173 if self.pending_keys.is_empty() {
174 return Ok(None);
175 }
176 let request_id = self.next_request_id;
177 self.next_request_id += 1;
178 Ok(Some(PreparedBatch {
179 request_id,
180 keys: std::mem::take(&mut self.pending_keys),
181 values: std::mem::take(&mut self.pending_values),
182 }))
183 }
184
185 pub fn stage_flush(
186 &self,
187 prepared: &PreparedBatch,
188 batch: &mut StoreWriteBatch,
189 ) -> DataFusionResult<()> {
190 for (key, value) in prepared.keys.iter().zip(prepared.values.iter()) {
191 batch
192 .push(&self.client, key, value)
193 .map_err(|e| DataFusionError::External(Box::new(e)))?;
194 }
195 Ok(())
196 }
197
198 pub fn mark_flush_persisted(
199 &self,
200 prepared: PreparedBatch,
201 sequence_number: u64,
202 ) -> BatchReceipt {
203 BatchReceipt {
204 writer_request_id: prepared.request_id,
205 entry_count: prepared.entry_count(),
206 store_sequence_number: sequence_number,
207 }
208 }
209
210 pub fn mark_flush_failed(&self, prepared: PreparedBatch) {
211 self.failed_prepared
212 .lock()
213 .expect("failed prepared mutex poisoned")
214 .push(prepared);
215 }
216
217 fn take_failed_prepared(&self) -> Option<PreparedBatch> {
218 let mut failed = self
219 .failed_prepared
220 .lock()
221 .expect("failed prepared mutex poisoned");
222 let (idx, _) = failed
223 .iter()
224 .enumerate()
225 .min_by_key(|(_, prepared)| prepared.request_id)?;
226 Some(failed.remove(idx))
227 }
228}
229
230impl StoreBatchUpload for BatchWriter {
231 type Prepared = PreparedBatch;
232 type Receipt = BatchReceipt;
233 type Error = DataFusionError;
234
235 fn stage_upload(
236 &self,
237 prepared: &Self::Prepared,
238 batch: &mut StoreWriteBatch,
239 ) -> Result<(), Self::Error> {
240 self.stage_flush(prepared, batch)
241 }
242
243 fn commit_error(&self, error: exoware_sdk::ClientError) -> Self::Error {
244 DataFusionError::External(Box::new(error))
245 }
246
247 fn mark_upload_persisted<'a>(
248 &'a self,
249 prepared: Self::Prepared,
250 sequence_number: u64,
251 ) -> BoxFuture<'a, Self::Receipt>
252 where
253 Self: Sync + 'a,
254 Self::Prepared: 'a,
255 {
256 Box::pin(async move { self.mark_flush_persisted(prepared, sequence_number) })
257 }
258
259 fn mark_upload_failed<'a>(
260 &'a self,
261 prepared: Self::Prepared,
262 _error: String,
263 ) -> BoxFuture<'a, ()>
264 where
265 Self: Sync + 'a,
266 Self::Prepared: 'a,
267 {
268 Box::pin(async move {
269 self.mark_flush_failed(prepared);
270 })
271 }
272}
273
274#[derive(Debug)]
275pub(crate) struct KvIngestSink {
276 pub(crate) client: StoreClient,
277 pub(crate) schema: SchemaRef,
278 pub(crate) model: Arc<TableModel>,
279 pub(crate) index_specs: Arc<Vec<ResolvedIndexSpec>>,
280}
281
282impl KvIngestSink {
283 pub(crate) fn new(
284 client: StoreClient,
285 schema: SchemaRef,
286 model: Arc<TableModel>,
287 index_specs: Arc<Vec<ResolvedIndexSpec>>,
288 ) -> Self {
289 Self {
290 client,
291 schema,
292 model,
293 index_specs,
294 }
295 }
296}
297
298impl DisplayAs for KvIngestSink {
299 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300 write!(f, "KvIngestSink")
301 }
302}
303
304#[async_trait]
305impl DataSink for KvIngestSink {
306 fn as_any(&self) -> &dyn Any {
307 self
308 }
309
310 fn schema(&self) -> &SchemaRef {
311 &self.schema
312 }
313
314 async fn write_all(
315 &self,
316 data: SendableRecordBatchStream,
317 _context: &Arc<TaskContext>,
318 ) -> DataFusionResult<u64> {
319 let mut data = data;
320 let mut pending_keys: Vec<Key> = Vec::new();
321 let mut pending_values: Vec<Vec<u8>> = Vec::new();
322 let mut logical_rows_written = 0u64;
323
324 while let Some(batch) = data.try_next().await? {
325 let encoded_entries = encode_insert_entries(&batch, &self.model, &self.index_specs)?;
326 logical_rows_written += batch.num_rows() as u64;
327 for (key, value) in encoded_entries {
328 pending_keys.push(key);
329 pending_values.push(value);
330 }
331 }
332
333 if !pending_keys.is_empty() {
334 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
335 }
336 Ok(logical_rows_written)
337 }
338}
339
340pub(crate) fn encode_insert_entries(
341 batch: &RecordBatch,
342 model: &TableModel,
343 index_specs: &[ResolvedIndexSpec],
344) -> DataFusionResult<Vec<(Key, Vec<u8>)>> {
345 let mut out = Vec::with_capacity(batch.num_rows() * (1 + index_specs.len()));
346 for row_idx in 0..batch.num_rows() {
347 let row = extract_row_from_batch(batch, row_idx, model)?;
348 let base_key = encode_primary_key_from_row(model.table_prefix, &row, model)
349 .map_err(DataFusionError::Execution)?;
350 let base_value = encode_base_row_value(&row, model)?;
351 out.push((base_key, base_value));
352
353 for spec in index_specs {
354 let secondary_key = encode_secondary_index_key(model.table_prefix, spec, model, &row)
355 .map_err(DataFusionError::Execution)?;
356 let secondary_value = encode_secondary_index_value(&row, model, spec)?;
357 out.push((secondary_key, secondary_value));
358 }
359 }
360 Ok(out)
361}
362
363pub(crate) fn extract_row_from_batch(
364 batch: &RecordBatch,
365 row_idx: usize,
366 model: &TableModel,
367) -> DataFusionResult<KvRow> {
368 let mut values = Vec::with_capacity(model.columns.len());
369 for col in &model.columns {
370 let array = required_column(batch, &col.name)?;
371 if col.nullable && array.is_null(row_idx) {
372 values.push(CellValue::Null);
373 continue;
374 }
375 let value = match col.kind {
376 ColumnKind::Int64 => CellValue::Int64(i64_value_at(array, row_idx, &col.name)?),
377 ColumnKind::UInt64 => CellValue::UInt64(uint64_value_at(array, row_idx, &col.name)?),
378 ColumnKind::Float64 => CellValue::Float64(f64_value_at(array, row_idx, &col.name)?),
379 ColumnKind::Boolean => CellValue::Boolean(bool_value_at(array, row_idx, &col.name)?),
380 ColumnKind::Date32 => CellValue::Date32(date32_value_at(array, row_idx, &col.name)?),
381 ColumnKind::Date64 => CellValue::Date64(date64_value_at(array, row_idx, &col.name)?),
382 ColumnKind::Timestamp => {
383 CellValue::Timestamp(timestamp_micros_value_at(array, row_idx, &col.name)?)
384 }
385 ColumnKind::Decimal128 => {
386 CellValue::Decimal128(decimal128_value_at(array, row_idx, &col.name)?)
387 }
388 ColumnKind::Decimal256 => {
389 CellValue::Decimal256(decimal256_value_at(array, row_idx, &col.name)?)
390 }
391 ColumnKind::Utf8 => CellValue::Utf8(string_value_at(array, row_idx, &col.name)?),
392 ColumnKind::FixedSizeBinary(_) => {
393 CellValue::FixedBinary(fixed_binary_value_at(array, row_idx, &col.name)?)
394 }
395 ColumnKind::List(elem) => list_value_at(array, row_idx, &col.name, elem)?,
396 };
397 values.push(value);
398 }
399 Ok(KvRow { values })
400}
401
402pub(crate) fn encode_base_row_value(row: &KvRow, model: &TableModel) -> DataFusionResult<Vec<u8>> {
403 let mut values = Vec::with_capacity(model.columns.len());
404 for (idx, col) in model.columns.iter().enumerate() {
405 if model.is_pk_column(idx) {
406 values.push(None);
407 continue;
408 }
409 values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
410 }
411 let stored_row = StoredRow { values };
412 Ok(stored_row.encode().to_vec())
413}
414
415pub(crate) fn encode_secondary_index_value(
416 row: &KvRow,
417 model: &TableModel,
418 spec: &ResolvedIndexSpec,
419) -> DataFusionResult<Vec<u8>> {
420 let mut values = Vec::with_capacity(model.columns.len());
421 for (idx, col) in model.columns.iter().enumerate() {
422 if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
423 values.push(None);
424 continue;
425 }
426 values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
427 }
428 let stored_row = StoredRow { values };
429 Ok(stored_row.encode().to_vec())
430}
431
432pub(crate) fn encode_secondary_index_value_from_archived(
433 archived: &StoredRow,
434 model: &TableModel,
435 spec: &ResolvedIndexSpec,
436) -> DataFusionResult<Vec<u8>> {
437 if archived.values.len() != model.columns.len() {
438 return Err(DataFusionError::Execution(
439 "archived row column count mismatch".to_string(),
440 ));
441 }
442 let mut values = Vec::with_capacity(model.columns.len());
443 for (idx, col) in model.columns.iter().enumerate() {
444 if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
445 values.push(None);
446 continue;
447 }
448 let stored_opt = archived.values.get(idx).and_then(|value| value.as_ref());
449 if !archived_non_pk_value_is_valid(col, stored_opt) {
450 return Err(DataFusionError::Execution(format!(
451 "invalid archived value for secondary index column '{}'",
452 col.name
453 )));
454 }
455 values.push(owned_stored_value_from_archived(stored_opt)?);
456 }
457 let stored_row = StoredRow { values };
458 Ok(stored_row.encode().to_vec())
459}
460
461pub(crate) fn encode_non_pk_cell_value(
462 value: &CellValue,
463 col: &ResolvedColumn,
464) -> DataFusionResult<Option<StoredValue>> {
465 match (col.kind, value) {
466 (_, CellValue::Null) => {
467 if !col.nullable {
468 return Err(DataFusionError::Execution(format!(
469 "column '{}' is not nullable but received NULL",
470 col.name
471 )));
472 }
473 Ok(None)
474 }
475 (ColumnKind::Int64, CellValue::Int64(v)) => Ok(Some(StoredValue::Int64(*v))),
476 (ColumnKind::UInt64, CellValue::UInt64(v)) => Ok(Some(StoredValue::UInt64(*v))),
477 (ColumnKind::Float64, CellValue::Float64(v)) => Ok(Some(StoredValue::Float64(*v))),
478 (ColumnKind::Boolean, CellValue::Boolean(v)) => Ok(Some(StoredValue::Boolean(*v))),
479 (ColumnKind::Date32, CellValue::Date32(v)) => Ok(Some(StoredValue::Int64(*v as i64))),
480 (ColumnKind::Date64, CellValue::Date64(v)) => Ok(Some(StoredValue::Int64(*v))),
481 (ColumnKind::Timestamp, CellValue::Timestamp(v)) => Ok(Some(StoredValue::Int64(*v))),
482 (ColumnKind::Decimal128, CellValue::Decimal128(v)) => {
483 Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
484 }
485 (ColumnKind::Decimal256, CellValue::Decimal256(v)) => {
486 Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
487 }
488 (ColumnKind::Utf8, CellValue::Utf8(v)) => Ok(Some(StoredValue::Utf8(v.clone()))),
489 (ColumnKind::FixedSizeBinary(n), CellValue::FixedBinary(v)) => {
490 if v.len() != n {
491 return Err(DataFusionError::Execution(format!(
492 "column '{}' expects FixedSizeBinary({n}) value with exactly {n} bytes, got {}",
493 col.name,
494 v.len()
495 )));
496 }
497 Ok(Some(StoredValue::Bytes(v.clone())))
498 }
499 (ColumnKind::List(elem), CellValue::List(items)) => {
500 let mut stored_items = Vec::with_capacity(items.len());
501 for item in items {
502 let stored_item = match (elem, item) {
503 (ListElementKind::Int64, CellValue::Int64(v)) => StoredValue::Int64(*v),
504 (ListElementKind::Float64, CellValue::Float64(v)) => StoredValue::Float64(*v),
505 (ListElementKind::Boolean, CellValue::Boolean(v)) => StoredValue::Boolean(*v),
506 (ListElementKind::Utf8, CellValue::Utf8(v)) => StoredValue::Utf8(v.clone()),
507 _ => {
508 return Err(DataFusionError::Execution(format!(
509 "column '{}' list element type mismatch (expected {:?}, got {:?})",
510 col.name, elem, item
511 )))
512 }
513 };
514 stored_items.push(stored_item);
515 }
516 Ok(Some(StoredValue::List(stored_items)))
517 }
518 _ => Err(DataFusionError::Execution(format!(
519 "column '{}' type mismatch (expected {:?}, got {:?})",
520 col.name, col.kind, value
521 ))),
522 }
523}
524
525pub(crate) fn owned_stored_value_from_archived(
526 stored_opt: Option<&StoredValue>,
527) -> DataFusionResult<Option<StoredValue>> {
528 let Some(stored) = stored_opt else {
529 return Ok(None);
530 };
531 Ok(Some(match stored {
532 StoredValue::Int64(v) => StoredValue::Int64(*v),
533 StoredValue::UInt64(v) => StoredValue::UInt64(*v),
534 StoredValue::Float64(v) => StoredValue::Float64(*v),
535 StoredValue::Boolean(v) => StoredValue::Boolean(*v),
536 StoredValue::Utf8(v) => StoredValue::Utf8(v.as_str().to_string()),
537 StoredValue::Bytes(v) => StoredValue::Bytes(v.as_slice().to_vec()),
538 StoredValue::List(items) => {
539 let mut out = Vec::with_capacity(items.len());
540 for item in items.iter() {
541 let owned = owned_stored_value_from_archived(Some(item))?.ok_or_else(|| {
542 DataFusionError::Execution(
543 "archived list item unexpectedly decoded as NULL".to_string(),
544 )
545 })?;
546 out.push(owned);
547 }
548 StoredValue::List(out)
549 }
550 }))
551}
552
553#[cfg(test)]
554pub(crate) fn decode_base_row(
555 pk_values: Vec<CellValue>,
556 value: &[u8],
557 model: &TableModel,
558) -> Option<KvRow> {
559 if pk_values.len() != model.primary_key_indices.len() {
560 return None;
561 }
562 let archived = decode_stored_row(value).ok()?;
563 if archived.values.len() != model.columns.len() {
564 return None;
565 }
566 let mut values = vec![CellValue::Null; model.columns.len()];
567 for (pk_pos, pk_value) in pk_values.into_iter().enumerate() {
568 let col_idx = *model.primary_key_indices.get(pk_pos)?;
569 values[col_idx] = pk_value;
570 }
571
572 for (idx, col) in model.columns.iter().enumerate() {
573 if model.is_pk_column(idx) {
574 continue;
575 }
576 let Some(stored) = archived.values[idx].as_ref() else {
577 if col.nullable {
578 continue;
579 }
580 return None;
581 };
582 values[idx] = match (col.kind, stored) {
583 (ColumnKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
584 (ColumnKind::UInt64, StoredValue::UInt64(v)) => CellValue::UInt64(*v),
585 (ColumnKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
586 (ColumnKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
587 (ColumnKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
588 (ColumnKind::Date32, StoredValue::Int64(v)) => CellValue::Date32(*v as i32),
589 (ColumnKind::Date64, StoredValue::Int64(v)) => CellValue::Date64(*v),
590 (ColumnKind::Timestamp, StoredValue::Int64(v)) => CellValue::Timestamp(*v),
591 (ColumnKind::Decimal128, StoredValue::Bytes(bytes)) => {
592 let arr: [u8; 16] = bytes.as_slice().try_into().ok()?;
593 CellValue::Decimal128(i128::from_le_bytes(arr))
594 }
595 (ColumnKind::Decimal256, StoredValue::Bytes(bytes)) => {
596 let arr: [u8; 32] = bytes.as_slice().try_into().ok()?;
597 CellValue::Decimal256(i256::from_le_bytes(arr))
598 }
599 (ColumnKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
600 (ColumnKind::FixedSizeBinary(_), StoredValue::Bytes(v)) => {
601 CellValue::FixedBinary(v.as_slice().to_vec())
602 }
603 (ColumnKind::List(elem), StoredValue::List(items)) => {
604 let mut cells = Vec::with_capacity(items.len());
605 for item in items.iter() {
606 cells.push(decode_list_element_archived(elem, item)?);
607 }
608 CellValue::List(cells)
609 }
610 _ => return None,
611 };
612 }
613 Some(KvRow { values })
614}
615
616pub(crate) fn decode_list_element_archived(
617 elem: ListElementKind,
618 stored: &StoredValue,
619) -> Option<CellValue> {
620 Some(match (elem, stored) {
621 (ListElementKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
622 (ListElementKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
623 (ListElementKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
624 (ListElementKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
625 (ListElementKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
626 _ => return None,
627 })
628}
629
630pub(crate) fn required_column<'a>(
631 batch: &'a RecordBatch,
632 name: &str,
633) -> DataFusionResult<&'a ArrayRef> {
634 batch.column_by_name(name).ok_or_else(|| {
635 DataFusionError::Execution(format!("insert batch is missing required column '{name}'"))
636 })
637}
638
639pub(crate) fn i64_value_at(
640 array: &ArrayRef,
641 row_idx: usize,
642 column_name: &str,
643) -> DataFusionResult<i64> {
644 if array.is_null(row_idx) {
645 return Err(DataFusionError::Execution(format!(
646 "column '{column_name}' cannot be NULL for kv table insert"
647 )));
648 }
649 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
650 DataFusionError::Execution(format!(
651 "column '{column_name}' expected Int64, got {:?}",
652 array.data_type()
653 ))
654 })?;
655 Ok(values.value(row_idx))
656}
657
658pub(crate) fn string_value_at(
659 array: &ArrayRef,
660 row_idx: usize,
661 column_name: &str,
662) -> DataFusionResult<String> {
663 if array.is_null(row_idx) {
664 return Err(DataFusionError::Execution(format!(
665 "column '{column_name}' cannot be NULL for kv table insert"
666 )));
667 }
668 if let Some(values) = array.as_any().downcast_ref::<StringArray>() {
669 return Ok(values.value(row_idx).to_string());
670 }
671 if let Some(values) = array.as_any().downcast_ref::<LargeStringArray>() {
672 return Ok(values.value(row_idx).to_string());
673 }
674 if let Some(values) = array.as_any().downcast_ref::<StringViewArray>() {
675 return Ok(values.value(row_idx).to_string());
676 }
677 Err(DataFusionError::Execution(format!(
678 "column '{column_name}' expected string, got {:?}",
679 array.data_type()
680 )))
681}
682
683pub(crate) fn f64_value_at(
684 array: &ArrayRef,
685 row_idx: usize,
686 column_name: &str,
687) -> DataFusionResult<f64> {
688 if array.is_null(row_idx) {
689 return Err(DataFusionError::Execution(format!(
690 "column '{column_name}' cannot be NULL for kv table insert"
691 )));
692 }
693 let values = array
694 .as_any()
695 .downcast_ref::<Float64Array>()
696 .ok_or_else(|| {
697 DataFusionError::Execution(format!(
698 "column '{column_name}' expected Float64, got {:?}",
699 array.data_type()
700 ))
701 })?;
702 Ok(values.value(row_idx))
703}
704
705pub(crate) fn bool_value_at(
706 array: &ArrayRef,
707 row_idx: usize,
708 column_name: &str,
709) -> DataFusionResult<bool> {
710 if array.is_null(row_idx) {
711 return Err(DataFusionError::Execution(format!(
712 "column '{column_name}' cannot be NULL for kv table insert"
713 )));
714 }
715 let values = array
716 .as_any()
717 .downcast_ref::<BooleanArray>()
718 .ok_or_else(|| {
719 DataFusionError::Execution(format!(
720 "column '{column_name}' expected Boolean, got {:?}",
721 array.data_type()
722 ))
723 })?;
724 Ok(values.value(row_idx))
725}
726
727pub(crate) fn date32_value_at(
728 array: &ArrayRef,
729 row_idx: usize,
730 column_name: &str,
731) -> DataFusionResult<i32> {
732 if array.is_null(row_idx) {
733 return Err(DataFusionError::Execution(format!(
734 "column '{column_name}' cannot be NULL for kv table insert"
735 )));
736 }
737 let values = array
738 .as_any()
739 .downcast_ref::<Date32Array>()
740 .ok_or_else(|| {
741 DataFusionError::Execution(format!(
742 "column '{column_name}' expected Date32, got {:?}",
743 array.data_type()
744 ))
745 })?;
746 Ok(values.value(row_idx))
747}
748
749pub(crate) fn date64_value_at(
750 array: &ArrayRef,
751 row_idx: usize,
752 column_name: &str,
753) -> DataFusionResult<i64> {
754 if array.is_null(row_idx) {
755 return Err(DataFusionError::Execution(format!(
756 "column '{column_name}' cannot be NULL for kv table insert"
757 )));
758 }
759 let values = array
760 .as_any()
761 .downcast_ref::<Date64Array>()
762 .ok_or_else(|| {
763 DataFusionError::Execution(format!(
764 "column '{column_name}' expected Date64, got {:?}",
765 array.data_type()
766 ))
767 })?;
768 Ok(values.value(row_idx))
769}
770
771pub(crate) fn timestamp_micros_value_at(
772 array: &ArrayRef,
773 row_idx: usize,
774 column_name: &str,
775) -> DataFusionResult<i64> {
776 if array.is_null(row_idx) {
777 return Err(DataFusionError::Execution(format!(
778 "column '{column_name}' cannot be NULL for kv table insert"
779 )));
780 }
781 let values = array
782 .as_any()
783 .downcast_ref::<TimestampMicrosecondArray>()
784 .ok_or_else(|| {
785 DataFusionError::Execution(format!(
786 "column '{column_name}' expected TimestampMicrosecond, got {:?}",
787 array.data_type()
788 ))
789 })?;
790 Ok(values.value(row_idx))
791}
792
793pub(crate) fn decimal128_value_at(
794 array: &ArrayRef,
795 row_idx: usize,
796 column_name: &str,
797) -> DataFusionResult<i128> {
798 if array.is_null(row_idx) {
799 return Err(DataFusionError::Execution(format!(
800 "column '{column_name}' cannot be NULL for kv table insert"
801 )));
802 }
803 let values = array
804 .as_any()
805 .downcast_ref::<Decimal128Array>()
806 .ok_or_else(|| {
807 DataFusionError::Execution(format!(
808 "column '{column_name}' expected Decimal128, got {:?}",
809 array.data_type()
810 ))
811 })?;
812 Ok(values.value(row_idx))
813}
814
815pub(crate) fn uint64_value_at(
816 array: &ArrayRef,
817 row_idx: usize,
818 column_name: &str,
819) -> DataFusionResult<u64> {
820 if array.is_null(row_idx) {
821 return Err(DataFusionError::Execution(format!(
822 "column '{column_name}' cannot be NULL for kv table insert"
823 )));
824 }
825 let values = array
826 .as_any()
827 .downcast_ref::<UInt64Array>()
828 .ok_or_else(|| {
829 DataFusionError::Execution(format!(
830 "column '{column_name}' expected UInt64, got {:?}",
831 array.data_type()
832 ))
833 })?;
834 Ok(values.value(row_idx))
835}
836
837pub(crate) fn decimal256_value_at(
838 array: &ArrayRef,
839 row_idx: usize,
840 column_name: &str,
841) -> DataFusionResult<i256> {
842 if array.is_null(row_idx) {
843 return Err(DataFusionError::Execution(format!(
844 "column '{column_name}' cannot be NULL for kv table insert"
845 )));
846 }
847 let values = array
848 .as_any()
849 .downcast_ref::<Decimal256Array>()
850 .ok_or_else(|| {
851 DataFusionError::Execution(format!(
852 "column '{column_name}' expected Decimal256, got {:?}",
853 array.data_type()
854 ))
855 })?;
856 Ok(values.value(row_idx))
857}
858
859pub(crate) fn fixed_binary_value_at(
860 array: &ArrayRef,
861 row_idx: usize,
862 column_name: &str,
863) -> DataFusionResult<Vec<u8>> {
864 if array.is_null(row_idx) {
865 return Err(DataFusionError::Execution(format!(
866 "column '{column_name}' cannot be NULL for kv table insert"
867 )));
868 }
869 let values = array
870 .as_any()
871 .downcast_ref::<FixedSizeBinaryArray>()
872 .ok_or_else(|| {
873 DataFusionError::Execution(format!(
874 "column '{column_name}' expected FixedSizeBinary, got {:?}",
875 array.data_type()
876 ))
877 })?;
878 Ok(values.value(row_idx).to_vec())
879}
880
881pub(crate) fn list_value_at(
882 array: &ArrayRef,
883 row_idx: usize,
884 column_name: &str,
885 elem: ListElementKind,
886) -> DataFusionResult<CellValue> {
887 if array.is_null(row_idx) {
888 return Err(DataFusionError::Execution(format!(
889 "column '{column_name}' cannot be NULL for kv table insert"
890 )));
891 }
892 let list_array = array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
893 DataFusionError::Execution(format!(
894 "column '{column_name}' expected List, got {:?}",
895 array.data_type()
896 ))
897 })?;
898 let child = list_array.value(row_idx);
899 let mut items = Vec::with_capacity(child.len());
900 for i in 0..child.len() {
901 let item = match elem {
902 ListElementKind::Int64 => {
903 let arr = child.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
904 DataFusionError::Execution(format!(
905 "column '{column_name}' list element expected Int64"
906 ))
907 })?;
908 CellValue::Int64(arr.value(i))
909 }
910 ListElementKind::Float64 => {
911 let arr = child
912 .as_any()
913 .downcast_ref::<Float64Array>()
914 .ok_or_else(|| {
915 DataFusionError::Execution(format!(
916 "column '{column_name}' list element expected Float64"
917 ))
918 })?;
919 CellValue::Float64(arr.value(i))
920 }
921 ListElementKind::Boolean => {
922 let arr = child
923 .as_any()
924 .downcast_ref::<BooleanArray>()
925 .ok_or_else(|| {
926 DataFusionError::Execution(format!(
927 "column '{column_name}' list element expected Boolean"
928 ))
929 })?;
930 CellValue::Boolean(arr.value(i))
931 }
932 ListElementKind::Utf8 => {
933 let arr = child
934 .as_any()
935 .downcast_ref::<StringArray>()
936 .ok_or_else(|| {
937 DataFusionError::Execution(format!(
938 "column '{column_name}' list element expected Utf8"
939 ))
940 })?;
941 CellValue::Utf8(arr.value(i).to_string())
942 }
943 };
944 items.push(item);
945 }
946 Ok(CellValue::List(items))
947}
948
949pub(crate) async fn flush_ingest_batch(
950 client: &StoreClient,
951 keys: &mut Vec<Key>,
952 values: &mut Vec<Vec<u8>>,
953) -> DataFusionResult<u64> {
954 if keys.is_empty() {
955 return Ok(0);
956 }
957 let mut batch = StoreWriteBatch::new();
958 for (key, value) in keys.iter().zip(values.iter()) {
959 batch
960 .push(client, key, value)
961 .map_err(|e| DataFusionError::External(Box::new(e)))?;
962 }
963 let token = batch
964 .commit(client)
965 .await
966 .map_err(|e| DataFusionError::External(Box::new(e)))?;
967 keys.clear();
968 values.clear();
969 Ok(token)
970}