1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let value_hint = raw_bytes_hint(fields[1].metadata());
156 let key_col = structs.column(0);
157 let val_col = structs.column(1);
158 let mut map = HashMap::new();
159 for i in 0..structs.len() {
160 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
161 map.insert(k, arrow_to_value(val_col.as_ref(), i, value_hint));
162 }
163 }
164 Some(map)
165}
166
167fn array_to_value_list(arr: &ArrayRef, elem_type: Option<&DataType>) -> Vec<Value> {
172 (0..arr.len())
173 .map(|i| arrow_to_value(arr.as_ref(), i, elem_type))
174 .collect()
175}
176
177fn raw_bytes_hint(metadata: &HashMap<String, String>) -> Option<&'static DataType> {
181 if metadata.get("uni_raw_bytes").is_some_and(|v| v == "true") {
182 Some(&DataType::Bytes)
183 } else {
184 None
185 }
186}
187
188fn list_child_bytes_hint(dt: &ArrowDataType) -> Option<&'static DataType> {
190 match dt {
191 ArrowDataType::List(f)
192 | ArrowDataType::LargeList(f)
193 | ArrowDataType::FixedSizeList(f, _) => raw_bytes_hint(f.metadata()),
194 _ => None,
195 }
196}
197
198pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
205 if col.is_null(row) {
206 return Value::Null;
207 }
208
209 if let Some(dt) = data_type {
211 match dt {
212 DataType::DateTime => {
213 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
215 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
216 struct_arr.column_by_name("nanos_since_epoch"),
217 struct_arr.column_by_name("offset_seconds"),
218 struct_arr.column_by_name("timezone_name"),
219 )
220 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
221 nanos_col
222 .as_any()
223 .downcast_ref::<TimestampNanosecondArray>(),
224 offset_col.as_any().downcast_ref::<Int32Array>(),
225 tz_col.as_any().downcast_ref::<StringArray>(),
226 )
227 {
228 if nanos_arr.is_null(row) {
229 return Value::Null;
230 }
231 let nanos = nanos_arr.value(row);
232 if offset_arr.is_null(row) {
233 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
235 nanos_since_epoch: nanos,
236 });
237 }
238 let offset = offset_arr.value(row);
239 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
240 return Value::Temporal(uni_common::TemporalValue::DateTime {
241 nanos_since_epoch: nanos,
242 offset_seconds: offset,
243 timezone_name: tz_name,
244 });
245 }
246 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
248 let nanos = ts.value(row);
249 let tz_name = ts.timezone().map(|s| s.to_string());
250 return Value::Temporal(uni_common::TemporalValue::DateTime {
251 nanos_since_epoch: nanos,
252 offset_seconds: 0,
253 timezone_name: tz_name,
254 });
255 }
256 }
257 DataType::Time => {
258 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
260 && let (Some(nanos_col), Some(offset_col)) = (
261 struct_arr.column_by_name("nanos_since_midnight"),
262 struct_arr.column_by_name("offset_seconds"),
263 )
264 && let (Some(nanos_arr), Some(offset_arr)) = (
265 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
266 offset_col.as_any().downcast_ref::<Int32Array>(),
267 )
268 {
269 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
271 return Value::Null;
272 }
273 let nanos = nanos_arr.value(row);
274 let offset = offset_arr.value(row);
275 return Value::Temporal(uni_common::TemporalValue::Time {
276 nanos_since_midnight: nanos,
277 offset_seconds: offset,
278 });
279 }
280 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
282 let nanos = t.value(row);
283 return Value::Temporal(uni_common::TemporalValue::Time {
284 nanos_since_midnight: nanos,
285 offset_seconds: 0,
286 });
287 }
288 }
289 DataType::Bytes => {
290 let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
291 log::warn!("Bytes column is not LargeBinaryArray");
292 return Value::Null;
293 };
294 if arr.is_null(row) {
295 return Value::Null;
296 }
297 return Value::Bytes(arr.value(row).to_vec());
298 }
299 DataType::Btic => {
300 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
301 log::warn!("BTIC column is not FixedSizeBinaryArray");
302 return Value::Null;
303 };
304 let bytes = fsb.value(row);
305 return match uni_btic::encode::decode_slice(bytes) {
306 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
307 lo: btic.lo(),
308 hi: btic.hi(),
309 meta: btic.meta(),
310 }),
311 Err(e) => {
312 log::warn!("BTIC decode error: {}", e);
313 Value::Null
314 }
315 };
316 }
317 _ => {}
318 }
319 }
320
321 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
323 return Value::String(s.value(row).to_string());
324 }
325
326 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
328 return Value::Int(u.value(row) as i64);
329 }
330 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
331 return Value::Int(i.value(row));
332 }
333 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
334 return Value::Int(i.value(row) as i64);
335 }
336
337 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
339 return Value::Float(f.value(row));
340 }
341 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
342 return Value::Float(f.value(row) as f64);
343 }
344
345 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
347 return Value::Bool(b.value(row));
348 }
349
350 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
352 let elem_hint = list_child_bytes_hint(list.data_type());
353 return Value::List(array_to_value_list(&list.value(row), elem_hint));
354 }
355
356 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
358 let arr = list.value(row);
359
360 if let Some(obj) = try_reconstruct_map(&arr) {
362 return Value::Map(obj);
363 }
364
365 let elem_hint = list_child_bytes_hint(list.data_type());
366 return Value::List(array_to_value_list(&arr, elem_hint));
367 }
368
369 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
371 let elem_hint = list_child_bytes_hint(list.data_type());
372 return Value::List(array_to_value_list(&list.value(row), elem_hint));
373 }
374
375 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
377 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
378
379 if field_names.contains(&"nanos_since_epoch")
381 && field_names.contains(&"offset_seconds")
382 && field_names.contains(&"timezone_name")
383 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
384 s.column_by_name("nanos_since_epoch"),
385 s.column_by_name("offset_seconds"),
386 s.column_by_name("timezone_name"),
387 )
388 {
389 let nanos_opt = nanos_col
391 .as_any()
392 .downcast_ref::<TimestampNanosecondArray>()
393 .map(|a| {
394 if a.is_null(row) {
395 None
396 } else {
397 Some(a.value(row))
398 }
399 })
400 .or_else(|| {
401 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
402 if a.is_null(row) {
403 None
404 } else {
405 Some(a.value(row))
406 }
407 })
408 });
409 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
410 if a.is_null(row) {
411 None
412 } else {
413 Some(a.value(row))
414 }
415 });
416
417 if let Some(Some(nanos)) = nanos_opt {
418 match offset_opt {
419 Some(Some(offset)) => {
420 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
421 if a.is_null(row) {
422 None
423 } else {
424 Some(a.value(row).to_string())
425 }
426 });
427 return Value::Temporal(uni_common::TemporalValue::DateTime {
428 nanos_since_epoch: nanos,
429 offset_seconds: offset,
430 timezone_name: tz_name,
431 });
432 }
433 _ => {
434 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
436 nanos_since_epoch: nanos,
437 });
438 }
439 }
440 }
441 }
442
443 if field_names.contains(&"nanos_since_midnight")
445 && field_names.contains(&"offset_seconds")
446 && let (Some(nanos_col), Some(offset_col)) = (
447 s.column_by_name("nanos_since_midnight"),
448 s.column_by_name("offset_seconds"),
449 )
450 {
451 let nanos_opt = nanos_col
453 .as_any()
454 .downcast_ref::<Time64NanosecondArray>()
455 .map(|a| {
456 if a.is_null(row) {
457 None
458 } else {
459 Some(a.value(row))
460 }
461 })
462 .or_else(|| {
463 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
464 if a.is_null(row) {
465 None
466 } else {
467 Some(a.value(row))
468 }
469 })
470 });
471 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
472 if a.is_null(row) {
473 None
474 } else {
475 Some(a.value(row))
476 }
477 });
478
479 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
480 return Value::Temporal(uni_common::TemporalValue::Time {
481 nanos_since_midnight: nanos,
482 offset_seconds: offset,
483 });
484 }
485 }
486
487 let mut map = HashMap::new();
489 for (field, child) in s.fields().iter().zip(s.columns()) {
490 map.insert(
491 field.name().clone(),
492 arrow_to_value(child.as_ref(), row, None),
493 );
494 }
495 return Value::Map(map);
496 }
497
498 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
500 let days = d.value(row);
501 return Value::Temporal(uni_common::TemporalValue::Date {
502 days_since_epoch: days,
503 });
504 }
505
506 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
508 let nanos = ts.value(row);
509 return match ts.timezone() {
510 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
511 nanos_since_epoch: nanos,
512 offset_seconds: 0,
513 timezone_name: Some(tz.to_string()),
514 }),
515 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
516 nanos_since_epoch: nanos,
517 }),
518 };
519 }
520
521 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
523 let nanos = t.value(row);
524 return Value::Temporal(uni_common::TemporalValue::LocalTime {
525 nanos_since_midnight: nanos,
526 });
527 }
528
529 if let Some(t) = col
531 .as_any()
532 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
533 {
534 let micros = t.value(row);
535 return Value::Temporal(uni_common::TemporalValue::LocalTime {
536 nanos_since_midnight: micros * 1000,
537 });
538 }
539
540 if let Some(d) = col
542 .as_any()
543 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
544 {
545 let micros = d.value(row);
546 let total_nanos = micros * 1000;
547 let seconds = total_nanos / 1_000_000_000;
548 let remaining_nanos = total_nanos % 1_000_000_000;
549 return Value::Temporal(uni_common::TemporalValue::Duration {
550 months: 0,
551 days: 0,
552 nanos: seconds * 1_000_000_000 + remaining_nanos,
553 });
554 }
555
556 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
558 let val = interval.value(row);
559 return Value::Temporal(uni_common::TemporalValue::Duration {
560 months: val.months as i64,
561 days: val.days as i64,
562 nanos: val.nanoseconds,
563 });
564 }
565
566 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
568 let bytes = b.value(row);
569 if bytes.is_empty() {
570 return Value::Null;
571 }
572 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
573 eprintln!("CypherValue decode error: {}", e);
574 Value::Null
575 });
576 }
577
578 if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
580 && fsb.value_length() == 24
581 {
582 let bytes = fsb.value(row);
583 return match uni_btic::encode::decode_slice(bytes) {
584 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
585 lo: btic.lo(),
586 hi: btic.hi(),
587 meta: btic.meta(),
588 }),
589 Err(e) => {
590 log::warn!("BTIC decode error: {}", e);
591 Value::Null
592 }
593 };
594 }
595
596 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
598 let bytes = b.value(row);
599 return Crdt::from_msgpack(bytes)
600 .ok()
601 .and_then(|crdt| serde_json::to_value(&crdt).ok())
602 .map(Value::from)
603 .unwrap_or(Value::Null);
604 }
605
606 Value::Null
608}
609
610fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
611 let mut builder = UInt64Builder::with_capacity(values.len());
612 for v in values {
613 if let Some(n) = v.as_u64() {
614 builder.append_value(n);
615 } else {
616 builder.append_null();
617 }
618 }
619 Arc::new(builder.finish())
620}
621
622fn values_to_int64_array(values: &[Value]) -> ArrayRef {
623 let mut builder = Int64Builder::with_capacity(values.len());
624 for v in values {
625 if let Some(n) = v.as_i64() {
626 builder.append_value(n);
627 } else {
628 builder.append_null();
629 }
630 }
631 Arc::new(builder.finish())
632}
633
634fn values_to_int32_array(values: &[Value]) -> ArrayRef {
635 let mut builder = Int32Builder::with_capacity(values.len());
636 for v in values {
637 if let Some(n) = v.as_i64() {
638 builder.append_value(n as i32);
639 } else {
640 builder.append_null();
641 }
642 }
643 Arc::new(builder.finish())
644}
645
646fn values_to_string_array(values: &[Value]) -> ArrayRef {
647 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
648 for v in values {
649 if let Some(s) = v.as_str() {
650 builder.append_value(s);
651 } else if v.is_null() {
652 builder.append_null();
653 } else {
654 builder.append_value(v.to_string());
655 }
656 }
657 Arc::new(builder.finish())
658}
659
660fn values_to_bool_array(values: &[Value]) -> ArrayRef {
661 let mut builder = BooleanBuilder::with_capacity(values.len());
662 for v in values {
663 if let Some(b) = v.as_bool() {
664 builder.append_value(b);
665 } else {
666 builder.append_null();
667 }
668 }
669 Arc::new(builder.finish())
670}
671
672fn values_to_float32_array(values: &[Value]) -> ArrayRef {
673 let mut builder = Float32Builder::with_capacity(values.len());
674 for v in values {
675 if let Some(n) = v.as_f64() {
676 builder.append_value(n as f32);
677 } else {
678 builder.append_null();
679 }
680 }
681 Arc::new(builder.finish())
682}
683
684fn values_to_float64_array(values: &[Value]) -> ArrayRef {
685 let mut builder = Float64Builder::with_capacity(values.len());
686 for v in values {
687 if let Some(n) = v.as_f64() {
688 builder.append_value(n);
689 } else {
690 builder.append_null();
691 }
692 }
693 Arc::new(builder.finish())
694}
695
696fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
697 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
698 for v in values {
699 match v {
700 Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
701 let btic = uni_btic::Btic::new(*lo, *hi, *meta)
702 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
703 builder.append_value(uni_btic::encode::encode(&btic))?;
704 }
705 Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
706 Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
707 Err(_) => builder.append_null(),
708 },
709 Value::List(bytes) => {
710 let b: Vec<u8> = bytes
711 .iter()
712 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
713 .collect();
714 if b.len() as i32 == size {
715 builder.append_value(&b)?;
716 } else {
717 builder.append_null();
718 }
719 }
720 _ => builder.append_null(),
721 }
722 }
723 Ok(Arc::new(builder.finish()))
724}
725
726pub fn extract_vector_f32_values(
741 val: Option<&Value>,
742 is_deleted: bool,
743 dimensions: usize,
744) -> (Vec<f32>, bool) {
745 let zeros = || vec![0.0_f32; dimensions];
746
747 if is_deleted {
749 return (zeros(), true);
750 }
751
752 match val {
753 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
755 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
758 let values: Vec<f32> = arr
759 .iter()
760 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
761 .collect();
762 (values, true)
763 }
764 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
767}
768
769fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
770 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
771 for v in values {
772 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
773 for val in vals {
774 builder.values().append_value(val);
775 }
776 builder.append(valid);
777 }
778 Arc::new(builder.finish())
779}
780
781fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
782 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
783 for v in values {
784 if v.is_null() {
785 builder.append_null();
786 } else if let Value::Temporal(tv) = v {
787 match tv {
788 uni_common::TemporalValue::DateTime {
789 nanos_since_epoch, ..
790 }
791 | uni_common::TemporalValue::LocalDateTime {
792 nanos_since_epoch, ..
793 } => builder.append_value(*nanos_since_epoch),
794 _ => builder.append_null(),
795 }
796 } else if let Some(n) = v.as_i64() {
797 builder.append_value(n);
798 } else if let Some(s) = v.as_str() {
799 match parse_datetime_to_nanos(s) {
800 Some(nanos) => builder.append_value(nanos),
801 None => builder.append_null(),
802 }
803 } else {
804 builder.append_null();
805 }
806 }
807
808 let arr = builder.finish();
809 if let Some(tz) = tz {
810 Arc::new(arr.with_timezone(tz.as_ref()))
811 } else {
812 Arc::new(arr)
813 }
814}
815
816fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
821 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
822 let mut offset_builder = Int32Builder::with_capacity(values.len());
823 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
824 let mut null_buffer = BooleanBufferBuilder::new(values.len());
825
826 for v in values {
827 match v {
828 Value::Temporal(uni_common::TemporalValue::DateTime {
829 nanos_since_epoch,
830 offset_seconds,
831 timezone_name,
832 }) => {
833 nanos_builder.append_value(*nanos_since_epoch);
834 offset_builder.append_value(*offset_seconds);
835 tz_builder.append_option(timezone_name.as_deref());
836 null_buffer.append(true);
837 }
838 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
839 nanos_builder.append_value(*nanos_since_epoch);
840 offset_builder.append_null();
841 tz_builder.append_null();
842 null_buffer.append(true);
843 }
844 _ => {
845 nanos_builder.append_null();
846 offset_builder.append_null();
847 tz_builder.append_null();
848 null_buffer.append(false);
849 }
850 }
851 }
852
853 let struct_arr = StructArray::new(
854 schema::datetime_struct_fields(),
855 vec![
856 Arc::new(nanos_builder.finish()) as ArrayRef,
857 Arc::new(offset_builder.finish()) as ArrayRef,
858 Arc::new(tz_builder.finish()) as ArrayRef,
859 ],
860 Some(null_buffer.finish().into()),
861 );
862 Arc::new(struct_arr)
863}
864
865fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
870 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
871 let mut offset_builder = Int32Builder::with_capacity(values.len());
872 let mut null_buffer = BooleanBufferBuilder::new(values.len());
873
874 for v in values {
875 match v {
876 Value::Temporal(uni_common::TemporalValue::Time {
877 nanos_since_midnight,
878 offset_seconds,
879 }) => {
880 nanos_builder.append_value(*nanos_since_midnight);
881 offset_builder.append_value(*offset_seconds);
882 null_buffer.append(true);
883 }
884 Value::Temporal(uni_common::TemporalValue::LocalTime {
885 nanos_since_midnight,
886 }) => {
887 nanos_builder.append_value(*nanos_since_midnight);
888 offset_builder.append_null();
889 null_buffer.append(true);
890 }
891 _ => {
892 nanos_builder.append_null();
893 offset_builder.append_null();
894 null_buffer.append(false);
895 }
896 }
897 }
898
899 let struct_arr = StructArray::new(
900 schema::time_struct_fields(),
901 vec![
902 Arc::new(nanos_builder.finish()) as ArrayRef,
903 Arc::new(offset_builder.finish()) as ArrayRef,
904 ],
905 Some(null_buffer.finish().into()),
906 );
907 Arc::new(struct_arr)
908}
909
910fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
911 let mut builder =
912 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
913 for v in values {
914 if v.is_null() {
915 builder.append_null();
916 } else {
917 let cv_bytes = uni_common::cypher_value_codec::encode(v);
919 builder.append_value(&cv_bytes);
920 }
921 }
922 Arc::new(builder.finish())
923}
924
925pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
927 match dt {
928 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
929 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
930 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
931 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
932 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
933 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
934 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
935 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
936 ArrowDataType::FixedSizeList(inner, size) => {
937 if inner.data_type() == &ArrowDataType::Float32 {
938 Ok(values_to_fixed_size_list_f32_array(values, *size))
939 } else {
940 Err(anyhow!("Unsupported FixedSizeList inner type"))
941 }
942 }
943 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
944 Ok(values_to_timestamp_array(values, tz.as_ref()))
945 }
946 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
947 Ok(values_to_timestamp_array(values, tz.as_ref()))
948 }
949 ArrowDataType::Date32 => {
950 let mut builder = Date32Builder::with_capacity(values.len());
951 for v in values {
952 if v.is_null() {
953 builder.append_null();
954 } else if let Value::Temporal(uni_common::TemporalValue::Date {
955 days_since_epoch,
956 }) = v
957 {
958 builder.append_value(*days_since_epoch);
959 } else if let Some(n) = v.as_i64() {
960 builder.append_value(n as i32);
961 } else {
962 builder.append_null();
963 }
964 }
965 Ok(Arc::new(builder.finish()))
966 }
967 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
968 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
969 for v in values {
970 if v.is_null() {
971 builder.append_null();
972 } else if let Value::Temporal(tv) = v {
973 match tv {
974 uni_common::TemporalValue::LocalTime {
975 nanos_since_midnight,
976 }
977 | uni_common::TemporalValue::Time {
978 nanos_since_midnight,
979 ..
980 } => builder.append_value(*nanos_since_midnight),
981 _ => builder.append_null(),
982 }
983 } else if let Some(n) = v.as_i64() {
984 builder.append_value(n);
985 } else {
986 builder.append_null();
987 }
988 }
989 Ok(Arc::new(builder.finish()))
990 }
991 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
992 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
993 for v in values {
994 if v.is_null() {
995 builder.append_null();
996 } else if let Value::Temporal(tv) = v {
997 match tv {
998 uni_common::TemporalValue::LocalTime {
999 nanos_since_midnight,
1000 }
1001 | uni_common::TemporalValue::Time {
1002 nanos_since_midnight,
1003 ..
1004 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
1006 }
1007 } else if let Some(n) = v.as_i64() {
1008 builder.append_value(n);
1009 } else {
1010 builder.append_null();
1011 }
1012 }
1013 Ok(Arc::new(builder.finish()))
1014 }
1015 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
1016 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
1017 for v in values {
1018 if v.is_null() {
1019 builder.append_null();
1020 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1021 months,
1022 days,
1023 nanos,
1024 }) = v
1025 {
1026 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
1027 months: *months as i32,
1028 days: *days as i32,
1029 nanoseconds: *nanos,
1030 });
1031 } else {
1032 builder.append_null();
1033 }
1034 }
1035 Ok(Arc::new(builder.finish()))
1036 }
1037 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1038 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1039 for v in values {
1040 if v.is_null() {
1041 builder.append_null();
1042 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1043 months,
1044 days,
1045 nanos,
1046 }) = v
1047 {
1048 let total_micros =
1049 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1050 builder.append_value(total_micros);
1051 } else if let Some(n) = v.as_i64() {
1052 builder.append_value(n);
1053 } else {
1054 builder.append_null();
1055 }
1056 }
1057 Ok(Arc::new(builder.finish()))
1058 }
1059 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1060 ArrowDataType::List(field) => {
1061 if field.data_type() == &ArrowDataType::Utf8 {
1062 let mut builder = ListBuilder::new(StringBuilder::new());
1063 for v in values {
1064 if let Value::List(arr) = v {
1065 for item in arr {
1066 if let Some(s) = item.as_str() {
1067 builder.values().append_value(s);
1068 } else {
1069 builder.values().append_null();
1070 }
1071 }
1072 builder.append(true);
1073 } else {
1074 builder.append_null();
1075 }
1076 }
1077 Ok(Arc::new(builder.finish()))
1078 } else {
1079 Err(anyhow!(
1080 "Unsupported List inner type: {:?}",
1081 field.data_type()
1082 ))
1083 }
1084 }
1085 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1086 Ok(values_to_datetime_struct_array(values))
1087 }
1088 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1089 Ok(values_to_time_struct_array(values))
1090 }
1091 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1092 }
1093}
1094
1095pub struct PropertyExtractor<'a> {
1097 data_type: &'a DataType,
1098}
1099
1100impl<'a> PropertyExtractor<'a> {
1101 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1102 Self { data_type }
1103 }
1104
1105 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1108 where
1109 F: Fn(usize) -> Option<&'a Value>,
1110 {
1111 match self.data_type {
1112 DataType::String => self.build_string_column(len, deleted, get_props),
1113 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1114 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1115 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1116 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1117 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1118 DataType::Vector { dimensions } => {
1119 self.build_vector_column(len, deleted, get_props, *dimensions)
1120 }
1121 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1122 DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1123 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1124 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1125 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1126 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1127 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1128 DataType::Date => self.build_date32_column(len, deleted, get_props),
1129 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1130 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1131 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1132 _ => Err(anyhow!(
1133 "Unsupported data type for arrow conversion: {:?}",
1134 self.data_type
1135 )),
1136 }
1137 }
1138
1139 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1140 where
1141 F: Fn(usize) -> Option<&'a Value>,
1142 {
1143 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1144 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1145 let prop = get_props(i);
1146 if let Some(s) = prop.and_then(|v| v.as_str()) {
1147 builder.append_value(s);
1148 } else if let Some(Value::Temporal(tv)) = prop {
1149 builder.append_value(tv.to_string());
1150 } else if is_deleted {
1151 builder.append_value("");
1152 } else {
1153 builder.append_null();
1154 }
1155 }
1156 Ok(Arc::new(builder.finish()))
1157 }
1158
1159 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1160 where
1161 F: Fn(usize) -> Option<&'a Value>,
1162 {
1163 let mut values = Vec::with_capacity(len);
1164 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1165 let val = get_props(i)
1168 .and_then(|v| v.as_i64())
1169 .and_then(|v| i32::try_from(v).ok());
1170 if val.is_none() && is_deleted {
1171 values.push(Some(0));
1172 } else {
1173 values.push(val);
1174 }
1175 }
1176 Ok(Arc::new(Int32Array::from(values)))
1177 }
1178
1179 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1180 where
1181 F: Fn(usize) -> Option<&'a Value>,
1182 {
1183 let mut values = Vec::with_capacity(len);
1184 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1185 let val = get_props(i).and_then(|v| v.as_i64());
1186 if val.is_none() && is_deleted {
1187 values.push(Some(0));
1188 } else {
1189 values.push(val);
1190 }
1191 }
1192 Ok(Arc::new(Int64Array::from(values)))
1193 }
1194
1195 fn build_timestamp_column<F>(
1196 &self,
1197 len: usize,
1198 deleted: &[bool],
1199 get_props: F,
1200 ) -> Result<ArrayRef>
1201 where
1202 F: Fn(usize) -> Option<&'a Value>,
1203 {
1204 let mut values = Vec::with_capacity(len);
1205 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1206 let val = get_props(i);
1207 let ts = if is_deleted || val.is_none() {
1208 Some(0i64)
1209 } else if let Some(Value::Temporal(tv)) = val {
1210 match tv {
1211 uni_common::TemporalValue::DateTime {
1212 nanos_since_epoch, ..
1213 }
1214 | uni_common::TemporalValue::LocalDateTime {
1215 nanos_since_epoch, ..
1216 } => Some(*nanos_since_epoch),
1217 _ => None,
1218 }
1219 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1220 Some(v)
1221 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1222 parse_datetime_to_nanos(s)
1223 } else {
1224 None
1225 };
1226
1227 if is_deleted {
1228 values.push(Some(0));
1229 } else {
1230 values.push(ts);
1231 }
1232 }
1233 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1234 Ok(Arc::new(arr))
1235 }
1236
1237 fn build_datetime_struct_column<F>(
1238 &self,
1239 len: usize,
1240 deleted: &[bool],
1241 get_props: F,
1242 ) -> Result<ArrayRef>
1243 where
1244 F: Fn(usize) -> Option<&'a Value>,
1245 {
1246 let values = self.collect_values_or_null(len, deleted, &get_props);
1247 Ok(values_to_datetime_struct_array(&values))
1248 }
1249
1250 fn build_time_struct_column<F>(
1251 &self,
1252 len: usize,
1253 deleted: &[bool],
1254 get_props: F,
1255 ) -> Result<ArrayRef>
1256 where
1257 F: Fn(usize) -> Option<&'a Value>,
1258 {
1259 let values = self.collect_values_or_null(len, deleted, &get_props);
1260 Ok(values_to_time_struct_array(&values))
1261 }
1262
1263 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1265 where
1266 F: Fn(usize) -> Option<&'a Value>,
1267 {
1268 deleted
1269 .iter()
1270 .enumerate()
1271 .take(len)
1272 .map(|(i, &is_deleted)| {
1273 if is_deleted {
1274 Value::Null
1275 } else {
1276 get_props(i).cloned().unwrap_or(Value::Null)
1277 }
1278 })
1279 .collect()
1280 }
1281
1282 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1283 where
1284 F: Fn(usize) -> Option<&'a Value>,
1285 {
1286 let mut builder = Date32Builder::with_capacity(len);
1287 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1288
1289 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1290 let val = get_props(i);
1291 let days = if is_deleted || val.is_none() {
1292 Some(0)
1293 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1294 days_since_epoch,
1295 })) = val
1296 {
1297 Some(*days_since_epoch)
1298 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1299 i32::try_from(v).ok()
1302 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1303 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1304 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1305 Err(_) => None,
1306 }
1307 } else {
1308 None
1309 };
1310
1311 if is_deleted {
1312 builder.append_value(0);
1313 } else if let Some(v) = days {
1314 builder.append_value(v);
1315 } else {
1316 builder.append_null();
1317 }
1318 }
1319 Ok(Arc::new(builder.finish()))
1320 }
1321
1322 fn build_duration_column<F>(
1323 &self,
1324 len: usize,
1325 deleted: &[bool],
1326 get_props: F,
1327 ) -> Result<ArrayRef>
1328 where
1329 F: Fn(usize) -> Option<&'a Value>,
1330 {
1331 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1333 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1334 let raw_val = get_props(i);
1335 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1336 {
1337 let encoded = uni_common::cypher_value_codec::encode(val);
1338 builder.append_value(&encoded);
1339 } else if is_deleted {
1340 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1341 months: 0,
1342 days: 0,
1343 nanos: 0,
1344 });
1345 let encoded = uni_common::cypher_value_codec::encode(&zero);
1346 builder.append_value(&encoded);
1347 } else {
1348 builder.append_null();
1349 }
1350 }
1351 Ok(Arc::new(builder.finish()))
1352 }
1353
1354 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1355 where
1356 F: Fn(usize) -> Option<&'a Value>,
1357 {
1358 const ENCODED_LEN: i32 = 24;
1359 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1360 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1361 let raw_val = get_props(i);
1362 let btic = match raw_val {
1363 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1364 uni_btic::Btic::new(*lo, *hi, *meta)
1365 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1366 ),
1367 Some(Value::String(s)) => Some(
1368 uni_btic::parse::parse_btic_literal(s)
1369 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1370 ),
1371 _ => None,
1372 };
1373
1374 if let Some(b) = btic {
1375 builder.append_value(uni_btic::encode::encode(&b))?;
1376 } else if is_deleted {
1377 builder.append_value([0u8; ENCODED_LEN as usize])?;
1378 } else {
1379 builder.append_null();
1380 }
1381 }
1382 Ok(Arc::new(builder.finish()))
1383 }
1384
1385 fn build_float32_column<F>(
1386 &self,
1387 len: usize,
1388 deleted: &[bool],
1389 get_props: F,
1390 ) -> Result<ArrayRef>
1391 where
1392 F: Fn(usize) -> Option<&'a Value>,
1393 {
1394 let mut values = Vec::with_capacity(len);
1395 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1396 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1397 if val.is_none() && is_deleted {
1398 values.push(Some(0.0));
1399 } else {
1400 values.push(val);
1401 }
1402 }
1403 Ok(Arc::new(Float32Array::from(values)))
1404 }
1405
1406 fn build_float64_column<F>(
1407 &self,
1408 len: usize,
1409 deleted: &[bool],
1410 get_props: F,
1411 ) -> Result<ArrayRef>
1412 where
1413 F: Fn(usize) -> Option<&'a Value>,
1414 {
1415 let mut values = Vec::with_capacity(len);
1416 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1417 let val = get_props(i).and_then(|v| v.as_f64());
1418 if val.is_none() && is_deleted {
1419 values.push(Some(0.0));
1420 } else {
1421 values.push(val);
1422 }
1423 }
1424 Ok(Arc::new(Float64Array::from(values)))
1425 }
1426
1427 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1428 where
1429 F: Fn(usize) -> Option<&'a Value>,
1430 {
1431 let mut values = Vec::with_capacity(len);
1432 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1433 let val = get_props(i).and_then(|v| v.as_bool());
1434 if val.is_none() && is_deleted {
1435 values.push(Some(false));
1436 } else {
1437 values.push(val);
1438 }
1439 }
1440 Ok(Arc::new(BooleanArray::from(values)))
1441 }
1442
1443 fn build_vector_column<F>(
1444 &self,
1445 len: usize,
1446 deleted: &[bool],
1447 get_props: F,
1448 dimensions: usize,
1449 ) -> Result<ArrayRef>
1450 where
1451 F: Fn(usize) -> Option<&'a Value>,
1452 {
1453 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1454
1455 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1456 let val = get_props(i);
1457 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1458 for v in values {
1459 builder.values().append_value(v);
1460 }
1461 builder.append(valid);
1462 }
1463 Ok(Arc::new(builder.finish()))
1464 }
1465
1466 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1467 where
1468 F: Fn(usize) -> Option<&'a Value>,
1469 {
1470 let null_val = Value::Null;
1471 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1472 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1473 let val = get_props(i);
1474 let uni_val = if val.is_none() && is_deleted {
1475 &null_val
1476 } else {
1477 val.unwrap_or(&null_val)
1478 };
1479 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1481 builder.append_value(&cv_bytes);
1482 }
1483 Ok(Arc::new(builder.finish()))
1484 }
1485
1486 fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1487 where
1488 F: Fn(usize) -> Option<&'a Value>,
1489 {
1490 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1491 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1492 let val = get_props(i);
1493 if let Some(Value::Bytes(b)) = val {
1494 builder.append_value(b);
1495 } else if is_deleted {
1496 builder.append_value(&[][..]);
1497 } else {
1498 builder.append_null();
1499 }
1500 }
1501 Ok(Arc::new(builder.finish()))
1502 }
1503
1504 fn build_list_column<F>(
1505 &self,
1506 len: usize,
1507 deleted: &[bool],
1508 get_props: F,
1509 inner: &DataType,
1510 ) -> Result<ArrayRef>
1511 where
1512 F: Fn(usize) -> Option<&'a Value>,
1513 {
1514 match inner {
1515 DataType::String => {
1516 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1517 if let Some(s) = v.as_str() {
1518 b.append_value(s);
1519 } else {
1520 b.append_null();
1521 }
1522 })
1523 }
1524 DataType::Int64 => {
1525 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1526 if let Some(n) = v.as_i64() {
1527 b.append_value(n);
1528 } else {
1529 b.append_null();
1530 }
1531 })
1532 }
1533 DataType::Float64 => {
1534 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1535 if let Some(f) = v.as_f64() {
1536 b.append_value(f);
1537 } else {
1538 b.append_null();
1539 }
1540 })
1541 }
1542 DataType::Bytes => {
1543 let item_field = Arc::new(
1547 Field::new("item", ArrowDataType::LargeBinary, true)
1548 .with_metadata(schema::raw_bytes_field_metadata()),
1549 );
1550 let mut builder =
1551 ListBuilder::new(LargeBinaryBuilder::new()).with_field(item_field);
1552 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1553 let val_array = get_props(i).and_then(|v| v.as_array());
1554 if val_array.is_none() && is_deleted {
1555 builder.append_null();
1556 } else if let Some(arr) = val_array {
1557 for v in arr {
1558 if let Value::Bytes(b) = v {
1559 builder.values().append_value(b);
1560 } else {
1561 builder.values().append_null();
1562 }
1563 }
1564 builder.append(true);
1565 } else {
1566 builder.append_null();
1567 }
1568 }
1569 Ok(Arc::new(builder.finish()))
1570 }
1571 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1572 }
1573 }
1574
1575 fn build_typed_list<F, B, A>(
1577 &self,
1578 len: usize,
1579 deleted: &[bool],
1580 get_props: &F,
1581 inner_builder: B,
1582 mut append_value: A,
1583 ) -> Result<ArrayRef>
1584 where
1585 F: Fn(usize) -> Option<&'a Value>,
1586 B: arrow_array::builder::ArrayBuilder,
1587 A: FnMut(&Value, &mut B),
1588 {
1589 let mut builder = ListBuilder::new(inner_builder);
1590 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1591 let val_array = get_props(i).and_then(|v| v.as_array());
1592 if val_array.is_none() && is_deleted {
1593 builder.append_null();
1594 } else if let Some(arr) = val_array {
1595 for v in arr {
1596 append_value(v, builder.values());
1597 }
1598 builder.append(true);
1599 } else {
1600 builder.append_null();
1601 }
1602 }
1603 Ok(Arc::new(builder.finish()))
1604 }
1605
1606 fn build_map_column<F>(
1607 &self,
1608 len: usize,
1609 deleted: &[bool],
1610 get_props: F,
1611 key: &DataType,
1612 value: &DataType,
1613 ) -> Result<ArrayRef>
1614 where
1615 F: Fn(usize) -> Option<&'a Value>,
1616 {
1617 if !matches!(key, DataType::String) {
1618 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1619 }
1620
1621 match value {
1622 DataType::String => self.build_typed_map(
1623 len,
1624 deleted,
1625 &get_props,
1626 StringBuilder::new(),
1627 arrow_schema::DataType::Utf8,
1628 None,
1629 |v, b: &mut StringBuilder| {
1630 if let Some(s) = v.as_str() {
1631 b.append_value(s);
1632 } else {
1633 b.append_null();
1634 }
1635 },
1636 ),
1637 DataType::Int64 => self.build_typed_map(
1638 len,
1639 deleted,
1640 &get_props,
1641 Int64Builder::new(),
1642 arrow_schema::DataType::Int64,
1643 None,
1644 |v, b: &mut Int64Builder| {
1645 if let Some(n) = v.as_i64() {
1646 b.append_value(n);
1647 } else {
1648 b.append_null();
1649 }
1650 },
1651 ),
1652 DataType::Bytes => self.build_typed_map(
1653 len,
1654 deleted,
1655 &get_props,
1656 LargeBinaryBuilder::new(),
1657 arrow_schema::DataType::LargeBinary,
1658 Some(schema::raw_bytes_field_metadata()),
1661 |v, b: &mut LargeBinaryBuilder| {
1662 if let Value::Bytes(bytes) = v {
1663 b.append_value(bytes);
1664 } else {
1665 b.append_null();
1666 }
1667 },
1668 ),
1669 _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1670 }
1671 }
1672
1673 #[expect(
1675 clippy::too_many_arguments,
1676 reason = "builder plumbing: value type + optional child metadata are distinct knobs"
1677 )]
1678 fn build_typed_map<F, B, A>(
1679 &self,
1680 len: usize,
1681 deleted: &[bool],
1682 get_props: &F,
1683 value_builder: B,
1684 value_arrow_type: arrow_schema::DataType,
1685 value_metadata: Option<HashMap<String, String>>,
1686 mut append_value: A,
1687 ) -> Result<ArrayRef>
1688 where
1689 F: Fn(usize) -> Option<&'a Value>,
1690 B: arrow_array::builder::ArrayBuilder,
1691 A: FnMut(&Value, &mut B),
1692 {
1693 let key_builder = Box::new(StringBuilder::new());
1694 let value_builder = Box::new(value_builder);
1695 let value_field = match value_metadata {
1696 Some(meta) => Field::new("value", value_arrow_type, true).with_metadata(meta),
1697 None => Field::new("value", value_arrow_type, true),
1698 };
1699 let struct_builder = StructBuilder::new(
1700 vec![
1701 Field::new("key", arrow_schema::DataType::Utf8, false),
1702 value_field,
1703 ],
1704 vec![key_builder, value_builder],
1705 );
1706 let mut builder = ListBuilder::new(struct_builder);
1707
1708 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1709 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1710 }
1711 Ok(Arc::new(builder.finish()))
1712 }
1713
1714 fn append_map_entry<B, A>(
1716 &self,
1717 builder: &mut ListBuilder<StructBuilder>,
1718 val: Option<&'a Value>,
1719 is_deleted: bool,
1720 append_value: &mut A,
1721 ) where
1722 B: arrow_array::builder::ArrayBuilder,
1723 A: FnMut(&Value, &mut B),
1724 {
1725 let val_obj = val.and_then(|v| v.as_object());
1726 if val_obj.is_none() && is_deleted {
1727 builder.append(false);
1728 } else if let Some(obj) = val_obj {
1729 let struct_b = builder.values();
1730 for (k, v) in obj {
1731 struct_b
1732 .field_builder::<StringBuilder>(0)
1733 .unwrap()
1734 .append_value(k);
1735 let value_b = struct_b.field_builder::<B>(1).unwrap();
1737 append_value(v, value_b);
1738 struct_b.append(true);
1739 }
1740 builder.append(true);
1741 } else {
1742 builder.append(false);
1743 }
1744 }
1745
1746 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1747 where
1748 F: Fn(usize) -> Option<&'a Value>,
1749 {
1750 let mut builder = BinaryBuilder::new();
1751 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1752 if is_deleted {
1753 builder.append_null();
1754 continue;
1755 }
1756 if let Some(val) = get_props(i) {
1757 let crdt_result = if let Some(s) = val.as_str() {
1760 serde_json::from_str::<Crdt>(s)
1761 } else {
1762 let json_val: serde_json::Value = val.clone().into();
1764 serde_json::from_value::<Crdt>(json_val)
1765 };
1766
1767 if let Ok(crdt) = crdt_result {
1768 if let Ok(bytes) = crdt.to_msgpack() {
1769 builder.append_value(&bytes);
1770 } else {
1771 builder.append_null();
1772 }
1773 } else {
1774 builder.append_null();
1775 }
1776 } else {
1777 builder.append_null();
1778 }
1779 }
1780 Ok(Arc::new(builder.finish()))
1781 }
1782}
1783
1784pub fn build_edge_column<'a>(
1786 name: &'a str,
1787 data_type: &'a DataType,
1788 len: usize,
1789 get_props: impl Fn(usize) -> Option<&'a Value>,
1790) -> Result<ArrayRef> {
1791 let deleted = vec![false; len];
1793 let extractor = PropertyExtractor::new(name, data_type);
1794 extractor.build_column(len, &deleted, get_props)
1795}
1796
1797#[cfg(test)]
1798mod tests {
1799 use super::*;
1800 use arrow_array::{
1801 Array, DurationMicrosecondArray,
1802 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1803 };
1804 use std::collections::HashMap;
1805 use uni_common::TemporalValue;
1806 use uni_crdt::{Crdt, GCounter};
1807
1808 #[test]
1809 fn test_arrow_to_value_string() {
1810 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1811 assert_eq!(
1812 arrow_to_value(&arr, 0, None),
1813 Value::String("hello".to_string())
1814 );
1815 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1816 assert_eq!(
1817 arrow_to_value(&arr, 2, None),
1818 Value::String("world".to_string())
1819 );
1820 }
1821
1822 #[test]
1823 fn test_arrow_to_value_int64() {
1824 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1825 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1826 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1827 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1828 }
1829
1830 #[test]
1831 #[allow(clippy::approx_constant)]
1832 fn test_arrow_to_value_float64() {
1833 let arr = Float64Array::from(vec![Some(3.14), None]);
1834 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1835 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1836 }
1837
1838 #[test]
1839 fn test_arrow_to_value_bool() {
1840 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1841 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1842 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1843 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1844 }
1845
1846 #[test]
1847 fn test_values_to_array_int64() {
1848 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1849 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1850 assert_eq!(arr.len(), 4);
1851
1852 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1853 assert_eq!(int_arr.value(0), 1);
1854 assert_eq!(int_arr.value(1), 2);
1855 assert!(int_arr.is_null(2));
1856 assert_eq!(int_arr.value(3), 4);
1857 }
1858
1859 #[test]
1860 fn test_values_to_array_string() {
1861 let values = vec![
1862 Value::String("a".to_string()),
1863 Value::String("b".to_string()),
1864 Value::Null,
1865 ];
1866 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1867 assert_eq!(arr.len(), 3);
1868
1869 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1870 assert_eq!(str_arr.value(0), "a");
1871 assert_eq!(str_arr.value(1), "b");
1872 assert!(str_arr.is_null(2));
1873 }
1874
1875 #[test]
1876 fn test_property_extractor_string() {
1877 let props: Vec<HashMap<String, Value>> = vec![
1878 [("name".to_string(), Value::String("Alice".to_string()))]
1879 .into_iter()
1880 .collect(),
1881 [("name".to_string(), Value::String("Bob".to_string()))]
1882 .into_iter()
1883 .collect(),
1884 HashMap::new(),
1885 ];
1886 let deleted = vec![false, false, true];
1887
1888 let extractor = PropertyExtractor::new("name", &DataType::String);
1889 let arr = extractor
1890 .build_column(3, &deleted, |i| props[i].get("name"))
1891 .unwrap();
1892
1893 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1894 assert_eq!(str_arr.value(0), "Alice");
1895 assert_eq!(str_arr.value(1), "Bob");
1896 assert_eq!(str_arr.value(2), ""); }
1898
1899 #[test]
1900 fn test_property_extractor_int64() {
1901 let props: Vec<HashMap<String, Value>> = vec![
1902 [("age".to_string(), Value::Int(25))].into_iter().collect(),
1903 [("age".to_string(), Value::Int(30))].into_iter().collect(),
1904 HashMap::new(),
1905 ];
1906 let deleted = vec![false, false, true];
1907
1908 let extractor = PropertyExtractor::new("age", &DataType::Int64);
1909 let arr = extractor
1910 .build_column(3, &deleted, |i| props[i].get("age"))
1911 .unwrap();
1912
1913 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1914 assert_eq!(int_arr.value(0), 25);
1915 assert_eq!(int_arr.value(1), 30);
1916 assert_eq!(int_arr.value(2), 0); }
1918
1919 #[test]
1920 fn test_property_extractor_bytes_roundtrip() {
1921 let blob = vec![0u8, 1, 2, 255];
1922 let props: Vec<HashMap<String, Value>> = vec![
1923 [("blob".to_string(), Value::Bytes(blob.clone()))]
1924 .into_iter()
1925 .collect(),
1926 [("blob".to_string(), Value::Bytes(Vec::new()))]
1927 .into_iter()
1928 .collect(),
1929 HashMap::new(),
1930 ];
1931 let deleted = vec![false, false, false];
1932
1933 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1934 let arr = extractor
1935 .build_column(3, &deleted, |i| props[i].get("blob"))
1936 .unwrap();
1937
1938 assert_eq!(
1940 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1941 Value::Bytes(blob)
1942 );
1943 assert_eq!(
1944 arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
1945 Value::Bytes(Vec::new())
1946 );
1947 assert_eq!(
1949 arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
1950 Value::Null
1951 );
1952 }
1953
1954 #[test]
1955 fn test_bytes_vs_cypher_value_disambiguation() {
1956 let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1960 let props: Vec<HashMap<String, Value>> = vec![
1961 [("blob".to_string(), Value::Bytes(raw.clone()))]
1962 .into_iter()
1963 .collect(),
1964 ];
1965 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1966 let arr = extractor
1967 .build_column(1, &[false], |i| props[i].get("blob"))
1968 .unwrap();
1969 assert_eq!(
1971 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1972 Value::Bytes(raw)
1973 );
1974 }
1975
1976 #[test]
1977 fn test_data_type_bytes_to_arrow() {
1978 assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
1979 }
1980
1981 #[test]
1982 fn test_arrow_to_value_time64() {
1983 let mut builder = Time64MicrosecondBuilder::new();
1985 builder.append_value(37_845_000_000);
1987 builder.append_value(0);
1989 builder.append_value(86_399_123_456);
1991 builder.append_null();
1992
1993 let arr = builder.finish();
1994 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1996 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1997 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1998 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1999 }
2000
2001 #[test]
2002 fn test_arrow_to_value_duration() {
2003 let arr = DurationMicrosecondArray::from(vec![
2006 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
2010 ]);
2011
2012 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
2013 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
2014 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
2015 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2016 }
2017
2018 #[test]
2019 fn test_arrow_to_value_binary_crdt() {
2020 let mut builder = BinaryBuilder::new();
2022
2023 let mut counter = GCounter::new();
2025 counter.increment("actor1", 5);
2026 let crdt = Crdt::GCounter(counter);
2027 let bytes = crdt.to_msgpack().unwrap();
2028 builder.append_value(&bytes);
2029
2030 builder.append_null();
2032
2033 let arr = builder.finish();
2034
2035 let result = arrow_to_value(&arr, 0, None);
2037 assert!(result.as_object().is_some());
2038 let obj = result.as_object().unwrap();
2039 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
2041
2042 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2044 }
2045
2046 #[test]
2047 fn test_datetime_struct_encode_decode_roundtrip() {
2048 let values = vec![
2050 Value::Temporal(TemporalValue::DateTime {
2051 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
2054 }),
2055 Value::Temporal(TemporalValue::DateTime {
2056 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
2059 }),
2060 Value::Temporal(TemporalValue::DateTime {
2061 nanos_since_epoch: 0, offset_seconds: 0,
2063 timezone_name: Some("UTC".to_string()),
2064 }),
2065 ];
2066
2067 let arr_ref = values_to_datetime_struct_array(&values);
2069 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2070 assert_eq!(arr.len(), 3);
2071
2072 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2074 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2075 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2076
2077 assert_eq!(decoded_0, values[0]);
2079 assert_eq!(decoded_1, values[1]);
2080 assert_eq!(decoded_2, values[2]);
2081
2082 if let Value::Temporal(TemporalValue::DateTime {
2084 nanos_since_epoch,
2085 offset_seconds,
2086 timezone_name,
2087 }) = decoded_0
2088 {
2089 assert_eq!(nanos_since_epoch, 441763200000000000);
2090 assert_eq!(offset_seconds, 3600);
2091 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2092 } else {
2093 panic!("Expected DateTime value");
2094 }
2095 }
2096
2097 #[test]
2098 fn test_datetime_struct_null_handling() {
2099 let values = vec![
2101 Value::Temporal(TemporalValue::DateTime {
2102 nanos_since_epoch: 441763200000000000,
2103 offset_seconds: 3600,
2104 timezone_name: Some("Europe/Paris".to_string()),
2105 }),
2106 Value::Null,
2107 Value::Temporal(TemporalValue::DateTime {
2108 nanos_since_epoch: 0,
2109 offset_seconds: 0,
2110 timezone_name: None,
2111 }),
2112 ];
2113
2114 let arr_ref = values_to_datetime_struct_array(&values);
2115 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2116 assert_eq!(arr.len(), 3);
2117
2118 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2120 assert_eq!(decoded_0, values[0]);
2121
2122 assert!(arr.is_null(1));
2124 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2125 assert_eq!(decoded_1, Value::Null);
2126
2127 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2129 assert_eq!(decoded_2, values[2]);
2130 }
2131
2132 #[test]
2133 fn test_datetime_struct_boundary_values() {
2134 let values = vec![
2136 Value::Temporal(TemporalValue::DateTime {
2137 nanos_since_epoch: 441763200000000000,
2138 offset_seconds: 0, timezone_name: None,
2140 }),
2141 Value::Temporal(TemporalValue::DateTime {
2142 nanos_since_epoch: 441763200000000000,
2143 offset_seconds: 43200, timezone_name: None,
2145 }),
2146 Value::Temporal(TemporalValue::DateTime {
2147 nanos_since_epoch: 441763200000000000,
2148 offset_seconds: -43200, timezone_name: None,
2150 }),
2151 ];
2152
2153 let arr_ref = values_to_datetime_struct_array(&values);
2154 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2155 assert_eq!(arr.len(), 3);
2156
2157 for (i, expected) in values.iter().enumerate() {
2159 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2160 assert_eq!(&decoded, expected);
2161 }
2162 }
2163
2164 #[test]
2165 fn test_datetime_old_schema_migration() {
2166 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2168 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
2171
2172 let arr = builder.finish();
2173
2174 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2176 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2177 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2178
2179 if let Value::Temporal(TemporalValue::DateTime {
2181 nanos_since_epoch,
2182 offset_seconds,
2183 timezone_name,
2184 }) = decoded_0
2185 {
2186 assert_eq!(nanos_since_epoch, 441763200000000000);
2187 assert_eq!(offset_seconds, 0);
2188 assert_eq!(timezone_name, Some("UTC".to_string()));
2189 } else {
2190 panic!("Expected DateTime value");
2191 }
2192
2193 assert_eq!(decoded_2, Value::Null);
2195 }
2196
2197 #[test]
2198 fn test_time_struct_encode_decode_roundtrip() {
2199 let values = vec![
2201 Value::Temporal(TemporalValue::Time {
2202 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
2205 Value::Temporal(TemporalValue::Time {
2206 nanos_since_midnight: 0, offset_seconds: 0,
2208 }),
2209 Value::Temporal(TemporalValue::Time {
2210 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2213 ];
2214
2215 let arr_ref = values_to_time_struct_array(&values);
2217 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2218 assert_eq!(arr.len(), 3);
2219
2220 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2222 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2223 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2224
2225 assert_eq!(decoded_0, values[0]);
2227 assert_eq!(decoded_1, values[1]);
2228 assert_eq!(decoded_2, values[2]);
2229
2230 if let Value::Temporal(TemporalValue::Time {
2232 nanos_since_midnight,
2233 offset_seconds,
2234 }) = decoded_0
2235 {
2236 assert_eq!(nanos_since_midnight, 37845000000000);
2237 assert_eq!(offset_seconds, 3600);
2238 } else {
2239 panic!("Expected Time value");
2240 }
2241 }
2242
2243 #[test]
2244 fn test_time_struct_null_handling() {
2245 let values = vec![
2247 Value::Temporal(TemporalValue::Time {
2248 nanos_since_midnight: 37845000000000,
2249 offset_seconds: 3600,
2250 }),
2251 Value::Null,
2252 Value::Temporal(TemporalValue::Time {
2253 nanos_since_midnight: 0,
2254 offset_seconds: 0,
2255 }),
2256 ];
2257
2258 let arr_ref = values_to_time_struct_array(&values);
2259 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2260 assert_eq!(arr.len(), 3);
2261
2262 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2264 assert_eq!(decoded_0, values[0]);
2265
2266 assert!(arr.is_null(1));
2268 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2269 assert_eq!(decoded_1, Value::Null);
2270
2271 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2273 assert_eq!(decoded_2, values[2]);
2274 }
2275
2276 #[test]
2279 fn test_extract_vector_f32_values_valid_vector() {
2280 let v = vec![1.0, 2.0, 3.0];
2281 let val = Value::Vector(v.clone());
2282 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2283 assert_eq!(result, v);
2284 assert!(valid);
2285 }
2286
2287 #[test]
2288 fn test_extract_vector_f32_values_vector_wrong_dims() {
2289 let v = vec![1.0, 2.0];
2290 let val = Value::Vector(v);
2291 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2292 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2293 assert!(!valid);
2294 }
2295
2296 #[test]
2297 fn test_extract_vector_f32_values_valid_list() {
2298 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2299 let val = Value::List(v);
2300 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2301 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2302 assert!(valid);
2303 }
2304
2305 #[test]
2306 fn test_extract_vector_f32_values_list_wrong_dims() {
2307 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2308 let val = Value::List(v);
2309 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2310 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2311 assert!(!valid);
2312 }
2313
2314 #[test]
2315 fn test_extract_vector_f32_values_list_int_coercion() {
2316 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2317 let val = Value::List(v);
2318 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2319 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2320 assert!(valid);
2321 }
2322
2323 #[test]
2324 fn test_extract_vector_f32_values_none() {
2325 let (result, valid) = extract_vector_f32_values(None, false, 3);
2326 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2327 assert!(!valid);
2328 }
2329
2330 #[test]
2331 fn test_extract_vector_f32_values_null() {
2332 let val = Value::Null;
2333 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2334 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2335 assert!(!valid);
2336 }
2337
2338 #[test]
2339 fn test_extract_vector_f32_values_unsupported_type() {
2340 let val = Value::String("not a vector".to_string());
2341 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2342 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2343 assert!(!valid);
2344 }
2345
2346 #[test]
2347 fn test_extract_vector_f32_values_deleted_with_none() {
2348 let (result, valid) = extract_vector_f32_values(None, true, 3);
2349 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2350 assert!(valid); }
2352
2353 #[test]
2354 fn test_extract_vector_f32_values_deleted_with_null() {
2355 let val = Value::Null;
2356 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2357 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2358 assert!(valid); }
2360
2361 #[test]
2364 fn test_values_to_fixed_size_list_vector_with_nulls() {
2365 let values = vec![
2366 Value::Vector(vec![1.0, 2.0]),
2367 Value::Null,
2368 Value::Vector(vec![3.0, 4.0]),
2369 Value::String("invalid".to_string()),
2370 ];
2371 let arr_ref = values_to_array(
2372 &values,
2373 &ArrowDataType::FixedSizeList(
2374 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2375 2,
2376 ),
2377 )
2378 .unwrap();
2379
2380 let arr = arr_ref
2381 .as_any()
2382 .downcast_ref::<FixedSizeListArray>()
2383 .unwrap();
2384
2385 assert_eq!(arr.len(), 4);
2386 assert!(arr.is_valid(0));
2387 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2389 assert!(!arr.is_valid(3)); }
2391
2392 #[test]
2393 fn test_values_to_fixed_size_list_from_list() {
2394 let values = vec![
2395 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2396 Value::List(vec![Value::Int(3), Value::Int(4)]),
2397 ];
2398 let arr_ref = values_to_array(
2399 &values,
2400 &ArrowDataType::FixedSizeList(
2401 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2402 2,
2403 ),
2404 )
2405 .unwrap();
2406
2407 let arr = arr_ref
2408 .as_any()
2409 .downcast_ref::<FixedSizeListArray>()
2410 .unwrap();
2411
2412 assert_eq!(arr.len(), 2);
2413 assert!(arr.is_valid(0));
2414 assert!(arr.is_valid(1));
2415
2416 let child = arr
2418 .values()
2419 .as_any()
2420 .downcast_ref::<Float32Array>()
2421 .unwrap();
2422 assert_eq!(child.value(0), 1.0);
2423 assert_eq!(child.value(1), 2.0);
2424 assert_eq!(child.value(2), 3.0);
2425 assert_eq!(child.value(3), 4.0);
2426 }
2427
2428 #[test]
2429 fn test_values_to_fixed_size_list_wrong_dimensions() {
2430 let values = vec![
2431 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2434 let arr_ref = values_to_array(
2435 &values,
2436 &ArrowDataType::FixedSizeList(
2437 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2438 2,
2439 ),
2440 )
2441 .unwrap();
2442
2443 let arr = arr_ref
2444 .as_any()
2445 .downcast_ref::<FixedSizeListArray>()
2446 .unwrap();
2447
2448 assert_eq!(arr.len(), 2);
2449 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2454 .values()
2455 .as_any()
2456 .downcast_ref::<Float32Array>()
2457 .unwrap();
2458 assert_eq!(child.value(0), 0.0);
2459 assert_eq!(child.value(1), 0.0);
2460 assert_eq!(child.value(2), 0.0);
2461 assert_eq!(child.value(3), 0.0);
2462 }
2463
2464 #[test]
2465 fn test_values_to_fixed_size_list_all_nulls() {
2466 let values = vec![Value::Null, Value::Null, Value::Null];
2467 let arr_ref = values_to_array(
2468 &values,
2469 &ArrowDataType::FixedSizeList(
2470 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2471 3,
2472 ),
2473 )
2474 .unwrap();
2475
2476 let arr = arr_ref
2477 .as_any()
2478 .downcast_ref::<FixedSizeListArray>()
2479 .unwrap();
2480
2481 assert_eq!(arr.len(), 3);
2482 assert!(!arr.is_valid(0));
2483 assert!(!arr.is_valid(1));
2484 assert!(!arr.is_valid(2));
2485
2486 let child = arr
2488 .values()
2489 .as_any()
2490 .downcast_ref::<Float32Array>()
2491 .unwrap();
2492 assert_eq!(child.len(), 9);
2493 }
2494
2495 #[test]
2496 fn test_values_to_fixed_size_list_mixed_types() {
2497 let values = vec![
2498 Value::Vector(vec![1.0, 2.0]),
2499 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2500 Value::Null,
2501 Value::String("invalid".to_string()),
2502 ];
2503 let arr_ref = values_to_array(
2504 &values,
2505 &ArrowDataType::FixedSizeList(
2506 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2507 2,
2508 ),
2509 )
2510 .unwrap();
2511
2512 let arr = arr_ref
2513 .as_any()
2514 .downcast_ref::<FixedSizeListArray>()
2515 .unwrap();
2516
2517 assert_eq!(arr.len(), 4);
2518 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2525 .values()
2526 .as_any()
2527 .downcast_ref::<Float32Array>()
2528 .unwrap();
2529 assert_eq!(child.value(0), 1.0);
2530 assert_eq!(child.value(1), 2.0);
2531 assert_eq!(child.value(2), 3.0);
2532 assert_eq!(child.value(3), 4.0);
2533 }
2534
2535 #[test]
2538 fn test_build_vector_column_with_nulls_and_deleted() {
2539 let data_type = DataType::Vector { dimensions: 3 };
2540 let extractor = PropertyExtractor::new("test_vec", &data_type);
2541
2542 let props = [
2543 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2544 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2547 ];
2548 let deleted = [false, false, false, true]; let arr_ref = extractor
2551 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2552 .unwrap();
2553
2554 let arr = arr_ref
2555 .as_any()
2556 .downcast_ref::<FixedSizeListArray>()
2557 .unwrap();
2558
2559 assert_eq!(arr.len(), 4);
2560 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2567 .values()
2568 .as_any()
2569 .downcast_ref::<Float32Array>()
2570 .unwrap();
2571 assert_eq!(child.value(0), 1.0);
2572 assert_eq!(child.value(1), 2.0);
2573 assert_eq!(child.value(2), 3.0);
2574 assert_eq!(child.value(9), 0.0);
2578 assert_eq!(child.value(10), 0.0);
2579 assert_eq!(child.value(11), 0.0);
2580 }
2581
2582 #[test]
2583 fn test_build_vector_column_with_list_input() {
2584 let data_type = DataType::Vector { dimensions: 2 };
2585 let extractor = PropertyExtractor::new("test_vec", &data_type);
2586
2587 let props = [
2588 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2589 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2590 Some(Value::Vector(vec![5.0, 6.0])),
2591 ];
2592 let deleted = [false, false, false];
2593
2594 let arr_ref = extractor
2595 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2596 .unwrap();
2597
2598 let arr = arr_ref
2599 .as_any()
2600 .downcast_ref::<FixedSizeListArray>()
2601 .unwrap();
2602
2603 assert_eq!(arr.len(), 3);
2604 assert!(arr.is_valid(0));
2605 assert!(arr.is_valid(1));
2606 assert!(arr.is_valid(2));
2607
2608 let child = arr
2610 .values()
2611 .as_any()
2612 .downcast_ref::<Float32Array>()
2613 .unwrap();
2614 assert_eq!(child.value(0), 1.0);
2615 assert_eq!(child.value(1), 2.0);
2616 assert_eq!(child.value(2), 3.0);
2617 assert_eq!(child.value(3), 4.0);
2618 assert_eq!(child.value(4), 5.0);
2619 assert_eq!(child.value(5), 6.0);
2620 }
2621
2622 #[test]
2625 fn test_int32_and_date32_columns_null_out_of_range() {
2626 let dt = DataType::Int64;
2627 let extractor = PropertyExtractor::new("x", &dt);
2628
2629 let over = Value::Int(i64::from(i32::MAX) + 1);
2630 let ok = Value::Int(42);
2631 let vals = [over, ok];
2632 let deleted = [false, false];
2633
2634 let arr = extractor
2635 .build_int32_column(2, &deleted, |i| Some(&vals[i]))
2636 .unwrap();
2637 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2638 assert!(
2639 arr.is_null(0),
2640 "out-of-range i64 must be NULL in an int32 column, not wrapped"
2641 );
2642 assert_eq!(arr.value(1), 42);
2643
2644 let arr = extractor
2645 .build_date32_column(2, &deleted, |i| Some(&vals[i]))
2646 .unwrap();
2647 let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
2648 assert!(
2649 arr.is_null(0),
2650 "out-of-range day count must be NULL in a date32 column, not wrapped"
2651 );
2652 assert_eq!(arr.value(1), 42);
2653 }
2654}