1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let key_col = structs.column(0);
154 let val_col = structs.column(1);
155 let mut map = HashMap::new();
156 for i in 0..structs.len() {
157 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158 map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159 }
160 }
161 Some(map)
162}
163
164fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166 (0..arr.len())
167 .map(|i| arrow_to_value(arr.as_ref(), i, None))
168 .collect()
169}
170
171pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178 if col.is_null(row) {
179 return Value::Null;
180 }
181
182 if let Some(dt) = data_type {
184 match dt {
185 DataType::DateTime => {
186 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189 struct_arr.column_by_name("nanos_since_epoch"),
190 struct_arr.column_by_name("offset_seconds"),
191 struct_arr.column_by_name("timezone_name"),
192 )
193 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194 nanos_col
195 .as_any()
196 .downcast_ref::<TimestampNanosecondArray>(),
197 offset_col.as_any().downcast_ref::<Int32Array>(),
198 tz_col.as_any().downcast_ref::<StringArray>(),
199 )
200 {
201 if nanos_arr.is_null(row) {
202 return Value::Null;
203 }
204 let nanos = nanos_arr.value(row);
205 if offset_arr.is_null(row) {
206 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208 nanos_since_epoch: nanos,
209 });
210 }
211 let offset = offset_arr.value(row);
212 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213 return Value::Temporal(uni_common::TemporalValue::DateTime {
214 nanos_since_epoch: nanos,
215 offset_seconds: offset,
216 timezone_name: tz_name,
217 });
218 }
219 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221 let nanos = ts.value(row);
222 let tz_name = ts.timezone().map(|s| s.to_string());
223 return Value::Temporal(uni_common::TemporalValue::DateTime {
224 nanos_since_epoch: nanos,
225 offset_seconds: 0,
226 timezone_name: tz_name,
227 });
228 }
229 }
230 DataType::Time => {
231 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233 && let (Some(nanos_col), Some(offset_col)) = (
234 struct_arr.column_by_name("nanos_since_midnight"),
235 struct_arr.column_by_name("offset_seconds"),
236 )
237 && let (Some(nanos_arr), Some(offset_arr)) = (
238 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239 offset_col.as_any().downcast_ref::<Int32Array>(),
240 )
241 {
242 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244 return Value::Null;
245 }
246 let nanos = nanos_arr.value(row);
247 let offset = offset_arr.value(row);
248 return Value::Temporal(uni_common::TemporalValue::Time {
249 nanos_since_midnight: nanos,
250 offset_seconds: offset,
251 });
252 }
253 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255 let nanos = t.value(row);
256 return Value::Temporal(uni_common::TemporalValue::Time {
257 nanos_since_midnight: nanos,
258 offset_seconds: 0,
259 });
260 }
261 }
262 DataType::Btic => {
263 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
264 log::warn!("BTIC column is not FixedSizeBinaryArray");
265 return Value::Null;
266 };
267 let bytes = fsb.value(row);
268 return match uni_btic::encode::decode_slice(bytes) {
269 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
270 lo: btic.lo(),
271 hi: btic.hi(),
272 meta: btic.meta(),
273 }),
274 Err(e) => {
275 log::warn!("BTIC decode error: {}", e);
276 Value::Null
277 }
278 };
279 }
280 _ => {}
281 }
282 }
283
284 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
286 return Value::String(s.value(row).to_string());
287 }
288
289 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
291 return Value::Int(u.value(row) as i64);
292 }
293 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
294 return Value::Int(i.value(row));
295 }
296 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
297 return Value::Int(i.value(row) as i64);
298 }
299
300 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
302 return Value::Float(f.value(row));
303 }
304 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
305 return Value::Float(f.value(row) as f64);
306 }
307
308 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
310 return Value::Bool(b.value(row));
311 }
312
313 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
315 return Value::List(array_to_value_list(&list.value(row)));
316 }
317
318 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
320 let arr = list.value(row);
321
322 if let Some(obj) = try_reconstruct_map(&arr) {
324 return Value::Map(obj);
325 }
326
327 return Value::List(array_to_value_list(&arr));
328 }
329
330 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
332 return Value::List(array_to_value_list(&list.value(row)));
333 }
334
335 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
337 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
338
339 if field_names.contains(&"nanos_since_epoch")
341 && field_names.contains(&"offset_seconds")
342 && field_names.contains(&"timezone_name")
343 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
344 s.column_by_name("nanos_since_epoch"),
345 s.column_by_name("offset_seconds"),
346 s.column_by_name("timezone_name"),
347 )
348 {
349 let nanos_opt = nanos_col
351 .as_any()
352 .downcast_ref::<TimestampNanosecondArray>()
353 .map(|a| {
354 if a.is_null(row) {
355 None
356 } else {
357 Some(a.value(row))
358 }
359 })
360 .or_else(|| {
361 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
362 if a.is_null(row) {
363 None
364 } else {
365 Some(a.value(row))
366 }
367 })
368 });
369 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
370 if a.is_null(row) {
371 None
372 } else {
373 Some(a.value(row))
374 }
375 });
376
377 if let Some(Some(nanos)) = nanos_opt {
378 match offset_opt {
379 Some(Some(offset)) => {
380 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
381 if a.is_null(row) {
382 None
383 } else {
384 Some(a.value(row).to_string())
385 }
386 });
387 return Value::Temporal(uni_common::TemporalValue::DateTime {
388 nanos_since_epoch: nanos,
389 offset_seconds: offset,
390 timezone_name: tz_name,
391 });
392 }
393 _ => {
394 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
396 nanos_since_epoch: nanos,
397 });
398 }
399 }
400 }
401 }
402
403 if field_names.contains(&"nanos_since_midnight")
405 && field_names.contains(&"offset_seconds")
406 && let (Some(nanos_col), Some(offset_col)) = (
407 s.column_by_name("nanos_since_midnight"),
408 s.column_by_name("offset_seconds"),
409 )
410 {
411 let nanos_opt = nanos_col
413 .as_any()
414 .downcast_ref::<Time64NanosecondArray>()
415 .map(|a| {
416 if a.is_null(row) {
417 None
418 } else {
419 Some(a.value(row))
420 }
421 })
422 .or_else(|| {
423 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
424 if a.is_null(row) {
425 None
426 } else {
427 Some(a.value(row))
428 }
429 })
430 });
431 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
432 if a.is_null(row) {
433 None
434 } else {
435 Some(a.value(row))
436 }
437 });
438
439 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
440 return Value::Temporal(uni_common::TemporalValue::Time {
441 nanos_since_midnight: nanos,
442 offset_seconds: offset,
443 });
444 }
445 }
446
447 let mut map = HashMap::new();
449 for (field, child) in s.fields().iter().zip(s.columns()) {
450 map.insert(
451 field.name().clone(),
452 arrow_to_value(child.as_ref(), row, None),
453 );
454 }
455 return Value::Map(map);
456 }
457
458 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
460 let days = d.value(row);
461 return Value::Temporal(uni_common::TemporalValue::Date {
462 days_since_epoch: days,
463 });
464 }
465
466 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
468 let nanos = ts.value(row);
469 return match ts.timezone() {
470 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
471 nanos_since_epoch: nanos,
472 offset_seconds: 0,
473 timezone_name: Some(tz.to_string()),
474 }),
475 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
476 nanos_since_epoch: nanos,
477 }),
478 };
479 }
480
481 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
483 let nanos = t.value(row);
484 return Value::Temporal(uni_common::TemporalValue::LocalTime {
485 nanos_since_midnight: nanos,
486 });
487 }
488
489 if let Some(t) = col
491 .as_any()
492 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
493 {
494 let micros = t.value(row);
495 return Value::Temporal(uni_common::TemporalValue::LocalTime {
496 nanos_since_midnight: micros * 1000,
497 });
498 }
499
500 if let Some(d) = col
502 .as_any()
503 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
504 {
505 let micros = d.value(row);
506 let total_nanos = micros * 1000;
507 let seconds = total_nanos / 1_000_000_000;
508 let remaining_nanos = total_nanos % 1_000_000_000;
509 return Value::Temporal(uni_common::TemporalValue::Duration {
510 months: 0,
511 days: 0,
512 nanos: seconds * 1_000_000_000 + remaining_nanos,
513 });
514 }
515
516 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
518 let val = interval.value(row);
519 return Value::Temporal(uni_common::TemporalValue::Duration {
520 months: val.months as i64,
521 days: val.days as i64,
522 nanos: val.nanoseconds,
523 });
524 }
525
526 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
528 let bytes = b.value(row);
529 if bytes.is_empty() {
530 return Value::Null;
531 }
532 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
533 eprintln!("CypherValue decode error: {}", e);
534 Value::Null
535 });
536 }
537
538 if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
540 && fsb.value_length() == 24
541 {
542 let bytes = fsb.value(row);
543 return match uni_btic::encode::decode_slice(bytes) {
544 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
545 lo: btic.lo(),
546 hi: btic.hi(),
547 meta: btic.meta(),
548 }),
549 Err(e) => {
550 log::warn!("BTIC decode error: {}", e);
551 Value::Null
552 }
553 };
554 }
555
556 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
558 let bytes = b.value(row);
559 return Crdt::from_msgpack(bytes)
560 .ok()
561 .and_then(|crdt| serde_json::to_value(&crdt).ok())
562 .map(Value::from)
563 .unwrap_or(Value::Null);
564 }
565
566 Value::Null
568}
569
570fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
571 let mut builder = UInt64Builder::with_capacity(values.len());
572 for v in values {
573 if let Some(n) = v.as_u64() {
574 builder.append_value(n);
575 } else {
576 builder.append_null();
577 }
578 }
579 Arc::new(builder.finish())
580}
581
582fn values_to_int64_array(values: &[Value]) -> ArrayRef {
583 let mut builder = Int64Builder::with_capacity(values.len());
584 for v in values {
585 if let Some(n) = v.as_i64() {
586 builder.append_value(n);
587 } else {
588 builder.append_null();
589 }
590 }
591 Arc::new(builder.finish())
592}
593
594fn values_to_int32_array(values: &[Value]) -> ArrayRef {
595 let mut builder = Int32Builder::with_capacity(values.len());
596 for v in values {
597 if let Some(n) = v.as_i64() {
598 builder.append_value(n as i32);
599 } else {
600 builder.append_null();
601 }
602 }
603 Arc::new(builder.finish())
604}
605
606fn values_to_string_array(values: &[Value]) -> ArrayRef {
607 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
608 for v in values {
609 if let Some(s) = v.as_str() {
610 builder.append_value(s);
611 } else if v.is_null() {
612 builder.append_null();
613 } else {
614 builder.append_value(v.to_string());
615 }
616 }
617 Arc::new(builder.finish())
618}
619
620fn values_to_bool_array(values: &[Value]) -> ArrayRef {
621 let mut builder = BooleanBuilder::with_capacity(values.len());
622 for v in values {
623 if let Some(b) = v.as_bool() {
624 builder.append_value(b);
625 } else {
626 builder.append_null();
627 }
628 }
629 Arc::new(builder.finish())
630}
631
632fn values_to_float32_array(values: &[Value]) -> ArrayRef {
633 let mut builder = Float32Builder::with_capacity(values.len());
634 for v in values {
635 if let Some(n) = v.as_f64() {
636 builder.append_value(n as f32);
637 } else {
638 builder.append_null();
639 }
640 }
641 Arc::new(builder.finish())
642}
643
644fn values_to_float64_array(values: &[Value]) -> ArrayRef {
645 let mut builder = Float64Builder::with_capacity(values.len());
646 for v in values {
647 if let Some(n) = v.as_f64() {
648 builder.append_value(n);
649 } else {
650 builder.append_null();
651 }
652 }
653 Arc::new(builder.finish())
654}
655
656fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
657 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
658 for v in values {
659 match v {
660 Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
661 let btic = uni_btic::Btic::new(*lo, *hi, *meta)
662 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
663 builder.append_value(uni_btic::encode::encode(&btic))?;
664 }
665 Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
666 Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
667 Err(_) => builder.append_null(),
668 },
669 Value::List(bytes) => {
670 let b: Vec<u8> = bytes
671 .iter()
672 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
673 .collect();
674 if b.len() as i32 == size {
675 builder.append_value(&b)?;
676 } else {
677 builder.append_null();
678 }
679 }
680 _ => builder.append_null(),
681 }
682 }
683 Ok(Arc::new(builder.finish()))
684}
685
686pub fn extract_vector_f32_values(
701 val: Option<&Value>,
702 is_deleted: bool,
703 dimensions: usize,
704) -> (Vec<f32>, bool) {
705 let zeros = || vec![0.0_f32; dimensions];
706
707 if is_deleted {
709 return (zeros(), true);
710 }
711
712 match val {
713 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
715 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
718 let values: Vec<f32> = arr
719 .iter()
720 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
721 .collect();
722 (values, true)
723 }
724 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
727}
728
729fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
730 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
731 for v in values {
732 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
733 for val in vals {
734 builder.values().append_value(val);
735 }
736 builder.append(valid);
737 }
738 Arc::new(builder.finish())
739}
740
741fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
742 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
743 for v in values {
744 if v.is_null() {
745 builder.append_null();
746 } else if let Value::Temporal(tv) = v {
747 match tv {
748 uni_common::TemporalValue::DateTime {
749 nanos_since_epoch, ..
750 }
751 | uni_common::TemporalValue::LocalDateTime {
752 nanos_since_epoch, ..
753 } => builder.append_value(*nanos_since_epoch),
754 _ => builder.append_null(),
755 }
756 } else if let Some(n) = v.as_i64() {
757 builder.append_value(n);
758 } else if let Some(s) = v.as_str() {
759 match parse_datetime_to_nanos(s) {
760 Some(nanos) => builder.append_value(nanos),
761 None => builder.append_null(),
762 }
763 } else {
764 builder.append_null();
765 }
766 }
767
768 let arr = builder.finish();
769 if let Some(tz) = tz {
770 Arc::new(arr.with_timezone(tz.as_ref()))
771 } else {
772 Arc::new(arr)
773 }
774}
775
776fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
781 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
782 let mut offset_builder = Int32Builder::with_capacity(values.len());
783 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
784 let mut null_buffer = BooleanBufferBuilder::new(values.len());
785
786 for v in values {
787 match v {
788 Value::Temporal(uni_common::TemporalValue::DateTime {
789 nanos_since_epoch,
790 offset_seconds,
791 timezone_name,
792 }) => {
793 nanos_builder.append_value(*nanos_since_epoch);
794 offset_builder.append_value(*offset_seconds);
795 tz_builder.append_option(timezone_name.as_deref());
796 null_buffer.append(true);
797 }
798 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
799 nanos_builder.append_value(*nanos_since_epoch);
800 offset_builder.append_null();
801 tz_builder.append_null();
802 null_buffer.append(true);
803 }
804 _ => {
805 nanos_builder.append_null();
806 offset_builder.append_null();
807 tz_builder.append_null();
808 null_buffer.append(false);
809 }
810 }
811 }
812
813 let struct_arr = StructArray::new(
814 schema::datetime_struct_fields(),
815 vec![
816 Arc::new(nanos_builder.finish()) as ArrayRef,
817 Arc::new(offset_builder.finish()) as ArrayRef,
818 Arc::new(tz_builder.finish()) as ArrayRef,
819 ],
820 Some(null_buffer.finish().into()),
821 );
822 Arc::new(struct_arr)
823}
824
825fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
830 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
831 let mut offset_builder = Int32Builder::with_capacity(values.len());
832 let mut null_buffer = BooleanBufferBuilder::new(values.len());
833
834 for v in values {
835 match v {
836 Value::Temporal(uni_common::TemporalValue::Time {
837 nanos_since_midnight,
838 offset_seconds,
839 }) => {
840 nanos_builder.append_value(*nanos_since_midnight);
841 offset_builder.append_value(*offset_seconds);
842 null_buffer.append(true);
843 }
844 Value::Temporal(uni_common::TemporalValue::LocalTime {
845 nanos_since_midnight,
846 }) => {
847 nanos_builder.append_value(*nanos_since_midnight);
848 offset_builder.append_null();
849 null_buffer.append(true);
850 }
851 _ => {
852 nanos_builder.append_null();
853 offset_builder.append_null();
854 null_buffer.append(false);
855 }
856 }
857 }
858
859 let struct_arr = StructArray::new(
860 schema::time_struct_fields(),
861 vec![
862 Arc::new(nanos_builder.finish()) as ArrayRef,
863 Arc::new(offset_builder.finish()) as ArrayRef,
864 ],
865 Some(null_buffer.finish().into()),
866 );
867 Arc::new(struct_arr)
868}
869
870fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
871 let mut builder =
872 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
873 for v in values {
874 if v.is_null() {
875 builder.append_null();
876 } else {
877 let cv_bytes = uni_common::cypher_value_codec::encode(v);
879 builder.append_value(&cv_bytes);
880 }
881 }
882 Arc::new(builder.finish())
883}
884
885pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
887 match dt {
888 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
889 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
890 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
891 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
892 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
893 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
894 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
895 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
896 ArrowDataType::FixedSizeList(inner, size) => {
897 if inner.data_type() == &ArrowDataType::Float32 {
898 Ok(values_to_fixed_size_list_f32_array(values, *size))
899 } else {
900 Err(anyhow!("Unsupported FixedSizeList inner type"))
901 }
902 }
903 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
904 Ok(values_to_timestamp_array(values, tz.as_ref()))
905 }
906 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
907 Ok(values_to_timestamp_array(values, tz.as_ref()))
908 }
909 ArrowDataType::Date32 => {
910 let mut builder = Date32Builder::with_capacity(values.len());
911 for v in values {
912 if v.is_null() {
913 builder.append_null();
914 } else if let Value::Temporal(uni_common::TemporalValue::Date {
915 days_since_epoch,
916 }) = v
917 {
918 builder.append_value(*days_since_epoch);
919 } else if let Some(n) = v.as_i64() {
920 builder.append_value(n as i32);
921 } else {
922 builder.append_null();
923 }
924 }
925 Ok(Arc::new(builder.finish()))
926 }
927 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
928 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
929 for v in values {
930 if v.is_null() {
931 builder.append_null();
932 } else if let Value::Temporal(tv) = v {
933 match tv {
934 uni_common::TemporalValue::LocalTime {
935 nanos_since_midnight,
936 }
937 | uni_common::TemporalValue::Time {
938 nanos_since_midnight,
939 ..
940 } => builder.append_value(*nanos_since_midnight),
941 _ => builder.append_null(),
942 }
943 } else if let Some(n) = v.as_i64() {
944 builder.append_value(n);
945 } else {
946 builder.append_null();
947 }
948 }
949 Ok(Arc::new(builder.finish()))
950 }
951 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
952 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
953 for v in values {
954 if v.is_null() {
955 builder.append_null();
956 } else if let Value::Temporal(tv) = v {
957 match tv {
958 uni_common::TemporalValue::LocalTime {
959 nanos_since_midnight,
960 }
961 | uni_common::TemporalValue::Time {
962 nanos_since_midnight,
963 ..
964 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
966 }
967 } else if let Some(n) = v.as_i64() {
968 builder.append_value(n);
969 } else {
970 builder.append_null();
971 }
972 }
973 Ok(Arc::new(builder.finish()))
974 }
975 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
976 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
977 for v in values {
978 if v.is_null() {
979 builder.append_null();
980 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
981 months,
982 days,
983 nanos,
984 }) = v
985 {
986 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
987 months: *months as i32,
988 days: *days as i32,
989 nanoseconds: *nanos,
990 });
991 } else {
992 builder.append_null();
993 }
994 }
995 Ok(Arc::new(builder.finish()))
996 }
997 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
998 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
999 for v in values {
1000 if v.is_null() {
1001 builder.append_null();
1002 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1003 months,
1004 days,
1005 nanos,
1006 }) = v
1007 {
1008 let total_micros =
1009 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1010 builder.append_value(total_micros);
1011 } else if let Some(n) = v.as_i64() {
1012 builder.append_value(n);
1013 } else {
1014 builder.append_null();
1015 }
1016 }
1017 Ok(Arc::new(builder.finish()))
1018 }
1019 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1020 ArrowDataType::List(field) => {
1021 if field.data_type() == &ArrowDataType::Utf8 {
1022 let mut builder = ListBuilder::new(StringBuilder::new());
1023 for v in values {
1024 if let Value::List(arr) = v {
1025 for item in arr {
1026 if let Some(s) = item.as_str() {
1027 builder.values().append_value(s);
1028 } else {
1029 builder.values().append_null();
1030 }
1031 }
1032 builder.append(true);
1033 } else {
1034 builder.append_null();
1035 }
1036 }
1037 Ok(Arc::new(builder.finish()))
1038 } else {
1039 Err(anyhow!(
1040 "Unsupported List inner type: {:?}",
1041 field.data_type()
1042 ))
1043 }
1044 }
1045 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1046 Ok(values_to_datetime_struct_array(values))
1047 }
1048 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1049 Ok(values_to_time_struct_array(values))
1050 }
1051 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1052 }
1053}
1054
1055pub struct PropertyExtractor<'a> {
1057 data_type: &'a DataType,
1058}
1059
1060impl<'a> PropertyExtractor<'a> {
1061 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1062 Self { data_type }
1063 }
1064
1065 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1068 where
1069 F: Fn(usize) -> Option<&'a Value>,
1070 {
1071 match self.data_type {
1072 DataType::String => self.build_string_column(len, deleted, get_props),
1073 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1074 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1075 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1076 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1077 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1078 DataType::Vector { dimensions } => {
1079 self.build_vector_column(len, deleted, get_props, *dimensions)
1080 }
1081 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1082 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1083 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1084 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1085 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1086 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1087 DataType::Date => self.build_date32_column(len, deleted, get_props),
1088 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1089 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1090 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1091 _ => Err(anyhow!(
1092 "Unsupported data type for arrow conversion: {:?}",
1093 self.data_type
1094 )),
1095 }
1096 }
1097
1098 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1099 where
1100 F: Fn(usize) -> Option<&'a Value>,
1101 {
1102 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1103 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1104 let prop = get_props(i);
1105 if let Some(s) = prop.and_then(|v| v.as_str()) {
1106 builder.append_value(s);
1107 } else if let Some(Value::Temporal(tv)) = prop {
1108 builder.append_value(tv.to_string());
1109 } else if is_deleted {
1110 builder.append_value("");
1111 } else {
1112 builder.append_null();
1113 }
1114 }
1115 Ok(Arc::new(builder.finish()))
1116 }
1117
1118 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1119 where
1120 F: Fn(usize) -> Option<&'a Value>,
1121 {
1122 let mut values = Vec::with_capacity(len);
1123 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1124 let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1125 if val.is_none() && is_deleted {
1126 values.push(Some(0));
1127 } else {
1128 values.push(val);
1129 }
1130 }
1131 Ok(Arc::new(Int32Array::from(values)))
1132 }
1133
1134 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1135 where
1136 F: Fn(usize) -> Option<&'a Value>,
1137 {
1138 let mut values = Vec::with_capacity(len);
1139 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1140 let val = get_props(i).and_then(|v| v.as_i64());
1141 if val.is_none() && is_deleted {
1142 values.push(Some(0));
1143 } else {
1144 values.push(val);
1145 }
1146 }
1147 Ok(Arc::new(Int64Array::from(values)))
1148 }
1149
1150 fn build_timestamp_column<F>(
1151 &self,
1152 len: usize,
1153 deleted: &[bool],
1154 get_props: F,
1155 ) -> Result<ArrayRef>
1156 where
1157 F: Fn(usize) -> Option<&'a Value>,
1158 {
1159 let mut values = Vec::with_capacity(len);
1160 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1161 let val = get_props(i);
1162 let ts = if is_deleted || val.is_none() {
1163 Some(0i64)
1164 } else if let Some(Value::Temporal(tv)) = val {
1165 match tv {
1166 uni_common::TemporalValue::DateTime {
1167 nanos_since_epoch, ..
1168 }
1169 | uni_common::TemporalValue::LocalDateTime {
1170 nanos_since_epoch, ..
1171 } => Some(*nanos_since_epoch),
1172 _ => None,
1173 }
1174 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1175 Some(v)
1176 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1177 parse_datetime_to_nanos(s)
1178 } else {
1179 None
1180 };
1181
1182 if is_deleted {
1183 values.push(Some(0));
1184 } else {
1185 values.push(ts);
1186 }
1187 }
1188 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1189 Ok(Arc::new(arr))
1190 }
1191
1192 fn build_datetime_struct_column<F>(
1193 &self,
1194 len: usize,
1195 deleted: &[bool],
1196 get_props: F,
1197 ) -> Result<ArrayRef>
1198 where
1199 F: Fn(usize) -> Option<&'a Value>,
1200 {
1201 let values = self.collect_values_or_null(len, deleted, &get_props);
1202 Ok(values_to_datetime_struct_array(&values))
1203 }
1204
1205 fn build_time_struct_column<F>(
1206 &self,
1207 len: usize,
1208 deleted: &[bool],
1209 get_props: F,
1210 ) -> Result<ArrayRef>
1211 where
1212 F: Fn(usize) -> Option<&'a Value>,
1213 {
1214 let values = self.collect_values_or_null(len, deleted, &get_props);
1215 Ok(values_to_time_struct_array(&values))
1216 }
1217
1218 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1220 where
1221 F: Fn(usize) -> Option<&'a Value>,
1222 {
1223 deleted
1224 .iter()
1225 .enumerate()
1226 .take(len)
1227 .map(|(i, &is_deleted)| {
1228 if is_deleted {
1229 Value::Null
1230 } else {
1231 get_props(i).cloned().unwrap_or(Value::Null)
1232 }
1233 })
1234 .collect()
1235 }
1236
1237 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1238 where
1239 F: Fn(usize) -> Option<&'a Value>,
1240 {
1241 let mut builder = Date32Builder::with_capacity(len);
1242 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1243
1244 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1245 let val = get_props(i);
1246 let days = if is_deleted || val.is_none() {
1247 Some(0)
1248 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1249 days_since_epoch,
1250 })) = val
1251 {
1252 Some(*days_since_epoch)
1253 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1254 Some(v as i32)
1255 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1256 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1257 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1258 Err(_) => None,
1259 }
1260 } else {
1261 None
1262 };
1263
1264 if is_deleted {
1265 builder.append_value(0);
1266 } else if let Some(v) = days {
1267 builder.append_value(v);
1268 } else {
1269 builder.append_null();
1270 }
1271 }
1272 Ok(Arc::new(builder.finish()))
1273 }
1274
1275 fn build_duration_column<F>(
1276 &self,
1277 len: usize,
1278 deleted: &[bool],
1279 get_props: F,
1280 ) -> Result<ArrayRef>
1281 where
1282 F: Fn(usize) -> Option<&'a Value>,
1283 {
1284 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1286 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1287 let raw_val = get_props(i);
1288 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1289 {
1290 let encoded = uni_common::cypher_value_codec::encode(val);
1291 builder.append_value(&encoded);
1292 } else if is_deleted {
1293 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1294 months: 0,
1295 days: 0,
1296 nanos: 0,
1297 });
1298 let encoded = uni_common::cypher_value_codec::encode(&zero);
1299 builder.append_value(&encoded);
1300 } else {
1301 builder.append_null();
1302 }
1303 }
1304 Ok(Arc::new(builder.finish()))
1305 }
1306
1307 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1308 where
1309 F: Fn(usize) -> Option<&'a Value>,
1310 {
1311 const ENCODED_LEN: i32 = 24;
1312 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1313 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1314 let raw_val = get_props(i);
1315 let btic = match raw_val {
1316 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1317 uni_btic::Btic::new(*lo, *hi, *meta)
1318 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1319 ),
1320 Some(Value::String(s)) => Some(
1321 uni_btic::parse::parse_btic_literal(s)
1322 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1323 ),
1324 _ => None,
1325 };
1326
1327 if let Some(b) = btic {
1328 builder.append_value(uni_btic::encode::encode(&b))?;
1329 } else if is_deleted {
1330 builder.append_value([0u8; ENCODED_LEN as usize])?;
1331 } else {
1332 builder.append_null();
1333 }
1334 }
1335 Ok(Arc::new(builder.finish()))
1336 }
1337
1338 fn build_float32_column<F>(
1339 &self,
1340 len: usize,
1341 deleted: &[bool],
1342 get_props: F,
1343 ) -> Result<ArrayRef>
1344 where
1345 F: Fn(usize) -> Option<&'a Value>,
1346 {
1347 let mut values = Vec::with_capacity(len);
1348 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1349 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1350 if val.is_none() && is_deleted {
1351 values.push(Some(0.0));
1352 } else {
1353 values.push(val);
1354 }
1355 }
1356 Ok(Arc::new(Float32Array::from(values)))
1357 }
1358
1359 fn build_float64_column<F>(
1360 &self,
1361 len: usize,
1362 deleted: &[bool],
1363 get_props: F,
1364 ) -> Result<ArrayRef>
1365 where
1366 F: Fn(usize) -> Option<&'a Value>,
1367 {
1368 let mut values = Vec::with_capacity(len);
1369 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1370 let val = get_props(i).and_then(|v| v.as_f64());
1371 if val.is_none() && is_deleted {
1372 values.push(Some(0.0));
1373 } else {
1374 values.push(val);
1375 }
1376 }
1377 Ok(Arc::new(Float64Array::from(values)))
1378 }
1379
1380 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1381 where
1382 F: Fn(usize) -> Option<&'a Value>,
1383 {
1384 let mut values = Vec::with_capacity(len);
1385 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1386 let val = get_props(i).and_then(|v| v.as_bool());
1387 if val.is_none() && is_deleted {
1388 values.push(Some(false));
1389 } else {
1390 values.push(val);
1391 }
1392 }
1393 Ok(Arc::new(BooleanArray::from(values)))
1394 }
1395
1396 fn build_vector_column<F>(
1397 &self,
1398 len: usize,
1399 deleted: &[bool],
1400 get_props: F,
1401 dimensions: usize,
1402 ) -> Result<ArrayRef>
1403 where
1404 F: Fn(usize) -> Option<&'a Value>,
1405 {
1406 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1407
1408 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1409 let val = get_props(i);
1410 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1411 for v in values {
1412 builder.values().append_value(v);
1413 }
1414 builder.append(valid);
1415 }
1416 Ok(Arc::new(builder.finish()))
1417 }
1418
1419 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1420 where
1421 F: Fn(usize) -> Option<&'a Value>,
1422 {
1423 let null_val = Value::Null;
1424 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1425 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1426 let val = get_props(i);
1427 let uni_val = if val.is_none() && is_deleted {
1428 &null_val
1429 } else {
1430 val.unwrap_or(&null_val)
1431 };
1432 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1434 builder.append_value(&cv_bytes);
1435 }
1436 Ok(Arc::new(builder.finish()))
1437 }
1438
1439 fn build_list_column<F>(
1440 &self,
1441 len: usize,
1442 deleted: &[bool],
1443 get_props: F,
1444 inner: &DataType,
1445 ) -> Result<ArrayRef>
1446 where
1447 F: Fn(usize) -> Option<&'a Value>,
1448 {
1449 match inner {
1450 DataType::String => {
1451 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1452 if let Some(s) = v.as_str() {
1453 b.append_value(s);
1454 } else {
1455 b.append_null();
1456 }
1457 })
1458 }
1459 DataType::Int64 => {
1460 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1461 if let Some(n) = v.as_i64() {
1462 b.append_value(n);
1463 } else {
1464 b.append_null();
1465 }
1466 })
1467 }
1468 DataType::Float64 => {
1469 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1470 if let Some(f) = v.as_f64() {
1471 b.append_value(f);
1472 } else {
1473 b.append_null();
1474 }
1475 })
1476 }
1477 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1478 }
1479 }
1480
1481 fn build_typed_list<F, B, A>(
1483 &self,
1484 len: usize,
1485 deleted: &[bool],
1486 get_props: &F,
1487 inner_builder: B,
1488 mut append_value: A,
1489 ) -> Result<ArrayRef>
1490 where
1491 F: Fn(usize) -> Option<&'a Value>,
1492 B: arrow_array::builder::ArrayBuilder,
1493 A: FnMut(&Value, &mut B),
1494 {
1495 let mut builder = ListBuilder::new(inner_builder);
1496 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1497 let val_array = get_props(i).and_then(|v| v.as_array());
1498 if val_array.is_none() && is_deleted {
1499 builder.append_null();
1500 } else if let Some(arr) = val_array {
1501 for v in arr {
1502 append_value(v, builder.values());
1503 }
1504 builder.append(true);
1505 } else {
1506 builder.append_null();
1507 }
1508 }
1509 Ok(Arc::new(builder.finish()))
1510 }
1511
1512 fn build_map_column<F>(
1513 &self,
1514 len: usize,
1515 deleted: &[bool],
1516 get_props: F,
1517 key: &DataType,
1518 value: &DataType,
1519 ) -> Result<ArrayRef>
1520 where
1521 F: Fn(usize) -> Option<&'a Value>,
1522 {
1523 if !matches!(key, DataType::String) {
1524 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1525 }
1526
1527 match value {
1528 DataType::String => self.build_typed_map(
1529 len,
1530 deleted,
1531 &get_props,
1532 StringBuilder::new(),
1533 arrow_schema::DataType::Utf8,
1534 |v, b: &mut StringBuilder| {
1535 if let Some(s) = v.as_str() {
1536 b.append_value(s);
1537 } else {
1538 b.append_null();
1539 }
1540 },
1541 ),
1542 DataType::Int64 => self.build_typed_map(
1543 len,
1544 deleted,
1545 &get_props,
1546 Int64Builder::new(),
1547 arrow_schema::DataType::Int64,
1548 |v, b: &mut Int64Builder| {
1549 if let Some(n) = v.as_i64() {
1550 b.append_value(n);
1551 } else {
1552 b.append_null();
1553 }
1554 },
1555 ),
1556 _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1557 }
1558 }
1559
1560 fn build_typed_map<F, B, A>(
1562 &self,
1563 len: usize,
1564 deleted: &[bool],
1565 get_props: &F,
1566 value_builder: B,
1567 value_arrow_type: arrow_schema::DataType,
1568 mut append_value: A,
1569 ) -> Result<ArrayRef>
1570 where
1571 F: Fn(usize) -> Option<&'a Value>,
1572 B: arrow_array::builder::ArrayBuilder,
1573 A: FnMut(&Value, &mut B),
1574 {
1575 let key_builder = Box::new(StringBuilder::new());
1576 let value_builder = Box::new(value_builder);
1577 let struct_builder = StructBuilder::new(
1578 vec![
1579 Field::new("key", arrow_schema::DataType::Utf8, false),
1580 Field::new("value", value_arrow_type, true),
1581 ],
1582 vec![key_builder, value_builder],
1583 );
1584 let mut builder = ListBuilder::new(struct_builder);
1585
1586 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1587 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1588 }
1589 Ok(Arc::new(builder.finish()))
1590 }
1591
1592 fn append_map_entry<B, A>(
1594 &self,
1595 builder: &mut ListBuilder<StructBuilder>,
1596 val: Option<&'a Value>,
1597 is_deleted: bool,
1598 append_value: &mut A,
1599 ) where
1600 B: arrow_array::builder::ArrayBuilder,
1601 A: FnMut(&Value, &mut B),
1602 {
1603 let val_obj = val.and_then(|v| v.as_object());
1604 if val_obj.is_none() && is_deleted {
1605 builder.append(false);
1606 } else if let Some(obj) = val_obj {
1607 let struct_b = builder.values();
1608 for (k, v) in obj {
1609 struct_b
1610 .field_builder::<StringBuilder>(0)
1611 .unwrap()
1612 .append_value(k);
1613 let value_b = struct_b.field_builder::<B>(1).unwrap();
1615 append_value(v, value_b);
1616 struct_b.append(true);
1617 }
1618 builder.append(true);
1619 } else {
1620 builder.append(false);
1621 }
1622 }
1623
1624 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1625 where
1626 F: Fn(usize) -> Option<&'a Value>,
1627 {
1628 let mut builder = BinaryBuilder::new();
1629 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1630 if is_deleted {
1631 builder.append_null();
1632 continue;
1633 }
1634 if let Some(val) = get_props(i) {
1635 let crdt_result = if let Some(s) = val.as_str() {
1638 serde_json::from_str::<Crdt>(s)
1639 } else {
1640 let json_val: serde_json::Value = val.clone().into();
1642 serde_json::from_value::<Crdt>(json_val)
1643 };
1644
1645 if let Ok(crdt) = crdt_result {
1646 if let Ok(bytes) = crdt.to_msgpack() {
1647 builder.append_value(&bytes);
1648 } else {
1649 builder.append_null();
1650 }
1651 } else {
1652 builder.append_null();
1653 }
1654 } else {
1655 builder.append_null();
1656 }
1657 }
1658 Ok(Arc::new(builder.finish()))
1659 }
1660}
1661
1662pub fn build_edge_column<'a>(
1664 name: &'a str,
1665 data_type: &'a DataType,
1666 len: usize,
1667 get_props: impl Fn(usize) -> Option<&'a Value>,
1668) -> Result<ArrayRef> {
1669 let deleted = vec![false; len];
1671 let extractor = PropertyExtractor::new(name, data_type);
1672 extractor.build_column(len, &deleted, get_props)
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677 use super::*;
1678 use arrow_array::{
1679 Array, DurationMicrosecondArray,
1680 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1681 };
1682 use std::collections::HashMap;
1683 use uni_common::TemporalValue;
1684 use uni_crdt::{Crdt, GCounter};
1685
1686 #[test]
1687 fn test_arrow_to_value_string() {
1688 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1689 assert_eq!(
1690 arrow_to_value(&arr, 0, None),
1691 Value::String("hello".to_string())
1692 );
1693 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1694 assert_eq!(
1695 arrow_to_value(&arr, 2, None),
1696 Value::String("world".to_string())
1697 );
1698 }
1699
1700 #[test]
1701 fn test_arrow_to_value_int64() {
1702 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1703 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1704 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1705 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1706 }
1707
1708 #[test]
1709 #[allow(clippy::approx_constant)]
1710 fn test_arrow_to_value_float64() {
1711 let arr = Float64Array::from(vec![Some(3.14), None]);
1712 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1713 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1714 }
1715
1716 #[test]
1717 fn test_arrow_to_value_bool() {
1718 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1719 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1720 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1721 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1722 }
1723
1724 #[test]
1725 fn test_values_to_array_int64() {
1726 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1727 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1728 assert_eq!(arr.len(), 4);
1729
1730 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1731 assert_eq!(int_arr.value(0), 1);
1732 assert_eq!(int_arr.value(1), 2);
1733 assert!(int_arr.is_null(2));
1734 assert_eq!(int_arr.value(3), 4);
1735 }
1736
1737 #[test]
1738 fn test_values_to_array_string() {
1739 let values = vec![
1740 Value::String("a".to_string()),
1741 Value::String("b".to_string()),
1742 Value::Null,
1743 ];
1744 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1745 assert_eq!(arr.len(), 3);
1746
1747 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1748 assert_eq!(str_arr.value(0), "a");
1749 assert_eq!(str_arr.value(1), "b");
1750 assert!(str_arr.is_null(2));
1751 }
1752
1753 #[test]
1754 fn test_property_extractor_string() {
1755 let props: Vec<HashMap<String, Value>> = vec![
1756 [("name".to_string(), Value::String("Alice".to_string()))]
1757 .into_iter()
1758 .collect(),
1759 [("name".to_string(), Value::String("Bob".to_string()))]
1760 .into_iter()
1761 .collect(),
1762 HashMap::new(),
1763 ];
1764 let deleted = vec![false, false, true];
1765
1766 let extractor = PropertyExtractor::new("name", &DataType::String);
1767 let arr = extractor
1768 .build_column(3, &deleted, |i| props[i].get("name"))
1769 .unwrap();
1770
1771 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1772 assert_eq!(str_arr.value(0), "Alice");
1773 assert_eq!(str_arr.value(1), "Bob");
1774 assert_eq!(str_arr.value(2), ""); }
1776
1777 #[test]
1778 fn test_property_extractor_int64() {
1779 let props: Vec<HashMap<String, Value>> = vec![
1780 [("age".to_string(), Value::Int(25))].into_iter().collect(),
1781 [("age".to_string(), Value::Int(30))].into_iter().collect(),
1782 HashMap::new(),
1783 ];
1784 let deleted = vec![false, false, true];
1785
1786 let extractor = PropertyExtractor::new("age", &DataType::Int64);
1787 let arr = extractor
1788 .build_column(3, &deleted, |i| props[i].get("age"))
1789 .unwrap();
1790
1791 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1792 assert_eq!(int_arr.value(0), 25);
1793 assert_eq!(int_arr.value(1), 30);
1794 assert_eq!(int_arr.value(2), 0); }
1796
1797 #[test]
1798 fn test_arrow_to_value_time64() {
1799 let mut builder = Time64MicrosecondBuilder::new();
1801 builder.append_value(37_845_000_000);
1803 builder.append_value(0);
1805 builder.append_value(86_399_123_456);
1807 builder.append_null();
1808
1809 let arr = builder.finish();
1810 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1812 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1813 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1814 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1815 }
1816
1817 #[test]
1818 fn test_arrow_to_value_duration() {
1819 let arr = DurationMicrosecondArray::from(vec![
1822 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
1826 ]);
1827
1828 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1829 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1830 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1831 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1832 }
1833
1834 #[test]
1835 fn test_arrow_to_value_binary_crdt() {
1836 let mut builder = BinaryBuilder::new();
1838
1839 let mut counter = GCounter::new();
1841 counter.increment("actor1", 5);
1842 let crdt = Crdt::GCounter(counter);
1843 let bytes = crdt.to_msgpack().unwrap();
1844 builder.append_value(&bytes);
1845
1846 builder.append_null();
1848
1849 let arr = builder.finish();
1850
1851 let result = arrow_to_value(&arr, 0, None);
1853 assert!(result.as_object().is_some());
1854 let obj = result.as_object().unwrap();
1855 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1857
1858 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1860 }
1861
1862 #[test]
1863 fn test_datetime_struct_encode_decode_roundtrip() {
1864 let values = vec![
1866 Value::Temporal(TemporalValue::DateTime {
1867 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
1870 }),
1871 Value::Temporal(TemporalValue::DateTime {
1872 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
1875 }),
1876 Value::Temporal(TemporalValue::DateTime {
1877 nanos_since_epoch: 0, offset_seconds: 0,
1879 timezone_name: Some("UTC".to_string()),
1880 }),
1881 ];
1882
1883 let arr_ref = values_to_datetime_struct_array(&values);
1885 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1886 assert_eq!(arr.len(), 3);
1887
1888 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1890 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1891 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1892
1893 assert_eq!(decoded_0, values[0]);
1895 assert_eq!(decoded_1, values[1]);
1896 assert_eq!(decoded_2, values[2]);
1897
1898 if let Value::Temporal(TemporalValue::DateTime {
1900 nanos_since_epoch,
1901 offset_seconds,
1902 timezone_name,
1903 }) = decoded_0
1904 {
1905 assert_eq!(nanos_since_epoch, 441763200000000000);
1906 assert_eq!(offset_seconds, 3600);
1907 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1908 } else {
1909 panic!("Expected DateTime value");
1910 }
1911 }
1912
1913 #[test]
1914 fn test_datetime_struct_null_handling() {
1915 let values = vec![
1917 Value::Temporal(TemporalValue::DateTime {
1918 nanos_since_epoch: 441763200000000000,
1919 offset_seconds: 3600,
1920 timezone_name: Some("Europe/Paris".to_string()),
1921 }),
1922 Value::Null,
1923 Value::Temporal(TemporalValue::DateTime {
1924 nanos_since_epoch: 0,
1925 offset_seconds: 0,
1926 timezone_name: None,
1927 }),
1928 ];
1929
1930 let arr_ref = values_to_datetime_struct_array(&values);
1931 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1932 assert_eq!(arr.len(), 3);
1933
1934 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1936 assert_eq!(decoded_0, values[0]);
1937
1938 assert!(arr.is_null(1));
1940 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1941 assert_eq!(decoded_1, Value::Null);
1942
1943 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1945 assert_eq!(decoded_2, values[2]);
1946 }
1947
1948 #[test]
1949 fn test_datetime_struct_boundary_values() {
1950 let values = vec![
1952 Value::Temporal(TemporalValue::DateTime {
1953 nanos_since_epoch: 441763200000000000,
1954 offset_seconds: 0, timezone_name: None,
1956 }),
1957 Value::Temporal(TemporalValue::DateTime {
1958 nanos_since_epoch: 441763200000000000,
1959 offset_seconds: 43200, timezone_name: None,
1961 }),
1962 Value::Temporal(TemporalValue::DateTime {
1963 nanos_since_epoch: 441763200000000000,
1964 offset_seconds: -43200, timezone_name: None,
1966 }),
1967 ];
1968
1969 let arr_ref = values_to_datetime_struct_array(&values);
1970 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1971 assert_eq!(arr.len(), 3);
1972
1973 for (i, expected) in values.iter().enumerate() {
1975 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
1976 assert_eq!(&decoded, expected);
1977 }
1978 }
1979
1980 #[test]
1981 fn test_datetime_old_schema_migration() {
1982 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
1984 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
1987
1988 let arr = builder.finish();
1989
1990 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
1992 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
1993 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
1994
1995 if let Value::Temporal(TemporalValue::DateTime {
1997 nanos_since_epoch,
1998 offset_seconds,
1999 timezone_name,
2000 }) = decoded_0
2001 {
2002 assert_eq!(nanos_since_epoch, 441763200000000000);
2003 assert_eq!(offset_seconds, 0);
2004 assert_eq!(timezone_name, Some("UTC".to_string()));
2005 } else {
2006 panic!("Expected DateTime value");
2007 }
2008
2009 assert_eq!(decoded_2, Value::Null);
2011 }
2012
2013 #[test]
2014 fn test_time_struct_encode_decode_roundtrip() {
2015 let values = vec![
2017 Value::Temporal(TemporalValue::Time {
2018 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
2021 Value::Temporal(TemporalValue::Time {
2022 nanos_since_midnight: 0, offset_seconds: 0,
2024 }),
2025 Value::Temporal(TemporalValue::Time {
2026 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2029 ];
2030
2031 let arr_ref = values_to_time_struct_array(&values);
2033 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2034 assert_eq!(arr.len(), 3);
2035
2036 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2038 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2039 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2040
2041 assert_eq!(decoded_0, values[0]);
2043 assert_eq!(decoded_1, values[1]);
2044 assert_eq!(decoded_2, values[2]);
2045
2046 if let Value::Temporal(TemporalValue::Time {
2048 nanos_since_midnight,
2049 offset_seconds,
2050 }) = decoded_0
2051 {
2052 assert_eq!(nanos_since_midnight, 37845000000000);
2053 assert_eq!(offset_seconds, 3600);
2054 } else {
2055 panic!("Expected Time value");
2056 }
2057 }
2058
2059 #[test]
2060 fn test_time_struct_null_handling() {
2061 let values = vec![
2063 Value::Temporal(TemporalValue::Time {
2064 nanos_since_midnight: 37845000000000,
2065 offset_seconds: 3600,
2066 }),
2067 Value::Null,
2068 Value::Temporal(TemporalValue::Time {
2069 nanos_since_midnight: 0,
2070 offset_seconds: 0,
2071 }),
2072 ];
2073
2074 let arr_ref = values_to_time_struct_array(&values);
2075 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2076 assert_eq!(arr.len(), 3);
2077
2078 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2080 assert_eq!(decoded_0, values[0]);
2081
2082 assert!(arr.is_null(1));
2084 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2085 assert_eq!(decoded_1, Value::Null);
2086
2087 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2089 assert_eq!(decoded_2, values[2]);
2090 }
2091
2092 #[test]
2095 fn test_extract_vector_f32_values_valid_vector() {
2096 let v = vec![1.0, 2.0, 3.0];
2097 let val = Value::Vector(v.clone());
2098 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2099 assert_eq!(result, v);
2100 assert!(valid);
2101 }
2102
2103 #[test]
2104 fn test_extract_vector_f32_values_vector_wrong_dims() {
2105 let v = vec![1.0, 2.0];
2106 let val = Value::Vector(v);
2107 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2108 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2109 assert!(!valid);
2110 }
2111
2112 #[test]
2113 fn test_extract_vector_f32_values_valid_list() {
2114 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2115 let val = Value::List(v);
2116 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2117 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2118 assert!(valid);
2119 }
2120
2121 #[test]
2122 fn test_extract_vector_f32_values_list_wrong_dims() {
2123 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2124 let val = Value::List(v);
2125 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2126 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2127 assert!(!valid);
2128 }
2129
2130 #[test]
2131 fn test_extract_vector_f32_values_list_int_coercion() {
2132 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2133 let val = Value::List(v);
2134 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2135 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2136 assert!(valid);
2137 }
2138
2139 #[test]
2140 fn test_extract_vector_f32_values_none() {
2141 let (result, valid) = extract_vector_f32_values(None, false, 3);
2142 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2143 assert!(!valid);
2144 }
2145
2146 #[test]
2147 fn test_extract_vector_f32_values_null() {
2148 let val = Value::Null;
2149 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2150 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2151 assert!(!valid);
2152 }
2153
2154 #[test]
2155 fn test_extract_vector_f32_values_unsupported_type() {
2156 let val = Value::String("not a vector".to_string());
2157 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2158 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2159 assert!(!valid);
2160 }
2161
2162 #[test]
2163 fn test_extract_vector_f32_values_deleted_with_none() {
2164 let (result, valid) = extract_vector_f32_values(None, true, 3);
2165 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2166 assert!(valid); }
2168
2169 #[test]
2170 fn test_extract_vector_f32_values_deleted_with_null() {
2171 let val = Value::Null;
2172 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2173 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2174 assert!(valid); }
2176
2177 #[test]
2180 fn test_values_to_fixed_size_list_vector_with_nulls() {
2181 let values = vec![
2182 Value::Vector(vec![1.0, 2.0]),
2183 Value::Null,
2184 Value::Vector(vec![3.0, 4.0]),
2185 Value::String("invalid".to_string()),
2186 ];
2187 let arr_ref = values_to_array(
2188 &values,
2189 &ArrowDataType::FixedSizeList(
2190 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2191 2,
2192 ),
2193 )
2194 .unwrap();
2195
2196 let arr = arr_ref
2197 .as_any()
2198 .downcast_ref::<FixedSizeListArray>()
2199 .unwrap();
2200
2201 assert_eq!(arr.len(), 4);
2202 assert!(arr.is_valid(0));
2203 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2205 assert!(!arr.is_valid(3)); }
2207
2208 #[test]
2209 fn test_values_to_fixed_size_list_from_list() {
2210 let values = vec![
2211 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2212 Value::List(vec![Value::Int(3), Value::Int(4)]),
2213 ];
2214 let arr_ref = values_to_array(
2215 &values,
2216 &ArrowDataType::FixedSizeList(
2217 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2218 2,
2219 ),
2220 )
2221 .unwrap();
2222
2223 let arr = arr_ref
2224 .as_any()
2225 .downcast_ref::<FixedSizeListArray>()
2226 .unwrap();
2227
2228 assert_eq!(arr.len(), 2);
2229 assert!(arr.is_valid(0));
2230 assert!(arr.is_valid(1));
2231
2232 let child = arr
2234 .values()
2235 .as_any()
2236 .downcast_ref::<Float32Array>()
2237 .unwrap();
2238 assert_eq!(child.value(0), 1.0);
2239 assert_eq!(child.value(1), 2.0);
2240 assert_eq!(child.value(2), 3.0);
2241 assert_eq!(child.value(3), 4.0);
2242 }
2243
2244 #[test]
2245 fn test_values_to_fixed_size_list_wrong_dimensions() {
2246 let values = vec![
2247 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2250 let arr_ref = values_to_array(
2251 &values,
2252 &ArrowDataType::FixedSizeList(
2253 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2254 2,
2255 ),
2256 )
2257 .unwrap();
2258
2259 let arr = arr_ref
2260 .as_any()
2261 .downcast_ref::<FixedSizeListArray>()
2262 .unwrap();
2263
2264 assert_eq!(arr.len(), 2);
2265 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2270 .values()
2271 .as_any()
2272 .downcast_ref::<Float32Array>()
2273 .unwrap();
2274 assert_eq!(child.value(0), 0.0);
2275 assert_eq!(child.value(1), 0.0);
2276 assert_eq!(child.value(2), 0.0);
2277 assert_eq!(child.value(3), 0.0);
2278 }
2279
2280 #[test]
2281 fn test_values_to_fixed_size_list_all_nulls() {
2282 let values = vec![Value::Null, Value::Null, Value::Null];
2283 let arr_ref = values_to_array(
2284 &values,
2285 &ArrowDataType::FixedSizeList(
2286 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2287 3,
2288 ),
2289 )
2290 .unwrap();
2291
2292 let arr = arr_ref
2293 .as_any()
2294 .downcast_ref::<FixedSizeListArray>()
2295 .unwrap();
2296
2297 assert_eq!(arr.len(), 3);
2298 assert!(!arr.is_valid(0));
2299 assert!(!arr.is_valid(1));
2300 assert!(!arr.is_valid(2));
2301
2302 let child = arr
2304 .values()
2305 .as_any()
2306 .downcast_ref::<Float32Array>()
2307 .unwrap();
2308 assert_eq!(child.len(), 9);
2309 }
2310
2311 #[test]
2312 fn test_values_to_fixed_size_list_mixed_types() {
2313 let values = vec![
2314 Value::Vector(vec![1.0, 2.0]),
2315 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2316 Value::Null,
2317 Value::String("invalid".to_string()),
2318 ];
2319 let arr_ref = values_to_array(
2320 &values,
2321 &ArrowDataType::FixedSizeList(
2322 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2323 2,
2324 ),
2325 )
2326 .unwrap();
2327
2328 let arr = arr_ref
2329 .as_any()
2330 .downcast_ref::<FixedSizeListArray>()
2331 .unwrap();
2332
2333 assert_eq!(arr.len(), 4);
2334 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2341 .values()
2342 .as_any()
2343 .downcast_ref::<Float32Array>()
2344 .unwrap();
2345 assert_eq!(child.value(0), 1.0);
2346 assert_eq!(child.value(1), 2.0);
2347 assert_eq!(child.value(2), 3.0);
2348 assert_eq!(child.value(3), 4.0);
2349 }
2350
2351 #[test]
2354 fn test_build_vector_column_with_nulls_and_deleted() {
2355 let data_type = DataType::Vector { dimensions: 3 };
2356 let extractor = PropertyExtractor::new("test_vec", &data_type);
2357
2358 let props = [
2359 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2360 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2363 ];
2364 let deleted = [false, false, false, true]; let arr_ref = extractor
2367 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2368 .unwrap();
2369
2370 let arr = arr_ref
2371 .as_any()
2372 .downcast_ref::<FixedSizeListArray>()
2373 .unwrap();
2374
2375 assert_eq!(arr.len(), 4);
2376 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2383 .values()
2384 .as_any()
2385 .downcast_ref::<Float32Array>()
2386 .unwrap();
2387 assert_eq!(child.value(0), 1.0);
2388 assert_eq!(child.value(1), 2.0);
2389 assert_eq!(child.value(2), 3.0);
2390 assert_eq!(child.value(9), 0.0);
2394 assert_eq!(child.value(10), 0.0);
2395 assert_eq!(child.value(11), 0.0);
2396 }
2397
2398 #[test]
2399 fn test_build_vector_column_with_list_input() {
2400 let data_type = DataType::Vector { dimensions: 2 };
2401 let extractor = PropertyExtractor::new("test_vec", &data_type);
2402
2403 let props = [
2404 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2405 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2406 Some(Value::Vector(vec![5.0, 6.0])),
2407 ];
2408 let deleted = [false, false, false];
2409
2410 let arr_ref = extractor
2411 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2412 .unwrap();
2413
2414 let arr = arr_ref
2415 .as_any()
2416 .downcast_ref::<FixedSizeListArray>()
2417 .unwrap();
2418
2419 assert_eq!(arr.len(), 3);
2420 assert!(arr.is_valid(0));
2421 assert!(arr.is_valid(1));
2422 assert!(arr.is_valid(2));
2423
2424 let child = arr
2426 .values()
2427 .as_any()
2428 .downcast_ref::<Float32Array>()
2429 .unwrap();
2430 assert_eq!(child.value(0), 1.0);
2431 assert_eq!(child.value(1), 2.0);
2432 assert_eq!(child.value(2), 3.0);
2433 assert_eq!(child.value(3), 4.0);
2434 assert_eq!(child.value(4), 5.0);
2435 assert_eq!(child.value(5), 6.0);
2436 }
2437}