1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let key_col = structs.column(0);
154 let val_col = structs.column(1);
155 let mut map = HashMap::new();
156 for i in 0..structs.len() {
157 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158 map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159 }
160 }
161 Some(map)
162}
163
164fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166 (0..arr.len())
167 .map(|i| arrow_to_value(arr.as_ref(), i, None))
168 .collect()
169}
170
171pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178 if col.is_null(row) {
179 return Value::Null;
180 }
181
182 if let Some(dt) = data_type {
184 match dt {
185 DataType::DateTime => {
186 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189 struct_arr.column_by_name("nanos_since_epoch"),
190 struct_arr.column_by_name("offset_seconds"),
191 struct_arr.column_by_name("timezone_name"),
192 )
193 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194 nanos_col
195 .as_any()
196 .downcast_ref::<TimestampNanosecondArray>(),
197 offset_col.as_any().downcast_ref::<Int32Array>(),
198 tz_col.as_any().downcast_ref::<StringArray>(),
199 )
200 {
201 if nanos_arr.is_null(row) {
202 return Value::Null;
203 }
204 let nanos = nanos_arr.value(row);
205 if offset_arr.is_null(row) {
206 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208 nanos_since_epoch: nanos,
209 });
210 }
211 let offset = offset_arr.value(row);
212 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213 return Value::Temporal(uni_common::TemporalValue::DateTime {
214 nanos_since_epoch: nanos,
215 offset_seconds: offset,
216 timezone_name: tz_name,
217 });
218 }
219 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221 let nanos = ts.value(row);
222 let tz_name = ts.timezone().map(|s| s.to_string());
223 return Value::Temporal(uni_common::TemporalValue::DateTime {
224 nanos_since_epoch: nanos,
225 offset_seconds: 0,
226 timezone_name: tz_name,
227 });
228 }
229 }
230 DataType::Time => {
231 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233 && let (Some(nanos_col), Some(offset_col)) = (
234 struct_arr.column_by_name("nanos_since_midnight"),
235 struct_arr.column_by_name("offset_seconds"),
236 )
237 && let (Some(nanos_arr), Some(offset_arr)) = (
238 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239 offset_col.as_any().downcast_ref::<Int32Array>(),
240 )
241 {
242 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244 return Value::Null;
245 }
246 let nanos = nanos_arr.value(row);
247 let offset = offset_arr.value(row);
248 return Value::Temporal(uni_common::TemporalValue::Time {
249 nanos_since_midnight: nanos,
250 offset_seconds: offset,
251 });
252 }
253 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255 let nanos = t.value(row);
256 return Value::Temporal(uni_common::TemporalValue::Time {
257 nanos_since_midnight: nanos,
258 offset_seconds: 0,
259 });
260 }
261 }
262 DataType::Bytes => {
263 let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
264 log::warn!("Bytes column is not LargeBinaryArray");
265 return Value::Null;
266 };
267 if arr.is_null(row) {
268 return Value::Null;
269 }
270 return Value::Bytes(arr.value(row).to_vec());
271 }
272 DataType::Btic => {
273 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
274 log::warn!("BTIC column is not FixedSizeBinaryArray");
275 return Value::Null;
276 };
277 let bytes = fsb.value(row);
278 return match uni_btic::encode::decode_slice(bytes) {
279 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
280 lo: btic.lo(),
281 hi: btic.hi(),
282 meta: btic.meta(),
283 }),
284 Err(e) => {
285 log::warn!("BTIC decode error: {}", e);
286 Value::Null
287 }
288 };
289 }
290 _ => {}
291 }
292 }
293
294 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
296 return Value::String(s.value(row).to_string());
297 }
298
299 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
301 return Value::Int(u.value(row) as i64);
302 }
303 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
304 return Value::Int(i.value(row));
305 }
306 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
307 return Value::Int(i.value(row) as i64);
308 }
309
310 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
312 return Value::Float(f.value(row));
313 }
314 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
315 return Value::Float(f.value(row) as f64);
316 }
317
318 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
320 return Value::Bool(b.value(row));
321 }
322
323 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
325 return Value::List(array_to_value_list(&list.value(row)));
326 }
327
328 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
330 let arr = list.value(row);
331
332 if let Some(obj) = try_reconstruct_map(&arr) {
334 return Value::Map(obj);
335 }
336
337 return Value::List(array_to_value_list(&arr));
338 }
339
340 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
342 return Value::List(array_to_value_list(&list.value(row)));
343 }
344
345 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
347 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
348
349 if field_names.contains(&"nanos_since_epoch")
351 && field_names.contains(&"offset_seconds")
352 && field_names.contains(&"timezone_name")
353 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
354 s.column_by_name("nanos_since_epoch"),
355 s.column_by_name("offset_seconds"),
356 s.column_by_name("timezone_name"),
357 )
358 {
359 let nanos_opt = nanos_col
361 .as_any()
362 .downcast_ref::<TimestampNanosecondArray>()
363 .map(|a| {
364 if a.is_null(row) {
365 None
366 } else {
367 Some(a.value(row))
368 }
369 })
370 .or_else(|| {
371 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
372 if a.is_null(row) {
373 None
374 } else {
375 Some(a.value(row))
376 }
377 })
378 });
379 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
380 if a.is_null(row) {
381 None
382 } else {
383 Some(a.value(row))
384 }
385 });
386
387 if let Some(Some(nanos)) = nanos_opt {
388 match offset_opt {
389 Some(Some(offset)) => {
390 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
391 if a.is_null(row) {
392 None
393 } else {
394 Some(a.value(row).to_string())
395 }
396 });
397 return Value::Temporal(uni_common::TemporalValue::DateTime {
398 nanos_since_epoch: nanos,
399 offset_seconds: offset,
400 timezone_name: tz_name,
401 });
402 }
403 _ => {
404 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
406 nanos_since_epoch: nanos,
407 });
408 }
409 }
410 }
411 }
412
413 if field_names.contains(&"nanos_since_midnight")
415 && field_names.contains(&"offset_seconds")
416 && let (Some(nanos_col), Some(offset_col)) = (
417 s.column_by_name("nanos_since_midnight"),
418 s.column_by_name("offset_seconds"),
419 )
420 {
421 let nanos_opt = nanos_col
423 .as_any()
424 .downcast_ref::<Time64NanosecondArray>()
425 .map(|a| {
426 if a.is_null(row) {
427 None
428 } else {
429 Some(a.value(row))
430 }
431 })
432 .or_else(|| {
433 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
434 if a.is_null(row) {
435 None
436 } else {
437 Some(a.value(row))
438 }
439 })
440 });
441 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
442 if a.is_null(row) {
443 None
444 } else {
445 Some(a.value(row))
446 }
447 });
448
449 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
450 return Value::Temporal(uni_common::TemporalValue::Time {
451 nanos_since_midnight: nanos,
452 offset_seconds: offset,
453 });
454 }
455 }
456
457 let mut map = HashMap::new();
459 for (field, child) in s.fields().iter().zip(s.columns()) {
460 map.insert(
461 field.name().clone(),
462 arrow_to_value(child.as_ref(), row, None),
463 );
464 }
465 return Value::Map(map);
466 }
467
468 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
470 let days = d.value(row);
471 return Value::Temporal(uni_common::TemporalValue::Date {
472 days_since_epoch: days,
473 });
474 }
475
476 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
478 let nanos = ts.value(row);
479 return match ts.timezone() {
480 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
481 nanos_since_epoch: nanos,
482 offset_seconds: 0,
483 timezone_name: Some(tz.to_string()),
484 }),
485 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
486 nanos_since_epoch: nanos,
487 }),
488 };
489 }
490
491 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
493 let nanos = t.value(row);
494 return Value::Temporal(uni_common::TemporalValue::LocalTime {
495 nanos_since_midnight: nanos,
496 });
497 }
498
499 if let Some(t) = col
501 .as_any()
502 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
503 {
504 let micros = t.value(row);
505 return Value::Temporal(uni_common::TemporalValue::LocalTime {
506 nanos_since_midnight: micros * 1000,
507 });
508 }
509
510 if let Some(d) = col
512 .as_any()
513 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
514 {
515 let micros = d.value(row);
516 let total_nanos = micros * 1000;
517 let seconds = total_nanos / 1_000_000_000;
518 let remaining_nanos = total_nanos % 1_000_000_000;
519 return Value::Temporal(uni_common::TemporalValue::Duration {
520 months: 0,
521 days: 0,
522 nanos: seconds * 1_000_000_000 + remaining_nanos,
523 });
524 }
525
526 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
528 let val = interval.value(row);
529 return Value::Temporal(uni_common::TemporalValue::Duration {
530 months: val.months as i64,
531 days: val.days as i64,
532 nanos: val.nanoseconds,
533 });
534 }
535
536 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
538 let bytes = b.value(row);
539 if bytes.is_empty() {
540 return Value::Null;
541 }
542 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
543 eprintln!("CypherValue decode error: {}", e);
544 Value::Null
545 });
546 }
547
548 if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
550 && fsb.value_length() == 24
551 {
552 let bytes = fsb.value(row);
553 return match uni_btic::encode::decode_slice(bytes) {
554 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
555 lo: btic.lo(),
556 hi: btic.hi(),
557 meta: btic.meta(),
558 }),
559 Err(e) => {
560 log::warn!("BTIC decode error: {}", e);
561 Value::Null
562 }
563 };
564 }
565
566 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
568 let bytes = b.value(row);
569 return Crdt::from_msgpack(bytes)
570 .ok()
571 .and_then(|crdt| serde_json::to_value(&crdt).ok())
572 .map(Value::from)
573 .unwrap_or(Value::Null);
574 }
575
576 Value::Null
578}
579
580fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
581 let mut builder = UInt64Builder::with_capacity(values.len());
582 for v in values {
583 if let Some(n) = v.as_u64() {
584 builder.append_value(n);
585 } else {
586 builder.append_null();
587 }
588 }
589 Arc::new(builder.finish())
590}
591
592fn values_to_int64_array(values: &[Value]) -> ArrayRef {
593 let mut builder = Int64Builder::with_capacity(values.len());
594 for v in values {
595 if let Some(n) = v.as_i64() {
596 builder.append_value(n);
597 } else {
598 builder.append_null();
599 }
600 }
601 Arc::new(builder.finish())
602}
603
604fn values_to_int32_array(values: &[Value]) -> ArrayRef {
605 let mut builder = Int32Builder::with_capacity(values.len());
606 for v in values {
607 if let Some(n) = v.as_i64() {
608 builder.append_value(n as i32);
609 } else {
610 builder.append_null();
611 }
612 }
613 Arc::new(builder.finish())
614}
615
616fn values_to_string_array(values: &[Value]) -> ArrayRef {
617 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
618 for v in values {
619 if let Some(s) = v.as_str() {
620 builder.append_value(s);
621 } else if v.is_null() {
622 builder.append_null();
623 } else {
624 builder.append_value(v.to_string());
625 }
626 }
627 Arc::new(builder.finish())
628}
629
630fn values_to_bool_array(values: &[Value]) -> ArrayRef {
631 let mut builder = BooleanBuilder::with_capacity(values.len());
632 for v in values {
633 if let Some(b) = v.as_bool() {
634 builder.append_value(b);
635 } else {
636 builder.append_null();
637 }
638 }
639 Arc::new(builder.finish())
640}
641
642fn values_to_float32_array(values: &[Value]) -> ArrayRef {
643 let mut builder = Float32Builder::with_capacity(values.len());
644 for v in values {
645 if let Some(n) = v.as_f64() {
646 builder.append_value(n as f32);
647 } else {
648 builder.append_null();
649 }
650 }
651 Arc::new(builder.finish())
652}
653
654fn values_to_float64_array(values: &[Value]) -> ArrayRef {
655 let mut builder = Float64Builder::with_capacity(values.len());
656 for v in values {
657 if let Some(n) = v.as_f64() {
658 builder.append_value(n);
659 } else {
660 builder.append_null();
661 }
662 }
663 Arc::new(builder.finish())
664}
665
666fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
667 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
668 for v in values {
669 match v {
670 Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
671 let btic = uni_btic::Btic::new(*lo, *hi, *meta)
672 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
673 builder.append_value(uni_btic::encode::encode(&btic))?;
674 }
675 Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
676 Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
677 Err(_) => builder.append_null(),
678 },
679 Value::List(bytes) => {
680 let b: Vec<u8> = bytes
681 .iter()
682 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
683 .collect();
684 if b.len() as i32 == size {
685 builder.append_value(&b)?;
686 } else {
687 builder.append_null();
688 }
689 }
690 _ => builder.append_null(),
691 }
692 }
693 Ok(Arc::new(builder.finish()))
694}
695
696pub fn extract_vector_f32_values(
711 val: Option<&Value>,
712 is_deleted: bool,
713 dimensions: usize,
714) -> (Vec<f32>, bool) {
715 let zeros = || vec![0.0_f32; dimensions];
716
717 if is_deleted {
719 return (zeros(), true);
720 }
721
722 match val {
723 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
725 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
728 let values: Vec<f32> = arr
729 .iter()
730 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
731 .collect();
732 (values, true)
733 }
734 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
737}
738
739fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
740 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
741 for v in values {
742 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
743 for val in vals {
744 builder.values().append_value(val);
745 }
746 builder.append(valid);
747 }
748 Arc::new(builder.finish())
749}
750
751fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
752 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
753 for v in values {
754 if v.is_null() {
755 builder.append_null();
756 } else if let Value::Temporal(tv) = v {
757 match tv {
758 uni_common::TemporalValue::DateTime {
759 nanos_since_epoch, ..
760 }
761 | uni_common::TemporalValue::LocalDateTime {
762 nanos_since_epoch, ..
763 } => builder.append_value(*nanos_since_epoch),
764 _ => builder.append_null(),
765 }
766 } else if let Some(n) = v.as_i64() {
767 builder.append_value(n);
768 } else if let Some(s) = v.as_str() {
769 match parse_datetime_to_nanos(s) {
770 Some(nanos) => builder.append_value(nanos),
771 None => builder.append_null(),
772 }
773 } else {
774 builder.append_null();
775 }
776 }
777
778 let arr = builder.finish();
779 if let Some(tz) = tz {
780 Arc::new(arr.with_timezone(tz.as_ref()))
781 } else {
782 Arc::new(arr)
783 }
784}
785
786fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
791 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
792 let mut offset_builder = Int32Builder::with_capacity(values.len());
793 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
794 let mut null_buffer = BooleanBufferBuilder::new(values.len());
795
796 for v in values {
797 match v {
798 Value::Temporal(uni_common::TemporalValue::DateTime {
799 nanos_since_epoch,
800 offset_seconds,
801 timezone_name,
802 }) => {
803 nanos_builder.append_value(*nanos_since_epoch);
804 offset_builder.append_value(*offset_seconds);
805 tz_builder.append_option(timezone_name.as_deref());
806 null_buffer.append(true);
807 }
808 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
809 nanos_builder.append_value(*nanos_since_epoch);
810 offset_builder.append_null();
811 tz_builder.append_null();
812 null_buffer.append(true);
813 }
814 _ => {
815 nanos_builder.append_null();
816 offset_builder.append_null();
817 tz_builder.append_null();
818 null_buffer.append(false);
819 }
820 }
821 }
822
823 let struct_arr = StructArray::new(
824 schema::datetime_struct_fields(),
825 vec![
826 Arc::new(nanos_builder.finish()) as ArrayRef,
827 Arc::new(offset_builder.finish()) as ArrayRef,
828 Arc::new(tz_builder.finish()) as ArrayRef,
829 ],
830 Some(null_buffer.finish().into()),
831 );
832 Arc::new(struct_arr)
833}
834
835fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
840 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
841 let mut offset_builder = Int32Builder::with_capacity(values.len());
842 let mut null_buffer = BooleanBufferBuilder::new(values.len());
843
844 for v in values {
845 match v {
846 Value::Temporal(uni_common::TemporalValue::Time {
847 nanos_since_midnight,
848 offset_seconds,
849 }) => {
850 nanos_builder.append_value(*nanos_since_midnight);
851 offset_builder.append_value(*offset_seconds);
852 null_buffer.append(true);
853 }
854 Value::Temporal(uni_common::TemporalValue::LocalTime {
855 nanos_since_midnight,
856 }) => {
857 nanos_builder.append_value(*nanos_since_midnight);
858 offset_builder.append_null();
859 null_buffer.append(true);
860 }
861 _ => {
862 nanos_builder.append_null();
863 offset_builder.append_null();
864 null_buffer.append(false);
865 }
866 }
867 }
868
869 let struct_arr = StructArray::new(
870 schema::time_struct_fields(),
871 vec![
872 Arc::new(nanos_builder.finish()) as ArrayRef,
873 Arc::new(offset_builder.finish()) as ArrayRef,
874 ],
875 Some(null_buffer.finish().into()),
876 );
877 Arc::new(struct_arr)
878}
879
880fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
881 let mut builder =
882 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
883 for v in values {
884 if v.is_null() {
885 builder.append_null();
886 } else {
887 let cv_bytes = uni_common::cypher_value_codec::encode(v);
889 builder.append_value(&cv_bytes);
890 }
891 }
892 Arc::new(builder.finish())
893}
894
895pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
897 match dt {
898 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
899 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
900 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
901 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
902 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
903 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
904 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
905 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
906 ArrowDataType::FixedSizeList(inner, size) => {
907 if inner.data_type() == &ArrowDataType::Float32 {
908 Ok(values_to_fixed_size_list_f32_array(values, *size))
909 } else {
910 Err(anyhow!("Unsupported FixedSizeList inner type"))
911 }
912 }
913 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
914 Ok(values_to_timestamp_array(values, tz.as_ref()))
915 }
916 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
917 Ok(values_to_timestamp_array(values, tz.as_ref()))
918 }
919 ArrowDataType::Date32 => {
920 let mut builder = Date32Builder::with_capacity(values.len());
921 for v in values {
922 if v.is_null() {
923 builder.append_null();
924 } else if let Value::Temporal(uni_common::TemporalValue::Date {
925 days_since_epoch,
926 }) = v
927 {
928 builder.append_value(*days_since_epoch);
929 } else if let Some(n) = v.as_i64() {
930 builder.append_value(n as i32);
931 } else {
932 builder.append_null();
933 }
934 }
935 Ok(Arc::new(builder.finish()))
936 }
937 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
938 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
939 for v in values {
940 if v.is_null() {
941 builder.append_null();
942 } else if let Value::Temporal(tv) = v {
943 match tv {
944 uni_common::TemporalValue::LocalTime {
945 nanos_since_midnight,
946 }
947 | uni_common::TemporalValue::Time {
948 nanos_since_midnight,
949 ..
950 } => builder.append_value(*nanos_since_midnight),
951 _ => builder.append_null(),
952 }
953 } else if let Some(n) = v.as_i64() {
954 builder.append_value(n);
955 } else {
956 builder.append_null();
957 }
958 }
959 Ok(Arc::new(builder.finish()))
960 }
961 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
962 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
963 for v in values {
964 if v.is_null() {
965 builder.append_null();
966 } else if let Value::Temporal(tv) = v {
967 match tv {
968 uni_common::TemporalValue::LocalTime {
969 nanos_since_midnight,
970 }
971 | uni_common::TemporalValue::Time {
972 nanos_since_midnight,
973 ..
974 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
976 }
977 } else if let Some(n) = v.as_i64() {
978 builder.append_value(n);
979 } else {
980 builder.append_null();
981 }
982 }
983 Ok(Arc::new(builder.finish()))
984 }
985 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
986 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
987 for v in values {
988 if v.is_null() {
989 builder.append_null();
990 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
991 months,
992 days,
993 nanos,
994 }) = v
995 {
996 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
997 months: *months as i32,
998 days: *days as i32,
999 nanoseconds: *nanos,
1000 });
1001 } else {
1002 builder.append_null();
1003 }
1004 }
1005 Ok(Arc::new(builder.finish()))
1006 }
1007 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1008 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1009 for v in values {
1010 if v.is_null() {
1011 builder.append_null();
1012 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1013 months,
1014 days,
1015 nanos,
1016 }) = v
1017 {
1018 let total_micros =
1019 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1020 builder.append_value(total_micros);
1021 } else if let Some(n) = v.as_i64() {
1022 builder.append_value(n);
1023 } else {
1024 builder.append_null();
1025 }
1026 }
1027 Ok(Arc::new(builder.finish()))
1028 }
1029 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1030 ArrowDataType::List(field) => {
1031 if field.data_type() == &ArrowDataType::Utf8 {
1032 let mut builder = ListBuilder::new(StringBuilder::new());
1033 for v in values {
1034 if let Value::List(arr) = v {
1035 for item in arr {
1036 if let Some(s) = item.as_str() {
1037 builder.values().append_value(s);
1038 } else {
1039 builder.values().append_null();
1040 }
1041 }
1042 builder.append(true);
1043 } else {
1044 builder.append_null();
1045 }
1046 }
1047 Ok(Arc::new(builder.finish()))
1048 } else {
1049 Err(anyhow!(
1050 "Unsupported List inner type: {:?}",
1051 field.data_type()
1052 ))
1053 }
1054 }
1055 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1056 Ok(values_to_datetime_struct_array(values))
1057 }
1058 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1059 Ok(values_to_time_struct_array(values))
1060 }
1061 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1062 }
1063}
1064
1065pub struct PropertyExtractor<'a> {
1067 data_type: &'a DataType,
1068}
1069
1070impl<'a> PropertyExtractor<'a> {
1071 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1072 Self { data_type }
1073 }
1074
1075 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1078 where
1079 F: Fn(usize) -> Option<&'a Value>,
1080 {
1081 match self.data_type {
1082 DataType::String => self.build_string_column(len, deleted, get_props),
1083 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1084 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1085 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1086 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1087 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1088 DataType::Vector { dimensions } => {
1089 self.build_vector_column(len, deleted, get_props, *dimensions)
1090 }
1091 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1092 DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1093 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1094 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1095 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1096 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1097 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1098 DataType::Date => self.build_date32_column(len, deleted, get_props),
1099 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1100 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1101 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1102 _ => Err(anyhow!(
1103 "Unsupported data type for arrow conversion: {:?}",
1104 self.data_type
1105 )),
1106 }
1107 }
1108
1109 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1110 where
1111 F: Fn(usize) -> Option<&'a Value>,
1112 {
1113 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1114 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1115 let prop = get_props(i);
1116 if let Some(s) = prop.and_then(|v| v.as_str()) {
1117 builder.append_value(s);
1118 } else if let Some(Value::Temporal(tv)) = prop {
1119 builder.append_value(tv.to_string());
1120 } else if is_deleted {
1121 builder.append_value("");
1122 } else {
1123 builder.append_null();
1124 }
1125 }
1126 Ok(Arc::new(builder.finish()))
1127 }
1128
1129 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1130 where
1131 F: Fn(usize) -> Option<&'a Value>,
1132 {
1133 let mut values = Vec::with_capacity(len);
1134 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1135 let val = get_props(i)
1138 .and_then(|v| v.as_i64())
1139 .and_then(|v| i32::try_from(v).ok());
1140 if val.is_none() && is_deleted {
1141 values.push(Some(0));
1142 } else {
1143 values.push(val);
1144 }
1145 }
1146 Ok(Arc::new(Int32Array::from(values)))
1147 }
1148
1149 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1150 where
1151 F: Fn(usize) -> Option<&'a Value>,
1152 {
1153 let mut values = Vec::with_capacity(len);
1154 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1155 let val = get_props(i).and_then(|v| v.as_i64());
1156 if val.is_none() && is_deleted {
1157 values.push(Some(0));
1158 } else {
1159 values.push(val);
1160 }
1161 }
1162 Ok(Arc::new(Int64Array::from(values)))
1163 }
1164
1165 fn build_timestamp_column<F>(
1166 &self,
1167 len: usize,
1168 deleted: &[bool],
1169 get_props: F,
1170 ) -> Result<ArrayRef>
1171 where
1172 F: Fn(usize) -> Option<&'a Value>,
1173 {
1174 let mut values = Vec::with_capacity(len);
1175 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1176 let val = get_props(i);
1177 let ts = if is_deleted || val.is_none() {
1178 Some(0i64)
1179 } else if let Some(Value::Temporal(tv)) = val {
1180 match tv {
1181 uni_common::TemporalValue::DateTime {
1182 nanos_since_epoch, ..
1183 }
1184 | uni_common::TemporalValue::LocalDateTime {
1185 nanos_since_epoch, ..
1186 } => Some(*nanos_since_epoch),
1187 _ => None,
1188 }
1189 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1190 Some(v)
1191 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1192 parse_datetime_to_nanos(s)
1193 } else {
1194 None
1195 };
1196
1197 if is_deleted {
1198 values.push(Some(0));
1199 } else {
1200 values.push(ts);
1201 }
1202 }
1203 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1204 Ok(Arc::new(arr))
1205 }
1206
1207 fn build_datetime_struct_column<F>(
1208 &self,
1209 len: usize,
1210 deleted: &[bool],
1211 get_props: F,
1212 ) -> Result<ArrayRef>
1213 where
1214 F: Fn(usize) -> Option<&'a Value>,
1215 {
1216 let values = self.collect_values_or_null(len, deleted, &get_props);
1217 Ok(values_to_datetime_struct_array(&values))
1218 }
1219
1220 fn build_time_struct_column<F>(
1221 &self,
1222 len: usize,
1223 deleted: &[bool],
1224 get_props: F,
1225 ) -> Result<ArrayRef>
1226 where
1227 F: Fn(usize) -> Option<&'a Value>,
1228 {
1229 let values = self.collect_values_or_null(len, deleted, &get_props);
1230 Ok(values_to_time_struct_array(&values))
1231 }
1232
1233 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1235 where
1236 F: Fn(usize) -> Option<&'a Value>,
1237 {
1238 deleted
1239 .iter()
1240 .enumerate()
1241 .take(len)
1242 .map(|(i, &is_deleted)| {
1243 if is_deleted {
1244 Value::Null
1245 } else {
1246 get_props(i).cloned().unwrap_or(Value::Null)
1247 }
1248 })
1249 .collect()
1250 }
1251
1252 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1253 where
1254 F: Fn(usize) -> Option<&'a Value>,
1255 {
1256 let mut builder = Date32Builder::with_capacity(len);
1257 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1258
1259 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1260 let val = get_props(i);
1261 let days = if is_deleted || val.is_none() {
1262 Some(0)
1263 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1264 days_since_epoch,
1265 })) = val
1266 {
1267 Some(*days_since_epoch)
1268 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1269 i32::try_from(v).ok()
1272 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1273 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1274 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1275 Err(_) => None,
1276 }
1277 } else {
1278 None
1279 };
1280
1281 if is_deleted {
1282 builder.append_value(0);
1283 } else if let Some(v) = days {
1284 builder.append_value(v);
1285 } else {
1286 builder.append_null();
1287 }
1288 }
1289 Ok(Arc::new(builder.finish()))
1290 }
1291
1292 fn build_duration_column<F>(
1293 &self,
1294 len: usize,
1295 deleted: &[bool],
1296 get_props: F,
1297 ) -> Result<ArrayRef>
1298 where
1299 F: Fn(usize) -> Option<&'a Value>,
1300 {
1301 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1303 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1304 let raw_val = get_props(i);
1305 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1306 {
1307 let encoded = uni_common::cypher_value_codec::encode(val);
1308 builder.append_value(&encoded);
1309 } else if is_deleted {
1310 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1311 months: 0,
1312 days: 0,
1313 nanos: 0,
1314 });
1315 let encoded = uni_common::cypher_value_codec::encode(&zero);
1316 builder.append_value(&encoded);
1317 } else {
1318 builder.append_null();
1319 }
1320 }
1321 Ok(Arc::new(builder.finish()))
1322 }
1323
1324 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1325 where
1326 F: Fn(usize) -> Option<&'a Value>,
1327 {
1328 const ENCODED_LEN: i32 = 24;
1329 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1330 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1331 let raw_val = get_props(i);
1332 let btic = match raw_val {
1333 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1334 uni_btic::Btic::new(*lo, *hi, *meta)
1335 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1336 ),
1337 Some(Value::String(s)) => Some(
1338 uni_btic::parse::parse_btic_literal(s)
1339 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1340 ),
1341 _ => None,
1342 };
1343
1344 if let Some(b) = btic {
1345 builder.append_value(uni_btic::encode::encode(&b))?;
1346 } else if is_deleted {
1347 builder.append_value([0u8; ENCODED_LEN as usize])?;
1348 } else {
1349 builder.append_null();
1350 }
1351 }
1352 Ok(Arc::new(builder.finish()))
1353 }
1354
1355 fn build_float32_column<F>(
1356 &self,
1357 len: usize,
1358 deleted: &[bool],
1359 get_props: F,
1360 ) -> Result<ArrayRef>
1361 where
1362 F: Fn(usize) -> Option<&'a Value>,
1363 {
1364 let mut values = Vec::with_capacity(len);
1365 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1366 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1367 if val.is_none() && is_deleted {
1368 values.push(Some(0.0));
1369 } else {
1370 values.push(val);
1371 }
1372 }
1373 Ok(Arc::new(Float32Array::from(values)))
1374 }
1375
1376 fn build_float64_column<F>(
1377 &self,
1378 len: usize,
1379 deleted: &[bool],
1380 get_props: F,
1381 ) -> Result<ArrayRef>
1382 where
1383 F: Fn(usize) -> Option<&'a Value>,
1384 {
1385 let mut values = Vec::with_capacity(len);
1386 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1387 let val = get_props(i).and_then(|v| v.as_f64());
1388 if val.is_none() && is_deleted {
1389 values.push(Some(0.0));
1390 } else {
1391 values.push(val);
1392 }
1393 }
1394 Ok(Arc::new(Float64Array::from(values)))
1395 }
1396
1397 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1398 where
1399 F: Fn(usize) -> Option<&'a Value>,
1400 {
1401 let mut values = Vec::with_capacity(len);
1402 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1403 let val = get_props(i).and_then(|v| v.as_bool());
1404 if val.is_none() && is_deleted {
1405 values.push(Some(false));
1406 } else {
1407 values.push(val);
1408 }
1409 }
1410 Ok(Arc::new(BooleanArray::from(values)))
1411 }
1412
1413 fn build_vector_column<F>(
1414 &self,
1415 len: usize,
1416 deleted: &[bool],
1417 get_props: F,
1418 dimensions: usize,
1419 ) -> Result<ArrayRef>
1420 where
1421 F: Fn(usize) -> Option<&'a Value>,
1422 {
1423 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1424
1425 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1426 let val = get_props(i);
1427 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1428 for v in values {
1429 builder.values().append_value(v);
1430 }
1431 builder.append(valid);
1432 }
1433 Ok(Arc::new(builder.finish()))
1434 }
1435
1436 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1437 where
1438 F: Fn(usize) -> Option<&'a Value>,
1439 {
1440 let null_val = Value::Null;
1441 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1442 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1443 let val = get_props(i);
1444 let uni_val = if val.is_none() && is_deleted {
1445 &null_val
1446 } else {
1447 val.unwrap_or(&null_val)
1448 };
1449 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1451 builder.append_value(&cv_bytes);
1452 }
1453 Ok(Arc::new(builder.finish()))
1454 }
1455
1456 fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1457 where
1458 F: Fn(usize) -> Option<&'a Value>,
1459 {
1460 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1461 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1462 let val = get_props(i);
1463 if let Some(Value::Bytes(b)) = val {
1464 builder.append_value(b);
1465 } else if is_deleted {
1466 builder.append_value(&[][..]);
1467 } else {
1468 builder.append_null();
1469 }
1470 }
1471 Ok(Arc::new(builder.finish()))
1472 }
1473
1474 fn build_list_column<F>(
1475 &self,
1476 len: usize,
1477 deleted: &[bool],
1478 get_props: F,
1479 inner: &DataType,
1480 ) -> Result<ArrayRef>
1481 where
1482 F: Fn(usize) -> Option<&'a Value>,
1483 {
1484 match inner {
1485 DataType::String => {
1486 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1487 if let Some(s) = v.as_str() {
1488 b.append_value(s);
1489 } else {
1490 b.append_null();
1491 }
1492 })
1493 }
1494 DataType::Int64 => {
1495 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1496 if let Some(n) = v.as_i64() {
1497 b.append_value(n);
1498 } else {
1499 b.append_null();
1500 }
1501 })
1502 }
1503 DataType::Float64 => {
1504 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1505 if let Some(f) = v.as_f64() {
1506 b.append_value(f);
1507 } else {
1508 b.append_null();
1509 }
1510 })
1511 }
1512 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1513 }
1514 }
1515
1516 fn build_typed_list<F, B, A>(
1518 &self,
1519 len: usize,
1520 deleted: &[bool],
1521 get_props: &F,
1522 inner_builder: B,
1523 mut append_value: A,
1524 ) -> Result<ArrayRef>
1525 where
1526 F: Fn(usize) -> Option<&'a Value>,
1527 B: arrow_array::builder::ArrayBuilder,
1528 A: FnMut(&Value, &mut B),
1529 {
1530 let mut builder = ListBuilder::new(inner_builder);
1531 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1532 let val_array = get_props(i).and_then(|v| v.as_array());
1533 if val_array.is_none() && is_deleted {
1534 builder.append_null();
1535 } else if let Some(arr) = val_array {
1536 for v in arr {
1537 append_value(v, builder.values());
1538 }
1539 builder.append(true);
1540 } else {
1541 builder.append_null();
1542 }
1543 }
1544 Ok(Arc::new(builder.finish()))
1545 }
1546
1547 fn build_map_column<F>(
1548 &self,
1549 len: usize,
1550 deleted: &[bool],
1551 get_props: F,
1552 key: &DataType,
1553 value: &DataType,
1554 ) -> Result<ArrayRef>
1555 where
1556 F: Fn(usize) -> Option<&'a Value>,
1557 {
1558 if !matches!(key, DataType::String) {
1559 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1560 }
1561
1562 match value {
1563 DataType::String => self.build_typed_map(
1564 len,
1565 deleted,
1566 &get_props,
1567 StringBuilder::new(),
1568 arrow_schema::DataType::Utf8,
1569 |v, b: &mut StringBuilder| {
1570 if let Some(s) = v.as_str() {
1571 b.append_value(s);
1572 } else {
1573 b.append_null();
1574 }
1575 },
1576 ),
1577 DataType::Int64 => self.build_typed_map(
1578 len,
1579 deleted,
1580 &get_props,
1581 Int64Builder::new(),
1582 arrow_schema::DataType::Int64,
1583 |v, b: &mut Int64Builder| {
1584 if let Some(n) = v.as_i64() {
1585 b.append_value(n);
1586 } else {
1587 b.append_null();
1588 }
1589 },
1590 ),
1591 _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1592 }
1593 }
1594
1595 fn build_typed_map<F, B, A>(
1597 &self,
1598 len: usize,
1599 deleted: &[bool],
1600 get_props: &F,
1601 value_builder: B,
1602 value_arrow_type: arrow_schema::DataType,
1603 mut append_value: A,
1604 ) -> Result<ArrayRef>
1605 where
1606 F: Fn(usize) -> Option<&'a Value>,
1607 B: arrow_array::builder::ArrayBuilder,
1608 A: FnMut(&Value, &mut B),
1609 {
1610 let key_builder = Box::new(StringBuilder::new());
1611 let value_builder = Box::new(value_builder);
1612 let struct_builder = StructBuilder::new(
1613 vec![
1614 Field::new("key", arrow_schema::DataType::Utf8, false),
1615 Field::new("value", value_arrow_type, true),
1616 ],
1617 vec![key_builder, value_builder],
1618 );
1619 let mut builder = ListBuilder::new(struct_builder);
1620
1621 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1622 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1623 }
1624 Ok(Arc::new(builder.finish()))
1625 }
1626
1627 fn append_map_entry<B, A>(
1629 &self,
1630 builder: &mut ListBuilder<StructBuilder>,
1631 val: Option<&'a Value>,
1632 is_deleted: bool,
1633 append_value: &mut A,
1634 ) where
1635 B: arrow_array::builder::ArrayBuilder,
1636 A: FnMut(&Value, &mut B),
1637 {
1638 let val_obj = val.and_then(|v| v.as_object());
1639 if val_obj.is_none() && is_deleted {
1640 builder.append(false);
1641 } else if let Some(obj) = val_obj {
1642 let struct_b = builder.values();
1643 for (k, v) in obj {
1644 struct_b
1645 .field_builder::<StringBuilder>(0)
1646 .unwrap()
1647 .append_value(k);
1648 let value_b = struct_b.field_builder::<B>(1).unwrap();
1650 append_value(v, value_b);
1651 struct_b.append(true);
1652 }
1653 builder.append(true);
1654 } else {
1655 builder.append(false);
1656 }
1657 }
1658
1659 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1660 where
1661 F: Fn(usize) -> Option<&'a Value>,
1662 {
1663 let mut builder = BinaryBuilder::new();
1664 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1665 if is_deleted {
1666 builder.append_null();
1667 continue;
1668 }
1669 if let Some(val) = get_props(i) {
1670 let crdt_result = if let Some(s) = val.as_str() {
1673 serde_json::from_str::<Crdt>(s)
1674 } else {
1675 let json_val: serde_json::Value = val.clone().into();
1677 serde_json::from_value::<Crdt>(json_val)
1678 };
1679
1680 if let Ok(crdt) = crdt_result {
1681 if let Ok(bytes) = crdt.to_msgpack() {
1682 builder.append_value(&bytes);
1683 } else {
1684 builder.append_null();
1685 }
1686 } else {
1687 builder.append_null();
1688 }
1689 } else {
1690 builder.append_null();
1691 }
1692 }
1693 Ok(Arc::new(builder.finish()))
1694 }
1695}
1696
1697pub fn build_edge_column<'a>(
1699 name: &'a str,
1700 data_type: &'a DataType,
1701 len: usize,
1702 get_props: impl Fn(usize) -> Option<&'a Value>,
1703) -> Result<ArrayRef> {
1704 let deleted = vec![false; len];
1706 let extractor = PropertyExtractor::new(name, data_type);
1707 extractor.build_column(len, &deleted, get_props)
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712 use super::*;
1713 use arrow_array::{
1714 Array, DurationMicrosecondArray,
1715 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1716 };
1717 use std::collections::HashMap;
1718 use uni_common::TemporalValue;
1719 use uni_crdt::{Crdt, GCounter};
1720
1721 #[test]
1722 fn test_arrow_to_value_string() {
1723 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1724 assert_eq!(
1725 arrow_to_value(&arr, 0, None),
1726 Value::String("hello".to_string())
1727 );
1728 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1729 assert_eq!(
1730 arrow_to_value(&arr, 2, None),
1731 Value::String("world".to_string())
1732 );
1733 }
1734
1735 #[test]
1736 fn test_arrow_to_value_int64() {
1737 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1738 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1739 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1740 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1741 }
1742
1743 #[test]
1744 #[allow(clippy::approx_constant)]
1745 fn test_arrow_to_value_float64() {
1746 let arr = Float64Array::from(vec![Some(3.14), None]);
1747 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1748 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1749 }
1750
1751 #[test]
1752 fn test_arrow_to_value_bool() {
1753 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1754 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1755 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1756 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1757 }
1758
1759 #[test]
1760 fn test_values_to_array_int64() {
1761 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1762 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1763 assert_eq!(arr.len(), 4);
1764
1765 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1766 assert_eq!(int_arr.value(0), 1);
1767 assert_eq!(int_arr.value(1), 2);
1768 assert!(int_arr.is_null(2));
1769 assert_eq!(int_arr.value(3), 4);
1770 }
1771
1772 #[test]
1773 fn test_values_to_array_string() {
1774 let values = vec![
1775 Value::String("a".to_string()),
1776 Value::String("b".to_string()),
1777 Value::Null,
1778 ];
1779 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1780 assert_eq!(arr.len(), 3);
1781
1782 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1783 assert_eq!(str_arr.value(0), "a");
1784 assert_eq!(str_arr.value(1), "b");
1785 assert!(str_arr.is_null(2));
1786 }
1787
1788 #[test]
1789 fn test_property_extractor_string() {
1790 let props: Vec<HashMap<String, Value>> = vec![
1791 [("name".to_string(), Value::String("Alice".to_string()))]
1792 .into_iter()
1793 .collect(),
1794 [("name".to_string(), Value::String("Bob".to_string()))]
1795 .into_iter()
1796 .collect(),
1797 HashMap::new(),
1798 ];
1799 let deleted = vec![false, false, true];
1800
1801 let extractor = PropertyExtractor::new("name", &DataType::String);
1802 let arr = extractor
1803 .build_column(3, &deleted, |i| props[i].get("name"))
1804 .unwrap();
1805
1806 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1807 assert_eq!(str_arr.value(0), "Alice");
1808 assert_eq!(str_arr.value(1), "Bob");
1809 assert_eq!(str_arr.value(2), ""); }
1811
1812 #[test]
1813 fn test_property_extractor_int64() {
1814 let props: Vec<HashMap<String, Value>> = vec![
1815 [("age".to_string(), Value::Int(25))].into_iter().collect(),
1816 [("age".to_string(), Value::Int(30))].into_iter().collect(),
1817 HashMap::new(),
1818 ];
1819 let deleted = vec![false, false, true];
1820
1821 let extractor = PropertyExtractor::new("age", &DataType::Int64);
1822 let arr = extractor
1823 .build_column(3, &deleted, |i| props[i].get("age"))
1824 .unwrap();
1825
1826 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1827 assert_eq!(int_arr.value(0), 25);
1828 assert_eq!(int_arr.value(1), 30);
1829 assert_eq!(int_arr.value(2), 0); }
1831
1832 #[test]
1833 fn test_property_extractor_bytes_roundtrip() {
1834 let blob = vec![0u8, 1, 2, 255];
1835 let props: Vec<HashMap<String, Value>> = vec![
1836 [("blob".to_string(), Value::Bytes(blob.clone()))]
1837 .into_iter()
1838 .collect(),
1839 [("blob".to_string(), Value::Bytes(Vec::new()))]
1840 .into_iter()
1841 .collect(),
1842 HashMap::new(),
1843 ];
1844 let deleted = vec![false, false, false];
1845
1846 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1847 let arr = extractor
1848 .build_column(3, &deleted, |i| props[i].get("blob"))
1849 .unwrap();
1850
1851 assert_eq!(
1853 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1854 Value::Bytes(blob)
1855 );
1856 assert_eq!(
1857 arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
1858 Value::Bytes(Vec::new())
1859 );
1860 assert_eq!(
1862 arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
1863 Value::Null
1864 );
1865 }
1866
1867 #[test]
1868 fn test_bytes_vs_cypher_value_disambiguation() {
1869 let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1873 let props: Vec<HashMap<String, Value>> = vec![
1874 [("blob".to_string(), Value::Bytes(raw.clone()))]
1875 .into_iter()
1876 .collect(),
1877 ];
1878 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1879 let arr = extractor
1880 .build_column(1, &[false], |i| props[i].get("blob"))
1881 .unwrap();
1882 assert_eq!(
1884 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1885 Value::Bytes(raw)
1886 );
1887 }
1888
1889 #[test]
1890 fn test_data_type_bytes_to_arrow() {
1891 assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
1892 }
1893
1894 #[test]
1895 fn test_arrow_to_value_time64() {
1896 let mut builder = Time64MicrosecondBuilder::new();
1898 builder.append_value(37_845_000_000);
1900 builder.append_value(0);
1902 builder.append_value(86_399_123_456);
1904 builder.append_null();
1905
1906 let arr = builder.finish();
1907 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1909 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1910 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1911 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1912 }
1913
1914 #[test]
1915 fn test_arrow_to_value_duration() {
1916 let arr = DurationMicrosecondArray::from(vec![
1919 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
1923 ]);
1924
1925 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1926 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1927 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1928 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1929 }
1930
1931 #[test]
1932 fn test_arrow_to_value_binary_crdt() {
1933 let mut builder = BinaryBuilder::new();
1935
1936 let mut counter = GCounter::new();
1938 counter.increment("actor1", 5);
1939 let crdt = Crdt::GCounter(counter);
1940 let bytes = crdt.to_msgpack().unwrap();
1941 builder.append_value(&bytes);
1942
1943 builder.append_null();
1945
1946 let arr = builder.finish();
1947
1948 let result = arrow_to_value(&arr, 0, None);
1950 assert!(result.as_object().is_some());
1951 let obj = result.as_object().unwrap();
1952 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1954
1955 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1957 }
1958
1959 #[test]
1960 fn test_datetime_struct_encode_decode_roundtrip() {
1961 let values = vec![
1963 Value::Temporal(TemporalValue::DateTime {
1964 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
1967 }),
1968 Value::Temporal(TemporalValue::DateTime {
1969 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
1972 }),
1973 Value::Temporal(TemporalValue::DateTime {
1974 nanos_since_epoch: 0, offset_seconds: 0,
1976 timezone_name: Some("UTC".to_string()),
1977 }),
1978 ];
1979
1980 let arr_ref = values_to_datetime_struct_array(&values);
1982 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1983 assert_eq!(arr.len(), 3);
1984
1985 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1987 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1988 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1989
1990 assert_eq!(decoded_0, values[0]);
1992 assert_eq!(decoded_1, values[1]);
1993 assert_eq!(decoded_2, values[2]);
1994
1995 if let Value::Temporal(TemporalValue::DateTime {
1997 nanos_since_epoch,
1998 offset_seconds,
1999 timezone_name,
2000 }) = decoded_0
2001 {
2002 assert_eq!(nanos_since_epoch, 441763200000000000);
2003 assert_eq!(offset_seconds, 3600);
2004 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2005 } else {
2006 panic!("Expected DateTime value");
2007 }
2008 }
2009
2010 #[test]
2011 fn test_datetime_struct_null_handling() {
2012 let values = vec![
2014 Value::Temporal(TemporalValue::DateTime {
2015 nanos_since_epoch: 441763200000000000,
2016 offset_seconds: 3600,
2017 timezone_name: Some("Europe/Paris".to_string()),
2018 }),
2019 Value::Null,
2020 Value::Temporal(TemporalValue::DateTime {
2021 nanos_since_epoch: 0,
2022 offset_seconds: 0,
2023 timezone_name: None,
2024 }),
2025 ];
2026
2027 let arr_ref = values_to_datetime_struct_array(&values);
2028 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2029 assert_eq!(arr.len(), 3);
2030
2031 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2033 assert_eq!(decoded_0, values[0]);
2034
2035 assert!(arr.is_null(1));
2037 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2038 assert_eq!(decoded_1, Value::Null);
2039
2040 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2042 assert_eq!(decoded_2, values[2]);
2043 }
2044
2045 #[test]
2046 fn test_datetime_struct_boundary_values() {
2047 let values = vec![
2049 Value::Temporal(TemporalValue::DateTime {
2050 nanos_since_epoch: 441763200000000000,
2051 offset_seconds: 0, timezone_name: None,
2053 }),
2054 Value::Temporal(TemporalValue::DateTime {
2055 nanos_since_epoch: 441763200000000000,
2056 offset_seconds: 43200, timezone_name: None,
2058 }),
2059 Value::Temporal(TemporalValue::DateTime {
2060 nanos_since_epoch: 441763200000000000,
2061 offset_seconds: -43200, timezone_name: None,
2063 }),
2064 ];
2065
2066 let arr_ref = values_to_datetime_struct_array(&values);
2067 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2068 assert_eq!(arr.len(), 3);
2069
2070 for (i, expected) in values.iter().enumerate() {
2072 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2073 assert_eq!(&decoded, expected);
2074 }
2075 }
2076
2077 #[test]
2078 fn test_datetime_old_schema_migration() {
2079 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2081 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
2084
2085 let arr = builder.finish();
2086
2087 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2089 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2090 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2091
2092 if let Value::Temporal(TemporalValue::DateTime {
2094 nanos_since_epoch,
2095 offset_seconds,
2096 timezone_name,
2097 }) = decoded_0
2098 {
2099 assert_eq!(nanos_since_epoch, 441763200000000000);
2100 assert_eq!(offset_seconds, 0);
2101 assert_eq!(timezone_name, Some("UTC".to_string()));
2102 } else {
2103 panic!("Expected DateTime value");
2104 }
2105
2106 assert_eq!(decoded_2, Value::Null);
2108 }
2109
2110 #[test]
2111 fn test_time_struct_encode_decode_roundtrip() {
2112 let values = vec![
2114 Value::Temporal(TemporalValue::Time {
2115 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
2118 Value::Temporal(TemporalValue::Time {
2119 nanos_since_midnight: 0, offset_seconds: 0,
2121 }),
2122 Value::Temporal(TemporalValue::Time {
2123 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2126 ];
2127
2128 let arr_ref = values_to_time_struct_array(&values);
2130 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2131 assert_eq!(arr.len(), 3);
2132
2133 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2135 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2136 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2137
2138 assert_eq!(decoded_0, values[0]);
2140 assert_eq!(decoded_1, values[1]);
2141 assert_eq!(decoded_2, values[2]);
2142
2143 if let Value::Temporal(TemporalValue::Time {
2145 nanos_since_midnight,
2146 offset_seconds,
2147 }) = decoded_0
2148 {
2149 assert_eq!(nanos_since_midnight, 37845000000000);
2150 assert_eq!(offset_seconds, 3600);
2151 } else {
2152 panic!("Expected Time value");
2153 }
2154 }
2155
2156 #[test]
2157 fn test_time_struct_null_handling() {
2158 let values = vec![
2160 Value::Temporal(TemporalValue::Time {
2161 nanos_since_midnight: 37845000000000,
2162 offset_seconds: 3600,
2163 }),
2164 Value::Null,
2165 Value::Temporal(TemporalValue::Time {
2166 nanos_since_midnight: 0,
2167 offset_seconds: 0,
2168 }),
2169 ];
2170
2171 let arr_ref = values_to_time_struct_array(&values);
2172 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2173 assert_eq!(arr.len(), 3);
2174
2175 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2177 assert_eq!(decoded_0, values[0]);
2178
2179 assert!(arr.is_null(1));
2181 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2182 assert_eq!(decoded_1, Value::Null);
2183
2184 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2186 assert_eq!(decoded_2, values[2]);
2187 }
2188
2189 #[test]
2192 fn test_extract_vector_f32_values_valid_vector() {
2193 let v = vec![1.0, 2.0, 3.0];
2194 let val = Value::Vector(v.clone());
2195 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2196 assert_eq!(result, v);
2197 assert!(valid);
2198 }
2199
2200 #[test]
2201 fn test_extract_vector_f32_values_vector_wrong_dims() {
2202 let v = vec![1.0, 2.0];
2203 let val = Value::Vector(v);
2204 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2205 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2206 assert!(!valid);
2207 }
2208
2209 #[test]
2210 fn test_extract_vector_f32_values_valid_list() {
2211 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2212 let val = Value::List(v);
2213 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2214 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2215 assert!(valid);
2216 }
2217
2218 #[test]
2219 fn test_extract_vector_f32_values_list_wrong_dims() {
2220 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2221 let val = Value::List(v);
2222 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2223 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2224 assert!(!valid);
2225 }
2226
2227 #[test]
2228 fn test_extract_vector_f32_values_list_int_coercion() {
2229 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2230 let val = Value::List(v);
2231 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2232 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2233 assert!(valid);
2234 }
2235
2236 #[test]
2237 fn test_extract_vector_f32_values_none() {
2238 let (result, valid) = extract_vector_f32_values(None, false, 3);
2239 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2240 assert!(!valid);
2241 }
2242
2243 #[test]
2244 fn test_extract_vector_f32_values_null() {
2245 let val = Value::Null;
2246 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2247 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2248 assert!(!valid);
2249 }
2250
2251 #[test]
2252 fn test_extract_vector_f32_values_unsupported_type() {
2253 let val = Value::String("not a vector".to_string());
2254 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2255 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2256 assert!(!valid);
2257 }
2258
2259 #[test]
2260 fn test_extract_vector_f32_values_deleted_with_none() {
2261 let (result, valid) = extract_vector_f32_values(None, true, 3);
2262 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2263 assert!(valid); }
2265
2266 #[test]
2267 fn test_extract_vector_f32_values_deleted_with_null() {
2268 let val = Value::Null;
2269 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2270 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2271 assert!(valid); }
2273
2274 #[test]
2277 fn test_values_to_fixed_size_list_vector_with_nulls() {
2278 let values = vec![
2279 Value::Vector(vec![1.0, 2.0]),
2280 Value::Null,
2281 Value::Vector(vec![3.0, 4.0]),
2282 Value::String("invalid".to_string()),
2283 ];
2284 let arr_ref = values_to_array(
2285 &values,
2286 &ArrowDataType::FixedSizeList(
2287 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2288 2,
2289 ),
2290 )
2291 .unwrap();
2292
2293 let arr = arr_ref
2294 .as_any()
2295 .downcast_ref::<FixedSizeListArray>()
2296 .unwrap();
2297
2298 assert_eq!(arr.len(), 4);
2299 assert!(arr.is_valid(0));
2300 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2302 assert!(!arr.is_valid(3)); }
2304
2305 #[test]
2306 fn test_values_to_fixed_size_list_from_list() {
2307 let values = vec![
2308 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2309 Value::List(vec![Value::Int(3), Value::Int(4)]),
2310 ];
2311 let arr_ref = values_to_array(
2312 &values,
2313 &ArrowDataType::FixedSizeList(
2314 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2315 2,
2316 ),
2317 )
2318 .unwrap();
2319
2320 let arr = arr_ref
2321 .as_any()
2322 .downcast_ref::<FixedSizeListArray>()
2323 .unwrap();
2324
2325 assert_eq!(arr.len(), 2);
2326 assert!(arr.is_valid(0));
2327 assert!(arr.is_valid(1));
2328
2329 let child = arr
2331 .values()
2332 .as_any()
2333 .downcast_ref::<Float32Array>()
2334 .unwrap();
2335 assert_eq!(child.value(0), 1.0);
2336 assert_eq!(child.value(1), 2.0);
2337 assert_eq!(child.value(2), 3.0);
2338 assert_eq!(child.value(3), 4.0);
2339 }
2340
2341 #[test]
2342 fn test_values_to_fixed_size_list_wrong_dimensions() {
2343 let values = vec![
2344 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2347 let arr_ref = values_to_array(
2348 &values,
2349 &ArrowDataType::FixedSizeList(
2350 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2351 2,
2352 ),
2353 )
2354 .unwrap();
2355
2356 let arr = arr_ref
2357 .as_any()
2358 .downcast_ref::<FixedSizeListArray>()
2359 .unwrap();
2360
2361 assert_eq!(arr.len(), 2);
2362 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2367 .values()
2368 .as_any()
2369 .downcast_ref::<Float32Array>()
2370 .unwrap();
2371 assert_eq!(child.value(0), 0.0);
2372 assert_eq!(child.value(1), 0.0);
2373 assert_eq!(child.value(2), 0.0);
2374 assert_eq!(child.value(3), 0.0);
2375 }
2376
2377 #[test]
2378 fn test_values_to_fixed_size_list_all_nulls() {
2379 let values = vec![Value::Null, Value::Null, Value::Null];
2380 let arr_ref = values_to_array(
2381 &values,
2382 &ArrowDataType::FixedSizeList(
2383 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2384 3,
2385 ),
2386 )
2387 .unwrap();
2388
2389 let arr = arr_ref
2390 .as_any()
2391 .downcast_ref::<FixedSizeListArray>()
2392 .unwrap();
2393
2394 assert_eq!(arr.len(), 3);
2395 assert!(!arr.is_valid(0));
2396 assert!(!arr.is_valid(1));
2397 assert!(!arr.is_valid(2));
2398
2399 let child = arr
2401 .values()
2402 .as_any()
2403 .downcast_ref::<Float32Array>()
2404 .unwrap();
2405 assert_eq!(child.len(), 9);
2406 }
2407
2408 #[test]
2409 fn test_values_to_fixed_size_list_mixed_types() {
2410 let values = vec![
2411 Value::Vector(vec![1.0, 2.0]),
2412 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2413 Value::Null,
2414 Value::String("invalid".to_string()),
2415 ];
2416 let arr_ref = values_to_array(
2417 &values,
2418 &ArrowDataType::FixedSizeList(
2419 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2420 2,
2421 ),
2422 )
2423 .unwrap();
2424
2425 let arr = arr_ref
2426 .as_any()
2427 .downcast_ref::<FixedSizeListArray>()
2428 .unwrap();
2429
2430 assert_eq!(arr.len(), 4);
2431 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2438 .values()
2439 .as_any()
2440 .downcast_ref::<Float32Array>()
2441 .unwrap();
2442 assert_eq!(child.value(0), 1.0);
2443 assert_eq!(child.value(1), 2.0);
2444 assert_eq!(child.value(2), 3.0);
2445 assert_eq!(child.value(3), 4.0);
2446 }
2447
2448 #[test]
2451 fn test_build_vector_column_with_nulls_and_deleted() {
2452 let data_type = DataType::Vector { dimensions: 3 };
2453 let extractor = PropertyExtractor::new("test_vec", &data_type);
2454
2455 let props = [
2456 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2457 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2460 ];
2461 let deleted = [false, false, false, true]; let arr_ref = extractor
2464 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2465 .unwrap();
2466
2467 let arr = arr_ref
2468 .as_any()
2469 .downcast_ref::<FixedSizeListArray>()
2470 .unwrap();
2471
2472 assert_eq!(arr.len(), 4);
2473 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2480 .values()
2481 .as_any()
2482 .downcast_ref::<Float32Array>()
2483 .unwrap();
2484 assert_eq!(child.value(0), 1.0);
2485 assert_eq!(child.value(1), 2.0);
2486 assert_eq!(child.value(2), 3.0);
2487 assert_eq!(child.value(9), 0.0);
2491 assert_eq!(child.value(10), 0.0);
2492 assert_eq!(child.value(11), 0.0);
2493 }
2494
2495 #[test]
2496 fn test_build_vector_column_with_list_input() {
2497 let data_type = DataType::Vector { dimensions: 2 };
2498 let extractor = PropertyExtractor::new("test_vec", &data_type);
2499
2500 let props = [
2501 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2502 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2503 Some(Value::Vector(vec![5.0, 6.0])),
2504 ];
2505 let deleted = [false, false, false];
2506
2507 let arr_ref = extractor
2508 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2509 .unwrap();
2510
2511 let arr = arr_ref
2512 .as_any()
2513 .downcast_ref::<FixedSizeListArray>()
2514 .unwrap();
2515
2516 assert_eq!(arr.len(), 3);
2517 assert!(arr.is_valid(0));
2518 assert!(arr.is_valid(1));
2519 assert!(arr.is_valid(2));
2520
2521 let child = arr
2523 .values()
2524 .as_any()
2525 .downcast_ref::<Float32Array>()
2526 .unwrap();
2527 assert_eq!(child.value(0), 1.0);
2528 assert_eq!(child.value(1), 2.0);
2529 assert_eq!(child.value(2), 3.0);
2530 assert_eq!(child.value(3), 4.0);
2531 assert_eq!(child.value(4), 5.0);
2532 assert_eq!(child.value(5), 6.0);
2533 }
2534
2535 #[test]
2538 fn test_int32_and_date32_columns_null_out_of_range() {
2539 let dt = DataType::Int64;
2540 let extractor = PropertyExtractor::new("x", &dt);
2541
2542 let over = Value::Int(i64::from(i32::MAX) + 1);
2543 let ok = Value::Int(42);
2544 let vals = [over, ok];
2545 let deleted = [false, false];
2546
2547 let arr = extractor
2548 .build_int32_column(2, &deleted, |i| Some(&vals[i]))
2549 .unwrap();
2550 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2551 assert!(
2552 arr.is_null(0),
2553 "out-of-range i64 must be NULL in an int32 column, not wrapped"
2554 );
2555 assert_eq!(arr.value(1), 42);
2556
2557 let arr = extractor
2558 .build_date32_column(2, &deleted, |i| Some(&vals[i]))
2559 .unwrap();
2560 let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
2561 assert!(
2562 arr.is_null(0),
2563 "out-of-range day count must be NULL in a date32 column, not wrapped"
2564 );
2565 assert_eq!(arr.value(1), 42);
2566 }
2567}