1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142pub(crate) fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let value_hint = raw_bytes_hint(fields[1].metadata());
156 let key_col = structs.column(0);
157 let val_col = structs.column(1);
158 let mut map = HashMap::new();
159 for i in 0..structs.len() {
160 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
161 map.insert(k, arrow_to_value(val_col.as_ref(), i, value_hint));
162 }
163 }
164 Some(map)
165}
166
167fn array_to_value_list(arr: &ArrayRef, elem_type: Option<&DataType>) -> Vec<Value> {
172 (0..arr.len())
173 .map(|i| arrow_to_value(arr.as_ref(), i, elem_type))
174 .collect()
175}
176
177fn raw_bytes_hint(metadata: &HashMap<String, String>) -> Option<&'static DataType> {
181 if metadata.get("uni_raw_bytes").is_some_and(|v| v == "true") {
182 Some(&DataType::Bytes)
183 } else {
184 None
185 }
186}
187
188fn list_child_bytes_hint(dt: &ArrowDataType) -> Option<&'static DataType> {
190 match dt {
191 ArrowDataType::List(f)
192 | ArrowDataType::LargeList(f)
193 | ArrowDataType::FixedSizeList(f, _) => raw_bytes_hint(f.metadata()),
194 _ => None,
195 }
196}
197
198pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
205 if col.is_null(row) {
206 return Value::Null;
207 }
208
209 if let Some(dt) = data_type {
211 match dt {
212 DataType::DateTime => {
213 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
215 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
216 struct_arr.column_by_name("nanos_since_epoch"),
217 struct_arr.column_by_name("offset_seconds"),
218 struct_arr.column_by_name("timezone_name"),
219 )
220 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
221 nanos_col
222 .as_any()
223 .downcast_ref::<TimestampNanosecondArray>(),
224 offset_col.as_any().downcast_ref::<Int32Array>(),
225 tz_col.as_any().downcast_ref::<StringArray>(),
226 )
227 {
228 if nanos_arr.is_null(row) {
229 return Value::Null;
230 }
231 let nanos = nanos_arr.value(row);
232 if offset_arr.is_null(row) {
233 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
235 nanos_since_epoch: nanos,
236 });
237 }
238 let offset = offset_arr.value(row);
239 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
240 return Value::Temporal(uni_common::TemporalValue::DateTime {
241 nanos_since_epoch: nanos,
242 offset_seconds: offset,
243 timezone_name: tz_name,
244 });
245 }
246 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
248 let nanos = ts.value(row);
249 let tz_name = ts.timezone().map(|s| s.to_string());
250 return Value::Temporal(uni_common::TemporalValue::DateTime {
251 nanos_since_epoch: nanos,
252 offset_seconds: 0,
253 timezone_name: tz_name,
254 });
255 }
256 }
257 DataType::Time => {
258 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
260 && let (Some(nanos_col), Some(offset_col)) = (
261 struct_arr.column_by_name("nanos_since_midnight"),
262 struct_arr.column_by_name("offset_seconds"),
263 )
264 && let (Some(nanos_arr), Some(offset_arr)) = (
265 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
266 offset_col.as_any().downcast_ref::<Int32Array>(),
267 )
268 {
269 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
271 return Value::Null;
272 }
273 let nanos = nanos_arr.value(row);
274 let offset = offset_arr.value(row);
275 return Value::Temporal(uni_common::TemporalValue::Time {
276 nanos_since_midnight: nanos,
277 offset_seconds: offset,
278 });
279 }
280 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
282 let nanos = t.value(row);
283 return Value::Temporal(uni_common::TemporalValue::Time {
284 nanos_since_midnight: nanos,
285 offset_seconds: 0,
286 });
287 }
288 }
289 DataType::Bytes => {
290 let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
291 log::warn!("Bytes column is not LargeBinaryArray");
292 return Value::Null;
293 };
294 if arr.is_null(row) {
295 return Value::Null;
296 }
297 return Value::Bytes(arr.value(row).to_vec());
298 }
299 DataType::Btic => {
300 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
301 log::warn!("BTIC column is not FixedSizeBinaryArray");
302 return Value::Null;
303 };
304 let bytes = fsb.value(row);
305 return match uni_btic::encode::decode_slice(bytes) {
306 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
307 lo: btic.lo(),
308 hi: btic.hi(),
309 meta: btic.meta(),
310 }),
311 Err(e) => {
312 log::warn!("BTIC decode error: {}", e);
313 Value::Null
314 }
315 };
316 }
317 _ => {}
318 }
319 }
320
321 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
323 return Value::String(s.value(row).to_string());
324 }
325
326 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
328 return Value::Int(u.value(row) as i64);
329 }
330 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
331 return Value::Int(i.value(row));
332 }
333 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
334 return Value::Int(i.value(row) as i64);
335 }
336
337 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
339 return Value::Float(f.value(row));
340 }
341 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
342 return Value::Float(f.value(row) as f64);
343 }
344
345 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
347 return Value::Bool(b.value(row));
348 }
349
350 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
352 let elem_hint = list_child_bytes_hint(list.data_type());
353 return Value::List(array_to_value_list(&list.value(row), elem_hint));
354 }
355
356 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
358 let arr = list.value(row);
359
360 if let Some(obj) = try_reconstruct_map(&arr) {
362 return Value::Map(obj);
363 }
364
365 let elem_hint = list_child_bytes_hint(list.data_type());
366 return Value::List(array_to_value_list(&arr, elem_hint));
367 }
368
369 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
371 let elem_hint = list_child_bytes_hint(list.data_type());
372 return Value::List(array_to_value_list(&list.value(row), elem_hint));
373 }
374
375 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
377 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
378
379 if field_names.contains(&"nanos_since_epoch")
381 && field_names.contains(&"offset_seconds")
382 && field_names.contains(&"timezone_name")
383 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
384 s.column_by_name("nanos_since_epoch"),
385 s.column_by_name("offset_seconds"),
386 s.column_by_name("timezone_name"),
387 )
388 {
389 let nanos_opt = nanos_col
391 .as_any()
392 .downcast_ref::<TimestampNanosecondArray>()
393 .map(|a| {
394 if a.is_null(row) {
395 None
396 } else {
397 Some(a.value(row))
398 }
399 })
400 .or_else(|| {
401 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
402 if a.is_null(row) {
403 None
404 } else {
405 Some(a.value(row))
406 }
407 })
408 });
409 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
410 if a.is_null(row) {
411 None
412 } else {
413 Some(a.value(row))
414 }
415 });
416
417 if let Some(Some(nanos)) = nanos_opt {
418 match offset_opt {
419 Some(Some(offset)) => {
420 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
421 if a.is_null(row) {
422 None
423 } else {
424 Some(a.value(row).to_string())
425 }
426 });
427 return Value::Temporal(uni_common::TemporalValue::DateTime {
428 nanos_since_epoch: nanos,
429 offset_seconds: offset,
430 timezone_name: tz_name,
431 });
432 }
433 _ => {
434 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
436 nanos_since_epoch: nanos,
437 });
438 }
439 }
440 }
441 }
442
443 if field_names.contains(&"nanos_since_midnight")
445 && field_names.contains(&"offset_seconds")
446 && let (Some(nanos_col), Some(offset_col)) = (
447 s.column_by_name("nanos_since_midnight"),
448 s.column_by_name("offset_seconds"),
449 )
450 {
451 let nanos_opt = nanos_col
453 .as_any()
454 .downcast_ref::<Time64NanosecondArray>()
455 .map(|a| {
456 if a.is_null(row) {
457 None
458 } else {
459 Some(a.value(row))
460 }
461 })
462 .or_else(|| {
463 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
464 if a.is_null(row) {
465 None
466 } else {
467 Some(a.value(row))
468 }
469 })
470 });
471 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
472 if a.is_null(row) {
473 None
474 } else {
475 Some(a.value(row))
476 }
477 });
478
479 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
480 return Value::Temporal(uni_common::TemporalValue::Time {
481 nanos_since_midnight: nanos,
482 offset_seconds: offset,
483 });
484 }
485 }
486
487 let mut map = HashMap::new();
489 for (field, child) in s.fields().iter().zip(s.columns()) {
490 map.insert(
491 field.name().clone(),
492 arrow_to_value(child.as_ref(), row, None),
493 );
494 }
495 return Value::Map(map);
496 }
497
498 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
500 let days = d.value(row);
501 return Value::Temporal(uni_common::TemporalValue::Date {
502 days_since_epoch: days,
503 });
504 }
505
506 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
508 let nanos = ts.value(row);
509 return match ts.timezone() {
510 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
511 nanos_since_epoch: nanos,
512 offset_seconds: 0,
513 timezone_name: Some(tz.to_string()),
514 }),
515 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
516 nanos_since_epoch: nanos,
517 }),
518 };
519 }
520
521 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
523 let nanos = t.value(row);
524 return Value::Temporal(uni_common::TemporalValue::LocalTime {
525 nanos_since_midnight: nanos,
526 });
527 }
528
529 if let Some(t) = col
531 .as_any()
532 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
533 {
534 let micros = t.value(row);
535 return Value::Temporal(uni_common::TemporalValue::LocalTime {
536 nanos_since_midnight: micros * 1000,
537 });
538 }
539
540 if let Some(d) = col
542 .as_any()
543 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
544 {
545 let micros = d.value(row);
546 let total_nanos = micros * 1000;
547 let seconds = total_nanos / 1_000_000_000;
548 let remaining_nanos = total_nanos % 1_000_000_000;
549 return Value::Temporal(uni_common::TemporalValue::Duration {
550 months: 0,
551 days: 0,
552 nanos: seconds * 1_000_000_000 + remaining_nanos,
553 });
554 }
555
556 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
558 let val = interval.value(row);
559 return Value::Temporal(uni_common::TemporalValue::Duration {
560 months: val.months as i64,
561 days: val.days as i64,
562 nanos: val.nanoseconds,
563 });
564 }
565
566 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
568 let bytes = b.value(row);
569 if bytes.is_empty() {
570 return Value::Null;
571 }
572 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
573 eprintln!("CypherValue decode error: {}", e);
574 Value::Null
575 });
576 }
577
578 if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
580 && fsb.value_length() == 24
581 {
582 let bytes = fsb.value(row);
583 return match uni_btic::encode::decode_slice(bytes) {
584 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
585 lo: btic.lo(),
586 hi: btic.hi(),
587 meta: btic.meta(),
588 }),
589 Err(e) => {
590 log::warn!("BTIC decode error: {}", e);
591 Value::Null
592 }
593 };
594 }
595
596 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
598 let bytes = b.value(row);
599 return Crdt::from_msgpack(bytes)
600 .ok()
601 .and_then(|crdt| serde_json::to_value(&crdt).ok())
602 .map(Value::from)
603 .unwrap_or(Value::Null);
604 }
605
606 Value::Null
608}
609
610fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
611 let mut builder = UInt64Builder::with_capacity(values.len());
612 for v in values {
613 if let Some(n) = v.as_u64() {
614 builder.append_value(n);
615 } else {
616 builder.append_null();
617 }
618 }
619 Arc::new(builder.finish())
620}
621
622fn values_to_int64_array(values: &[Value]) -> ArrayRef {
623 let mut builder = Int64Builder::with_capacity(values.len());
624 for v in values {
625 if let Some(n) = v.as_i64() {
626 builder.append_value(n);
627 } else {
628 builder.append_null();
629 }
630 }
631 Arc::new(builder.finish())
632}
633
634fn values_to_int32_array(values: &[Value]) -> ArrayRef {
635 let mut builder = Int32Builder::with_capacity(values.len());
636 for v in values {
637 if let Some(n) = v.as_i64() {
638 builder.append_value(n as i32);
639 } else {
640 builder.append_null();
641 }
642 }
643 Arc::new(builder.finish())
644}
645
646fn values_to_string_array(values: &[Value]) -> ArrayRef {
647 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
648 for v in values {
649 if let Some(s) = v.as_str() {
650 builder.append_value(s);
651 } else if v.is_null() {
652 builder.append_null();
653 } else {
654 builder.append_value(v.to_string());
655 }
656 }
657 Arc::new(builder.finish())
658}
659
660fn values_to_bool_array(values: &[Value]) -> ArrayRef {
661 let mut builder = BooleanBuilder::with_capacity(values.len());
662 for v in values {
663 if let Some(b) = v.as_bool() {
664 builder.append_value(b);
665 } else {
666 builder.append_null();
667 }
668 }
669 Arc::new(builder.finish())
670}
671
672fn values_to_float32_array(values: &[Value]) -> ArrayRef {
673 let mut builder = Float32Builder::with_capacity(values.len());
674 for v in values {
675 if let Some(n) = v.as_f64() {
676 builder.append_value(n as f32);
677 } else {
678 builder.append_null();
679 }
680 }
681 Arc::new(builder.finish())
682}
683
684fn values_to_float64_array(values: &[Value]) -> ArrayRef {
685 let mut builder = Float64Builder::with_capacity(values.len());
686 for v in values {
687 if let Some(n) = v.as_f64() {
688 builder.append_value(n);
689 } else {
690 builder.append_null();
691 }
692 }
693 Arc::new(builder.finish())
694}
695
696fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
697 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
698 for v in values {
699 match v {
700 Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
701 let btic = uni_btic::Btic::new(*lo, *hi, *meta)
702 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
703 builder.append_value(uni_btic::encode::encode(&btic))?;
704 }
705 Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
706 Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
707 Err(_) => builder.append_null(),
708 },
709 Value::List(bytes) => {
710 let b: Vec<u8> = bytes
711 .iter()
712 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
713 .collect();
714 if b.len() as i32 == size {
715 builder.append_value(&b)?;
716 } else {
717 builder.append_null();
718 }
719 }
720 _ => builder.append_null(),
721 }
722 }
723 Ok(Arc::new(builder.finish()))
724}
725
726pub fn extract_vector_f32_values(
741 val: Option<&Value>,
742 is_deleted: bool,
743 dimensions: usize,
744) -> (Vec<f32>, bool) {
745 let zeros = || vec![0.0_f32; dimensions];
746
747 if is_deleted {
749 return (zeros(), true);
750 }
751
752 match val {
753 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
755 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
758 let values: Vec<f32> = arr
759 .iter()
760 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
761 .collect();
762 (values, true)
763 }
764 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
767}
768
769fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
770 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
771 for v in values {
772 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
773 for val in vals {
774 builder.values().append_value(val);
775 }
776 builder.append(valid);
777 }
778 Arc::new(builder.finish())
779}
780
781fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
782 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
783 for v in values {
784 if v.is_null() {
785 builder.append_null();
786 } else if let Value::Temporal(tv) = v {
787 match tv {
788 uni_common::TemporalValue::DateTime {
789 nanos_since_epoch, ..
790 }
791 | uni_common::TemporalValue::LocalDateTime {
792 nanos_since_epoch, ..
793 } => builder.append_value(*nanos_since_epoch),
794 _ => builder.append_null(),
795 }
796 } else if let Some(n) = v.as_i64() {
797 builder.append_value(n);
798 } else if let Some(s) = v.as_str() {
799 match parse_datetime_to_nanos(s) {
800 Some(nanos) => builder.append_value(nanos),
801 None => builder.append_null(),
802 }
803 } else {
804 builder.append_null();
805 }
806 }
807
808 let arr = builder.finish();
809 if let Some(tz) = tz {
810 Arc::new(arr.with_timezone(tz.as_ref()))
811 } else {
812 Arc::new(arr)
813 }
814}
815
816fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
821 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
822 let mut offset_builder = Int32Builder::with_capacity(values.len());
823 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
824 let mut null_buffer = BooleanBufferBuilder::new(values.len());
825
826 for v in values {
827 match v {
828 Value::Temporal(uni_common::TemporalValue::DateTime {
829 nanos_since_epoch,
830 offset_seconds,
831 timezone_name,
832 }) => {
833 nanos_builder.append_value(*nanos_since_epoch);
834 offset_builder.append_value(*offset_seconds);
835 tz_builder.append_option(timezone_name.as_deref());
836 null_buffer.append(true);
837 }
838 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
839 nanos_builder.append_value(*nanos_since_epoch);
840 offset_builder.append_null();
841 tz_builder.append_null();
842 null_buffer.append(true);
843 }
844 _ => {
845 nanos_builder.append_null();
846 offset_builder.append_null();
847 tz_builder.append_null();
848 null_buffer.append(false);
849 }
850 }
851 }
852
853 let struct_arr = StructArray::new(
854 schema::datetime_struct_fields(),
855 vec![
856 Arc::new(nanos_builder.finish()) as ArrayRef,
857 Arc::new(offset_builder.finish()) as ArrayRef,
858 Arc::new(tz_builder.finish()) as ArrayRef,
859 ],
860 Some(null_buffer.finish().into()),
861 );
862 Arc::new(struct_arr)
863}
864
865fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
870 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
871 let mut offset_builder = Int32Builder::with_capacity(values.len());
872 let mut null_buffer = BooleanBufferBuilder::new(values.len());
873
874 for v in values {
875 match v {
876 Value::Temporal(uni_common::TemporalValue::Time {
877 nanos_since_midnight,
878 offset_seconds,
879 }) => {
880 nanos_builder.append_value(*nanos_since_midnight);
881 offset_builder.append_value(*offset_seconds);
882 null_buffer.append(true);
883 }
884 Value::Temporal(uni_common::TemporalValue::LocalTime {
885 nanos_since_midnight,
886 }) => {
887 nanos_builder.append_value(*nanos_since_midnight);
888 offset_builder.append_null();
889 null_buffer.append(true);
890 }
891 _ => {
892 nanos_builder.append_null();
893 offset_builder.append_null();
894 null_buffer.append(false);
895 }
896 }
897 }
898
899 let struct_arr = StructArray::new(
900 schema::time_struct_fields(),
901 vec![
902 Arc::new(nanos_builder.finish()) as ArrayRef,
903 Arc::new(offset_builder.finish()) as ArrayRef,
904 ],
905 Some(null_buffer.finish().into()),
906 );
907 Arc::new(struct_arr)
908}
909
910fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
911 let mut builder =
912 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
913 for v in values {
914 if v.is_null() {
915 builder.append_null();
916 } else {
917 let cv_bytes = uni_common::cypher_value_codec::encode(v);
919 builder.append_value(&cv_bytes);
920 }
921 }
922 Arc::new(builder.finish())
923}
924
925pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
927 match dt {
928 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
929 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
930 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
931 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
932 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
933 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
934 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
935 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
936 ArrowDataType::FixedSizeList(inner, size) => {
937 if inner.data_type() == &ArrowDataType::Float32 {
938 Ok(values_to_fixed_size_list_f32_array(values, *size))
939 } else {
940 Err(anyhow!("Unsupported FixedSizeList inner type"))
941 }
942 }
943 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
944 Ok(values_to_timestamp_array(values, tz.as_ref()))
945 }
946 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
947 Ok(values_to_timestamp_array(values, tz.as_ref()))
948 }
949 ArrowDataType::Date32 => {
950 let mut builder = Date32Builder::with_capacity(values.len());
951 for v in values {
952 if v.is_null() {
953 builder.append_null();
954 } else if let Value::Temporal(uni_common::TemporalValue::Date {
955 days_since_epoch,
956 }) = v
957 {
958 builder.append_value(*days_since_epoch);
959 } else if let Some(n) = v.as_i64() {
960 builder.append_value(n as i32);
961 } else {
962 builder.append_null();
963 }
964 }
965 Ok(Arc::new(builder.finish()))
966 }
967 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
968 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
969 for v in values {
970 if v.is_null() {
971 builder.append_null();
972 } else if let Value::Temporal(tv) = v {
973 match tv {
974 uni_common::TemporalValue::LocalTime {
975 nanos_since_midnight,
976 }
977 | uni_common::TemporalValue::Time {
978 nanos_since_midnight,
979 ..
980 } => builder.append_value(*nanos_since_midnight),
981 _ => builder.append_null(),
982 }
983 } else if let Some(n) = v.as_i64() {
984 builder.append_value(n);
985 } else {
986 builder.append_null();
987 }
988 }
989 Ok(Arc::new(builder.finish()))
990 }
991 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
992 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
993 for v in values {
994 if v.is_null() {
995 builder.append_null();
996 } else if let Value::Temporal(tv) = v {
997 match tv {
998 uni_common::TemporalValue::LocalTime {
999 nanos_since_midnight,
1000 }
1001 | uni_common::TemporalValue::Time {
1002 nanos_since_midnight,
1003 ..
1004 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
1006 }
1007 } else if let Some(n) = v.as_i64() {
1008 builder.append_value(n);
1009 } else {
1010 builder.append_null();
1011 }
1012 }
1013 Ok(Arc::new(builder.finish()))
1014 }
1015 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
1016 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
1017 for v in values {
1018 if v.is_null() {
1019 builder.append_null();
1020 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1021 months,
1022 days,
1023 nanos,
1024 }) = v
1025 {
1026 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
1027 months: *months as i32,
1028 days: *days as i32,
1029 nanoseconds: *nanos,
1030 });
1031 } else {
1032 builder.append_null();
1033 }
1034 }
1035 Ok(Arc::new(builder.finish()))
1036 }
1037 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1038 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1039 for v in values {
1040 if v.is_null() {
1041 builder.append_null();
1042 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1043 months,
1044 days,
1045 nanos,
1046 }) = v
1047 {
1048 let total_micros =
1049 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1050 builder.append_value(total_micros);
1051 } else if let Some(n) = v.as_i64() {
1052 builder.append_value(n);
1053 } else {
1054 builder.append_null();
1055 }
1056 }
1057 Ok(Arc::new(builder.finish()))
1058 }
1059 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1060 ArrowDataType::List(field) => {
1061 if field.data_type() == &ArrowDataType::Utf8 {
1062 let mut builder = ListBuilder::new(StringBuilder::new());
1063 for v in values {
1064 if let Value::List(arr) = v {
1065 for item in arr {
1066 if let Some(s) = item.as_str() {
1067 builder.values().append_value(s);
1068 } else {
1069 builder.values().append_null();
1070 }
1071 }
1072 builder.append(true);
1073 } else {
1074 builder.append_null();
1075 }
1076 }
1077 Ok(Arc::new(builder.finish()))
1078 } else {
1079 Err(anyhow!(
1080 "Unsupported List inner type: {:?}",
1081 field.data_type()
1082 ))
1083 }
1084 }
1085 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1086 Ok(values_to_datetime_struct_array(values))
1087 }
1088 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1089 Ok(values_to_time_struct_array(values))
1090 }
1091 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1092 }
1093}
1094
1095pub struct PropertyExtractor<'a> {
1097 data_type: &'a DataType,
1098}
1099
1100impl<'a> PropertyExtractor<'a> {
1101 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1102 Self { data_type }
1103 }
1104
1105 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1108 where
1109 F: Fn(usize) -> Option<&'a Value>,
1110 {
1111 match self.data_type {
1112 DataType::String => self.build_string_column(len, deleted, get_props),
1113 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1114 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1115 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1116 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1117 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1118 DataType::Vector { dimensions } => {
1119 self.build_vector_column(len, deleted, get_props, *dimensions)
1120 }
1121 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1122 DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1123 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1124 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1125 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1126 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1127 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1128 DataType::Date => self.build_date32_column(len, deleted, get_props),
1129 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1130 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1131 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1132 _ => Err(anyhow!(
1133 "Unsupported data type for arrow conversion: {:?}",
1134 self.data_type
1135 )),
1136 }
1137 }
1138
1139 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1140 where
1141 F: Fn(usize) -> Option<&'a Value>,
1142 {
1143 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1144 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1145 let prop = get_props(i);
1146 if let Some(s) = prop.and_then(|v| v.as_str()) {
1147 builder.append_value(s);
1148 } else if let Some(Value::Temporal(tv)) = prop {
1149 builder.append_value(tv.to_string());
1150 } else if is_deleted {
1151 builder.append_value("");
1152 } else {
1153 builder.append_null();
1154 }
1155 }
1156 Ok(Arc::new(builder.finish()))
1157 }
1158
1159 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1160 where
1161 F: Fn(usize) -> Option<&'a Value>,
1162 {
1163 let mut values = Vec::with_capacity(len);
1164 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1165 let val = get_props(i)
1168 .and_then(|v| v.as_i64())
1169 .and_then(|v| i32::try_from(v).ok());
1170 if val.is_none() && is_deleted {
1171 values.push(Some(0));
1172 } else {
1173 values.push(val);
1174 }
1175 }
1176 Ok(Arc::new(Int32Array::from(values)))
1177 }
1178
1179 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1180 where
1181 F: Fn(usize) -> Option<&'a Value>,
1182 {
1183 let mut values = Vec::with_capacity(len);
1184 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1185 let val = get_props(i).and_then(|v| v.as_i64());
1186 if val.is_none() && is_deleted {
1187 values.push(Some(0));
1188 } else {
1189 values.push(val);
1190 }
1191 }
1192 Ok(Arc::new(Int64Array::from(values)))
1193 }
1194
1195 fn build_timestamp_column<F>(
1196 &self,
1197 len: usize,
1198 deleted: &[bool],
1199 get_props: F,
1200 ) -> Result<ArrayRef>
1201 where
1202 F: Fn(usize) -> Option<&'a Value>,
1203 {
1204 let mut values = Vec::with_capacity(len);
1205 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1206 let val = get_props(i);
1207 let ts = if is_deleted || val.is_none() {
1208 Some(0i64)
1209 } else if let Some(Value::Temporal(tv)) = val {
1210 match tv {
1211 uni_common::TemporalValue::DateTime {
1212 nanos_since_epoch, ..
1213 }
1214 | uni_common::TemporalValue::LocalDateTime {
1215 nanos_since_epoch, ..
1216 } => Some(*nanos_since_epoch),
1217 _ => None,
1218 }
1219 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1220 Some(v)
1221 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1222 parse_datetime_to_nanos(s)
1223 } else {
1224 None
1225 };
1226
1227 if is_deleted {
1228 values.push(Some(0));
1229 } else {
1230 values.push(ts);
1231 }
1232 }
1233 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1234 Ok(Arc::new(arr))
1235 }
1236
1237 fn build_datetime_struct_column<F>(
1238 &self,
1239 len: usize,
1240 deleted: &[bool],
1241 get_props: F,
1242 ) -> Result<ArrayRef>
1243 where
1244 F: Fn(usize) -> Option<&'a Value>,
1245 {
1246 let values = self.collect_values_or_null(len, deleted, &get_props);
1247 Ok(values_to_datetime_struct_array(&values))
1248 }
1249
1250 fn build_time_struct_column<F>(
1251 &self,
1252 len: usize,
1253 deleted: &[bool],
1254 get_props: F,
1255 ) -> Result<ArrayRef>
1256 where
1257 F: Fn(usize) -> Option<&'a Value>,
1258 {
1259 let values = self.collect_values_or_null(len, deleted, &get_props);
1260 Ok(values_to_time_struct_array(&values))
1261 }
1262
1263 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1265 where
1266 F: Fn(usize) -> Option<&'a Value>,
1267 {
1268 deleted
1269 .iter()
1270 .enumerate()
1271 .take(len)
1272 .map(|(i, &is_deleted)| {
1273 if is_deleted {
1274 Value::Null
1275 } else {
1276 get_props(i).cloned().unwrap_or(Value::Null)
1277 }
1278 })
1279 .collect()
1280 }
1281
1282 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1283 where
1284 F: Fn(usize) -> Option<&'a Value>,
1285 {
1286 let mut builder = Date32Builder::with_capacity(len);
1287 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1288
1289 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1290 let val = get_props(i);
1291 let days = if is_deleted || val.is_none() {
1292 Some(0)
1293 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1294 days_since_epoch,
1295 })) = val
1296 {
1297 Some(*days_since_epoch)
1298 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1299 i32::try_from(v).ok()
1302 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1303 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1304 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1305 Err(_) => None,
1306 }
1307 } else {
1308 None
1309 };
1310
1311 if is_deleted {
1312 builder.append_value(0);
1313 } else if let Some(v) = days {
1314 builder.append_value(v);
1315 } else {
1316 builder.append_null();
1317 }
1318 }
1319 Ok(Arc::new(builder.finish()))
1320 }
1321
1322 fn build_duration_column<F>(
1323 &self,
1324 len: usize,
1325 deleted: &[bool],
1326 get_props: F,
1327 ) -> Result<ArrayRef>
1328 where
1329 F: Fn(usize) -> Option<&'a Value>,
1330 {
1331 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1333 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1334 let raw_val = get_props(i);
1335 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1336 {
1337 let encoded = uni_common::cypher_value_codec::encode(val);
1338 builder.append_value(&encoded);
1339 } else if is_deleted {
1340 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1341 months: 0,
1342 days: 0,
1343 nanos: 0,
1344 });
1345 let encoded = uni_common::cypher_value_codec::encode(&zero);
1346 builder.append_value(&encoded);
1347 } else {
1348 builder.append_null();
1349 }
1350 }
1351 Ok(Arc::new(builder.finish()))
1352 }
1353
1354 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1355 where
1356 F: Fn(usize) -> Option<&'a Value>,
1357 {
1358 const ENCODED_LEN: i32 = 24;
1359 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1360 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1361 let raw_val = get_props(i);
1362 let btic = match raw_val {
1363 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1364 uni_btic::Btic::new(*lo, *hi, *meta)
1365 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1366 ),
1367 Some(Value::String(s)) => Some(
1368 uni_btic::parse::parse_btic_literal(s)
1369 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1370 ),
1371 _ => None,
1372 };
1373
1374 if let Some(b) = btic {
1375 builder.append_value(uni_btic::encode::encode(&b))?;
1376 } else if is_deleted {
1377 builder.append_value([0u8; ENCODED_LEN as usize])?;
1378 } else {
1379 builder.append_null();
1380 }
1381 }
1382 Ok(Arc::new(builder.finish()))
1383 }
1384
1385 fn build_float32_column<F>(
1386 &self,
1387 len: usize,
1388 deleted: &[bool],
1389 get_props: F,
1390 ) -> Result<ArrayRef>
1391 where
1392 F: Fn(usize) -> Option<&'a Value>,
1393 {
1394 let mut values = Vec::with_capacity(len);
1395 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1396 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1397 if val.is_none() && is_deleted {
1398 values.push(Some(0.0));
1399 } else {
1400 values.push(val);
1401 }
1402 }
1403 Ok(Arc::new(Float32Array::from(values)))
1404 }
1405
1406 fn build_float64_column<F>(
1407 &self,
1408 len: usize,
1409 deleted: &[bool],
1410 get_props: F,
1411 ) -> Result<ArrayRef>
1412 where
1413 F: Fn(usize) -> Option<&'a Value>,
1414 {
1415 let mut values = Vec::with_capacity(len);
1416 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1417 let val = get_props(i).and_then(|v| v.as_f64());
1418 if val.is_none() && is_deleted {
1419 values.push(Some(0.0));
1420 } else {
1421 values.push(val);
1422 }
1423 }
1424 Ok(Arc::new(Float64Array::from(values)))
1425 }
1426
1427 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1428 where
1429 F: Fn(usize) -> Option<&'a Value>,
1430 {
1431 let mut values = Vec::with_capacity(len);
1432 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1433 let val = get_props(i).and_then(|v| v.as_bool());
1434 if val.is_none() && is_deleted {
1435 values.push(Some(false));
1436 } else {
1437 values.push(val);
1438 }
1439 }
1440 Ok(Arc::new(BooleanArray::from(values)))
1441 }
1442
1443 fn build_vector_column<F>(
1444 &self,
1445 len: usize,
1446 deleted: &[bool],
1447 get_props: F,
1448 dimensions: usize,
1449 ) -> Result<ArrayRef>
1450 where
1451 F: Fn(usize) -> Option<&'a Value>,
1452 {
1453 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1454
1455 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1456 let val = get_props(i);
1457 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1458 for v in values {
1459 builder.values().append_value(v);
1460 }
1461 builder.append(valid);
1462 }
1463 Ok(Arc::new(builder.finish()))
1464 }
1465
1466 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1467 where
1468 F: Fn(usize) -> Option<&'a Value>,
1469 {
1470 let null_val = Value::Null;
1471 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1472 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1473 let val = get_props(i);
1474 let uni_val = if val.is_none() && is_deleted {
1475 &null_val
1476 } else {
1477 val.unwrap_or(&null_val)
1478 };
1479 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1481 builder.append_value(&cv_bytes);
1482 }
1483 Ok(Arc::new(builder.finish()))
1484 }
1485
1486 fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1487 where
1488 F: Fn(usize) -> Option<&'a Value>,
1489 {
1490 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1491 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1492 let val = get_props(i);
1493 if let Some(Value::Bytes(b)) = val {
1494 builder.append_value(b);
1495 } else if is_deleted {
1496 builder.append_value(&[][..]);
1497 } else {
1498 builder.append_null();
1499 }
1500 }
1501 Ok(Arc::new(builder.finish()))
1502 }
1503
1504 fn build_list_column<F>(
1505 &self,
1506 len: usize,
1507 deleted: &[bool],
1508 get_props: F,
1509 inner: &DataType,
1510 ) -> Result<ArrayRef>
1511 where
1512 F: Fn(usize) -> Option<&'a Value>,
1513 {
1514 match inner {
1515 DataType::String => {
1516 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1517 if let Some(s) = v.as_str() {
1518 b.append_value(s);
1519 } else {
1520 b.append_null();
1521 }
1522 })
1523 }
1524 DataType::Int64 => {
1525 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1526 if let Some(n) = v.as_i64() {
1527 b.append_value(n);
1528 } else {
1529 b.append_null();
1530 }
1531 })
1532 }
1533 DataType::Float64 => {
1534 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1535 if let Some(f) = v.as_f64() {
1536 b.append_value(f);
1537 } else {
1538 b.append_null();
1539 }
1540 })
1541 }
1542 DataType::Bytes => {
1543 let item_field = Arc::new(
1547 Field::new("item", ArrowDataType::LargeBinary, true)
1548 .with_metadata(schema::raw_bytes_field_metadata()),
1549 );
1550 let mut builder =
1551 ListBuilder::new(LargeBinaryBuilder::new()).with_field(item_field);
1552 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1553 let val_array = get_props(i).and_then(|v| v.as_array());
1554 if val_array.is_none() && is_deleted {
1555 builder.append_null();
1556 } else if let Some(arr) = val_array {
1557 for v in arr {
1558 if let Value::Bytes(b) = v {
1559 builder.values().append_value(b);
1560 } else {
1561 builder.values().append_null();
1562 }
1563 }
1564 builder.append(true);
1565 } else {
1566 builder.append_null();
1567 }
1568 }
1569 Ok(Arc::new(builder.finish()))
1570 }
1571 DataType::Vector { dimensions } => {
1572 let dim = *dimensions as i32;
1583 let mut builder =
1584 ListBuilder::new(FixedSizeListBuilder::new(Float32Builder::new(), dim));
1585 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1586 let val_array = get_props(i).and_then(|v| v.as_array());
1587 if val_array.is_none() && is_deleted {
1588 builder.append_null();
1589 } else if let Some(arr) = val_array {
1590 for tok in arr {
1594 let (vals, valid) =
1595 extract_vector_f32_values(Some(tok), false, *dimensions);
1596 for v in vals {
1597 builder.values().values().append_value(v);
1598 }
1599 builder.values().append(valid);
1600 }
1601 builder.append(true);
1602 } else {
1603 builder.append_null();
1604 }
1605 }
1606 Ok(Arc::new(builder.finish()))
1607 }
1608 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1609 }
1610 }
1611
1612 fn build_typed_list<F, B, A>(
1614 &self,
1615 len: usize,
1616 deleted: &[bool],
1617 get_props: &F,
1618 inner_builder: B,
1619 mut append_value: A,
1620 ) -> Result<ArrayRef>
1621 where
1622 F: Fn(usize) -> Option<&'a Value>,
1623 B: arrow_array::builder::ArrayBuilder,
1624 A: FnMut(&Value, &mut B),
1625 {
1626 let mut builder = ListBuilder::new(inner_builder);
1627 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1628 let val_array = get_props(i).and_then(|v| v.as_array());
1629 if val_array.is_none() && is_deleted {
1630 builder.append_null();
1631 } else if let Some(arr) = val_array {
1632 for v in arr {
1633 append_value(v, builder.values());
1634 }
1635 builder.append(true);
1636 } else {
1637 builder.append_null();
1638 }
1639 }
1640 Ok(Arc::new(builder.finish()))
1641 }
1642
1643 fn build_map_column<F>(
1644 &self,
1645 len: usize,
1646 deleted: &[bool],
1647 get_props: F,
1648 key: &DataType,
1649 value: &DataType,
1650 ) -> Result<ArrayRef>
1651 where
1652 F: Fn(usize) -> Option<&'a Value>,
1653 {
1654 if !matches!(key, DataType::String) {
1655 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1656 }
1657
1658 match value {
1659 DataType::String => self.build_typed_map(
1660 len,
1661 deleted,
1662 &get_props,
1663 StringBuilder::new(),
1664 arrow_schema::DataType::Utf8,
1665 None,
1666 |v, b: &mut StringBuilder| {
1667 if let Some(s) = v.as_str() {
1668 b.append_value(s);
1669 } else {
1670 b.append_null();
1671 }
1672 },
1673 ),
1674 DataType::Int64 => self.build_typed_map(
1675 len,
1676 deleted,
1677 &get_props,
1678 Int64Builder::new(),
1679 arrow_schema::DataType::Int64,
1680 None,
1681 |v, b: &mut Int64Builder| {
1682 if let Some(n) = v.as_i64() {
1683 b.append_value(n);
1684 } else {
1685 b.append_null();
1686 }
1687 },
1688 ),
1689 DataType::Int32 => self.build_typed_map(
1690 len,
1691 deleted,
1692 &get_props,
1693 Int32Builder::new(),
1694 arrow_schema::DataType::Int32,
1695 None,
1696 |v, b: &mut Int32Builder| match v.as_i64().and_then(|n| i32::try_from(n).ok()) {
1697 Some(n) => b.append_value(n),
1698 None => b.append_null(),
1699 },
1700 ),
1701 DataType::Float64 => self.build_typed_map(
1702 len,
1703 deleted,
1704 &get_props,
1705 Float64Builder::new(),
1706 arrow_schema::DataType::Float64,
1707 None,
1708 |v, b: &mut Float64Builder| match v.as_f64() {
1709 Some(f) => b.append_value(f),
1710 None => b.append_null(),
1711 },
1712 ),
1713 DataType::Float32 => self.build_typed_map(
1714 len,
1715 deleted,
1716 &get_props,
1717 Float32Builder::new(),
1718 arrow_schema::DataType::Float32,
1719 None,
1720 |v, b: &mut Float32Builder| match v.as_f64() {
1721 Some(f) => b.append_value(f as f32),
1722 None => b.append_null(),
1723 },
1724 ),
1725 DataType::Bool => self.build_typed_map(
1726 len,
1727 deleted,
1728 &get_props,
1729 BooleanBuilder::new(),
1730 arrow_schema::DataType::Boolean,
1731 None,
1732 |v, b: &mut BooleanBuilder| match v.as_bool() {
1733 Some(x) => b.append_value(x),
1734 None => b.append_null(),
1735 },
1736 ),
1737 DataType::Bytes => self.build_typed_map(
1738 len,
1739 deleted,
1740 &get_props,
1741 LargeBinaryBuilder::new(),
1742 arrow_schema::DataType::LargeBinary,
1743 Some(schema::raw_bytes_field_metadata()),
1746 |v, b: &mut LargeBinaryBuilder| {
1747 if let Value::Bytes(bytes) = v {
1748 b.append_value(bytes);
1749 } else {
1750 b.append_null();
1751 }
1752 },
1753 ),
1754 _ => self.build_typed_map(
1760 len,
1761 deleted,
1762 &get_props,
1763 LargeBinaryBuilder::new(),
1764 arrow_schema::DataType::LargeBinary,
1765 None,
1766 |v, b: &mut LargeBinaryBuilder| {
1767 if v.is_null() {
1768 b.append_null();
1769 } else {
1770 b.append_value(uni_common::cypher_value_codec::encode(v));
1771 }
1772 },
1773 ),
1774 }
1775 }
1776
1777 #[expect(
1779 clippy::too_many_arguments,
1780 reason = "builder plumbing: value type + optional child metadata are distinct knobs"
1781 )]
1782 fn build_typed_map<F, B, A>(
1783 &self,
1784 len: usize,
1785 deleted: &[bool],
1786 get_props: &F,
1787 value_builder: B,
1788 value_arrow_type: arrow_schema::DataType,
1789 value_metadata: Option<HashMap<String, String>>,
1790 mut append_value: A,
1791 ) -> Result<ArrayRef>
1792 where
1793 F: Fn(usize) -> Option<&'a Value>,
1794 B: arrow_array::builder::ArrayBuilder,
1795 A: FnMut(&Value, &mut B),
1796 {
1797 let key_builder = Box::new(StringBuilder::new());
1798 let value_builder = Box::new(value_builder);
1799 let value_field = match value_metadata {
1800 Some(meta) => Field::new("value", value_arrow_type, true).with_metadata(meta),
1801 None => Field::new("value", value_arrow_type, true),
1802 };
1803 let struct_builder = StructBuilder::new(
1804 vec![
1805 Field::new("key", arrow_schema::DataType::Utf8, false),
1806 value_field,
1807 ],
1808 vec![key_builder, value_builder],
1809 );
1810 let mut builder = ListBuilder::new(struct_builder);
1811
1812 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1813 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1814 }
1815 Ok(Arc::new(builder.finish()))
1816 }
1817
1818 fn append_map_entry<B, A>(
1820 &self,
1821 builder: &mut ListBuilder<StructBuilder>,
1822 val: Option<&'a Value>,
1823 is_deleted: bool,
1824 append_value: &mut A,
1825 ) where
1826 B: arrow_array::builder::ArrayBuilder,
1827 A: FnMut(&Value, &mut B),
1828 {
1829 let val_obj = val.and_then(|v| v.as_object());
1830 if val_obj.is_none() && is_deleted {
1831 builder.append(false);
1832 } else if let Some(obj) = val_obj {
1833 let struct_b = builder.values();
1834 for (k, v) in obj {
1835 struct_b
1836 .field_builder::<StringBuilder>(0)
1837 .unwrap()
1838 .append_value(k);
1839 let value_b = struct_b.field_builder::<B>(1).unwrap();
1841 append_value(v, value_b);
1842 struct_b.append(true);
1843 }
1844 builder.append(true);
1845 } else {
1846 builder.append(false);
1847 }
1848 }
1849
1850 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1851 where
1852 F: Fn(usize) -> Option<&'a Value>,
1853 {
1854 let mut builder = BinaryBuilder::new();
1855 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1856 if is_deleted {
1857 builder.append_null();
1858 continue;
1859 }
1860 if let Some(val) = get_props(i) {
1861 let crdt_result = if let Some(s) = val.as_str() {
1864 serde_json::from_str::<Crdt>(s)
1865 } else {
1866 let json_val: serde_json::Value = val.clone().into();
1868 serde_json::from_value::<Crdt>(json_val)
1869 };
1870
1871 if let Ok(crdt) = crdt_result {
1872 if let Ok(bytes) = crdt.to_msgpack() {
1873 builder.append_value(&bytes);
1874 } else {
1875 builder.append_null();
1876 }
1877 } else {
1878 builder.append_null();
1879 }
1880 } else {
1881 builder.append_null();
1882 }
1883 }
1884 Ok(Arc::new(builder.finish()))
1885 }
1886}
1887
1888pub fn build_edge_column<'a>(
1890 name: &'a str,
1891 data_type: &'a DataType,
1892 len: usize,
1893 get_props: impl Fn(usize) -> Option<&'a Value>,
1894) -> Result<ArrayRef> {
1895 let deleted = vec![false; len];
1897 let extractor = PropertyExtractor::new(name, data_type);
1898 extractor.build_column(len, &deleted, get_props)
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903 use super::*;
1904 use arrow_array::{
1905 Array, DurationMicrosecondArray,
1906 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1907 };
1908 use std::collections::HashMap;
1909 use uni_common::TemporalValue;
1910 use uni_crdt::{Crdt, GCounter};
1911
1912 #[test]
1913 fn test_arrow_to_value_string() {
1914 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1915 assert_eq!(
1916 arrow_to_value(&arr, 0, None),
1917 Value::String("hello".to_string())
1918 );
1919 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1920 assert_eq!(
1921 arrow_to_value(&arr, 2, None),
1922 Value::String("world".to_string())
1923 );
1924 }
1925
1926 #[test]
1927 fn test_arrow_to_value_int64() {
1928 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1929 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1930 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1931 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1932 }
1933
1934 #[test]
1935 #[allow(clippy::approx_constant)]
1936 fn test_arrow_to_value_float64() {
1937 let arr = Float64Array::from(vec![Some(3.14), None]);
1938 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1939 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1940 }
1941
1942 #[test]
1943 fn test_arrow_to_value_bool() {
1944 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1945 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1946 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1947 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1948 }
1949
1950 #[test]
1951 fn test_values_to_array_int64() {
1952 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1953 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1954 assert_eq!(arr.len(), 4);
1955
1956 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1957 assert_eq!(int_arr.value(0), 1);
1958 assert_eq!(int_arr.value(1), 2);
1959 assert!(int_arr.is_null(2));
1960 assert_eq!(int_arr.value(3), 4);
1961 }
1962
1963 #[test]
1964 fn test_values_to_array_string() {
1965 let values = vec![
1966 Value::String("a".to_string()),
1967 Value::String("b".to_string()),
1968 Value::Null,
1969 ];
1970 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1971 assert_eq!(arr.len(), 3);
1972
1973 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1974 assert_eq!(str_arr.value(0), "a");
1975 assert_eq!(str_arr.value(1), "b");
1976 assert!(str_arr.is_null(2));
1977 }
1978
1979 #[test]
1980 fn test_property_extractor_string() {
1981 let props: Vec<HashMap<String, Value>> = vec![
1982 [("name".to_string(), Value::String("Alice".to_string()))]
1983 .into_iter()
1984 .collect(),
1985 [("name".to_string(), Value::String("Bob".to_string()))]
1986 .into_iter()
1987 .collect(),
1988 HashMap::new(),
1989 ];
1990 let deleted = vec![false, false, true];
1991
1992 let extractor = PropertyExtractor::new("name", &DataType::String);
1993 let arr = extractor
1994 .build_column(3, &deleted, |i| props[i].get("name"))
1995 .unwrap();
1996
1997 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1998 assert_eq!(str_arr.value(0), "Alice");
1999 assert_eq!(str_arr.value(1), "Bob");
2000 assert_eq!(str_arr.value(2), ""); }
2002
2003 #[test]
2004 fn test_property_extractor_int64() {
2005 let props: Vec<HashMap<String, Value>> = vec![
2006 [("age".to_string(), Value::Int(25))].into_iter().collect(),
2007 [("age".to_string(), Value::Int(30))].into_iter().collect(),
2008 HashMap::new(),
2009 ];
2010 let deleted = vec![false, false, true];
2011
2012 let extractor = PropertyExtractor::new("age", &DataType::Int64);
2013 let arr = extractor
2014 .build_column(3, &deleted, |i| props[i].get("age"))
2015 .unwrap();
2016
2017 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2018 assert_eq!(int_arr.value(0), 25);
2019 assert_eq!(int_arr.value(1), 30);
2020 assert_eq!(int_arr.value(2), 0); }
2022
2023 #[test]
2024 fn test_property_extractor_bytes_roundtrip() {
2025 let blob = vec![0u8, 1, 2, 255];
2026 let props: Vec<HashMap<String, Value>> = vec![
2027 [("blob".to_string(), Value::Bytes(blob.clone()))]
2028 .into_iter()
2029 .collect(),
2030 [("blob".to_string(), Value::Bytes(Vec::new()))]
2031 .into_iter()
2032 .collect(),
2033 HashMap::new(),
2034 ];
2035 let deleted = vec![false, false, false];
2036
2037 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2038 let arr = extractor
2039 .build_column(3, &deleted, |i| props[i].get("blob"))
2040 .unwrap();
2041
2042 assert_eq!(
2044 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2045 Value::Bytes(blob)
2046 );
2047 assert_eq!(
2048 arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
2049 Value::Bytes(Vec::new())
2050 );
2051 assert_eq!(
2053 arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
2054 Value::Null
2055 );
2056 }
2057
2058 #[test]
2059 fn test_bytes_vs_cypher_value_disambiguation() {
2060 let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
2064 let props: Vec<HashMap<String, Value>> = vec![
2065 [("blob".to_string(), Value::Bytes(raw.clone()))]
2066 .into_iter()
2067 .collect(),
2068 ];
2069 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2070 let arr = extractor
2071 .build_column(1, &[false], |i| props[i].get("blob"))
2072 .unwrap();
2073 assert_eq!(
2075 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2076 Value::Bytes(raw)
2077 );
2078 }
2079
2080 #[test]
2081 fn test_data_type_bytes_to_arrow() {
2082 assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
2083 }
2084
2085 #[test]
2086 fn test_arrow_to_value_time64() {
2087 let mut builder = Time64MicrosecondBuilder::new();
2089 builder.append_value(37_845_000_000);
2091 builder.append_value(0);
2093 builder.append_value(86_399_123_456);
2095 builder.append_null();
2096
2097 let arr = builder.finish();
2098 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
2100 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
2101 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
2102 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2103 }
2104
2105 #[test]
2106 fn test_arrow_to_value_duration() {
2107 let arr = DurationMicrosecondArray::from(vec![
2110 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
2114 ]);
2115
2116 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
2117 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
2118 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
2119 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2120 }
2121
2122 #[test]
2123 fn test_arrow_to_value_binary_crdt() {
2124 let mut builder = BinaryBuilder::new();
2126
2127 let mut counter = GCounter::new();
2129 counter.increment("actor1", 5);
2130 let crdt = Crdt::GCounter(counter);
2131 let bytes = crdt.to_msgpack().unwrap();
2132 builder.append_value(&bytes);
2133
2134 builder.append_null();
2136
2137 let arr = builder.finish();
2138
2139 let result = arrow_to_value(&arr, 0, None);
2141 assert!(result.as_object().is_some());
2142 let obj = result.as_object().unwrap();
2143 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
2145
2146 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2148 }
2149
2150 #[test]
2151 fn test_datetime_struct_encode_decode_roundtrip() {
2152 let values = vec![
2154 Value::Temporal(TemporalValue::DateTime {
2155 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
2158 }),
2159 Value::Temporal(TemporalValue::DateTime {
2160 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
2163 }),
2164 Value::Temporal(TemporalValue::DateTime {
2165 nanos_since_epoch: 0, offset_seconds: 0,
2167 timezone_name: Some("UTC".to_string()),
2168 }),
2169 ];
2170
2171 let arr_ref = values_to_datetime_struct_array(&values);
2173 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2174 assert_eq!(arr.len(), 3);
2175
2176 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2178 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2179 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2180
2181 assert_eq!(decoded_0, values[0]);
2183 assert_eq!(decoded_1, values[1]);
2184 assert_eq!(decoded_2, values[2]);
2185
2186 if let Value::Temporal(TemporalValue::DateTime {
2188 nanos_since_epoch,
2189 offset_seconds,
2190 timezone_name,
2191 }) = decoded_0
2192 {
2193 assert_eq!(nanos_since_epoch, 441763200000000000);
2194 assert_eq!(offset_seconds, 3600);
2195 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2196 } else {
2197 panic!("Expected DateTime value");
2198 }
2199 }
2200
2201 #[test]
2202 fn test_datetime_struct_null_handling() {
2203 let values = vec![
2205 Value::Temporal(TemporalValue::DateTime {
2206 nanos_since_epoch: 441763200000000000,
2207 offset_seconds: 3600,
2208 timezone_name: Some("Europe/Paris".to_string()),
2209 }),
2210 Value::Null,
2211 Value::Temporal(TemporalValue::DateTime {
2212 nanos_since_epoch: 0,
2213 offset_seconds: 0,
2214 timezone_name: None,
2215 }),
2216 ];
2217
2218 let arr_ref = values_to_datetime_struct_array(&values);
2219 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2220 assert_eq!(arr.len(), 3);
2221
2222 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2224 assert_eq!(decoded_0, values[0]);
2225
2226 assert!(arr.is_null(1));
2228 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2229 assert_eq!(decoded_1, Value::Null);
2230
2231 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2233 assert_eq!(decoded_2, values[2]);
2234 }
2235
2236 #[test]
2237 fn test_datetime_struct_boundary_values() {
2238 let values = vec![
2240 Value::Temporal(TemporalValue::DateTime {
2241 nanos_since_epoch: 441763200000000000,
2242 offset_seconds: 0, timezone_name: None,
2244 }),
2245 Value::Temporal(TemporalValue::DateTime {
2246 nanos_since_epoch: 441763200000000000,
2247 offset_seconds: 43200, timezone_name: None,
2249 }),
2250 Value::Temporal(TemporalValue::DateTime {
2251 nanos_since_epoch: 441763200000000000,
2252 offset_seconds: -43200, timezone_name: None,
2254 }),
2255 ];
2256
2257 let arr_ref = values_to_datetime_struct_array(&values);
2258 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2259 assert_eq!(arr.len(), 3);
2260
2261 for (i, expected) in values.iter().enumerate() {
2263 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2264 assert_eq!(&decoded, expected);
2265 }
2266 }
2267
2268 #[test]
2269 fn test_datetime_old_schema_migration() {
2270 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2272 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
2275
2276 let arr = builder.finish();
2277
2278 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2280 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2281 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2282
2283 if let Value::Temporal(TemporalValue::DateTime {
2285 nanos_since_epoch,
2286 offset_seconds,
2287 timezone_name,
2288 }) = decoded_0
2289 {
2290 assert_eq!(nanos_since_epoch, 441763200000000000);
2291 assert_eq!(offset_seconds, 0);
2292 assert_eq!(timezone_name, Some("UTC".to_string()));
2293 } else {
2294 panic!("Expected DateTime value");
2295 }
2296
2297 assert_eq!(decoded_2, Value::Null);
2299 }
2300
2301 #[test]
2302 fn test_time_struct_encode_decode_roundtrip() {
2303 let values = vec![
2305 Value::Temporal(TemporalValue::Time {
2306 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
2309 Value::Temporal(TemporalValue::Time {
2310 nanos_since_midnight: 0, offset_seconds: 0,
2312 }),
2313 Value::Temporal(TemporalValue::Time {
2314 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2317 ];
2318
2319 let arr_ref = values_to_time_struct_array(&values);
2321 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2322 assert_eq!(arr.len(), 3);
2323
2324 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2326 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2327 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2328
2329 assert_eq!(decoded_0, values[0]);
2331 assert_eq!(decoded_1, values[1]);
2332 assert_eq!(decoded_2, values[2]);
2333
2334 if let Value::Temporal(TemporalValue::Time {
2336 nanos_since_midnight,
2337 offset_seconds,
2338 }) = decoded_0
2339 {
2340 assert_eq!(nanos_since_midnight, 37845000000000);
2341 assert_eq!(offset_seconds, 3600);
2342 } else {
2343 panic!("Expected Time value");
2344 }
2345 }
2346
2347 #[test]
2348 fn test_time_struct_null_handling() {
2349 let values = vec![
2351 Value::Temporal(TemporalValue::Time {
2352 nanos_since_midnight: 37845000000000,
2353 offset_seconds: 3600,
2354 }),
2355 Value::Null,
2356 Value::Temporal(TemporalValue::Time {
2357 nanos_since_midnight: 0,
2358 offset_seconds: 0,
2359 }),
2360 ];
2361
2362 let arr_ref = values_to_time_struct_array(&values);
2363 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2364 assert_eq!(arr.len(), 3);
2365
2366 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2368 assert_eq!(decoded_0, values[0]);
2369
2370 assert!(arr.is_null(1));
2372 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2373 assert_eq!(decoded_1, Value::Null);
2374
2375 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2377 assert_eq!(decoded_2, values[2]);
2378 }
2379
2380 #[test]
2383 fn test_extract_vector_f32_values_valid_vector() {
2384 let v = vec![1.0, 2.0, 3.0];
2385 let val = Value::Vector(v.clone());
2386 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2387 assert_eq!(result, v);
2388 assert!(valid);
2389 }
2390
2391 #[test]
2392 fn test_extract_vector_f32_values_vector_wrong_dims() {
2393 let v = vec![1.0, 2.0];
2394 let val = Value::Vector(v);
2395 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2396 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2397 assert!(!valid);
2398 }
2399
2400 #[test]
2401 fn test_extract_vector_f32_values_valid_list() {
2402 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2403 let val = Value::List(v);
2404 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2405 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2406 assert!(valid);
2407 }
2408
2409 #[test]
2410 fn test_extract_vector_f32_values_list_wrong_dims() {
2411 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2412 let val = Value::List(v);
2413 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2414 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2415 assert!(!valid);
2416 }
2417
2418 #[test]
2419 fn test_extract_vector_f32_values_list_int_coercion() {
2420 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2421 let val = Value::List(v);
2422 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2423 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2424 assert!(valid);
2425 }
2426
2427 #[test]
2428 fn test_extract_vector_f32_values_none() {
2429 let (result, valid) = extract_vector_f32_values(None, false, 3);
2430 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2431 assert!(!valid);
2432 }
2433
2434 #[test]
2435 fn test_extract_vector_f32_values_null() {
2436 let val = Value::Null;
2437 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2438 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2439 assert!(!valid);
2440 }
2441
2442 #[test]
2443 fn test_extract_vector_f32_values_unsupported_type() {
2444 let val = Value::String("not a vector".to_string());
2445 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2446 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2447 assert!(!valid);
2448 }
2449
2450 #[test]
2451 fn test_extract_vector_f32_values_deleted_with_none() {
2452 let (result, valid) = extract_vector_f32_values(None, true, 3);
2453 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2454 assert!(valid); }
2456
2457 #[test]
2458 fn test_extract_vector_f32_values_deleted_with_null() {
2459 let val = Value::Null;
2460 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2461 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2462 assert!(valid); }
2464
2465 #[test]
2468 fn test_values_to_fixed_size_list_vector_with_nulls() {
2469 let values = vec![
2470 Value::Vector(vec![1.0, 2.0]),
2471 Value::Null,
2472 Value::Vector(vec![3.0, 4.0]),
2473 Value::String("invalid".to_string()),
2474 ];
2475 let arr_ref = values_to_array(
2476 &values,
2477 &ArrowDataType::FixedSizeList(
2478 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2479 2,
2480 ),
2481 )
2482 .unwrap();
2483
2484 let arr = arr_ref
2485 .as_any()
2486 .downcast_ref::<FixedSizeListArray>()
2487 .unwrap();
2488
2489 assert_eq!(arr.len(), 4);
2490 assert!(arr.is_valid(0));
2491 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2493 assert!(!arr.is_valid(3)); }
2495
2496 #[test]
2497 fn test_values_to_fixed_size_list_from_list() {
2498 let values = vec![
2499 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2500 Value::List(vec![Value::Int(3), Value::Int(4)]),
2501 ];
2502 let arr_ref = values_to_array(
2503 &values,
2504 &ArrowDataType::FixedSizeList(
2505 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2506 2,
2507 ),
2508 )
2509 .unwrap();
2510
2511 let arr = arr_ref
2512 .as_any()
2513 .downcast_ref::<FixedSizeListArray>()
2514 .unwrap();
2515
2516 assert_eq!(arr.len(), 2);
2517 assert!(arr.is_valid(0));
2518 assert!(arr.is_valid(1));
2519
2520 let child = arr
2522 .values()
2523 .as_any()
2524 .downcast_ref::<Float32Array>()
2525 .unwrap();
2526 assert_eq!(child.value(0), 1.0);
2527 assert_eq!(child.value(1), 2.0);
2528 assert_eq!(child.value(2), 3.0);
2529 assert_eq!(child.value(3), 4.0);
2530 }
2531
2532 #[test]
2533 fn test_values_to_fixed_size_list_wrong_dimensions() {
2534 let values = vec![
2535 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2538 let arr_ref = values_to_array(
2539 &values,
2540 &ArrowDataType::FixedSizeList(
2541 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2542 2,
2543 ),
2544 )
2545 .unwrap();
2546
2547 let arr = arr_ref
2548 .as_any()
2549 .downcast_ref::<FixedSizeListArray>()
2550 .unwrap();
2551
2552 assert_eq!(arr.len(), 2);
2553 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2558 .values()
2559 .as_any()
2560 .downcast_ref::<Float32Array>()
2561 .unwrap();
2562 assert_eq!(child.value(0), 0.0);
2563 assert_eq!(child.value(1), 0.0);
2564 assert_eq!(child.value(2), 0.0);
2565 assert_eq!(child.value(3), 0.0);
2566 }
2567
2568 #[test]
2569 fn test_values_to_fixed_size_list_all_nulls() {
2570 let values = vec![Value::Null, Value::Null, Value::Null];
2571 let arr_ref = values_to_array(
2572 &values,
2573 &ArrowDataType::FixedSizeList(
2574 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2575 3,
2576 ),
2577 )
2578 .unwrap();
2579
2580 let arr = arr_ref
2581 .as_any()
2582 .downcast_ref::<FixedSizeListArray>()
2583 .unwrap();
2584
2585 assert_eq!(arr.len(), 3);
2586 assert!(!arr.is_valid(0));
2587 assert!(!arr.is_valid(1));
2588 assert!(!arr.is_valid(2));
2589
2590 let child = arr
2592 .values()
2593 .as_any()
2594 .downcast_ref::<Float32Array>()
2595 .unwrap();
2596 assert_eq!(child.len(), 9);
2597 }
2598
2599 #[test]
2600 fn test_values_to_fixed_size_list_mixed_types() {
2601 let values = vec![
2602 Value::Vector(vec![1.0, 2.0]),
2603 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2604 Value::Null,
2605 Value::String("invalid".to_string()),
2606 ];
2607 let arr_ref = values_to_array(
2608 &values,
2609 &ArrowDataType::FixedSizeList(
2610 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2611 2,
2612 ),
2613 )
2614 .unwrap();
2615
2616 let arr = arr_ref
2617 .as_any()
2618 .downcast_ref::<FixedSizeListArray>()
2619 .unwrap();
2620
2621 assert_eq!(arr.len(), 4);
2622 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2629 .values()
2630 .as_any()
2631 .downcast_ref::<Float32Array>()
2632 .unwrap();
2633 assert_eq!(child.value(0), 1.0);
2634 assert_eq!(child.value(1), 2.0);
2635 assert_eq!(child.value(2), 3.0);
2636 assert_eq!(child.value(3), 4.0);
2637 }
2638
2639 #[test]
2642 fn test_build_vector_column_with_nulls_and_deleted() {
2643 let data_type = DataType::Vector { dimensions: 3 };
2644 let extractor = PropertyExtractor::new("test_vec", &data_type);
2645
2646 let props = [
2647 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2648 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2651 ];
2652 let deleted = [false, false, false, true]; let arr_ref = extractor
2655 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2656 .unwrap();
2657
2658 let arr = arr_ref
2659 .as_any()
2660 .downcast_ref::<FixedSizeListArray>()
2661 .unwrap();
2662
2663 assert_eq!(arr.len(), 4);
2664 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2671 .values()
2672 .as_any()
2673 .downcast_ref::<Float32Array>()
2674 .unwrap();
2675 assert_eq!(child.value(0), 1.0);
2676 assert_eq!(child.value(1), 2.0);
2677 assert_eq!(child.value(2), 3.0);
2678 assert_eq!(child.value(9), 0.0);
2682 assert_eq!(child.value(10), 0.0);
2683 assert_eq!(child.value(11), 0.0);
2684 }
2685
2686 #[test]
2687 fn test_build_vector_column_with_list_input() {
2688 let data_type = DataType::Vector { dimensions: 2 };
2689 let extractor = PropertyExtractor::new("test_vec", &data_type);
2690
2691 let props = [
2692 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2693 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2694 Some(Value::Vector(vec![5.0, 6.0])),
2695 ];
2696 let deleted = [false, false, false];
2697
2698 let arr_ref = extractor
2699 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2700 .unwrap();
2701
2702 let arr = arr_ref
2703 .as_any()
2704 .downcast_ref::<FixedSizeListArray>()
2705 .unwrap();
2706
2707 assert_eq!(arr.len(), 3);
2708 assert!(arr.is_valid(0));
2709 assert!(arr.is_valid(1));
2710 assert!(arr.is_valid(2));
2711
2712 let child = arr
2714 .values()
2715 .as_any()
2716 .downcast_ref::<Float32Array>()
2717 .unwrap();
2718 assert_eq!(child.value(0), 1.0);
2719 assert_eq!(child.value(1), 2.0);
2720 assert_eq!(child.value(2), 3.0);
2721 assert_eq!(child.value(3), 4.0);
2722 assert_eq!(child.value(4), 5.0);
2723 assert_eq!(child.value(5), 6.0);
2724 }
2725
2726 #[test]
2729 fn test_build_multivector_list_column_roundtrip() {
2730 let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 3 }));
2735 let extractor = PropertyExtractor::new("tokens", &data_type);
2736
2737 let props = [
2738 Some(Value::List(vec![
2740 Value::Vector(vec![1.0, 2.0, 3.0]),
2741 Value::Vector(vec![4.0, 5.0, 6.0]),
2742 ])),
2743 Some(Value::List(vec![
2745 Value::Vector(vec![7.0, 8.0, 9.0]),
2746 Value::Vector(vec![10.0, 11.0, 12.0]),
2747 Value::Vector(vec![13.0, 14.0, 15.0]),
2748 ])),
2749 Some(Value::List(vec![])),
2751 None,
2753 ];
2754 let deleted = [false, false, false, true];
2755
2756 let arr_ref = extractor
2757 .build_column(4, &deleted, |i| props[i].as_ref())
2758 .unwrap();
2759
2760 let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
2762 assert_eq!(outer.len(), 4);
2763 assert!(outer.is_valid(0));
2764 assert!(outer.is_valid(1));
2765 assert!(outer.is_valid(2)); assert!(!outer.is_valid(3)); assert_eq!(outer.value(0).len(), 2);
2770 assert_eq!(outer.value(1).len(), 3);
2771 assert_eq!(outer.value(2).len(), 0);
2772
2773 let row0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&data_type));
2775 assert_eq!(
2776 row0,
2777 Value::List(vec![
2778 Value::List(vec![
2779 Value::Float(1.0),
2780 Value::Float(2.0),
2781 Value::Float(3.0)
2782 ]),
2783 Value::List(vec![
2784 Value::Float(4.0),
2785 Value::Float(5.0),
2786 Value::Float(6.0)
2787 ]),
2788 ])
2789 );
2790
2791 let row1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&data_type));
2792 let Value::List(tokens) = row1 else {
2793 panic!("row1 should decode to a list of tokens");
2794 };
2795 assert_eq!(tokens.len(), 3);
2796 assert_eq!(
2797 tokens[2],
2798 Value::List(vec![
2799 Value::Float(13.0),
2800 Value::Float(14.0),
2801 Value::Float(15.0)
2802 ])
2803 );
2804 }
2805
2806 #[test]
2807 fn test_build_multivector_invalid_inner_tokens() {
2808 let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 2 }));
2812 let extractor = PropertyExtractor::new("tokens", &data_type);
2813
2814 let props = [Some(Value::List(vec![
2815 Value::Vector(vec![1.0, 2.0]), Value::Vector(vec![9.0, 9.0, 9.0]), Value::String("nope".to_string()), Value::List(vec![Value::Float(3.0), Value::Float(4.0)]), ]))];
2820 let deleted = [false];
2821
2822 let arr_ref = extractor
2823 .build_column(1, &deleted, |i| props[i].as_ref())
2824 .unwrap();
2825 let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
2826 let inner_row = outer.value(0);
2827 let inner = inner_row
2828 .as_any()
2829 .downcast_ref::<FixedSizeListArray>()
2830 .unwrap();
2831 assert_eq!(inner.len(), 4);
2832 assert!(inner.is_valid(0)); assert!(!inner.is_valid(1)); assert!(!inner.is_valid(2)); assert!(inner.is_valid(3)); }
2837
2838 #[test]
2839 fn test_values_to_array_multivector_schemaless_deferred() {
2840 let values = vec![Value::List(vec![Value::Vector(vec![1.0, 2.0])])];
2845 let dt = ArrowDataType::List(Arc::new(Field::new(
2846 "item",
2847 ArrowDataType::FixedSizeList(
2848 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
2849 2,
2850 ),
2851 true,
2852 )));
2853 assert!(values_to_array(&values, &dt).is_err());
2854 }
2855
2856 #[test]
2859 fn test_int32_and_date32_columns_null_out_of_range() {
2860 let dt = DataType::Int64;
2861 let extractor = PropertyExtractor::new("x", &dt);
2862
2863 let over = Value::Int(i64::from(i32::MAX) + 1);
2864 let ok = Value::Int(42);
2865 let vals = [over, ok];
2866 let deleted = [false, false];
2867
2868 let arr = extractor
2869 .build_int32_column(2, &deleted, |i| Some(&vals[i]))
2870 .unwrap();
2871 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2872 assert!(
2873 arr.is_null(0),
2874 "out-of-range i64 must be NULL in an int32 column, not wrapped"
2875 );
2876 assert_eq!(arr.value(1), 42);
2877
2878 let arr = extractor
2879 .build_date32_column(2, &deleted, |i| Some(&vals[i]))
2880 .unwrap();
2881 let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
2882 assert!(
2883 arr.is_null(0),
2884 "out-of-range day count must be NULL in a date32 column, not wrapped"
2885 );
2886 assert_eq!(arr.value(1), 42);
2887 }
2888}