1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array,
20 Float64Array, Int32Array, Int64Array, IntervalMonthDayNanoArray, LargeBinaryArray, ListArray,
21 StringArray, StructArray, Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
22};
23use arrow_schema::{DataType as ArrowDataType, Field};
24use std::collections::HashMap;
25use std::sync::Arc;
26use uni_common::DataType;
27use uni_common::Value;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema;
30use uni_crdt::Crdt;
31
32fn build_timestamp_column_from_id_map<K, I>(
37 ids: I,
38 timestamps: Option<&HashMap<K, i64>>,
39) -> ArrayRef
40where
41 K: Eq + std::hash::Hash,
42 I: IntoIterator<Item = K>,
43{
44 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
45 for id in ids {
46 match timestamps.and_then(|m| m.get(&id)) {
47 Some(&ts) => builder.append_value(ts),
48 None => builder.append_null(),
49 }
50 }
51 Arc::new(builder.finish())
52}
53
54pub fn build_timestamp_column_from_vid_map<I>(
55 ids: I,
56 timestamps: Option<&HashMap<Vid, i64>>,
57) -> ArrayRef
58where
59 I: IntoIterator<Item = Vid>,
60{
61 build_timestamp_column_from_id_map(ids, timestamps)
62}
63
64pub fn build_timestamp_column_from_eid_map<I>(
65 ids: I,
66 timestamps: Option<&HashMap<Eid, i64>>,
67) -> ArrayRef
68where
69 I: IntoIterator<Item = Eid>,
70{
71 build_timestamp_column_from_id_map(ids, timestamps)
72}
73
74pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
78where
79 I: IntoIterator<Item = Option<i64>>,
80{
81 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
82 for ts in timestamps {
83 builder.append_option(ts);
84 }
85 Arc::new(builder.finish())
86}
87
88pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
94 if list_arr.is_null(row) {
95 return Vec::new();
96 }
97 let values = list_arr.value(row);
98 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
99 return Vec::new();
100 };
101 (0..str_arr.len())
102 .filter(|&j| !str_arr.is_null(j))
103 .map(|j| str_arr.value(j).to_string())
104 .collect()
105}
106
107fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
112 chrono::DateTime::parse_from_rfc3339(s)
113 .map(|dt| {
114 dt.with_timezone(&chrono::Utc)
115 .timestamp_nanos_opt()
116 .unwrap_or(0)
117 })
118 .or_else(|_| {
119 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
120 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
121 })
122 .or_else(|_| {
123 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
124 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
125 })
126 .or_else(|_| {
127 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
128 dt.with_timezone(&chrono::Utc)
129 .timestamp_nanos_opt()
130 .unwrap_or(0)
131 })
132 })
133 .ok()
134 .or_else(|| {
135 s.strip_suffix('Z')
136 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
137 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
138 })
139}
140
141fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
147 let structs = arr.as_any().downcast_ref::<StructArray>()?;
148 let fields = structs.fields();
149 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
150 return None;
151 }
152 let key_col = structs.column(0);
153 let val_col = structs.column(1);
154 let mut map = HashMap::new();
155 for i in 0..structs.len() {
156 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
157 map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
158 }
159 }
160 Some(map)
161}
162
163fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
165 (0..arr.len())
166 .map(|i| arrow_to_value(arr.as_ref(), i, None))
167 .collect()
168}
169
170pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
177 if col.is_null(row) {
178 return Value::Null;
179 }
180
181 if let Some(dt) = data_type {
183 match dt {
184 DataType::DateTime => {
185 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
187 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
188 struct_arr.column_by_name("nanos_since_epoch"),
189 struct_arr.column_by_name("offset_seconds"),
190 struct_arr.column_by_name("timezone_name"),
191 )
192 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
193 nanos_col
194 .as_any()
195 .downcast_ref::<TimestampNanosecondArray>(),
196 offset_col.as_any().downcast_ref::<Int32Array>(),
197 tz_col.as_any().downcast_ref::<StringArray>(),
198 )
199 {
200 if nanos_arr.is_null(row) {
201 return Value::Null;
202 }
203 let nanos = nanos_arr.value(row);
204 if offset_arr.is_null(row) {
205 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
207 nanos_since_epoch: nanos,
208 });
209 }
210 let offset = offset_arr.value(row);
211 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
212 return Value::Temporal(uni_common::TemporalValue::DateTime {
213 nanos_since_epoch: nanos,
214 offset_seconds: offset,
215 timezone_name: tz_name,
216 });
217 }
218 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
220 let nanos = ts.value(row);
221 let tz_name = ts.timezone().map(|s| s.to_string());
222 return Value::Temporal(uni_common::TemporalValue::DateTime {
223 nanos_since_epoch: nanos,
224 offset_seconds: 0,
225 timezone_name: tz_name,
226 });
227 }
228 }
229 DataType::Time => {
230 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
232 && let (Some(nanos_col), Some(offset_col)) = (
233 struct_arr.column_by_name("nanos_since_midnight"),
234 struct_arr.column_by_name("offset_seconds"),
235 )
236 && let (Some(nanos_arr), Some(offset_arr)) = (
237 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
238 offset_col.as_any().downcast_ref::<Int32Array>(),
239 )
240 {
241 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
243 return Value::Null;
244 }
245 let nanos = nanos_arr.value(row);
246 let offset = offset_arr.value(row);
247 return Value::Temporal(uni_common::TemporalValue::Time {
248 nanos_since_midnight: nanos,
249 offset_seconds: offset,
250 });
251 }
252 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
254 let nanos = t.value(row);
255 return Value::Temporal(uni_common::TemporalValue::Time {
256 nanos_since_midnight: nanos,
257 offset_seconds: 0,
258 });
259 }
260 }
261 _ => {}
262 }
263 }
264
265 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
267 return Value::String(s.value(row).to_string());
268 }
269
270 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
272 return Value::Int(u.value(row) as i64);
273 }
274 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
275 return Value::Int(i.value(row));
276 }
277 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
278 return Value::Int(i.value(row) as i64);
279 }
280
281 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
283 return Value::Float(f.value(row));
284 }
285 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
286 return Value::Float(f.value(row) as f64);
287 }
288
289 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
291 return Value::Bool(b.value(row));
292 }
293
294 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
296 return Value::List(array_to_value_list(&list.value(row)));
297 }
298
299 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
301 let arr = list.value(row);
302
303 if let Some(obj) = try_reconstruct_map(&arr) {
305 return Value::Map(obj);
306 }
307
308 return Value::List(array_to_value_list(&arr));
309 }
310
311 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
313 return Value::List(array_to_value_list(&list.value(row)));
314 }
315
316 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
318 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
319
320 if field_names.contains(&"nanos_since_epoch")
322 && field_names.contains(&"offset_seconds")
323 && field_names.contains(&"timezone_name")
324 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
325 s.column_by_name("nanos_since_epoch"),
326 s.column_by_name("offset_seconds"),
327 s.column_by_name("timezone_name"),
328 )
329 {
330 let nanos_opt = nanos_col
332 .as_any()
333 .downcast_ref::<TimestampNanosecondArray>()
334 .map(|a| {
335 if a.is_null(row) {
336 None
337 } else {
338 Some(a.value(row))
339 }
340 })
341 .or_else(|| {
342 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
343 if a.is_null(row) {
344 None
345 } else {
346 Some(a.value(row))
347 }
348 })
349 });
350 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
351 if a.is_null(row) {
352 None
353 } else {
354 Some(a.value(row))
355 }
356 });
357
358 if let Some(Some(nanos)) = nanos_opt {
359 match offset_opt {
360 Some(Some(offset)) => {
361 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
362 if a.is_null(row) {
363 None
364 } else {
365 Some(a.value(row).to_string())
366 }
367 });
368 return Value::Temporal(uni_common::TemporalValue::DateTime {
369 nanos_since_epoch: nanos,
370 offset_seconds: offset,
371 timezone_name: tz_name,
372 });
373 }
374 _ => {
375 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
377 nanos_since_epoch: nanos,
378 });
379 }
380 }
381 }
382 }
383
384 if field_names.contains(&"nanos_since_midnight")
386 && field_names.contains(&"offset_seconds")
387 && let (Some(nanos_col), Some(offset_col)) = (
388 s.column_by_name("nanos_since_midnight"),
389 s.column_by_name("offset_seconds"),
390 )
391 {
392 let nanos_opt = nanos_col
394 .as_any()
395 .downcast_ref::<Time64NanosecondArray>()
396 .map(|a| {
397 if a.is_null(row) {
398 None
399 } else {
400 Some(a.value(row))
401 }
402 })
403 .or_else(|| {
404 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
405 if a.is_null(row) {
406 None
407 } else {
408 Some(a.value(row))
409 }
410 })
411 });
412 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
413 if a.is_null(row) {
414 None
415 } else {
416 Some(a.value(row))
417 }
418 });
419
420 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
421 return Value::Temporal(uni_common::TemporalValue::Time {
422 nanos_since_midnight: nanos,
423 offset_seconds: offset,
424 });
425 }
426 }
427
428 let mut map = HashMap::new();
430 for (field, child) in s.fields().iter().zip(s.columns()) {
431 map.insert(
432 field.name().clone(),
433 arrow_to_value(child.as_ref(), row, None),
434 );
435 }
436 return Value::Map(map);
437 }
438
439 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
441 let days = d.value(row);
442 return Value::Temporal(uni_common::TemporalValue::Date {
443 days_since_epoch: days,
444 });
445 }
446
447 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
449 let nanos = ts.value(row);
450 return match ts.timezone() {
451 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
452 nanos_since_epoch: nanos,
453 offset_seconds: 0,
454 timezone_name: Some(tz.to_string()),
455 }),
456 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
457 nanos_since_epoch: nanos,
458 }),
459 };
460 }
461
462 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
464 let nanos = t.value(row);
465 return Value::Temporal(uni_common::TemporalValue::LocalTime {
466 nanos_since_midnight: nanos,
467 });
468 }
469
470 if let Some(t) = col
472 .as_any()
473 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
474 {
475 let micros = t.value(row);
476 return Value::Temporal(uni_common::TemporalValue::LocalTime {
477 nanos_since_midnight: micros * 1000,
478 });
479 }
480
481 if let Some(d) = col
483 .as_any()
484 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
485 {
486 let micros = d.value(row);
487 let total_nanos = micros * 1000;
488 let seconds = total_nanos / 1_000_000_000;
489 let remaining_nanos = total_nanos % 1_000_000_000;
490 return Value::Temporal(uni_common::TemporalValue::Duration {
491 months: 0,
492 days: 0,
493 nanos: seconds * 1_000_000_000 + remaining_nanos,
494 });
495 }
496
497 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
499 let val = interval.value(row);
500 return Value::Temporal(uni_common::TemporalValue::Duration {
501 months: val.months as i64,
502 days: val.days as i64,
503 nanos: val.nanoseconds,
504 });
505 }
506
507 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
509 let bytes = b.value(row);
510 if bytes.is_empty() {
511 return Value::Null;
512 }
513 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
514 eprintln!("CypherValue decode error: {}", e);
515 Value::Null
516 });
517 }
518
519 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
521 let bytes = b.value(row);
522 return Crdt::from_msgpack(bytes)
523 .ok()
524 .and_then(|crdt| serde_json::to_value(&crdt).ok())
525 .map(Value::from)
526 .unwrap_or(Value::Null);
527 }
528
529 Value::Null
531}
532
533fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
534 let mut builder = UInt64Builder::with_capacity(values.len());
535 for v in values {
536 if let Some(n) = v.as_u64() {
537 builder.append_value(n);
538 } else {
539 builder.append_null();
540 }
541 }
542 Arc::new(builder.finish())
543}
544
545fn values_to_int64_array(values: &[Value]) -> ArrayRef {
546 let mut builder = Int64Builder::with_capacity(values.len());
547 for v in values {
548 if let Some(n) = v.as_i64() {
549 builder.append_value(n);
550 } else {
551 builder.append_null();
552 }
553 }
554 Arc::new(builder.finish())
555}
556
557fn values_to_int32_array(values: &[Value]) -> ArrayRef {
558 let mut builder = Int32Builder::with_capacity(values.len());
559 for v in values {
560 if let Some(n) = v.as_i64() {
561 builder.append_value(n as i32);
562 } else {
563 builder.append_null();
564 }
565 }
566 Arc::new(builder.finish())
567}
568
569fn values_to_string_array(values: &[Value]) -> ArrayRef {
570 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
571 for v in values {
572 if let Some(s) = v.as_str() {
573 builder.append_value(s);
574 } else if v.is_null() {
575 builder.append_null();
576 } else {
577 builder.append_value(v.to_string());
578 }
579 }
580 Arc::new(builder.finish())
581}
582
583fn values_to_bool_array(values: &[Value]) -> ArrayRef {
584 let mut builder = BooleanBuilder::with_capacity(values.len());
585 for v in values {
586 if let Some(b) = v.as_bool() {
587 builder.append_value(b);
588 } else {
589 builder.append_null();
590 }
591 }
592 Arc::new(builder.finish())
593}
594
595fn values_to_float32_array(values: &[Value]) -> ArrayRef {
596 let mut builder = Float32Builder::with_capacity(values.len());
597 for v in values {
598 if let Some(n) = v.as_f64() {
599 builder.append_value(n as f32);
600 } else {
601 builder.append_null();
602 }
603 }
604 Arc::new(builder.finish())
605}
606
607fn values_to_float64_array(values: &[Value]) -> ArrayRef {
608 let mut builder = Float64Builder::with_capacity(values.len());
609 for v in values {
610 if let Some(n) = v.as_f64() {
611 builder.append_value(n);
612 } else {
613 builder.append_null();
614 }
615 }
616 Arc::new(builder.finish())
617}
618
619fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
620 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
621 for v in values {
622 if let Value::List(bytes) = v {
623 let b: Vec<u8> = bytes
624 .iter()
625 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
626 .collect();
627 if b.len() as i32 == size {
628 builder.append_value(&b)?;
629 } else {
630 builder.append_null();
631 }
632 } else {
633 builder.append_null();
634 }
635 }
636 Ok(Arc::new(builder.finish()))
637}
638
639pub fn extract_vector_f32_values(
654 val: Option<&Value>,
655 is_deleted: bool,
656 dimensions: usize,
657) -> (Vec<f32>, bool) {
658 let zeros = || vec![0.0_f32; dimensions];
659
660 if is_deleted {
662 return (zeros(), true);
663 }
664
665 match val {
666 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
668 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
671 let values: Vec<f32> = arr
672 .iter()
673 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
674 .collect();
675 (values, true)
676 }
677 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
680}
681
682fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
683 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
684 for v in values {
685 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
686 for val in vals {
687 builder.values().append_value(val);
688 }
689 builder.append(valid);
690 }
691 Arc::new(builder.finish())
692}
693
694fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
695 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
696 for v in values {
697 if v.is_null() {
698 builder.append_null();
699 } else if let Value::Temporal(tv) = v {
700 match tv {
701 uni_common::TemporalValue::DateTime {
702 nanos_since_epoch, ..
703 }
704 | uni_common::TemporalValue::LocalDateTime {
705 nanos_since_epoch, ..
706 } => builder.append_value(*nanos_since_epoch),
707 _ => builder.append_null(),
708 }
709 } else if let Some(n) = v.as_i64() {
710 builder.append_value(n);
711 } else if let Some(s) = v.as_str() {
712 match parse_datetime_to_nanos(s) {
713 Some(nanos) => builder.append_value(nanos),
714 None => builder.append_null(),
715 }
716 } else {
717 builder.append_null();
718 }
719 }
720
721 let arr = builder.finish();
722 if let Some(tz) = tz {
723 Arc::new(arr.with_timezone(tz.as_ref()))
724 } else {
725 Arc::new(arr)
726 }
727}
728
729fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
734 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
735 let mut offset_builder = Int32Builder::with_capacity(values.len());
736 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
737 let mut null_buffer = BooleanBufferBuilder::new(values.len());
738
739 for v in values {
740 match v {
741 Value::Temporal(uni_common::TemporalValue::DateTime {
742 nanos_since_epoch,
743 offset_seconds,
744 timezone_name,
745 }) => {
746 nanos_builder.append_value(*nanos_since_epoch);
747 offset_builder.append_value(*offset_seconds);
748 tz_builder.append_option(timezone_name.as_deref());
749 null_buffer.append(true);
750 }
751 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
752 nanos_builder.append_value(*nanos_since_epoch);
753 offset_builder.append_null();
754 tz_builder.append_null();
755 null_buffer.append(true);
756 }
757 _ => {
758 nanos_builder.append_null();
759 offset_builder.append_null();
760 tz_builder.append_null();
761 null_buffer.append(false);
762 }
763 }
764 }
765
766 let struct_arr = StructArray::new(
767 schema::datetime_struct_fields(),
768 vec![
769 Arc::new(nanos_builder.finish()) as ArrayRef,
770 Arc::new(offset_builder.finish()) as ArrayRef,
771 Arc::new(tz_builder.finish()) as ArrayRef,
772 ],
773 Some(null_buffer.finish().into()),
774 );
775 Arc::new(struct_arr)
776}
777
778fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
783 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
784 let mut offset_builder = Int32Builder::with_capacity(values.len());
785 let mut null_buffer = BooleanBufferBuilder::new(values.len());
786
787 for v in values {
788 match v {
789 Value::Temporal(uni_common::TemporalValue::Time {
790 nanos_since_midnight,
791 offset_seconds,
792 }) => {
793 nanos_builder.append_value(*nanos_since_midnight);
794 offset_builder.append_value(*offset_seconds);
795 null_buffer.append(true);
796 }
797 Value::Temporal(uni_common::TemporalValue::LocalTime {
798 nanos_since_midnight,
799 }) => {
800 nanos_builder.append_value(*nanos_since_midnight);
801 offset_builder.append_null();
802 null_buffer.append(true);
803 }
804 _ => {
805 nanos_builder.append_null();
806 offset_builder.append_null();
807 null_buffer.append(false);
808 }
809 }
810 }
811
812 let struct_arr = StructArray::new(
813 schema::time_struct_fields(),
814 vec![
815 Arc::new(nanos_builder.finish()) as ArrayRef,
816 Arc::new(offset_builder.finish()) as ArrayRef,
817 ],
818 Some(null_buffer.finish().into()),
819 );
820 Arc::new(struct_arr)
821}
822
823fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
824 let mut builder =
825 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
826 for v in values {
827 if v.is_null() {
828 builder.append_null();
829 } else {
830 let cv_bytes = uni_common::cypher_value_codec::encode(v);
832 builder.append_value(&cv_bytes);
833 }
834 }
835 Arc::new(builder.finish())
836}
837
838pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
840 match dt {
841 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
842 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
843 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
844 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
845 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
846 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
847 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
848 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
849 ArrowDataType::FixedSizeList(inner, size) => {
850 if inner.data_type() == &ArrowDataType::Float32 {
851 Ok(values_to_fixed_size_list_f32_array(values, *size))
852 } else {
853 Err(anyhow!("Unsupported FixedSizeList inner type"))
854 }
855 }
856 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
857 Ok(values_to_timestamp_array(values, tz.as_ref()))
858 }
859 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
860 Ok(values_to_timestamp_array(values, tz.as_ref()))
861 }
862 ArrowDataType::Date32 => {
863 let mut builder = Date32Builder::with_capacity(values.len());
864 for v in values {
865 if v.is_null() {
866 builder.append_null();
867 } else if let Value::Temporal(uni_common::TemporalValue::Date {
868 days_since_epoch,
869 }) = v
870 {
871 builder.append_value(*days_since_epoch);
872 } else if let Some(n) = v.as_i64() {
873 builder.append_value(n as i32);
874 } else {
875 builder.append_null();
876 }
877 }
878 Ok(Arc::new(builder.finish()))
879 }
880 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
881 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
882 for v in values {
883 if v.is_null() {
884 builder.append_null();
885 } else if let Value::Temporal(tv) = v {
886 match tv {
887 uni_common::TemporalValue::LocalTime {
888 nanos_since_midnight,
889 }
890 | uni_common::TemporalValue::Time {
891 nanos_since_midnight,
892 ..
893 } => builder.append_value(*nanos_since_midnight),
894 _ => builder.append_null(),
895 }
896 } else if let Some(n) = v.as_i64() {
897 builder.append_value(n);
898 } else {
899 builder.append_null();
900 }
901 }
902 Ok(Arc::new(builder.finish()))
903 }
904 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
905 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
906 for v in values {
907 if v.is_null() {
908 builder.append_null();
909 } else if let Value::Temporal(tv) = v {
910 match tv {
911 uni_common::TemporalValue::LocalTime {
912 nanos_since_midnight,
913 }
914 | uni_common::TemporalValue::Time {
915 nanos_since_midnight,
916 ..
917 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
919 }
920 } else if let Some(n) = v.as_i64() {
921 builder.append_value(n);
922 } else {
923 builder.append_null();
924 }
925 }
926 Ok(Arc::new(builder.finish()))
927 }
928 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
929 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
930 for v in values {
931 if v.is_null() {
932 builder.append_null();
933 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
934 months,
935 days,
936 nanos,
937 }) = v
938 {
939 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
940 months: *months as i32,
941 days: *days as i32,
942 nanoseconds: *nanos,
943 });
944 } else {
945 builder.append_null();
946 }
947 }
948 Ok(Arc::new(builder.finish()))
949 }
950 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
951 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
952 for v in values {
953 if v.is_null() {
954 builder.append_null();
955 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
956 months,
957 days,
958 nanos,
959 }) = v
960 {
961 let total_micros =
962 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
963 builder.append_value(total_micros);
964 } else if let Some(n) = v.as_i64() {
965 builder.append_value(n);
966 } else {
967 builder.append_null();
968 }
969 }
970 Ok(Arc::new(builder.finish()))
971 }
972 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
973 ArrowDataType::List(field) => {
974 if field.data_type() == &ArrowDataType::Utf8 {
975 let mut builder = ListBuilder::new(StringBuilder::new());
976 for v in values {
977 if let Value::List(arr) = v {
978 for item in arr {
979 if let Some(s) = item.as_str() {
980 builder.values().append_value(s);
981 } else {
982 builder.values().append_null();
983 }
984 }
985 builder.append(true);
986 } else {
987 builder.append_null();
988 }
989 }
990 Ok(Arc::new(builder.finish()))
991 } else {
992 Err(anyhow!(
993 "Unsupported List inner type: {:?}",
994 field.data_type()
995 ))
996 }
997 }
998 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
999 Ok(values_to_datetime_struct_array(values))
1000 }
1001 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1002 Ok(values_to_time_struct_array(values))
1003 }
1004 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1005 }
1006}
1007
1008pub struct PropertyExtractor<'a> {
1010 data_type: &'a DataType,
1011}
1012
1013impl<'a> PropertyExtractor<'a> {
1014 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1015 Self { data_type }
1016 }
1017
1018 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1021 where
1022 F: Fn(usize) -> Option<&'a Value>,
1023 {
1024 match self.data_type {
1025 DataType::String => self.build_string_column(len, deleted, get_props),
1026 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1027 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1028 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1029 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1030 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1031 DataType::Vector { dimensions } => {
1032 self.build_vector_column(len, deleted, get_props, *dimensions)
1033 }
1034 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1035 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1036 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1037 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1038 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1039 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1040 DataType::Date => self.build_date32_column(len, deleted, get_props),
1041 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1042 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1043 _ => Err(anyhow!(
1044 "Unsupported data type for arrow conversion: {:?}",
1045 self.data_type
1046 )),
1047 }
1048 }
1049
1050 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1051 where
1052 F: Fn(usize) -> Option<&'a Value>,
1053 {
1054 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1055 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1056 let prop = get_props(i);
1057 if let Some(s) = prop.and_then(|v| v.as_str()) {
1058 builder.append_value(s);
1059 } else if let Some(Value::Temporal(tv)) = prop {
1060 builder.append_value(tv.to_string());
1061 } else if is_deleted {
1062 builder.append_value("");
1063 } else {
1064 builder.append_null();
1065 }
1066 }
1067 Ok(Arc::new(builder.finish()))
1068 }
1069
1070 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1071 where
1072 F: Fn(usize) -> Option<&'a Value>,
1073 {
1074 let mut values = Vec::with_capacity(len);
1075 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1076 let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1077 if val.is_none() && is_deleted {
1078 values.push(Some(0));
1079 } else {
1080 values.push(val);
1081 }
1082 }
1083 Ok(Arc::new(Int32Array::from(values)))
1084 }
1085
1086 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1087 where
1088 F: Fn(usize) -> Option<&'a Value>,
1089 {
1090 let mut values = Vec::with_capacity(len);
1091 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1092 let val = get_props(i).and_then(|v| v.as_i64());
1093 if val.is_none() && is_deleted {
1094 values.push(Some(0));
1095 } else {
1096 values.push(val);
1097 }
1098 }
1099 Ok(Arc::new(Int64Array::from(values)))
1100 }
1101
1102 fn build_timestamp_column<F>(
1103 &self,
1104 len: usize,
1105 deleted: &[bool],
1106 get_props: F,
1107 ) -> Result<ArrayRef>
1108 where
1109 F: Fn(usize) -> Option<&'a Value>,
1110 {
1111 let mut values = Vec::with_capacity(len);
1112 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1113 let val = get_props(i);
1114 let ts = if is_deleted || val.is_none() {
1115 Some(0i64)
1116 } else if let Some(Value::Temporal(tv)) = val {
1117 match tv {
1118 uni_common::TemporalValue::DateTime {
1119 nanos_since_epoch, ..
1120 }
1121 | uni_common::TemporalValue::LocalDateTime {
1122 nanos_since_epoch, ..
1123 } => Some(*nanos_since_epoch),
1124 _ => None,
1125 }
1126 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1127 Some(v)
1128 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1129 parse_datetime_to_nanos(s)
1130 } else {
1131 None
1132 };
1133
1134 if is_deleted {
1135 values.push(Some(0));
1136 } else {
1137 values.push(ts);
1138 }
1139 }
1140 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1141 Ok(Arc::new(arr))
1142 }
1143
1144 fn build_datetime_struct_column<F>(
1145 &self,
1146 len: usize,
1147 deleted: &[bool],
1148 get_props: F,
1149 ) -> Result<ArrayRef>
1150 where
1151 F: Fn(usize) -> Option<&'a Value>,
1152 {
1153 let values = self.collect_values_or_null(len, deleted, &get_props);
1154 Ok(values_to_datetime_struct_array(&values))
1155 }
1156
1157 fn build_time_struct_column<F>(
1158 &self,
1159 len: usize,
1160 deleted: &[bool],
1161 get_props: F,
1162 ) -> Result<ArrayRef>
1163 where
1164 F: Fn(usize) -> Option<&'a Value>,
1165 {
1166 let values = self.collect_values_or_null(len, deleted, &get_props);
1167 Ok(values_to_time_struct_array(&values))
1168 }
1169
1170 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1172 where
1173 F: Fn(usize) -> Option<&'a Value>,
1174 {
1175 deleted
1176 .iter()
1177 .enumerate()
1178 .take(len)
1179 .map(|(i, &is_deleted)| {
1180 if is_deleted {
1181 Value::Null
1182 } else {
1183 get_props(i).cloned().unwrap_or(Value::Null)
1184 }
1185 })
1186 .collect()
1187 }
1188
1189 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1190 where
1191 F: Fn(usize) -> Option<&'a Value>,
1192 {
1193 let mut builder = Date32Builder::with_capacity(len);
1194 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1195
1196 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1197 let val = get_props(i);
1198 let days = if is_deleted || val.is_none() {
1199 Some(0)
1200 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1201 days_since_epoch,
1202 })) = val
1203 {
1204 Some(*days_since_epoch)
1205 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1206 Some(v as i32)
1207 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1208 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1209 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1210 Err(_) => None,
1211 }
1212 } else {
1213 None
1214 };
1215
1216 if is_deleted {
1217 builder.append_value(0);
1218 } else if let Some(v) = days {
1219 builder.append_value(v);
1220 } else {
1221 builder.append_null();
1222 }
1223 }
1224 Ok(Arc::new(builder.finish()))
1225 }
1226
1227 fn build_duration_column<F>(
1228 &self,
1229 len: usize,
1230 deleted: &[bool],
1231 get_props: F,
1232 ) -> Result<ArrayRef>
1233 where
1234 F: Fn(usize) -> Option<&'a Value>,
1235 {
1236 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1238 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1239 let raw_val = get_props(i);
1240 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1241 {
1242 let encoded = uni_common::cypher_value_codec::encode(val);
1243 builder.append_value(&encoded);
1244 } else if is_deleted {
1245 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1246 months: 0,
1247 days: 0,
1248 nanos: 0,
1249 });
1250 let encoded = uni_common::cypher_value_codec::encode(&zero);
1251 builder.append_value(&encoded);
1252 } else {
1253 builder.append_null();
1254 }
1255 }
1256 Ok(Arc::new(builder.finish()))
1257 }
1258
1259 fn build_float32_column<F>(
1260 &self,
1261 len: usize,
1262 deleted: &[bool],
1263 get_props: F,
1264 ) -> Result<ArrayRef>
1265 where
1266 F: Fn(usize) -> Option<&'a Value>,
1267 {
1268 let mut values = Vec::with_capacity(len);
1269 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1270 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1271 if val.is_none() && is_deleted {
1272 values.push(Some(0.0));
1273 } else {
1274 values.push(val);
1275 }
1276 }
1277 Ok(Arc::new(Float32Array::from(values)))
1278 }
1279
1280 fn build_float64_column<F>(
1281 &self,
1282 len: usize,
1283 deleted: &[bool],
1284 get_props: F,
1285 ) -> Result<ArrayRef>
1286 where
1287 F: Fn(usize) -> Option<&'a Value>,
1288 {
1289 let mut values = Vec::with_capacity(len);
1290 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1291 let val = get_props(i).and_then(|v| v.as_f64());
1292 if val.is_none() && is_deleted {
1293 values.push(Some(0.0));
1294 } else {
1295 values.push(val);
1296 }
1297 }
1298 Ok(Arc::new(Float64Array::from(values)))
1299 }
1300
1301 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1302 where
1303 F: Fn(usize) -> Option<&'a Value>,
1304 {
1305 let mut values = Vec::with_capacity(len);
1306 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1307 let val = get_props(i).and_then(|v| v.as_bool());
1308 if val.is_none() && is_deleted {
1309 values.push(Some(false));
1310 } else {
1311 values.push(val);
1312 }
1313 }
1314 Ok(Arc::new(BooleanArray::from(values)))
1315 }
1316
1317 fn build_vector_column<F>(
1318 &self,
1319 len: usize,
1320 deleted: &[bool],
1321 get_props: F,
1322 dimensions: usize,
1323 ) -> Result<ArrayRef>
1324 where
1325 F: Fn(usize) -> Option<&'a Value>,
1326 {
1327 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1328
1329 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1330 let val = get_props(i);
1331 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1332 for v in values {
1333 builder.values().append_value(v);
1334 }
1335 builder.append(valid);
1336 }
1337 Ok(Arc::new(builder.finish()))
1338 }
1339
1340 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1341 where
1342 F: Fn(usize) -> Option<&'a Value>,
1343 {
1344 let null_val = Value::Null;
1345 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1346 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1347 let val = get_props(i);
1348 let uni_val = if val.is_none() && is_deleted {
1349 &null_val
1350 } else {
1351 val.unwrap_or(&null_val)
1352 };
1353 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1355 builder.append_value(&cv_bytes);
1356 }
1357 Ok(Arc::new(builder.finish()))
1358 }
1359
1360 fn build_list_column<F>(
1361 &self,
1362 len: usize,
1363 deleted: &[bool],
1364 get_props: F,
1365 inner: &DataType,
1366 ) -> Result<ArrayRef>
1367 where
1368 F: Fn(usize) -> Option<&'a Value>,
1369 {
1370 match inner {
1371 DataType::String => {
1372 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1373 if let Some(s) = v.as_str() {
1374 b.append_value(s);
1375 } else {
1376 b.append_null();
1377 }
1378 })
1379 }
1380 DataType::Int64 => {
1381 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1382 if let Some(n) = v.as_i64() {
1383 b.append_value(n);
1384 } else {
1385 b.append_null();
1386 }
1387 })
1388 }
1389 DataType::Float64 => {
1390 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1391 if let Some(f) = v.as_f64() {
1392 b.append_value(f);
1393 } else {
1394 b.append_null();
1395 }
1396 })
1397 }
1398 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1399 }
1400 }
1401
1402 fn build_typed_list<F, B, A>(
1404 &self,
1405 len: usize,
1406 deleted: &[bool],
1407 get_props: &F,
1408 inner_builder: B,
1409 mut append_value: A,
1410 ) -> Result<ArrayRef>
1411 where
1412 F: Fn(usize) -> Option<&'a Value>,
1413 B: arrow_array::builder::ArrayBuilder,
1414 A: FnMut(&Value, &mut B),
1415 {
1416 let mut builder = ListBuilder::new(inner_builder);
1417 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1418 let val_array = get_props(i).and_then(|v| v.as_array());
1419 if val_array.is_none() && is_deleted {
1420 builder.append_null();
1421 } else if let Some(arr) = val_array {
1422 for v in arr {
1423 append_value(v, builder.values());
1424 }
1425 builder.append(true);
1426 } else {
1427 builder.append_null();
1428 }
1429 }
1430 Ok(Arc::new(builder.finish()))
1431 }
1432
1433 fn build_map_column<F>(
1434 &self,
1435 len: usize,
1436 deleted: &[bool],
1437 get_props: F,
1438 key: &DataType,
1439 value: &DataType,
1440 ) -> Result<ArrayRef>
1441 where
1442 F: Fn(usize) -> Option<&'a Value>,
1443 {
1444 if !matches!(key, DataType::String) {
1445 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1446 }
1447
1448 match value {
1449 DataType::String => self.build_typed_map(
1450 len,
1451 deleted,
1452 &get_props,
1453 StringBuilder::new(),
1454 arrow_schema::DataType::Utf8,
1455 |v, b: &mut StringBuilder| {
1456 if let Some(s) = v.as_str() {
1457 b.append_value(s);
1458 } else {
1459 b.append_null();
1460 }
1461 },
1462 ),
1463 DataType::Int64 => self.build_typed_map(
1464 len,
1465 deleted,
1466 &get_props,
1467 Int64Builder::new(),
1468 arrow_schema::DataType::Int64,
1469 |v, b: &mut Int64Builder| {
1470 if let Some(n) = v.as_i64() {
1471 b.append_value(n);
1472 } else {
1473 b.append_null();
1474 }
1475 },
1476 ),
1477 _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1478 }
1479 }
1480
1481 fn build_typed_map<F, B, A>(
1483 &self,
1484 len: usize,
1485 deleted: &[bool],
1486 get_props: &F,
1487 value_builder: B,
1488 value_arrow_type: arrow_schema::DataType,
1489 mut append_value: A,
1490 ) -> Result<ArrayRef>
1491 where
1492 F: Fn(usize) -> Option<&'a Value>,
1493 B: arrow_array::builder::ArrayBuilder,
1494 A: FnMut(&Value, &mut B),
1495 {
1496 let key_builder = Box::new(StringBuilder::new());
1497 let value_builder = Box::new(value_builder);
1498 let struct_builder = StructBuilder::new(
1499 vec![
1500 Field::new("key", arrow_schema::DataType::Utf8, false),
1501 Field::new("value", value_arrow_type, true),
1502 ],
1503 vec![key_builder, value_builder],
1504 );
1505 let mut builder = ListBuilder::new(struct_builder);
1506
1507 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1508 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1509 }
1510 Ok(Arc::new(builder.finish()))
1511 }
1512
1513 fn append_map_entry<B, A>(
1515 &self,
1516 builder: &mut ListBuilder<StructBuilder>,
1517 val: Option<&'a Value>,
1518 is_deleted: bool,
1519 append_value: &mut A,
1520 ) where
1521 B: arrow_array::builder::ArrayBuilder,
1522 A: FnMut(&Value, &mut B),
1523 {
1524 let val_obj = val.and_then(|v| v.as_object());
1525 if val_obj.is_none() && is_deleted {
1526 builder.append(false);
1527 } else if let Some(obj) = val_obj {
1528 let struct_b = builder.values();
1529 for (k, v) in obj {
1530 struct_b
1531 .field_builder::<StringBuilder>(0)
1532 .unwrap()
1533 .append_value(k);
1534 let value_b = struct_b.field_builder::<B>(1).unwrap();
1536 append_value(v, value_b);
1537 struct_b.append(true);
1538 }
1539 builder.append(true);
1540 } else {
1541 builder.append(false);
1542 }
1543 }
1544
1545 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1546 where
1547 F: Fn(usize) -> Option<&'a Value>,
1548 {
1549 let mut builder = BinaryBuilder::new();
1550 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1551 if is_deleted {
1552 builder.append_null();
1553 continue;
1554 }
1555 if let Some(val) = get_props(i) {
1556 let crdt_result = if let Some(s) = val.as_str() {
1559 serde_json::from_str::<Crdt>(s)
1560 } else {
1561 let json_val: serde_json::Value = val.clone().into();
1563 serde_json::from_value::<Crdt>(json_val)
1564 };
1565
1566 if let Ok(crdt) = crdt_result {
1567 if let Ok(bytes) = crdt.to_msgpack() {
1568 builder.append_value(&bytes);
1569 } else {
1570 builder.append_null();
1571 }
1572 } else {
1573 builder.append_null();
1574 }
1575 } else {
1576 builder.append_null();
1577 }
1578 }
1579 Ok(Arc::new(builder.finish()))
1580 }
1581}
1582
1583pub fn build_edge_column<'a>(
1585 name: &'a str,
1586 data_type: &'a DataType,
1587 len: usize,
1588 get_props: impl Fn(usize) -> Option<&'a Value>,
1589) -> Result<ArrayRef> {
1590 let deleted = vec![false; len];
1592 let extractor = PropertyExtractor::new(name, data_type);
1593 extractor.build_column(len, &deleted, get_props)
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598 use super::*;
1599 use arrow_array::{
1600 Array, DurationMicrosecondArray,
1601 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1602 };
1603 use std::collections::HashMap;
1604 use uni_common::TemporalValue;
1605 use uni_crdt::{Crdt, GCounter};
1606
1607 #[test]
1608 fn test_arrow_to_value_string() {
1609 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1610 assert_eq!(
1611 arrow_to_value(&arr, 0, None),
1612 Value::String("hello".to_string())
1613 );
1614 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1615 assert_eq!(
1616 arrow_to_value(&arr, 2, None),
1617 Value::String("world".to_string())
1618 );
1619 }
1620
1621 #[test]
1622 fn test_arrow_to_value_int64() {
1623 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1624 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1625 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1626 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1627 }
1628
1629 #[test]
1630 #[allow(clippy::approx_constant)]
1631 fn test_arrow_to_value_float64() {
1632 let arr = Float64Array::from(vec![Some(3.14), None]);
1633 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1634 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1635 }
1636
1637 #[test]
1638 fn test_arrow_to_value_bool() {
1639 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1640 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1641 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1642 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1643 }
1644
1645 #[test]
1646 fn test_values_to_array_int64() {
1647 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1648 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1649 assert_eq!(arr.len(), 4);
1650
1651 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1652 assert_eq!(int_arr.value(0), 1);
1653 assert_eq!(int_arr.value(1), 2);
1654 assert!(int_arr.is_null(2));
1655 assert_eq!(int_arr.value(3), 4);
1656 }
1657
1658 #[test]
1659 fn test_values_to_array_string() {
1660 let values = vec![
1661 Value::String("a".to_string()),
1662 Value::String("b".to_string()),
1663 Value::Null,
1664 ];
1665 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1666 assert_eq!(arr.len(), 3);
1667
1668 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1669 assert_eq!(str_arr.value(0), "a");
1670 assert_eq!(str_arr.value(1), "b");
1671 assert!(str_arr.is_null(2));
1672 }
1673
1674 #[test]
1675 fn test_property_extractor_string() {
1676 let props: Vec<HashMap<String, Value>> = vec![
1677 [("name".to_string(), Value::String("Alice".to_string()))]
1678 .into_iter()
1679 .collect(),
1680 [("name".to_string(), Value::String("Bob".to_string()))]
1681 .into_iter()
1682 .collect(),
1683 HashMap::new(),
1684 ];
1685 let deleted = vec![false, false, true];
1686
1687 let extractor = PropertyExtractor::new("name", &DataType::String);
1688 let arr = extractor
1689 .build_column(3, &deleted, |i| props[i].get("name"))
1690 .unwrap();
1691
1692 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1693 assert_eq!(str_arr.value(0), "Alice");
1694 assert_eq!(str_arr.value(1), "Bob");
1695 assert_eq!(str_arr.value(2), ""); }
1697
1698 #[test]
1699 fn test_property_extractor_int64() {
1700 let props: Vec<HashMap<String, Value>> = vec![
1701 [("age".to_string(), Value::Int(25))].into_iter().collect(),
1702 [("age".to_string(), Value::Int(30))].into_iter().collect(),
1703 HashMap::new(),
1704 ];
1705 let deleted = vec![false, false, true];
1706
1707 let extractor = PropertyExtractor::new("age", &DataType::Int64);
1708 let arr = extractor
1709 .build_column(3, &deleted, |i| props[i].get("age"))
1710 .unwrap();
1711
1712 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1713 assert_eq!(int_arr.value(0), 25);
1714 assert_eq!(int_arr.value(1), 30);
1715 assert_eq!(int_arr.value(2), 0); }
1717
1718 #[test]
1719 fn test_arrow_to_value_time64() {
1720 let mut builder = Time64MicrosecondBuilder::new();
1722 builder.append_value(37_845_000_000);
1724 builder.append_value(0);
1726 builder.append_value(86_399_123_456);
1728 builder.append_null();
1729
1730 let arr = builder.finish();
1731 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1733 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1734 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1735 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1736 }
1737
1738 #[test]
1739 fn test_arrow_to_value_duration() {
1740 let arr = DurationMicrosecondArray::from(vec![
1743 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
1747 ]);
1748
1749 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1750 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1751 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1752 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1753 }
1754
1755 #[test]
1756 fn test_arrow_to_value_binary_crdt() {
1757 let mut builder = BinaryBuilder::new();
1759
1760 let mut counter = GCounter::new();
1762 counter.increment("actor1", 5);
1763 let crdt = Crdt::GCounter(counter);
1764 let bytes = crdt.to_msgpack().unwrap();
1765 builder.append_value(&bytes);
1766
1767 builder.append_null();
1769
1770 let arr = builder.finish();
1771
1772 let result = arrow_to_value(&arr, 0, None);
1774 assert!(result.as_object().is_some());
1775 let obj = result.as_object().unwrap();
1776 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1778
1779 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1781 }
1782
1783 #[test]
1784 fn test_datetime_struct_encode_decode_roundtrip() {
1785 let values = vec![
1787 Value::Temporal(TemporalValue::DateTime {
1788 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
1791 }),
1792 Value::Temporal(TemporalValue::DateTime {
1793 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
1796 }),
1797 Value::Temporal(TemporalValue::DateTime {
1798 nanos_since_epoch: 0, offset_seconds: 0,
1800 timezone_name: Some("UTC".to_string()),
1801 }),
1802 ];
1803
1804 let arr_ref = values_to_datetime_struct_array(&values);
1806 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1807 assert_eq!(arr.len(), 3);
1808
1809 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1811 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1812 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1813
1814 assert_eq!(decoded_0, values[0]);
1816 assert_eq!(decoded_1, values[1]);
1817 assert_eq!(decoded_2, values[2]);
1818
1819 if let Value::Temporal(TemporalValue::DateTime {
1821 nanos_since_epoch,
1822 offset_seconds,
1823 timezone_name,
1824 }) = decoded_0
1825 {
1826 assert_eq!(nanos_since_epoch, 441763200000000000);
1827 assert_eq!(offset_seconds, 3600);
1828 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1829 } else {
1830 panic!("Expected DateTime value");
1831 }
1832 }
1833
1834 #[test]
1835 fn test_datetime_struct_null_handling() {
1836 let values = vec![
1838 Value::Temporal(TemporalValue::DateTime {
1839 nanos_since_epoch: 441763200000000000,
1840 offset_seconds: 3600,
1841 timezone_name: Some("Europe/Paris".to_string()),
1842 }),
1843 Value::Null,
1844 Value::Temporal(TemporalValue::DateTime {
1845 nanos_since_epoch: 0,
1846 offset_seconds: 0,
1847 timezone_name: None,
1848 }),
1849 ];
1850
1851 let arr_ref = values_to_datetime_struct_array(&values);
1852 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1853 assert_eq!(arr.len(), 3);
1854
1855 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1857 assert_eq!(decoded_0, values[0]);
1858
1859 assert!(arr.is_null(1));
1861 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1862 assert_eq!(decoded_1, Value::Null);
1863
1864 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1866 assert_eq!(decoded_2, values[2]);
1867 }
1868
1869 #[test]
1870 fn test_datetime_struct_boundary_values() {
1871 let values = vec![
1873 Value::Temporal(TemporalValue::DateTime {
1874 nanos_since_epoch: 441763200000000000,
1875 offset_seconds: 0, timezone_name: None,
1877 }),
1878 Value::Temporal(TemporalValue::DateTime {
1879 nanos_since_epoch: 441763200000000000,
1880 offset_seconds: 43200, timezone_name: None,
1882 }),
1883 Value::Temporal(TemporalValue::DateTime {
1884 nanos_since_epoch: 441763200000000000,
1885 offset_seconds: -43200, timezone_name: None,
1887 }),
1888 ];
1889
1890 let arr_ref = values_to_datetime_struct_array(&values);
1891 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1892 assert_eq!(arr.len(), 3);
1893
1894 for (i, expected) in values.iter().enumerate() {
1896 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
1897 assert_eq!(&decoded, expected);
1898 }
1899 }
1900
1901 #[test]
1902 fn test_datetime_old_schema_migration() {
1903 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
1905 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
1908
1909 let arr = builder.finish();
1910
1911 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
1913 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
1914 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
1915
1916 if let Value::Temporal(TemporalValue::DateTime {
1918 nanos_since_epoch,
1919 offset_seconds,
1920 timezone_name,
1921 }) = decoded_0
1922 {
1923 assert_eq!(nanos_since_epoch, 441763200000000000);
1924 assert_eq!(offset_seconds, 0);
1925 assert_eq!(timezone_name, Some("UTC".to_string()));
1926 } else {
1927 panic!("Expected DateTime value");
1928 }
1929
1930 assert_eq!(decoded_2, Value::Null);
1932 }
1933
1934 #[test]
1935 fn test_time_struct_encode_decode_roundtrip() {
1936 let values = vec![
1938 Value::Temporal(TemporalValue::Time {
1939 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
1942 Value::Temporal(TemporalValue::Time {
1943 nanos_since_midnight: 0, offset_seconds: 0,
1945 }),
1946 Value::Temporal(TemporalValue::Time {
1947 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
1950 ];
1951
1952 let arr_ref = values_to_time_struct_array(&values);
1954 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1955 assert_eq!(arr.len(), 3);
1956
1957 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
1959 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
1960 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
1961
1962 assert_eq!(decoded_0, values[0]);
1964 assert_eq!(decoded_1, values[1]);
1965 assert_eq!(decoded_2, values[2]);
1966
1967 if let Value::Temporal(TemporalValue::Time {
1969 nanos_since_midnight,
1970 offset_seconds,
1971 }) = decoded_0
1972 {
1973 assert_eq!(nanos_since_midnight, 37845000000000);
1974 assert_eq!(offset_seconds, 3600);
1975 } else {
1976 panic!("Expected Time value");
1977 }
1978 }
1979
1980 #[test]
1981 fn test_time_struct_null_handling() {
1982 let values = vec![
1984 Value::Temporal(TemporalValue::Time {
1985 nanos_since_midnight: 37845000000000,
1986 offset_seconds: 3600,
1987 }),
1988 Value::Null,
1989 Value::Temporal(TemporalValue::Time {
1990 nanos_since_midnight: 0,
1991 offset_seconds: 0,
1992 }),
1993 ];
1994
1995 let arr_ref = values_to_time_struct_array(&values);
1996 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1997 assert_eq!(arr.len(), 3);
1998
1999 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2001 assert_eq!(decoded_0, values[0]);
2002
2003 assert!(arr.is_null(1));
2005 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2006 assert_eq!(decoded_1, Value::Null);
2007
2008 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2010 assert_eq!(decoded_2, values[2]);
2011 }
2012
2013 #[test]
2016 fn test_extract_vector_f32_values_valid_vector() {
2017 let v = vec![1.0, 2.0, 3.0];
2018 let val = Value::Vector(v.clone());
2019 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2020 assert_eq!(result, v);
2021 assert!(valid);
2022 }
2023
2024 #[test]
2025 fn test_extract_vector_f32_values_vector_wrong_dims() {
2026 let v = vec![1.0, 2.0];
2027 let val = Value::Vector(v);
2028 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2029 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2030 assert!(!valid);
2031 }
2032
2033 #[test]
2034 fn test_extract_vector_f32_values_valid_list() {
2035 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2036 let val = Value::List(v);
2037 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2038 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2039 assert!(valid);
2040 }
2041
2042 #[test]
2043 fn test_extract_vector_f32_values_list_wrong_dims() {
2044 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2045 let val = Value::List(v);
2046 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2047 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2048 assert!(!valid);
2049 }
2050
2051 #[test]
2052 fn test_extract_vector_f32_values_list_int_coercion() {
2053 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2054 let val = Value::List(v);
2055 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2056 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2057 assert!(valid);
2058 }
2059
2060 #[test]
2061 fn test_extract_vector_f32_values_none() {
2062 let (result, valid) = extract_vector_f32_values(None, false, 3);
2063 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2064 assert!(!valid);
2065 }
2066
2067 #[test]
2068 fn test_extract_vector_f32_values_null() {
2069 let val = Value::Null;
2070 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2071 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2072 assert!(!valid);
2073 }
2074
2075 #[test]
2076 fn test_extract_vector_f32_values_unsupported_type() {
2077 let val = Value::String("not a vector".to_string());
2078 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2079 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2080 assert!(!valid);
2081 }
2082
2083 #[test]
2084 fn test_extract_vector_f32_values_deleted_with_none() {
2085 let (result, valid) = extract_vector_f32_values(None, true, 3);
2086 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2087 assert!(valid); }
2089
2090 #[test]
2091 fn test_extract_vector_f32_values_deleted_with_null() {
2092 let val = Value::Null;
2093 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2094 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2095 assert!(valid); }
2097
2098 #[test]
2101 fn test_values_to_fixed_size_list_vector_with_nulls() {
2102 let values = vec![
2103 Value::Vector(vec![1.0, 2.0]),
2104 Value::Null,
2105 Value::Vector(vec![3.0, 4.0]),
2106 Value::String("invalid".to_string()),
2107 ];
2108 let arr_ref = values_to_array(
2109 &values,
2110 &ArrowDataType::FixedSizeList(
2111 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2112 2,
2113 ),
2114 )
2115 .unwrap();
2116
2117 let arr = arr_ref
2118 .as_any()
2119 .downcast_ref::<FixedSizeListArray>()
2120 .unwrap();
2121
2122 assert_eq!(arr.len(), 4);
2123 assert!(arr.is_valid(0));
2124 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2126 assert!(!arr.is_valid(3)); }
2128
2129 #[test]
2130 fn test_values_to_fixed_size_list_from_list() {
2131 let values = vec![
2132 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2133 Value::List(vec![Value::Int(3), Value::Int(4)]),
2134 ];
2135 let arr_ref = values_to_array(
2136 &values,
2137 &ArrowDataType::FixedSizeList(
2138 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2139 2,
2140 ),
2141 )
2142 .unwrap();
2143
2144 let arr = arr_ref
2145 .as_any()
2146 .downcast_ref::<FixedSizeListArray>()
2147 .unwrap();
2148
2149 assert_eq!(arr.len(), 2);
2150 assert!(arr.is_valid(0));
2151 assert!(arr.is_valid(1));
2152
2153 let child = arr
2155 .values()
2156 .as_any()
2157 .downcast_ref::<Float32Array>()
2158 .unwrap();
2159 assert_eq!(child.value(0), 1.0);
2160 assert_eq!(child.value(1), 2.0);
2161 assert_eq!(child.value(2), 3.0);
2162 assert_eq!(child.value(3), 4.0);
2163 }
2164
2165 #[test]
2166 fn test_values_to_fixed_size_list_wrong_dimensions() {
2167 let values = vec![
2168 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2171 let arr_ref = values_to_array(
2172 &values,
2173 &ArrowDataType::FixedSizeList(
2174 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2175 2,
2176 ),
2177 )
2178 .unwrap();
2179
2180 let arr = arr_ref
2181 .as_any()
2182 .downcast_ref::<FixedSizeListArray>()
2183 .unwrap();
2184
2185 assert_eq!(arr.len(), 2);
2186 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2191 .values()
2192 .as_any()
2193 .downcast_ref::<Float32Array>()
2194 .unwrap();
2195 assert_eq!(child.value(0), 0.0);
2196 assert_eq!(child.value(1), 0.0);
2197 assert_eq!(child.value(2), 0.0);
2198 assert_eq!(child.value(3), 0.0);
2199 }
2200
2201 #[test]
2202 fn test_values_to_fixed_size_list_all_nulls() {
2203 let values = vec![Value::Null, Value::Null, Value::Null];
2204 let arr_ref = values_to_array(
2205 &values,
2206 &ArrowDataType::FixedSizeList(
2207 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2208 3,
2209 ),
2210 )
2211 .unwrap();
2212
2213 let arr = arr_ref
2214 .as_any()
2215 .downcast_ref::<FixedSizeListArray>()
2216 .unwrap();
2217
2218 assert_eq!(arr.len(), 3);
2219 assert!(!arr.is_valid(0));
2220 assert!(!arr.is_valid(1));
2221 assert!(!arr.is_valid(2));
2222
2223 let child = arr
2225 .values()
2226 .as_any()
2227 .downcast_ref::<Float32Array>()
2228 .unwrap();
2229 assert_eq!(child.len(), 9);
2230 }
2231
2232 #[test]
2233 fn test_values_to_fixed_size_list_mixed_types() {
2234 let values = vec![
2235 Value::Vector(vec![1.0, 2.0]),
2236 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2237 Value::Null,
2238 Value::String("invalid".to_string()),
2239 ];
2240 let arr_ref = values_to_array(
2241 &values,
2242 &ArrowDataType::FixedSizeList(
2243 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2244 2,
2245 ),
2246 )
2247 .unwrap();
2248
2249 let arr = arr_ref
2250 .as_any()
2251 .downcast_ref::<FixedSizeListArray>()
2252 .unwrap();
2253
2254 assert_eq!(arr.len(), 4);
2255 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2262 .values()
2263 .as_any()
2264 .downcast_ref::<Float32Array>()
2265 .unwrap();
2266 assert_eq!(child.value(0), 1.0);
2267 assert_eq!(child.value(1), 2.0);
2268 assert_eq!(child.value(2), 3.0);
2269 assert_eq!(child.value(3), 4.0);
2270 }
2271
2272 #[test]
2275 fn test_build_vector_column_with_nulls_and_deleted() {
2276 let data_type = DataType::Vector { dimensions: 3 };
2277 let extractor = PropertyExtractor::new("test_vec", &data_type);
2278
2279 let props = [
2280 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2281 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2284 ];
2285 let deleted = [false, false, false, true]; let arr_ref = extractor
2288 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2289 .unwrap();
2290
2291 let arr = arr_ref
2292 .as_any()
2293 .downcast_ref::<FixedSizeListArray>()
2294 .unwrap();
2295
2296 assert_eq!(arr.len(), 4);
2297 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2304 .values()
2305 .as_any()
2306 .downcast_ref::<Float32Array>()
2307 .unwrap();
2308 assert_eq!(child.value(0), 1.0);
2309 assert_eq!(child.value(1), 2.0);
2310 assert_eq!(child.value(2), 3.0);
2311 assert_eq!(child.value(9), 0.0);
2315 assert_eq!(child.value(10), 0.0);
2316 assert_eq!(child.value(11), 0.0);
2317 }
2318
2319 #[test]
2320 fn test_build_vector_column_with_list_input() {
2321 let data_type = DataType::Vector { dimensions: 2 };
2322 let extractor = PropertyExtractor::new("test_vec", &data_type);
2323
2324 let props = [
2325 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2326 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2327 Some(Value::Vector(vec![5.0, 6.0])),
2328 ];
2329 let deleted = [false, false, false];
2330
2331 let arr_ref = extractor
2332 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2333 .unwrap();
2334
2335 let arr = arr_ref
2336 .as_any()
2337 .downcast_ref::<FixedSizeListArray>()
2338 .unwrap();
2339
2340 assert_eq!(arr.len(), 3);
2341 assert!(arr.is_valid(0));
2342 assert!(arr.is_valid(1));
2343 assert!(arr.is_valid(2));
2344
2345 let child = arr
2347 .values()
2348 .as_any()
2349 .downcast_ref::<Float32Array>()
2350 .unwrap();
2351 assert_eq!(child.value(0), 1.0);
2352 assert_eq!(child.value(1), 2.0);
2353 assert_eq!(child.value(2), 3.0);
2354 assert_eq!(child.value(3), 4.0);
2355 assert_eq!(child.value(4), 5.0);
2356 assert_eq!(child.value(5), 6.0);
2357 }
2358}