1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use commonware_codec::Encode;
8use datafusion::arrow::array::{
9 ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array,
10 FixedSizeBinaryArray, Float64Array, Int64Array, LargeStringArray, ListArray, StringArray,
11 StringViewArray, TimestampMicrosecondArray, UInt64Array,
12};
13use datafusion::arrow::datatypes::{i256, SchemaRef};
14use datafusion::arrow::record_batch::RecordBatch;
15use datafusion::common::{DataFusionError, Result as DataFusionResult};
16use datafusion::datasource::sink::DataSink;
17use datafusion::execution::context::TaskContext;
18use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
19use exoware_sdk_rs::keys::Key;
20#[cfg(test)]
21use exoware_sdk_rs::kv_codec::decode_stored_row;
22use exoware_sdk_rs::kv_codec::{StoredRow, StoredValue};
23use exoware_sdk_rs::StoreClient;
24use futures::TryStreamExt;
25
26use crate::builder::archived_non_pk_value_is_valid;
27use crate::codec::*;
28use crate::types::*;
29
30#[derive(Debug)]
31pub struct TableWriter {
32 model: Arc<TableModel>,
33 index_specs: Arc<Vec<ResolvedIndexSpec>>,
34}
35
36impl TableWriter {
37 pub fn encode_row(&self, values: Vec<CellValue>) -> Result<Vec<(Key, Vec<u8>)>, String> {
38 let row = KvRow { values };
39 if row.values.len() != self.model.columns.len() {
40 return Err(format!(
41 "expected {} values, got {}",
42 self.model.columns.len(),
43 row.values.len()
44 ));
45 }
46 let base_key = encode_primary_key_from_row(self.model.table_prefix, &row, &self.model)?;
47 let base_value = encode_base_row_value(&row, &self.model).map_err(|e| format!("{e}"))?;
48 let mut out = vec![(base_key, base_value)];
49 for spec in self.index_specs.iter() {
50 let idx_key =
51 encode_secondary_index_key(self.model.table_prefix, spec, &self.model, &row)?;
52 let idx_value = encode_secondary_index_value(&row, &self.model, spec)
53 .map_err(|e| format!("{e}"))?;
54 out.push((idx_key, idx_value));
55 }
56 Ok(out)
57 }
58}
59
60#[derive(Debug)]
61pub struct BatchWriter {
62 client: StoreClient,
63 tables: HashMap<String, TableWriter>,
64 pub(crate) pending_keys: Vec<Key>,
65 pub(crate) pending_values: Vec<Vec<u8>>,
66}
67
68impl BatchWriter {
69 pub(crate) fn new(client: StoreClient, table_configs: &[(String, KvTableConfig)]) -> Self {
70 let mut tables = HashMap::new();
71 for (name, config) in table_configs {
72 let model = Arc::new(
73 TableModel::from_config(config).expect("config already validated by KvSchema"),
74 );
75 let index_specs = Arc::new(
76 model
77 .resolve_index_specs(&config.index_specs)
78 .expect("specs already validated by KvSchema"),
79 );
80 tables.insert(name.clone(), TableWriter { model, index_specs });
81 }
82 Self {
83 client,
84 tables,
85 pending_keys: Vec::new(),
86 pending_values: Vec::new(),
87 }
88 }
89
90 pub fn insert(
91 &mut self,
92 table_name: &str,
93 values: Vec<CellValue>,
94 ) -> Result<&mut Self, String> {
95 let writer = self
96 .tables
97 .get(table_name)
98 .ok_or_else(|| format!("unknown table '{table_name}'"))?;
99 let entries = writer.encode_row(values)?;
100 for (key, value) in entries {
101 self.pending_keys.push(key);
102 self.pending_values.push(value);
103 }
104 Ok(self)
105 }
106
107 pub fn pending_count(&self) -> usize {
108 self.pending_keys.len()
109 }
110
111 pub async fn flush(&mut self) -> DataFusionResult<u64> {
113 if self.pending_keys.is_empty() {
114 return Ok(0);
115 }
116 flush_ingest_batch(
117 &self.client,
118 &mut self.pending_keys,
119 &mut self.pending_values,
120 )
121 .await
122 }
123}
124
125#[derive(Debug)]
126pub(crate) struct KvIngestSink {
127 pub(crate) client: StoreClient,
128 pub(crate) schema: SchemaRef,
129 pub(crate) model: Arc<TableModel>,
130 pub(crate) index_specs: Arc<Vec<ResolvedIndexSpec>>,
131}
132
133impl KvIngestSink {
134 pub(crate) fn new(
135 client: StoreClient,
136 schema: SchemaRef,
137 model: Arc<TableModel>,
138 index_specs: Arc<Vec<ResolvedIndexSpec>>,
139 ) -> Self {
140 Self {
141 client,
142 schema,
143 model,
144 index_specs,
145 }
146 }
147}
148
149impl DisplayAs for KvIngestSink {
150 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151 write!(f, "KvIngestSink")
152 }
153}
154
155#[async_trait]
156impl DataSink for KvIngestSink {
157 fn as_any(&self) -> &dyn Any {
158 self
159 }
160
161 fn schema(&self) -> &SchemaRef {
162 &self.schema
163 }
164
165 async fn write_all(
166 &self,
167 data: SendableRecordBatchStream,
168 _context: &Arc<TaskContext>,
169 ) -> DataFusionResult<u64> {
170 let mut data = data;
171 let mut pending_keys: Vec<Key> = Vec::new();
172 let mut pending_values: Vec<Vec<u8>> = Vec::new();
173 let mut logical_rows_written = 0u64;
174
175 while let Some(batch) = data.try_next().await? {
176 let encoded_entries = encode_insert_entries(&batch, &self.model, &self.index_specs)?;
177 logical_rows_written += batch.num_rows() as u64;
178 for (key, value) in encoded_entries {
179 pending_keys.push(key);
180 pending_values.push(value);
181 }
182 }
183
184 if !pending_keys.is_empty() {
185 flush_ingest_batch(&self.client, &mut pending_keys, &mut pending_values).await?;
186 }
187 Ok(logical_rows_written)
188 }
189}
190
191pub(crate) fn encode_insert_entries(
192 batch: &RecordBatch,
193 model: &TableModel,
194 index_specs: &[ResolvedIndexSpec],
195) -> DataFusionResult<Vec<(Key, Vec<u8>)>> {
196 let mut out = Vec::with_capacity(batch.num_rows() * (1 + index_specs.len()));
197 for row_idx in 0..batch.num_rows() {
198 let row = extract_row_from_batch(batch, row_idx, model)?;
199 let base_key = encode_primary_key_from_row(model.table_prefix, &row, model)
200 .map_err(DataFusionError::Execution)?;
201 let base_value = encode_base_row_value(&row, model)?;
202 out.push((base_key, base_value));
203
204 for spec in index_specs {
205 let secondary_key = encode_secondary_index_key(model.table_prefix, spec, model, &row)
206 .map_err(DataFusionError::Execution)?;
207 let secondary_value = encode_secondary_index_value(&row, model, spec)?;
208 out.push((secondary_key, secondary_value));
209 }
210 }
211 Ok(out)
212}
213
214pub(crate) fn extract_row_from_batch(
215 batch: &RecordBatch,
216 row_idx: usize,
217 model: &TableModel,
218) -> DataFusionResult<KvRow> {
219 let mut values = Vec::with_capacity(model.columns.len());
220 for col in &model.columns {
221 let array = required_column(batch, &col.name)?;
222 if col.nullable && array.is_null(row_idx) {
223 values.push(CellValue::Null);
224 continue;
225 }
226 let value = match col.kind {
227 ColumnKind::Int64 => CellValue::Int64(i64_value_at(array, row_idx, &col.name)?),
228 ColumnKind::UInt64 => CellValue::UInt64(uint64_value_at(array, row_idx, &col.name)?),
229 ColumnKind::Float64 => CellValue::Float64(f64_value_at(array, row_idx, &col.name)?),
230 ColumnKind::Boolean => CellValue::Boolean(bool_value_at(array, row_idx, &col.name)?),
231 ColumnKind::Date32 => CellValue::Date32(date32_value_at(array, row_idx, &col.name)?),
232 ColumnKind::Date64 => CellValue::Date64(date64_value_at(array, row_idx, &col.name)?),
233 ColumnKind::Timestamp => {
234 CellValue::Timestamp(timestamp_micros_value_at(array, row_idx, &col.name)?)
235 }
236 ColumnKind::Decimal128 => {
237 CellValue::Decimal128(decimal128_value_at(array, row_idx, &col.name)?)
238 }
239 ColumnKind::Decimal256 => {
240 CellValue::Decimal256(decimal256_value_at(array, row_idx, &col.name)?)
241 }
242 ColumnKind::Utf8 => CellValue::Utf8(string_value_at(array, row_idx, &col.name)?),
243 ColumnKind::FixedSizeBinary(_) => {
244 CellValue::FixedBinary(fixed_binary_value_at(array, row_idx, &col.name)?)
245 }
246 ColumnKind::List(elem) => list_value_at(array, row_idx, &col.name, elem)?,
247 };
248 values.push(value);
249 }
250 Ok(KvRow { values })
251}
252
253pub(crate) fn encode_base_row_value(row: &KvRow, model: &TableModel) -> DataFusionResult<Vec<u8>> {
254 let mut values = Vec::with_capacity(model.columns.len());
255 for (idx, col) in model.columns.iter().enumerate() {
256 if model.is_pk_column(idx) {
257 values.push(None);
258 continue;
259 }
260 values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
261 }
262 let stored_row = StoredRow { values };
263 Ok(stored_row.encode().to_vec())
264}
265
266pub(crate) fn encode_secondary_index_value(
267 row: &KvRow,
268 model: &TableModel,
269 spec: &ResolvedIndexSpec,
270) -> DataFusionResult<Vec<u8>> {
271 let mut values = Vec::with_capacity(model.columns.len());
272 for (idx, col) in model.columns.iter().enumerate() {
273 if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
274 values.push(None);
275 continue;
276 }
277 values.push(encode_non_pk_cell_value(row.value_at(idx), col)?);
278 }
279 let stored_row = StoredRow { values };
280 Ok(stored_row.encode().to_vec())
281}
282
283pub(crate) fn encode_secondary_index_value_from_archived(
284 archived: &StoredRow,
285 model: &TableModel,
286 spec: &ResolvedIndexSpec,
287) -> DataFusionResult<Vec<u8>> {
288 if archived.values.len() != model.columns.len() {
289 return Err(DataFusionError::Execution(
290 "archived row column count mismatch".to_string(),
291 ));
292 }
293 let mut values = Vec::with_capacity(model.columns.len());
294 for (idx, col) in model.columns.iter().enumerate() {
295 if model.is_pk_column(idx) || !spec.value_column_mask[idx] {
296 values.push(None);
297 continue;
298 }
299 let stored_opt = archived.values.get(idx).and_then(|value| value.as_ref());
300 if !archived_non_pk_value_is_valid(col, stored_opt) {
301 return Err(DataFusionError::Execution(format!(
302 "invalid archived value for secondary index column '{}'",
303 col.name
304 )));
305 }
306 values.push(owned_stored_value_from_archived(stored_opt)?);
307 }
308 let stored_row = StoredRow { values };
309 Ok(stored_row.encode().to_vec())
310}
311
312pub(crate) fn encode_non_pk_cell_value(
313 value: &CellValue,
314 col: &ResolvedColumn,
315) -> DataFusionResult<Option<StoredValue>> {
316 match (col.kind, value) {
317 (_, CellValue::Null) => {
318 if !col.nullable {
319 return Err(DataFusionError::Execution(format!(
320 "column '{}' is not nullable but received NULL",
321 col.name
322 )));
323 }
324 Ok(None)
325 }
326 (ColumnKind::Int64, CellValue::Int64(v)) => Ok(Some(StoredValue::Int64(*v))),
327 (ColumnKind::UInt64, CellValue::UInt64(v)) => Ok(Some(StoredValue::UInt64(*v))),
328 (ColumnKind::Float64, CellValue::Float64(v)) => Ok(Some(StoredValue::Float64(*v))),
329 (ColumnKind::Boolean, CellValue::Boolean(v)) => Ok(Some(StoredValue::Boolean(*v))),
330 (ColumnKind::Date32, CellValue::Date32(v)) => Ok(Some(StoredValue::Int64(*v as i64))),
331 (ColumnKind::Date64, CellValue::Date64(v)) => Ok(Some(StoredValue::Int64(*v))),
332 (ColumnKind::Timestamp, CellValue::Timestamp(v)) => Ok(Some(StoredValue::Int64(*v))),
333 (ColumnKind::Decimal128, CellValue::Decimal128(v)) => {
334 Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
335 }
336 (ColumnKind::Decimal256, CellValue::Decimal256(v)) => {
337 Ok(Some(StoredValue::Bytes(v.to_le_bytes().to_vec())))
338 }
339 (ColumnKind::Utf8, CellValue::Utf8(v)) => Ok(Some(StoredValue::Utf8(v.clone()))),
340 (ColumnKind::FixedSizeBinary(n), CellValue::FixedBinary(v)) => {
341 if v.len() != n {
342 return Err(DataFusionError::Execution(format!(
343 "column '{}' expects FixedSizeBinary({n}) value with exactly {n} bytes, got {}",
344 col.name,
345 v.len()
346 )));
347 }
348 Ok(Some(StoredValue::Bytes(v.clone())))
349 }
350 (ColumnKind::List(elem), CellValue::List(items)) => {
351 let mut stored_items = Vec::with_capacity(items.len());
352 for item in items {
353 let stored_item = match (elem, item) {
354 (ListElementKind::Int64, CellValue::Int64(v)) => StoredValue::Int64(*v),
355 (ListElementKind::Float64, CellValue::Float64(v)) => StoredValue::Float64(*v),
356 (ListElementKind::Boolean, CellValue::Boolean(v)) => StoredValue::Boolean(*v),
357 (ListElementKind::Utf8, CellValue::Utf8(v)) => StoredValue::Utf8(v.clone()),
358 _ => {
359 return Err(DataFusionError::Execution(format!(
360 "column '{}' list element type mismatch (expected {:?}, got {:?})",
361 col.name, elem, item
362 )))
363 }
364 };
365 stored_items.push(stored_item);
366 }
367 Ok(Some(StoredValue::List(stored_items)))
368 }
369 _ => Err(DataFusionError::Execution(format!(
370 "column '{}' type mismatch (expected {:?}, got {:?})",
371 col.name, col.kind, value
372 ))),
373 }
374}
375
376pub(crate) fn owned_stored_value_from_archived(
377 stored_opt: Option<&StoredValue>,
378) -> DataFusionResult<Option<StoredValue>> {
379 let Some(stored) = stored_opt else {
380 return Ok(None);
381 };
382 Ok(Some(match stored {
383 StoredValue::Int64(v) => StoredValue::Int64(*v),
384 StoredValue::UInt64(v) => StoredValue::UInt64(*v),
385 StoredValue::Float64(v) => StoredValue::Float64(*v),
386 StoredValue::Boolean(v) => StoredValue::Boolean(*v),
387 StoredValue::Utf8(v) => StoredValue::Utf8(v.as_str().to_string()),
388 StoredValue::Bytes(v) => StoredValue::Bytes(v.as_slice().to_vec()),
389 StoredValue::List(items) => {
390 let mut out = Vec::with_capacity(items.len());
391 for item in items.iter() {
392 let owned = owned_stored_value_from_archived(Some(item))?.ok_or_else(|| {
393 DataFusionError::Execution(
394 "archived list item unexpectedly decoded as NULL".to_string(),
395 )
396 })?;
397 out.push(owned);
398 }
399 StoredValue::List(out)
400 }
401 }))
402}
403
404#[cfg(test)]
405pub(crate) fn decode_base_row(
406 pk_values: Vec<CellValue>,
407 value: &[u8],
408 model: &TableModel,
409) -> Option<KvRow> {
410 if pk_values.len() != model.primary_key_indices.len() {
411 return None;
412 }
413 let archived = decode_stored_row(value).ok()?;
414 if archived.values.len() != model.columns.len() {
415 return None;
416 }
417 let mut values = vec![CellValue::Null; model.columns.len()];
418 for (pk_pos, pk_value) in pk_values.into_iter().enumerate() {
419 let col_idx = *model.primary_key_indices.get(pk_pos)?;
420 values[col_idx] = pk_value;
421 }
422
423 for (idx, col) in model.columns.iter().enumerate() {
424 if model.is_pk_column(idx) {
425 continue;
426 }
427 let Some(stored) = archived.values[idx].as_ref() else {
428 if col.nullable {
429 continue;
430 }
431 return None;
432 };
433 values[idx] = match (col.kind, stored) {
434 (ColumnKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
435 (ColumnKind::UInt64, StoredValue::UInt64(v)) => CellValue::UInt64(*v),
436 (ColumnKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
437 (ColumnKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
438 (ColumnKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
439 (ColumnKind::Date32, StoredValue::Int64(v)) => CellValue::Date32(*v as i32),
440 (ColumnKind::Date64, StoredValue::Int64(v)) => CellValue::Date64(*v),
441 (ColumnKind::Timestamp, StoredValue::Int64(v)) => CellValue::Timestamp(*v),
442 (ColumnKind::Decimal128, StoredValue::Bytes(bytes)) => {
443 let arr: [u8; 16] = bytes.as_slice().try_into().ok()?;
444 CellValue::Decimal128(i128::from_le_bytes(arr))
445 }
446 (ColumnKind::Decimal256, StoredValue::Bytes(bytes)) => {
447 let arr: [u8; 32] = bytes.as_slice().try_into().ok()?;
448 CellValue::Decimal256(i256::from_le_bytes(arr))
449 }
450 (ColumnKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
451 (ColumnKind::FixedSizeBinary(_), StoredValue::Bytes(v)) => {
452 CellValue::FixedBinary(v.as_slice().to_vec())
453 }
454 (ColumnKind::List(elem), StoredValue::List(items)) => {
455 let mut cells = Vec::with_capacity(items.len());
456 for item in items.iter() {
457 cells.push(decode_list_element_archived(elem, item)?);
458 }
459 CellValue::List(cells)
460 }
461 _ => return None,
462 };
463 }
464 Some(KvRow { values })
465}
466
467pub(crate) fn decode_list_element_archived(
468 elem: ListElementKind,
469 stored: &StoredValue,
470) -> Option<CellValue> {
471 Some(match (elem, stored) {
472 (ListElementKind::Int64, StoredValue::Int64(v)) => CellValue::Int64(*v),
473 (ListElementKind::Float64, StoredValue::Float64(v)) => CellValue::Float64(*v),
474 (ListElementKind::Float64, StoredValue::Int64(v)) => CellValue::Float64(*v as f64),
475 (ListElementKind::Boolean, StoredValue::Boolean(v)) => CellValue::Boolean(*v),
476 (ListElementKind::Utf8, StoredValue::Utf8(v)) => CellValue::Utf8(v.as_str().to_string()),
477 _ => return None,
478 })
479}
480
481pub(crate) fn required_column<'a>(
482 batch: &'a RecordBatch,
483 name: &str,
484) -> DataFusionResult<&'a ArrayRef> {
485 batch.column_by_name(name).ok_or_else(|| {
486 DataFusionError::Execution(format!("insert batch is missing required column '{name}'"))
487 })
488}
489
490pub(crate) fn i64_value_at(
491 array: &ArrayRef,
492 row_idx: usize,
493 column_name: &str,
494) -> DataFusionResult<i64> {
495 if array.is_null(row_idx) {
496 return Err(DataFusionError::Execution(format!(
497 "column '{column_name}' cannot be NULL for kv table insert"
498 )));
499 }
500 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
501 DataFusionError::Execution(format!(
502 "column '{column_name}' expected Int64, got {:?}",
503 array.data_type()
504 ))
505 })?;
506 Ok(values.value(row_idx))
507}
508
509pub(crate) fn string_value_at(
510 array: &ArrayRef,
511 row_idx: usize,
512 column_name: &str,
513) -> DataFusionResult<String> {
514 if array.is_null(row_idx) {
515 return Err(DataFusionError::Execution(format!(
516 "column '{column_name}' cannot be NULL for kv table insert"
517 )));
518 }
519 if let Some(values) = array.as_any().downcast_ref::<StringArray>() {
520 return Ok(values.value(row_idx).to_string());
521 }
522 if let Some(values) = array.as_any().downcast_ref::<LargeStringArray>() {
523 return Ok(values.value(row_idx).to_string());
524 }
525 if let Some(values) = array.as_any().downcast_ref::<StringViewArray>() {
526 return Ok(values.value(row_idx).to_string());
527 }
528 Err(DataFusionError::Execution(format!(
529 "column '{column_name}' expected string, got {:?}",
530 array.data_type()
531 )))
532}
533
534pub(crate) fn f64_value_at(
535 array: &ArrayRef,
536 row_idx: usize,
537 column_name: &str,
538) -> DataFusionResult<f64> {
539 if array.is_null(row_idx) {
540 return Err(DataFusionError::Execution(format!(
541 "column '{column_name}' cannot be NULL for kv table insert"
542 )));
543 }
544 let values = array
545 .as_any()
546 .downcast_ref::<Float64Array>()
547 .ok_or_else(|| {
548 DataFusionError::Execution(format!(
549 "column '{column_name}' expected Float64, got {:?}",
550 array.data_type()
551 ))
552 })?;
553 Ok(values.value(row_idx))
554}
555
556pub(crate) fn bool_value_at(
557 array: &ArrayRef,
558 row_idx: usize,
559 column_name: &str,
560) -> DataFusionResult<bool> {
561 if array.is_null(row_idx) {
562 return Err(DataFusionError::Execution(format!(
563 "column '{column_name}' cannot be NULL for kv table insert"
564 )));
565 }
566 let values = array
567 .as_any()
568 .downcast_ref::<BooleanArray>()
569 .ok_or_else(|| {
570 DataFusionError::Execution(format!(
571 "column '{column_name}' expected Boolean, got {:?}",
572 array.data_type()
573 ))
574 })?;
575 Ok(values.value(row_idx))
576}
577
578pub(crate) fn date32_value_at(
579 array: &ArrayRef,
580 row_idx: usize,
581 column_name: &str,
582) -> DataFusionResult<i32> {
583 if array.is_null(row_idx) {
584 return Err(DataFusionError::Execution(format!(
585 "column '{column_name}' cannot be NULL for kv table insert"
586 )));
587 }
588 let values = array
589 .as_any()
590 .downcast_ref::<Date32Array>()
591 .ok_or_else(|| {
592 DataFusionError::Execution(format!(
593 "column '{column_name}' expected Date32, got {:?}",
594 array.data_type()
595 ))
596 })?;
597 Ok(values.value(row_idx))
598}
599
600pub(crate) fn date64_value_at(
601 array: &ArrayRef,
602 row_idx: usize,
603 column_name: &str,
604) -> DataFusionResult<i64> {
605 if array.is_null(row_idx) {
606 return Err(DataFusionError::Execution(format!(
607 "column '{column_name}' cannot be NULL for kv table insert"
608 )));
609 }
610 let values = array
611 .as_any()
612 .downcast_ref::<Date64Array>()
613 .ok_or_else(|| {
614 DataFusionError::Execution(format!(
615 "column '{column_name}' expected Date64, got {:?}",
616 array.data_type()
617 ))
618 })?;
619 Ok(values.value(row_idx))
620}
621
622pub(crate) fn timestamp_micros_value_at(
623 array: &ArrayRef,
624 row_idx: usize,
625 column_name: &str,
626) -> DataFusionResult<i64> {
627 if array.is_null(row_idx) {
628 return Err(DataFusionError::Execution(format!(
629 "column '{column_name}' cannot be NULL for kv table insert"
630 )));
631 }
632 let values = array
633 .as_any()
634 .downcast_ref::<TimestampMicrosecondArray>()
635 .ok_or_else(|| {
636 DataFusionError::Execution(format!(
637 "column '{column_name}' expected TimestampMicrosecond, got {:?}",
638 array.data_type()
639 ))
640 })?;
641 Ok(values.value(row_idx))
642}
643
644pub(crate) fn decimal128_value_at(
645 array: &ArrayRef,
646 row_idx: usize,
647 column_name: &str,
648) -> DataFusionResult<i128> {
649 if array.is_null(row_idx) {
650 return Err(DataFusionError::Execution(format!(
651 "column '{column_name}' cannot be NULL for kv table insert"
652 )));
653 }
654 let values = array
655 .as_any()
656 .downcast_ref::<Decimal128Array>()
657 .ok_or_else(|| {
658 DataFusionError::Execution(format!(
659 "column '{column_name}' expected Decimal128, got {:?}",
660 array.data_type()
661 ))
662 })?;
663 Ok(values.value(row_idx))
664}
665
666pub(crate) fn uint64_value_at(
667 array: &ArrayRef,
668 row_idx: usize,
669 column_name: &str,
670) -> DataFusionResult<u64> {
671 if array.is_null(row_idx) {
672 return Err(DataFusionError::Execution(format!(
673 "column '{column_name}' cannot be NULL for kv table insert"
674 )));
675 }
676 let values = array
677 .as_any()
678 .downcast_ref::<UInt64Array>()
679 .ok_or_else(|| {
680 DataFusionError::Execution(format!(
681 "column '{column_name}' expected UInt64, got {:?}",
682 array.data_type()
683 ))
684 })?;
685 Ok(values.value(row_idx))
686}
687
688pub(crate) fn decimal256_value_at(
689 array: &ArrayRef,
690 row_idx: usize,
691 column_name: &str,
692) -> DataFusionResult<i256> {
693 if array.is_null(row_idx) {
694 return Err(DataFusionError::Execution(format!(
695 "column '{column_name}' cannot be NULL for kv table insert"
696 )));
697 }
698 let values = array
699 .as_any()
700 .downcast_ref::<Decimal256Array>()
701 .ok_or_else(|| {
702 DataFusionError::Execution(format!(
703 "column '{column_name}' expected Decimal256, got {:?}",
704 array.data_type()
705 ))
706 })?;
707 Ok(values.value(row_idx))
708}
709
710pub(crate) fn fixed_binary_value_at(
711 array: &ArrayRef,
712 row_idx: usize,
713 column_name: &str,
714) -> DataFusionResult<Vec<u8>> {
715 if array.is_null(row_idx) {
716 return Err(DataFusionError::Execution(format!(
717 "column '{column_name}' cannot be NULL for kv table insert"
718 )));
719 }
720 let values = array
721 .as_any()
722 .downcast_ref::<FixedSizeBinaryArray>()
723 .ok_or_else(|| {
724 DataFusionError::Execution(format!(
725 "column '{column_name}' expected FixedSizeBinary, got {:?}",
726 array.data_type()
727 ))
728 })?;
729 Ok(values.value(row_idx).to_vec())
730}
731
732pub(crate) fn list_value_at(
733 array: &ArrayRef,
734 row_idx: usize,
735 column_name: &str,
736 elem: ListElementKind,
737) -> DataFusionResult<CellValue> {
738 if array.is_null(row_idx) {
739 return Err(DataFusionError::Execution(format!(
740 "column '{column_name}' cannot be NULL for kv table insert"
741 )));
742 }
743 let list_array = array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
744 DataFusionError::Execution(format!(
745 "column '{column_name}' expected List, got {:?}",
746 array.data_type()
747 ))
748 })?;
749 let child = list_array.value(row_idx);
750 let mut items = Vec::with_capacity(child.len());
751 for i in 0..child.len() {
752 let item = match elem {
753 ListElementKind::Int64 => {
754 let arr = child.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
755 DataFusionError::Execution(format!(
756 "column '{column_name}' list element expected Int64"
757 ))
758 })?;
759 CellValue::Int64(arr.value(i))
760 }
761 ListElementKind::Float64 => {
762 let arr = child
763 .as_any()
764 .downcast_ref::<Float64Array>()
765 .ok_or_else(|| {
766 DataFusionError::Execution(format!(
767 "column '{column_name}' list element expected Float64"
768 ))
769 })?;
770 CellValue::Float64(arr.value(i))
771 }
772 ListElementKind::Boolean => {
773 let arr = child
774 .as_any()
775 .downcast_ref::<BooleanArray>()
776 .ok_or_else(|| {
777 DataFusionError::Execution(format!(
778 "column '{column_name}' list element expected Boolean"
779 ))
780 })?;
781 CellValue::Boolean(arr.value(i))
782 }
783 ListElementKind::Utf8 => {
784 let arr = child
785 .as_any()
786 .downcast_ref::<StringArray>()
787 .ok_or_else(|| {
788 DataFusionError::Execution(format!(
789 "column '{column_name}' list element expected Utf8"
790 ))
791 })?;
792 CellValue::Utf8(arr.value(i).to_string())
793 }
794 };
795 items.push(item);
796 }
797 Ok(CellValue::List(items))
798}
799
800pub(crate) async fn flush_ingest_batch(
801 client: &StoreClient,
802 keys: &mut Vec<Key>,
803 values: &mut Vec<Vec<u8>>,
804) -> DataFusionResult<u64> {
805 if keys.is_empty() {
806 return Ok(0);
807 }
808 let refs: Vec<(&Key, &[u8])> = keys
809 .iter()
810 .zip(values.iter())
811 .map(|(key, value)| (key, value.as_slice()))
812 .collect();
813 let token = client
814 .put(&refs)
815 .await
816 .map_err(|e| DataFusionError::External(Box::new(e)))?;
817 keys.clear();
818 values.clear();
819 Ok(token)
820}