1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let key_col = structs.column(0);
154 let val_col = structs.column(1);
155 let mut map = HashMap::new();
156 for i in 0..structs.len() {
157 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158 map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159 }
160 }
161 Some(map)
162}
163
164fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166 (0..arr.len())
167 .map(|i| arrow_to_value(arr.as_ref(), i, None))
168 .collect()
169}
170
171pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178 if col.is_null(row) {
179 return Value::Null;
180 }
181
182 if let Some(dt) = data_type {
184 match dt {
185 DataType::DateTime => {
186 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189 struct_arr.column_by_name("nanos_since_epoch"),
190 struct_arr.column_by_name("offset_seconds"),
191 struct_arr.column_by_name("timezone_name"),
192 )
193 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194 nanos_col
195 .as_any()
196 .downcast_ref::<TimestampNanosecondArray>(),
197 offset_col.as_any().downcast_ref::<Int32Array>(),
198 tz_col.as_any().downcast_ref::<StringArray>(),
199 )
200 {
201 if nanos_arr.is_null(row) {
202 return Value::Null;
203 }
204 let nanos = nanos_arr.value(row);
205 if offset_arr.is_null(row) {
206 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208 nanos_since_epoch: nanos,
209 });
210 }
211 let offset = offset_arr.value(row);
212 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213 return Value::Temporal(uni_common::TemporalValue::DateTime {
214 nanos_since_epoch: nanos,
215 offset_seconds: offset,
216 timezone_name: tz_name,
217 });
218 }
219 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221 let nanos = ts.value(row);
222 let tz_name = ts.timezone().map(|s| s.to_string());
223 return Value::Temporal(uni_common::TemporalValue::DateTime {
224 nanos_since_epoch: nanos,
225 offset_seconds: 0,
226 timezone_name: tz_name,
227 });
228 }
229 }
230 DataType::Time => {
231 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233 && let (Some(nanos_col), Some(offset_col)) = (
234 struct_arr.column_by_name("nanos_since_midnight"),
235 struct_arr.column_by_name("offset_seconds"),
236 )
237 && let (Some(nanos_arr), Some(offset_arr)) = (
238 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239 offset_col.as_any().downcast_ref::<Int32Array>(),
240 )
241 {
242 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244 return Value::Null;
245 }
246 let nanos = nanos_arr.value(row);
247 let offset = offset_arr.value(row);
248 return Value::Temporal(uni_common::TemporalValue::Time {
249 nanos_since_midnight: nanos,
250 offset_seconds: offset,
251 });
252 }
253 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255 let nanos = t.value(row);
256 return Value::Temporal(uni_common::TemporalValue::Time {
257 nanos_since_midnight: nanos,
258 offset_seconds: 0,
259 });
260 }
261 }
262 DataType::Bytes => {
263 let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
264 log::warn!("Bytes column is not LargeBinaryArray");
265 return Value::Null;
266 };
267 if arr.is_null(row) {
268 return Value::Null;
269 }
270 return Value::Bytes(arr.value(row).to_vec());
271 }
272 DataType::Btic => {
273 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
274 log::warn!("BTIC column is not FixedSizeBinaryArray");
275 return Value::Null;
276 };
277 let bytes = fsb.value(row);
278 return match uni_btic::encode::decode_slice(bytes) {
279 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
280 lo: btic.lo(),
281 hi: btic.hi(),
282 meta: btic.meta(),
283 }),
284 Err(e) => {
285 log::warn!("BTIC decode error: {}", e);
286 Value::Null
287 }
288 };
289 }
290 _ => {}
291 }
292 }
293
294 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
296 return Value::String(s.value(row).to_string());
297 }
298
299 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
301 return Value::Int(u.value(row) as i64);
302 }
303 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
304 return Value::Int(i.value(row));
305 }
306 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
307 return Value::Int(i.value(row) as i64);
308 }
309
310 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
312 return Value::Float(f.value(row));
313 }
314 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
315 return Value::Float(f.value(row) as f64);
316 }
317
318 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
320 return Value::Bool(b.value(row));
321 }
322
323 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
325 return Value::List(array_to_value_list(&list.value(row)));
326 }
327
328 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
330 let arr = list.value(row);
331
332 if let Some(obj) = try_reconstruct_map(&arr) {
334 return Value::Map(obj);
335 }
336
337 return Value::List(array_to_value_list(&arr));
338 }
339
340 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
342 return Value::List(array_to_value_list(&list.value(row)));
343 }
344
345 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
347 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
348
349 if field_names.contains(&"nanos_since_epoch")
351 && field_names.contains(&"offset_seconds")
352 && field_names.contains(&"timezone_name")
353 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
354 s.column_by_name("nanos_since_epoch"),
355 s.column_by_name("offset_seconds"),
356 s.column_by_name("timezone_name"),
357 )
358 {
359 let nanos_opt = nanos_col
361 .as_any()
362 .downcast_ref::<TimestampNanosecondArray>()
363 .map(|a| {
364 if a.is_null(row) {
365 None
366 } else {
367 Some(a.value(row))
368 }
369 })
370 .or_else(|| {
371 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
372 if a.is_null(row) {
373 None
374 } else {
375 Some(a.value(row))
376 }
377 })
378 });
379 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
380 if a.is_null(row) {
381 None
382 } else {
383 Some(a.value(row))
384 }
385 });
386
387 if let Some(Some(nanos)) = nanos_opt {
388 match offset_opt {
389 Some(Some(offset)) => {
390 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
391 if a.is_null(row) {
392 None
393 } else {
394 Some(a.value(row).to_string())
395 }
396 });
397 return Value::Temporal(uni_common::TemporalValue::DateTime {
398 nanos_since_epoch: nanos,
399 offset_seconds: offset,
400 timezone_name: tz_name,
401 });
402 }
403 _ => {
404 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
406 nanos_since_epoch: nanos,
407 });
408 }
409 }
410 }
411 }
412
413 if field_names.contains(&"nanos_since_midnight")
415 && field_names.contains(&"offset_seconds")
416 && let (Some(nanos_col), Some(offset_col)) = (
417 s.column_by_name("nanos_since_midnight"),
418 s.column_by_name("offset_seconds"),
419 )
420 {
421 let nanos_opt = nanos_col
423 .as_any()
424 .downcast_ref::<Time64NanosecondArray>()
425 .map(|a| {
426 if a.is_null(row) {
427 None
428 } else {
429 Some(a.value(row))
430 }
431 })
432 .or_else(|| {
433 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
434 if a.is_null(row) {
435 None
436 } else {
437 Some(a.value(row))
438 }
439 })
440 });
441 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
442 if a.is_null(row) {
443 None
444 } else {
445 Some(a.value(row))
446 }
447 });
448
449 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
450 return Value::Temporal(uni_common::TemporalValue::Time {
451 nanos_since_midnight: nanos,
452 offset_seconds: offset,
453 });
454 }
455 }
456
457 let mut map = HashMap::new();
459 for (field, child) in s.fields().iter().zip(s.columns()) {
460 map.insert(
461 field.name().clone(),
462 arrow_to_value(child.as_ref(), row, None),
463 );
464 }
465 return Value::Map(map);
466 }
467
468 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
470 let days = d.value(row);
471 return Value::Temporal(uni_common::TemporalValue::Date {
472 days_since_epoch: days,
473 });
474 }
475
476 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
478 let nanos = ts.value(row);
479 return match ts.timezone() {
480 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
481 nanos_since_epoch: nanos,
482 offset_seconds: 0,
483 timezone_name: Some(tz.to_string()),
484 }),
485 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
486 nanos_since_epoch: nanos,
487 }),
488 };
489 }
490
491 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
493 let nanos = t.value(row);
494 return Value::Temporal(uni_common::TemporalValue::LocalTime {
495 nanos_since_midnight: nanos,
496 });
497 }
498
499 if let Some(t) = col
501 .as_any()
502 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
503 {
504 let micros = t.value(row);
505 return Value::Temporal(uni_common::TemporalValue::LocalTime {
506 nanos_since_midnight: micros * 1000,
507 });
508 }
509
510 if let Some(d) = col
512 .as_any()
513 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
514 {
515 let micros = d.value(row);
516 let total_nanos = micros * 1000;
517 let seconds = total_nanos / 1_000_000_000;
518 let remaining_nanos = total_nanos % 1_000_000_000;
519 return Value::Temporal(uni_common::TemporalValue::Duration {
520 months: 0,
521 days: 0,
522 nanos: seconds * 1_000_000_000 + remaining_nanos,
523 });
524 }
525
526 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
528 let val = interval.value(row);
529 return Value::Temporal(uni_common::TemporalValue::Duration {
530 months: val.months as i64,
531 days: val.days as i64,
532 nanos: val.nanoseconds,
533 });
534 }
535
536 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
538 let bytes = b.value(row);
539 if bytes.is_empty() {
540 return Value::Null;
541 }
542 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
543 eprintln!("CypherValue decode error: {}", e);
544 Value::Null
545 });
546 }
547
548 if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
550 && fsb.value_length() == 24
551 {
552 let bytes = fsb.value(row);
553 return match uni_btic::encode::decode_slice(bytes) {
554 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
555 lo: btic.lo(),
556 hi: btic.hi(),
557 meta: btic.meta(),
558 }),
559 Err(e) => {
560 log::warn!("BTIC decode error: {}", e);
561 Value::Null
562 }
563 };
564 }
565
566 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
568 let bytes = b.value(row);
569 return Crdt::from_msgpack(bytes)
570 .ok()
571 .and_then(|crdt| serde_json::to_value(&crdt).ok())
572 .map(Value::from)
573 .unwrap_or(Value::Null);
574 }
575
576 Value::Null
578}
579
580fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
581 let mut builder = UInt64Builder::with_capacity(values.len());
582 for v in values {
583 if let Some(n) = v.as_u64() {
584 builder.append_value(n);
585 } else {
586 builder.append_null();
587 }
588 }
589 Arc::new(builder.finish())
590}
591
592fn values_to_int64_array(values: &[Value]) -> ArrayRef {
593 let mut builder = Int64Builder::with_capacity(values.len());
594 for v in values {
595 if let Some(n) = v.as_i64() {
596 builder.append_value(n);
597 } else {
598 builder.append_null();
599 }
600 }
601 Arc::new(builder.finish())
602}
603
604fn values_to_int32_array(values: &[Value]) -> ArrayRef {
605 let mut builder = Int32Builder::with_capacity(values.len());
606 for v in values {
607 if let Some(n) = v.as_i64() {
608 builder.append_value(n as i32);
609 } else {
610 builder.append_null();
611 }
612 }
613 Arc::new(builder.finish())
614}
615
616fn values_to_string_array(values: &[Value]) -> ArrayRef {
617 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
618 for v in values {
619 if let Some(s) = v.as_str() {
620 builder.append_value(s);
621 } else if v.is_null() {
622 builder.append_null();
623 } else {
624 builder.append_value(v.to_string());
625 }
626 }
627 Arc::new(builder.finish())
628}
629
630fn values_to_bool_array(values: &[Value]) -> ArrayRef {
631 let mut builder = BooleanBuilder::with_capacity(values.len());
632 for v in values {
633 if let Some(b) = v.as_bool() {
634 builder.append_value(b);
635 } else {
636 builder.append_null();
637 }
638 }
639 Arc::new(builder.finish())
640}
641
642fn values_to_float32_array(values: &[Value]) -> ArrayRef {
643 let mut builder = Float32Builder::with_capacity(values.len());
644 for v in values {
645 if let Some(n) = v.as_f64() {
646 builder.append_value(n as f32);
647 } else {
648 builder.append_null();
649 }
650 }
651 Arc::new(builder.finish())
652}
653
654fn values_to_float64_array(values: &[Value]) -> ArrayRef {
655 let mut builder = Float64Builder::with_capacity(values.len());
656 for v in values {
657 if let Some(n) = v.as_f64() {
658 builder.append_value(n);
659 } else {
660 builder.append_null();
661 }
662 }
663 Arc::new(builder.finish())
664}
665
666fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
667 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
668 for v in values {
669 match v {
670 Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
671 let btic = uni_btic::Btic::new(*lo, *hi, *meta)
672 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
673 builder.append_value(uni_btic::encode::encode(&btic))?;
674 }
675 Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
676 Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
677 Err(_) => builder.append_null(),
678 },
679 Value::List(bytes) => {
680 let b: Vec<u8> = bytes
681 .iter()
682 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
683 .collect();
684 if b.len() as i32 == size {
685 builder.append_value(&b)?;
686 } else {
687 builder.append_null();
688 }
689 }
690 _ => builder.append_null(),
691 }
692 }
693 Ok(Arc::new(builder.finish()))
694}
695
696pub fn extract_vector_f32_values(
711 val: Option<&Value>,
712 is_deleted: bool,
713 dimensions: usize,
714) -> (Vec<f32>, bool) {
715 let zeros = || vec![0.0_f32; dimensions];
716
717 if is_deleted {
719 return (zeros(), true);
720 }
721
722 match val {
723 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
725 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
728 let values: Vec<f32> = arr
729 .iter()
730 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
731 .collect();
732 (values, true)
733 }
734 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
737}
738
739fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
740 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
741 for v in values {
742 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
743 for val in vals {
744 builder.values().append_value(val);
745 }
746 builder.append(valid);
747 }
748 Arc::new(builder.finish())
749}
750
751fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
752 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
753 for v in values {
754 if v.is_null() {
755 builder.append_null();
756 } else if let Value::Temporal(tv) = v {
757 match tv {
758 uni_common::TemporalValue::DateTime {
759 nanos_since_epoch, ..
760 }
761 | uni_common::TemporalValue::LocalDateTime {
762 nanos_since_epoch, ..
763 } => builder.append_value(*nanos_since_epoch),
764 _ => builder.append_null(),
765 }
766 } else if let Some(n) = v.as_i64() {
767 builder.append_value(n);
768 } else if let Some(s) = v.as_str() {
769 match parse_datetime_to_nanos(s) {
770 Some(nanos) => builder.append_value(nanos),
771 None => builder.append_null(),
772 }
773 } else {
774 builder.append_null();
775 }
776 }
777
778 let arr = builder.finish();
779 if let Some(tz) = tz {
780 Arc::new(arr.with_timezone(tz.as_ref()))
781 } else {
782 Arc::new(arr)
783 }
784}
785
786fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
791 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
792 let mut offset_builder = Int32Builder::with_capacity(values.len());
793 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
794 let mut null_buffer = BooleanBufferBuilder::new(values.len());
795
796 for v in values {
797 match v {
798 Value::Temporal(uni_common::TemporalValue::DateTime {
799 nanos_since_epoch,
800 offset_seconds,
801 timezone_name,
802 }) => {
803 nanos_builder.append_value(*nanos_since_epoch);
804 offset_builder.append_value(*offset_seconds);
805 tz_builder.append_option(timezone_name.as_deref());
806 null_buffer.append(true);
807 }
808 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
809 nanos_builder.append_value(*nanos_since_epoch);
810 offset_builder.append_null();
811 tz_builder.append_null();
812 null_buffer.append(true);
813 }
814 _ => {
815 nanos_builder.append_null();
816 offset_builder.append_null();
817 tz_builder.append_null();
818 null_buffer.append(false);
819 }
820 }
821 }
822
823 let struct_arr = StructArray::new(
824 schema::datetime_struct_fields(),
825 vec![
826 Arc::new(nanos_builder.finish()) as ArrayRef,
827 Arc::new(offset_builder.finish()) as ArrayRef,
828 Arc::new(tz_builder.finish()) as ArrayRef,
829 ],
830 Some(null_buffer.finish().into()),
831 );
832 Arc::new(struct_arr)
833}
834
835fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
840 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
841 let mut offset_builder = Int32Builder::with_capacity(values.len());
842 let mut null_buffer = BooleanBufferBuilder::new(values.len());
843
844 for v in values {
845 match v {
846 Value::Temporal(uni_common::TemporalValue::Time {
847 nanos_since_midnight,
848 offset_seconds,
849 }) => {
850 nanos_builder.append_value(*nanos_since_midnight);
851 offset_builder.append_value(*offset_seconds);
852 null_buffer.append(true);
853 }
854 Value::Temporal(uni_common::TemporalValue::LocalTime {
855 nanos_since_midnight,
856 }) => {
857 nanos_builder.append_value(*nanos_since_midnight);
858 offset_builder.append_null();
859 null_buffer.append(true);
860 }
861 _ => {
862 nanos_builder.append_null();
863 offset_builder.append_null();
864 null_buffer.append(false);
865 }
866 }
867 }
868
869 let struct_arr = StructArray::new(
870 schema::time_struct_fields(),
871 vec![
872 Arc::new(nanos_builder.finish()) as ArrayRef,
873 Arc::new(offset_builder.finish()) as ArrayRef,
874 ],
875 Some(null_buffer.finish().into()),
876 );
877 Arc::new(struct_arr)
878}
879
880fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
881 let mut builder =
882 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
883 for v in values {
884 if v.is_null() {
885 builder.append_null();
886 } else {
887 let cv_bytes = uni_common::cypher_value_codec::encode(v);
889 builder.append_value(&cv_bytes);
890 }
891 }
892 Arc::new(builder.finish())
893}
894
895pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
897 match dt {
898 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
899 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
900 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
901 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
902 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
903 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
904 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
905 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
906 ArrowDataType::FixedSizeList(inner, size) => {
907 if inner.data_type() == &ArrowDataType::Float32 {
908 Ok(values_to_fixed_size_list_f32_array(values, *size))
909 } else {
910 Err(anyhow!("Unsupported FixedSizeList inner type"))
911 }
912 }
913 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
914 Ok(values_to_timestamp_array(values, tz.as_ref()))
915 }
916 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
917 Ok(values_to_timestamp_array(values, tz.as_ref()))
918 }
919 ArrowDataType::Date32 => {
920 let mut builder = Date32Builder::with_capacity(values.len());
921 for v in values {
922 if v.is_null() {
923 builder.append_null();
924 } else if let Value::Temporal(uni_common::TemporalValue::Date {
925 days_since_epoch,
926 }) = v
927 {
928 builder.append_value(*days_since_epoch);
929 } else if let Some(n) = v.as_i64() {
930 builder.append_value(n as i32);
931 } else {
932 builder.append_null();
933 }
934 }
935 Ok(Arc::new(builder.finish()))
936 }
937 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
938 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
939 for v in values {
940 if v.is_null() {
941 builder.append_null();
942 } else if let Value::Temporal(tv) = v {
943 match tv {
944 uni_common::TemporalValue::LocalTime {
945 nanos_since_midnight,
946 }
947 | uni_common::TemporalValue::Time {
948 nanos_since_midnight,
949 ..
950 } => builder.append_value(*nanos_since_midnight),
951 _ => builder.append_null(),
952 }
953 } else if let Some(n) = v.as_i64() {
954 builder.append_value(n);
955 } else {
956 builder.append_null();
957 }
958 }
959 Ok(Arc::new(builder.finish()))
960 }
961 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
962 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
963 for v in values {
964 if v.is_null() {
965 builder.append_null();
966 } else if let Value::Temporal(tv) = v {
967 match tv {
968 uni_common::TemporalValue::LocalTime {
969 nanos_since_midnight,
970 }
971 | uni_common::TemporalValue::Time {
972 nanos_since_midnight,
973 ..
974 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
976 }
977 } else if let Some(n) = v.as_i64() {
978 builder.append_value(n);
979 } else {
980 builder.append_null();
981 }
982 }
983 Ok(Arc::new(builder.finish()))
984 }
985 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
986 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
987 for v in values {
988 if v.is_null() {
989 builder.append_null();
990 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
991 months,
992 days,
993 nanos,
994 }) = v
995 {
996 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
997 months: *months as i32,
998 days: *days as i32,
999 nanoseconds: *nanos,
1000 });
1001 } else {
1002 builder.append_null();
1003 }
1004 }
1005 Ok(Arc::new(builder.finish()))
1006 }
1007 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1008 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1009 for v in values {
1010 if v.is_null() {
1011 builder.append_null();
1012 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1013 months,
1014 days,
1015 nanos,
1016 }) = v
1017 {
1018 let total_micros =
1019 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1020 builder.append_value(total_micros);
1021 } else if let Some(n) = v.as_i64() {
1022 builder.append_value(n);
1023 } else {
1024 builder.append_null();
1025 }
1026 }
1027 Ok(Arc::new(builder.finish()))
1028 }
1029 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1030 ArrowDataType::List(field) => {
1031 if field.data_type() == &ArrowDataType::Utf8 {
1032 let mut builder = ListBuilder::new(StringBuilder::new());
1033 for v in values {
1034 if let Value::List(arr) = v {
1035 for item in arr {
1036 if let Some(s) = item.as_str() {
1037 builder.values().append_value(s);
1038 } else {
1039 builder.values().append_null();
1040 }
1041 }
1042 builder.append(true);
1043 } else {
1044 builder.append_null();
1045 }
1046 }
1047 Ok(Arc::new(builder.finish()))
1048 } else {
1049 Err(anyhow!(
1050 "Unsupported List inner type: {:?}",
1051 field.data_type()
1052 ))
1053 }
1054 }
1055 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1056 Ok(values_to_datetime_struct_array(values))
1057 }
1058 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1059 Ok(values_to_time_struct_array(values))
1060 }
1061 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1062 }
1063}
1064
1065pub struct PropertyExtractor<'a> {
1067 data_type: &'a DataType,
1068}
1069
1070impl<'a> PropertyExtractor<'a> {
1071 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1072 Self { data_type }
1073 }
1074
1075 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1078 where
1079 F: Fn(usize) -> Option<&'a Value>,
1080 {
1081 match self.data_type {
1082 DataType::String => self.build_string_column(len, deleted, get_props),
1083 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1084 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1085 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1086 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1087 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1088 DataType::Vector { dimensions } => {
1089 self.build_vector_column(len, deleted, get_props, *dimensions)
1090 }
1091 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1092 DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1093 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1094 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1095 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1096 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1097 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1098 DataType::Date => self.build_date32_column(len, deleted, get_props),
1099 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1100 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1101 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1102 _ => Err(anyhow!(
1103 "Unsupported data type for arrow conversion: {:?}",
1104 self.data_type
1105 )),
1106 }
1107 }
1108
1109 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1110 where
1111 F: Fn(usize) -> Option<&'a Value>,
1112 {
1113 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1114 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1115 let prop = get_props(i);
1116 if let Some(s) = prop.and_then(|v| v.as_str()) {
1117 builder.append_value(s);
1118 } else if let Some(Value::Temporal(tv)) = prop {
1119 builder.append_value(tv.to_string());
1120 } else if is_deleted {
1121 builder.append_value("");
1122 } else {
1123 builder.append_null();
1124 }
1125 }
1126 Ok(Arc::new(builder.finish()))
1127 }
1128
1129 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1130 where
1131 F: Fn(usize) -> Option<&'a Value>,
1132 {
1133 let mut values = Vec::with_capacity(len);
1134 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1135 let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1136 if val.is_none() && is_deleted {
1137 values.push(Some(0));
1138 } else {
1139 values.push(val);
1140 }
1141 }
1142 Ok(Arc::new(Int32Array::from(values)))
1143 }
1144
1145 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1146 where
1147 F: Fn(usize) -> Option<&'a Value>,
1148 {
1149 let mut values = Vec::with_capacity(len);
1150 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1151 let val = get_props(i).and_then(|v| v.as_i64());
1152 if val.is_none() && is_deleted {
1153 values.push(Some(0));
1154 } else {
1155 values.push(val);
1156 }
1157 }
1158 Ok(Arc::new(Int64Array::from(values)))
1159 }
1160
1161 fn build_timestamp_column<F>(
1162 &self,
1163 len: usize,
1164 deleted: &[bool],
1165 get_props: F,
1166 ) -> Result<ArrayRef>
1167 where
1168 F: Fn(usize) -> Option<&'a Value>,
1169 {
1170 let mut values = Vec::with_capacity(len);
1171 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1172 let val = get_props(i);
1173 let ts = if is_deleted || val.is_none() {
1174 Some(0i64)
1175 } else if let Some(Value::Temporal(tv)) = val {
1176 match tv {
1177 uni_common::TemporalValue::DateTime {
1178 nanos_since_epoch, ..
1179 }
1180 | uni_common::TemporalValue::LocalDateTime {
1181 nanos_since_epoch, ..
1182 } => Some(*nanos_since_epoch),
1183 _ => None,
1184 }
1185 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1186 Some(v)
1187 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1188 parse_datetime_to_nanos(s)
1189 } else {
1190 None
1191 };
1192
1193 if is_deleted {
1194 values.push(Some(0));
1195 } else {
1196 values.push(ts);
1197 }
1198 }
1199 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1200 Ok(Arc::new(arr))
1201 }
1202
1203 fn build_datetime_struct_column<F>(
1204 &self,
1205 len: usize,
1206 deleted: &[bool],
1207 get_props: F,
1208 ) -> Result<ArrayRef>
1209 where
1210 F: Fn(usize) -> Option<&'a Value>,
1211 {
1212 let values = self.collect_values_or_null(len, deleted, &get_props);
1213 Ok(values_to_datetime_struct_array(&values))
1214 }
1215
1216 fn build_time_struct_column<F>(
1217 &self,
1218 len: usize,
1219 deleted: &[bool],
1220 get_props: F,
1221 ) -> Result<ArrayRef>
1222 where
1223 F: Fn(usize) -> Option<&'a Value>,
1224 {
1225 let values = self.collect_values_or_null(len, deleted, &get_props);
1226 Ok(values_to_time_struct_array(&values))
1227 }
1228
1229 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1231 where
1232 F: Fn(usize) -> Option<&'a Value>,
1233 {
1234 deleted
1235 .iter()
1236 .enumerate()
1237 .take(len)
1238 .map(|(i, &is_deleted)| {
1239 if is_deleted {
1240 Value::Null
1241 } else {
1242 get_props(i).cloned().unwrap_or(Value::Null)
1243 }
1244 })
1245 .collect()
1246 }
1247
1248 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1249 where
1250 F: Fn(usize) -> Option<&'a Value>,
1251 {
1252 let mut builder = Date32Builder::with_capacity(len);
1253 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1254
1255 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1256 let val = get_props(i);
1257 let days = if is_deleted || val.is_none() {
1258 Some(0)
1259 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1260 days_since_epoch,
1261 })) = val
1262 {
1263 Some(*days_since_epoch)
1264 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1265 Some(v as i32)
1266 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1267 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1268 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1269 Err(_) => None,
1270 }
1271 } else {
1272 None
1273 };
1274
1275 if is_deleted {
1276 builder.append_value(0);
1277 } else if let Some(v) = days {
1278 builder.append_value(v);
1279 } else {
1280 builder.append_null();
1281 }
1282 }
1283 Ok(Arc::new(builder.finish()))
1284 }
1285
1286 fn build_duration_column<F>(
1287 &self,
1288 len: usize,
1289 deleted: &[bool],
1290 get_props: F,
1291 ) -> Result<ArrayRef>
1292 where
1293 F: Fn(usize) -> Option<&'a Value>,
1294 {
1295 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1297 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1298 let raw_val = get_props(i);
1299 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1300 {
1301 let encoded = uni_common::cypher_value_codec::encode(val);
1302 builder.append_value(&encoded);
1303 } else if is_deleted {
1304 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1305 months: 0,
1306 days: 0,
1307 nanos: 0,
1308 });
1309 let encoded = uni_common::cypher_value_codec::encode(&zero);
1310 builder.append_value(&encoded);
1311 } else {
1312 builder.append_null();
1313 }
1314 }
1315 Ok(Arc::new(builder.finish()))
1316 }
1317
1318 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1319 where
1320 F: Fn(usize) -> Option<&'a Value>,
1321 {
1322 const ENCODED_LEN: i32 = 24;
1323 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1324 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1325 let raw_val = get_props(i);
1326 let btic = match raw_val {
1327 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1328 uni_btic::Btic::new(*lo, *hi, *meta)
1329 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1330 ),
1331 Some(Value::String(s)) => Some(
1332 uni_btic::parse::parse_btic_literal(s)
1333 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1334 ),
1335 _ => None,
1336 };
1337
1338 if let Some(b) = btic {
1339 builder.append_value(uni_btic::encode::encode(&b))?;
1340 } else if is_deleted {
1341 builder.append_value([0u8; ENCODED_LEN as usize])?;
1342 } else {
1343 builder.append_null();
1344 }
1345 }
1346 Ok(Arc::new(builder.finish()))
1347 }
1348
1349 fn build_float32_column<F>(
1350 &self,
1351 len: usize,
1352 deleted: &[bool],
1353 get_props: F,
1354 ) -> Result<ArrayRef>
1355 where
1356 F: Fn(usize) -> Option<&'a Value>,
1357 {
1358 let mut values = Vec::with_capacity(len);
1359 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1360 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1361 if val.is_none() && is_deleted {
1362 values.push(Some(0.0));
1363 } else {
1364 values.push(val);
1365 }
1366 }
1367 Ok(Arc::new(Float32Array::from(values)))
1368 }
1369
1370 fn build_float64_column<F>(
1371 &self,
1372 len: usize,
1373 deleted: &[bool],
1374 get_props: F,
1375 ) -> Result<ArrayRef>
1376 where
1377 F: Fn(usize) -> Option<&'a Value>,
1378 {
1379 let mut values = Vec::with_capacity(len);
1380 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1381 let val = get_props(i).and_then(|v| v.as_f64());
1382 if val.is_none() && is_deleted {
1383 values.push(Some(0.0));
1384 } else {
1385 values.push(val);
1386 }
1387 }
1388 Ok(Arc::new(Float64Array::from(values)))
1389 }
1390
1391 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1392 where
1393 F: Fn(usize) -> Option<&'a Value>,
1394 {
1395 let mut values = Vec::with_capacity(len);
1396 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1397 let val = get_props(i).and_then(|v| v.as_bool());
1398 if val.is_none() && is_deleted {
1399 values.push(Some(false));
1400 } else {
1401 values.push(val);
1402 }
1403 }
1404 Ok(Arc::new(BooleanArray::from(values)))
1405 }
1406
1407 fn build_vector_column<F>(
1408 &self,
1409 len: usize,
1410 deleted: &[bool],
1411 get_props: F,
1412 dimensions: usize,
1413 ) -> Result<ArrayRef>
1414 where
1415 F: Fn(usize) -> Option<&'a Value>,
1416 {
1417 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1418
1419 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1420 let val = get_props(i);
1421 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1422 for v in values {
1423 builder.values().append_value(v);
1424 }
1425 builder.append(valid);
1426 }
1427 Ok(Arc::new(builder.finish()))
1428 }
1429
1430 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1431 where
1432 F: Fn(usize) -> Option<&'a Value>,
1433 {
1434 let null_val = Value::Null;
1435 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1436 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1437 let val = get_props(i);
1438 let uni_val = if val.is_none() && is_deleted {
1439 &null_val
1440 } else {
1441 val.unwrap_or(&null_val)
1442 };
1443 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1445 builder.append_value(&cv_bytes);
1446 }
1447 Ok(Arc::new(builder.finish()))
1448 }
1449
1450 fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1451 where
1452 F: Fn(usize) -> Option<&'a Value>,
1453 {
1454 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1455 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1456 let val = get_props(i);
1457 if let Some(Value::Bytes(b)) = val {
1458 builder.append_value(b);
1459 } else if is_deleted {
1460 builder.append_value(&[][..]);
1461 } else {
1462 builder.append_null();
1463 }
1464 }
1465 Ok(Arc::new(builder.finish()))
1466 }
1467
1468 fn build_list_column<F>(
1469 &self,
1470 len: usize,
1471 deleted: &[bool],
1472 get_props: F,
1473 inner: &DataType,
1474 ) -> Result<ArrayRef>
1475 where
1476 F: Fn(usize) -> Option<&'a Value>,
1477 {
1478 match inner {
1479 DataType::String => {
1480 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1481 if let Some(s) = v.as_str() {
1482 b.append_value(s);
1483 } else {
1484 b.append_null();
1485 }
1486 })
1487 }
1488 DataType::Int64 => {
1489 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1490 if let Some(n) = v.as_i64() {
1491 b.append_value(n);
1492 } else {
1493 b.append_null();
1494 }
1495 })
1496 }
1497 DataType::Float64 => {
1498 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1499 if let Some(f) = v.as_f64() {
1500 b.append_value(f);
1501 } else {
1502 b.append_null();
1503 }
1504 })
1505 }
1506 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1507 }
1508 }
1509
1510 fn build_typed_list<F, B, A>(
1512 &self,
1513 len: usize,
1514 deleted: &[bool],
1515 get_props: &F,
1516 inner_builder: B,
1517 mut append_value: A,
1518 ) -> Result<ArrayRef>
1519 where
1520 F: Fn(usize) -> Option<&'a Value>,
1521 B: arrow_array::builder::ArrayBuilder,
1522 A: FnMut(&Value, &mut B),
1523 {
1524 let mut builder = ListBuilder::new(inner_builder);
1525 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1526 let val_array = get_props(i).and_then(|v| v.as_array());
1527 if val_array.is_none() && is_deleted {
1528 builder.append_null();
1529 } else if let Some(arr) = val_array {
1530 for v in arr {
1531 append_value(v, builder.values());
1532 }
1533 builder.append(true);
1534 } else {
1535 builder.append_null();
1536 }
1537 }
1538 Ok(Arc::new(builder.finish()))
1539 }
1540
1541 fn build_map_column<F>(
1542 &self,
1543 len: usize,
1544 deleted: &[bool],
1545 get_props: F,
1546 key: &DataType,
1547 value: &DataType,
1548 ) -> Result<ArrayRef>
1549 where
1550 F: Fn(usize) -> Option<&'a Value>,
1551 {
1552 if !matches!(key, DataType::String) {
1553 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1554 }
1555
1556 match value {
1557 DataType::String => self.build_typed_map(
1558 len,
1559 deleted,
1560 &get_props,
1561 StringBuilder::new(),
1562 arrow_schema::DataType::Utf8,
1563 |v, b: &mut StringBuilder| {
1564 if let Some(s) = v.as_str() {
1565 b.append_value(s);
1566 } else {
1567 b.append_null();
1568 }
1569 },
1570 ),
1571 DataType::Int64 => self.build_typed_map(
1572 len,
1573 deleted,
1574 &get_props,
1575 Int64Builder::new(),
1576 arrow_schema::DataType::Int64,
1577 |v, b: &mut Int64Builder| {
1578 if let Some(n) = v.as_i64() {
1579 b.append_value(n);
1580 } else {
1581 b.append_null();
1582 }
1583 },
1584 ),
1585 _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1586 }
1587 }
1588
1589 fn build_typed_map<F, B, A>(
1591 &self,
1592 len: usize,
1593 deleted: &[bool],
1594 get_props: &F,
1595 value_builder: B,
1596 value_arrow_type: arrow_schema::DataType,
1597 mut append_value: A,
1598 ) -> Result<ArrayRef>
1599 where
1600 F: Fn(usize) -> Option<&'a Value>,
1601 B: arrow_array::builder::ArrayBuilder,
1602 A: FnMut(&Value, &mut B),
1603 {
1604 let key_builder = Box::new(StringBuilder::new());
1605 let value_builder = Box::new(value_builder);
1606 let struct_builder = StructBuilder::new(
1607 vec![
1608 Field::new("key", arrow_schema::DataType::Utf8, false),
1609 Field::new("value", value_arrow_type, true),
1610 ],
1611 vec![key_builder, value_builder],
1612 );
1613 let mut builder = ListBuilder::new(struct_builder);
1614
1615 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1616 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1617 }
1618 Ok(Arc::new(builder.finish()))
1619 }
1620
1621 fn append_map_entry<B, A>(
1623 &self,
1624 builder: &mut ListBuilder<StructBuilder>,
1625 val: Option<&'a Value>,
1626 is_deleted: bool,
1627 append_value: &mut A,
1628 ) where
1629 B: arrow_array::builder::ArrayBuilder,
1630 A: FnMut(&Value, &mut B),
1631 {
1632 let val_obj = val.and_then(|v| v.as_object());
1633 if val_obj.is_none() && is_deleted {
1634 builder.append(false);
1635 } else if let Some(obj) = val_obj {
1636 let struct_b = builder.values();
1637 for (k, v) in obj {
1638 struct_b
1639 .field_builder::<StringBuilder>(0)
1640 .unwrap()
1641 .append_value(k);
1642 let value_b = struct_b.field_builder::<B>(1).unwrap();
1644 append_value(v, value_b);
1645 struct_b.append(true);
1646 }
1647 builder.append(true);
1648 } else {
1649 builder.append(false);
1650 }
1651 }
1652
1653 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1654 where
1655 F: Fn(usize) -> Option<&'a Value>,
1656 {
1657 let mut builder = BinaryBuilder::new();
1658 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1659 if is_deleted {
1660 builder.append_null();
1661 continue;
1662 }
1663 if let Some(val) = get_props(i) {
1664 let crdt_result = if let Some(s) = val.as_str() {
1667 serde_json::from_str::<Crdt>(s)
1668 } else {
1669 let json_val: serde_json::Value = val.clone().into();
1671 serde_json::from_value::<Crdt>(json_val)
1672 };
1673
1674 if let Ok(crdt) = crdt_result {
1675 if let Ok(bytes) = crdt.to_msgpack() {
1676 builder.append_value(&bytes);
1677 } else {
1678 builder.append_null();
1679 }
1680 } else {
1681 builder.append_null();
1682 }
1683 } else {
1684 builder.append_null();
1685 }
1686 }
1687 Ok(Arc::new(builder.finish()))
1688 }
1689}
1690
1691pub fn build_edge_column<'a>(
1693 name: &'a str,
1694 data_type: &'a DataType,
1695 len: usize,
1696 get_props: impl Fn(usize) -> Option<&'a Value>,
1697) -> Result<ArrayRef> {
1698 let deleted = vec![false; len];
1700 let extractor = PropertyExtractor::new(name, data_type);
1701 extractor.build_column(len, &deleted, get_props)
1702}
1703
1704#[cfg(test)]
1705mod tests {
1706 use super::*;
1707 use arrow_array::{
1708 Array, DurationMicrosecondArray,
1709 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1710 };
1711 use std::collections::HashMap;
1712 use uni_common::TemporalValue;
1713 use uni_crdt::{Crdt, GCounter};
1714
1715 #[test]
1716 fn test_arrow_to_value_string() {
1717 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1718 assert_eq!(
1719 arrow_to_value(&arr, 0, None),
1720 Value::String("hello".to_string())
1721 );
1722 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1723 assert_eq!(
1724 arrow_to_value(&arr, 2, None),
1725 Value::String("world".to_string())
1726 );
1727 }
1728
1729 #[test]
1730 fn test_arrow_to_value_int64() {
1731 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1732 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1733 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1734 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1735 }
1736
1737 #[test]
1738 #[allow(clippy::approx_constant)]
1739 fn test_arrow_to_value_float64() {
1740 let arr = Float64Array::from(vec![Some(3.14), None]);
1741 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1742 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1743 }
1744
1745 #[test]
1746 fn test_arrow_to_value_bool() {
1747 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1748 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1749 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1750 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1751 }
1752
1753 #[test]
1754 fn test_values_to_array_int64() {
1755 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1756 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1757 assert_eq!(arr.len(), 4);
1758
1759 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1760 assert_eq!(int_arr.value(0), 1);
1761 assert_eq!(int_arr.value(1), 2);
1762 assert!(int_arr.is_null(2));
1763 assert_eq!(int_arr.value(3), 4);
1764 }
1765
1766 #[test]
1767 fn test_values_to_array_string() {
1768 let values = vec![
1769 Value::String("a".to_string()),
1770 Value::String("b".to_string()),
1771 Value::Null,
1772 ];
1773 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1774 assert_eq!(arr.len(), 3);
1775
1776 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1777 assert_eq!(str_arr.value(0), "a");
1778 assert_eq!(str_arr.value(1), "b");
1779 assert!(str_arr.is_null(2));
1780 }
1781
1782 #[test]
1783 fn test_property_extractor_string() {
1784 let props: Vec<HashMap<String, Value>> = vec![
1785 [("name".to_string(), Value::String("Alice".to_string()))]
1786 .into_iter()
1787 .collect(),
1788 [("name".to_string(), Value::String("Bob".to_string()))]
1789 .into_iter()
1790 .collect(),
1791 HashMap::new(),
1792 ];
1793 let deleted = vec![false, false, true];
1794
1795 let extractor = PropertyExtractor::new("name", &DataType::String);
1796 let arr = extractor
1797 .build_column(3, &deleted, |i| props[i].get("name"))
1798 .unwrap();
1799
1800 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1801 assert_eq!(str_arr.value(0), "Alice");
1802 assert_eq!(str_arr.value(1), "Bob");
1803 assert_eq!(str_arr.value(2), ""); }
1805
1806 #[test]
1807 fn test_property_extractor_int64() {
1808 let props: Vec<HashMap<String, Value>> = vec![
1809 [("age".to_string(), Value::Int(25))].into_iter().collect(),
1810 [("age".to_string(), Value::Int(30))].into_iter().collect(),
1811 HashMap::new(),
1812 ];
1813 let deleted = vec![false, false, true];
1814
1815 let extractor = PropertyExtractor::new("age", &DataType::Int64);
1816 let arr = extractor
1817 .build_column(3, &deleted, |i| props[i].get("age"))
1818 .unwrap();
1819
1820 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1821 assert_eq!(int_arr.value(0), 25);
1822 assert_eq!(int_arr.value(1), 30);
1823 assert_eq!(int_arr.value(2), 0); }
1825
1826 #[test]
1827 fn test_property_extractor_bytes_roundtrip() {
1828 let blob = vec![0u8, 1, 2, 255];
1829 let props: Vec<HashMap<String, Value>> = vec![
1830 [("blob".to_string(), Value::Bytes(blob.clone()))]
1831 .into_iter()
1832 .collect(),
1833 [("blob".to_string(), Value::Bytes(Vec::new()))]
1834 .into_iter()
1835 .collect(),
1836 HashMap::new(),
1837 ];
1838 let deleted = vec![false, false, false];
1839
1840 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1841 let arr = extractor
1842 .build_column(3, &deleted, |i| props[i].get("blob"))
1843 .unwrap();
1844
1845 assert_eq!(
1847 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1848 Value::Bytes(blob)
1849 );
1850 assert_eq!(
1851 arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
1852 Value::Bytes(Vec::new())
1853 );
1854 assert_eq!(
1856 arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
1857 Value::Null
1858 );
1859 }
1860
1861 #[test]
1862 fn test_bytes_vs_cypher_value_disambiguation() {
1863 let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1867 let props: Vec<HashMap<String, Value>> = vec![
1868 [("blob".to_string(), Value::Bytes(raw.clone()))]
1869 .into_iter()
1870 .collect(),
1871 ];
1872 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1873 let arr = extractor
1874 .build_column(1, &[false], |i| props[i].get("blob"))
1875 .unwrap();
1876 assert_eq!(
1878 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1879 Value::Bytes(raw)
1880 );
1881 }
1882
1883 #[test]
1884 fn test_data_type_bytes_to_arrow() {
1885 assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
1886 }
1887
1888 #[test]
1889 fn test_arrow_to_value_time64() {
1890 let mut builder = Time64MicrosecondBuilder::new();
1892 builder.append_value(37_845_000_000);
1894 builder.append_value(0);
1896 builder.append_value(86_399_123_456);
1898 builder.append_null();
1899
1900 let arr = builder.finish();
1901 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1903 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1904 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1905 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1906 }
1907
1908 #[test]
1909 fn test_arrow_to_value_duration() {
1910 let arr = DurationMicrosecondArray::from(vec![
1913 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
1917 ]);
1918
1919 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1920 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1921 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1922 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1923 }
1924
1925 #[test]
1926 fn test_arrow_to_value_binary_crdt() {
1927 let mut builder = BinaryBuilder::new();
1929
1930 let mut counter = GCounter::new();
1932 counter.increment("actor1", 5);
1933 let crdt = Crdt::GCounter(counter);
1934 let bytes = crdt.to_msgpack().unwrap();
1935 builder.append_value(&bytes);
1936
1937 builder.append_null();
1939
1940 let arr = builder.finish();
1941
1942 let result = arrow_to_value(&arr, 0, None);
1944 assert!(result.as_object().is_some());
1945 let obj = result.as_object().unwrap();
1946 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1948
1949 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1951 }
1952
1953 #[test]
1954 fn test_datetime_struct_encode_decode_roundtrip() {
1955 let values = vec![
1957 Value::Temporal(TemporalValue::DateTime {
1958 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
1961 }),
1962 Value::Temporal(TemporalValue::DateTime {
1963 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
1966 }),
1967 Value::Temporal(TemporalValue::DateTime {
1968 nanos_since_epoch: 0, offset_seconds: 0,
1970 timezone_name: Some("UTC".to_string()),
1971 }),
1972 ];
1973
1974 let arr_ref = values_to_datetime_struct_array(&values);
1976 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1977 assert_eq!(arr.len(), 3);
1978
1979 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1981 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1982 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1983
1984 assert_eq!(decoded_0, values[0]);
1986 assert_eq!(decoded_1, values[1]);
1987 assert_eq!(decoded_2, values[2]);
1988
1989 if let Value::Temporal(TemporalValue::DateTime {
1991 nanos_since_epoch,
1992 offset_seconds,
1993 timezone_name,
1994 }) = decoded_0
1995 {
1996 assert_eq!(nanos_since_epoch, 441763200000000000);
1997 assert_eq!(offset_seconds, 3600);
1998 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1999 } else {
2000 panic!("Expected DateTime value");
2001 }
2002 }
2003
2004 #[test]
2005 fn test_datetime_struct_null_handling() {
2006 let values = vec![
2008 Value::Temporal(TemporalValue::DateTime {
2009 nanos_since_epoch: 441763200000000000,
2010 offset_seconds: 3600,
2011 timezone_name: Some("Europe/Paris".to_string()),
2012 }),
2013 Value::Null,
2014 Value::Temporal(TemporalValue::DateTime {
2015 nanos_since_epoch: 0,
2016 offset_seconds: 0,
2017 timezone_name: None,
2018 }),
2019 ];
2020
2021 let arr_ref = values_to_datetime_struct_array(&values);
2022 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2023 assert_eq!(arr.len(), 3);
2024
2025 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2027 assert_eq!(decoded_0, values[0]);
2028
2029 assert!(arr.is_null(1));
2031 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2032 assert_eq!(decoded_1, Value::Null);
2033
2034 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2036 assert_eq!(decoded_2, values[2]);
2037 }
2038
2039 #[test]
2040 fn test_datetime_struct_boundary_values() {
2041 let values = vec![
2043 Value::Temporal(TemporalValue::DateTime {
2044 nanos_since_epoch: 441763200000000000,
2045 offset_seconds: 0, timezone_name: None,
2047 }),
2048 Value::Temporal(TemporalValue::DateTime {
2049 nanos_since_epoch: 441763200000000000,
2050 offset_seconds: 43200, timezone_name: None,
2052 }),
2053 Value::Temporal(TemporalValue::DateTime {
2054 nanos_since_epoch: 441763200000000000,
2055 offset_seconds: -43200, timezone_name: None,
2057 }),
2058 ];
2059
2060 let arr_ref = values_to_datetime_struct_array(&values);
2061 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2062 assert_eq!(arr.len(), 3);
2063
2064 for (i, expected) in values.iter().enumerate() {
2066 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2067 assert_eq!(&decoded, expected);
2068 }
2069 }
2070
2071 #[test]
2072 fn test_datetime_old_schema_migration() {
2073 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2075 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
2078
2079 let arr = builder.finish();
2080
2081 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2083 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2084 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2085
2086 if let Value::Temporal(TemporalValue::DateTime {
2088 nanos_since_epoch,
2089 offset_seconds,
2090 timezone_name,
2091 }) = decoded_0
2092 {
2093 assert_eq!(nanos_since_epoch, 441763200000000000);
2094 assert_eq!(offset_seconds, 0);
2095 assert_eq!(timezone_name, Some("UTC".to_string()));
2096 } else {
2097 panic!("Expected DateTime value");
2098 }
2099
2100 assert_eq!(decoded_2, Value::Null);
2102 }
2103
2104 #[test]
2105 fn test_time_struct_encode_decode_roundtrip() {
2106 let values = vec![
2108 Value::Temporal(TemporalValue::Time {
2109 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
2112 Value::Temporal(TemporalValue::Time {
2113 nanos_since_midnight: 0, offset_seconds: 0,
2115 }),
2116 Value::Temporal(TemporalValue::Time {
2117 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2120 ];
2121
2122 let arr_ref = values_to_time_struct_array(&values);
2124 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2125 assert_eq!(arr.len(), 3);
2126
2127 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2129 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2130 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2131
2132 assert_eq!(decoded_0, values[0]);
2134 assert_eq!(decoded_1, values[1]);
2135 assert_eq!(decoded_2, values[2]);
2136
2137 if let Value::Temporal(TemporalValue::Time {
2139 nanos_since_midnight,
2140 offset_seconds,
2141 }) = decoded_0
2142 {
2143 assert_eq!(nanos_since_midnight, 37845000000000);
2144 assert_eq!(offset_seconds, 3600);
2145 } else {
2146 panic!("Expected Time value");
2147 }
2148 }
2149
2150 #[test]
2151 fn test_time_struct_null_handling() {
2152 let values = vec![
2154 Value::Temporal(TemporalValue::Time {
2155 nanos_since_midnight: 37845000000000,
2156 offset_seconds: 3600,
2157 }),
2158 Value::Null,
2159 Value::Temporal(TemporalValue::Time {
2160 nanos_since_midnight: 0,
2161 offset_seconds: 0,
2162 }),
2163 ];
2164
2165 let arr_ref = values_to_time_struct_array(&values);
2166 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2167 assert_eq!(arr.len(), 3);
2168
2169 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2171 assert_eq!(decoded_0, values[0]);
2172
2173 assert!(arr.is_null(1));
2175 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2176 assert_eq!(decoded_1, Value::Null);
2177
2178 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2180 assert_eq!(decoded_2, values[2]);
2181 }
2182
2183 #[test]
2186 fn test_extract_vector_f32_values_valid_vector() {
2187 let v = vec![1.0, 2.0, 3.0];
2188 let val = Value::Vector(v.clone());
2189 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2190 assert_eq!(result, v);
2191 assert!(valid);
2192 }
2193
2194 #[test]
2195 fn test_extract_vector_f32_values_vector_wrong_dims() {
2196 let v = vec![1.0, 2.0];
2197 let val = Value::Vector(v);
2198 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2199 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2200 assert!(!valid);
2201 }
2202
2203 #[test]
2204 fn test_extract_vector_f32_values_valid_list() {
2205 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2206 let val = Value::List(v);
2207 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2208 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2209 assert!(valid);
2210 }
2211
2212 #[test]
2213 fn test_extract_vector_f32_values_list_wrong_dims() {
2214 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2215 let val = Value::List(v);
2216 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2217 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2218 assert!(!valid);
2219 }
2220
2221 #[test]
2222 fn test_extract_vector_f32_values_list_int_coercion() {
2223 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2224 let val = Value::List(v);
2225 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2226 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2227 assert!(valid);
2228 }
2229
2230 #[test]
2231 fn test_extract_vector_f32_values_none() {
2232 let (result, valid) = extract_vector_f32_values(None, false, 3);
2233 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2234 assert!(!valid);
2235 }
2236
2237 #[test]
2238 fn test_extract_vector_f32_values_null() {
2239 let val = Value::Null;
2240 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2241 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2242 assert!(!valid);
2243 }
2244
2245 #[test]
2246 fn test_extract_vector_f32_values_unsupported_type() {
2247 let val = Value::String("not a vector".to_string());
2248 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2249 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2250 assert!(!valid);
2251 }
2252
2253 #[test]
2254 fn test_extract_vector_f32_values_deleted_with_none() {
2255 let (result, valid) = extract_vector_f32_values(None, true, 3);
2256 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2257 assert!(valid); }
2259
2260 #[test]
2261 fn test_extract_vector_f32_values_deleted_with_null() {
2262 let val = Value::Null;
2263 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2264 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2265 assert!(valid); }
2267
2268 #[test]
2271 fn test_values_to_fixed_size_list_vector_with_nulls() {
2272 let values = vec![
2273 Value::Vector(vec![1.0, 2.0]),
2274 Value::Null,
2275 Value::Vector(vec![3.0, 4.0]),
2276 Value::String("invalid".to_string()),
2277 ];
2278 let arr_ref = values_to_array(
2279 &values,
2280 &ArrowDataType::FixedSizeList(
2281 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2282 2,
2283 ),
2284 )
2285 .unwrap();
2286
2287 let arr = arr_ref
2288 .as_any()
2289 .downcast_ref::<FixedSizeListArray>()
2290 .unwrap();
2291
2292 assert_eq!(arr.len(), 4);
2293 assert!(arr.is_valid(0));
2294 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2296 assert!(!arr.is_valid(3)); }
2298
2299 #[test]
2300 fn test_values_to_fixed_size_list_from_list() {
2301 let values = vec![
2302 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2303 Value::List(vec![Value::Int(3), Value::Int(4)]),
2304 ];
2305 let arr_ref = values_to_array(
2306 &values,
2307 &ArrowDataType::FixedSizeList(
2308 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2309 2,
2310 ),
2311 )
2312 .unwrap();
2313
2314 let arr = arr_ref
2315 .as_any()
2316 .downcast_ref::<FixedSizeListArray>()
2317 .unwrap();
2318
2319 assert_eq!(arr.len(), 2);
2320 assert!(arr.is_valid(0));
2321 assert!(arr.is_valid(1));
2322
2323 let child = arr
2325 .values()
2326 .as_any()
2327 .downcast_ref::<Float32Array>()
2328 .unwrap();
2329 assert_eq!(child.value(0), 1.0);
2330 assert_eq!(child.value(1), 2.0);
2331 assert_eq!(child.value(2), 3.0);
2332 assert_eq!(child.value(3), 4.0);
2333 }
2334
2335 #[test]
2336 fn test_values_to_fixed_size_list_wrong_dimensions() {
2337 let values = vec![
2338 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2341 let arr_ref = values_to_array(
2342 &values,
2343 &ArrowDataType::FixedSizeList(
2344 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2345 2,
2346 ),
2347 )
2348 .unwrap();
2349
2350 let arr = arr_ref
2351 .as_any()
2352 .downcast_ref::<FixedSizeListArray>()
2353 .unwrap();
2354
2355 assert_eq!(arr.len(), 2);
2356 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2361 .values()
2362 .as_any()
2363 .downcast_ref::<Float32Array>()
2364 .unwrap();
2365 assert_eq!(child.value(0), 0.0);
2366 assert_eq!(child.value(1), 0.0);
2367 assert_eq!(child.value(2), 0.0);
2368 assert_eq!(child.value(3), 0.0);
2369 }
2370
2371 #[test]
2372 fn test_values_to_fixed_size_list_all_nulls() {
2373 let values = vec![Value::Null, Value::Null, Value::Null];
2374 let arr_ref = values_to_array(
2375 &values,
2376 &ArrowDataType::FixedSizeList(
2377 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2378 3,
2379 ),
2380 )
2381 .unwrap();
2382
2383 let arr = arr_ref
2384 .as_any()
2385 .downcast_ref::<FixedSizeListArray>()
2386 .unwrap();
2387
2388 assert_eq!(arr.len(), 3);
2389 assert!(!arr.is_valid(0));
2390 assert!(!arr.is_valid(1));
2391 assert!(!arr.is_valid(2));
2392
2393 let child = arr
2395 .values()
2396 .as_any()
2397 .downcast_ref::<Float32Array>()
2398 .unwrap();
2399 assert_eq!(child.len(), 9);
2400 }
2401
2402 #[test]
2403 fn test_values_to_fixed_size_list_mixed_types() {
2404 let values = vec![
2405 Value::Vector(vec![1.0, 2.0]),
2406 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2407 Value::Null,
2408 Value::String("invalid".to_string()),
2409 ];
2410 let arr_ref = values_to_array(
2411 &values,
2412 &ArrowDataType::FixedSizeList(
2413 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2414 2,
2415 ),
2416 )
2417 .unwrap();
2418
2419 let arr = arr_ref
2420 .as_any()
2421 .downcast_ref::<FixedSizeListArray>()
2422 .unwrap();
2423
2424 assert_eq!(arr.len(), 4);
2425 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2432 .values()
2433 .as_any()
2434 .downcast_ref::<Float32Array>()
2435 .unwrap();
2436 assert_eq!(child.value(0), 1.0);
2437 assert_eq!(child.value(1), 2.0);
2438 assert_eq!(child.value(2), 3.0);
2439 assert_eq!(child.value(3), 4.0);
2440 }
2441
2442 #[test]
2445 fn test_build_vector_column_with_nulls_and_deleted() {
2446 let data_type = DataType::Vector { dimensions: 3 };
2447 let extractor = PropertyExtractor::new("test_vec", &data_type);
2448
2449 let props = [
2450 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2451 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2454 ];
2455 let deleted = [false, false, false, true]; let arr_ref = extractor
2458 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2459 .unwrap();
2460
2461 let arr = arr_ref
2462 .as_any()
2463 .downcast_ref::<FixedSizeListArray>()
2464 .unwrap();
2465
2466 assert_eq!(arr.len(), 4);
2467 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2474 .values()
2475 .as_any()
2476 .downcast_ref::<Float32Array>()
2477 .unwrap();
2478 assert_eq!(child.value(0), 1.0);
2479 assert_eq!(child.value(1), 2.0);
2480 assert_eq!(child.value(2), 3.0);
2481 assert_eq!(child.value(9), 0.0);
2485 assert_eq!(child.value(10), 0.0);
2486 assert_eq!(child.value(11), 0.0);
2487 }
2488
2489 #[test]
2490 fn test_build_vector_column_with_list_input() {
2491 let data_type = DataType::Vector { dimensions: 2 };
2492 let extractor = PropertyExtractor::new("test_vec", &data_type);
2493
2494 let props = [
2495 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2496 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2497 Some(Value::Vector(vec![5.0, 6.0])),
2498 ];
2499 let deleted = [false, false, false];
2500
2501 let arr_ref = extractor
2502 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2503 .unwrap();
2504
2505 let arr = arr_ref
2506 .as_any()
2507 .downcast_ref::<FixedSizeListArray>()
2508 .unwrap();
2509
2510 assert_eq!(arr.len(), 3);
2511 assert!(arr.is_valid(0));
2512 assert!(arr.is_valid(1));
2513 assert!(arr.is_valid(2));
2514
2515 let child = arr
2517 .values()
2518 .as_any()
2519 .downcast_ref::<Float32Array>()
2520 .unwrap();
2521 assert_eq!(child.value(0), 1.0);
2522 assert_eq!(child.value(1), 2.0);
2523 assert_eq!(child.value(2), 3.0);
2524 assert_eq!(child.value(3), 4.0);
2525 assert_eq!(child.value(4), 5.0);
2526 assert_eq!(child.value(5), 6.0);
2527 }
2528}