1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let key_col = structs.column(0);
154 let val_col = structs.column(1);
155 let mut map = HashMap::new();
156 for i in 0..structs.len() {
157 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158 map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159 }
160 }
161 Some(map)
162}
163
164fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166 (0..arr.len())
167 .map(|i| arrow_to_value(arr.as_ref(), i, None))
168 .collect()
169}
170
171pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178 if col.is_null(row) {
179 return Value::Null;
180 }
181
182 if let Some(dt) = data_type {
184 match dt {
185 DataType::DateTime => {
186 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189 struct_arr.column_by_name("nanos_since_epoch"),
190 struct_arr.column_by_name("offset_seconds"),
191 struct_arr.column_by_name("timezone_name"),
192 )
193 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194 nanos_col
195 .as_any()
196 .downcast_ref::<TimestampNanosecondArray>(),
197 offset_col.as_any().downcast_ref::<Int32Array>(),
198 tz_col.as_any().downcast_ref::<StringArray>(),
199 )
200 {
201 if nanos_arr.is_null(row) {
202 return Value::Null;
203 }
204 let nanos = nanos_arr.value(row);
205 if offset_arr.is_null(row) {
206 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208 nanos_since_epoch: nanos,
209 });
210 }
211 let offset = offset_arr.value(row);
212 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213 return Value::Temporal(uni_common::TemporalValue::DateTime {
214 nanos_since_epoch: nanos,
215 offset_seconds: offset,
216 timezone_name: tz_name,
217 });
218 }
219 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221 let nanos = ts.value(row);
222 let tz_name = ts.timezone().map(|s| s.to_string());
223 return Value::Temporal(uni_common::TemporalValue::DateTime {
224 nanos_since_epoch: nanos,
225 offset_seconds: 0,
226 timezone_name: tz_name,
227 });
228 }
229 }
230 DataType::Time => {
231 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233 && let (Some(nanos_col), Some(offset_col)) = (
234 struct_arr.column_by_name("nanos_since_midnight"),
235 struct_arr.column_by_name("offset_seconds"),
236 )
237 && let (Some(nanos_arr), Some(offset_arr)) = (
238 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239 offset_col.as_any().downcast_ref::<Int32Array>(),
240 )
241 {
242 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244 return Value::Null;
245 }
246 let nanos = nanos_arr.value(row);
247 let offset = offset_arr.value(row);
248 return Value::Temporal(uni_common::TemporalValue::Time {
249 nanos_since_midnight: nanos,
250 offset_seconds: offset,
251 });
252 }
253 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255 let nanos = t.value(row);
256 return Value::Temporal(uni_common::TemporalValue::Time {
257 nanos_since_midnight: nanos,
258 offset_seconds: 0,
259 });
260 }
261 }
262 DataType::Btic => {
263 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
264 log::warn!("BTIC column is not FixedSizeBinaryArray");
265 return Value::Null;
266 };
267 let bytes = fsb.value(row);
268 return match uni_btic::encode::decode_slice(bytes) {
269 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
270 lo: btic.lo(),
271 hi: btic.hi(),
272 meta: btic.meta(),
273 }),
274 Err(e) => {
275 log::warn!("BTIC decode error: {}", e);
276 Value::Null
277 }
278 };
279 }
280 _ => {}
281 }
282 }
283
284 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
286 return Value::String(s.value(row).to_string());
287 }
288
289 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
291 return Value::Int(u.value(row) as i64);
292 }
293 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
294 return Value::Int(i.value(row));
295 }
296 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
297 return Value::Int(i.value(row) as i64);
298 }
299
300 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
302 return Value::Float(f.value(row));
303 }
304 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
305 return Value::Float(f.value(row) as f64);
306 }
307
308 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
310 return Value::Bool(b.value(row));
311 }
312
313 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
315 return Value::List(array_to_value_list(&list.value(row)));
316 }
317
318 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
320 let arr = list.value(row);
321
322 if let Some(obj) = try_reconstruct_map(&arr) {
324 return Value::Map(obj);
325 }
326
327 return Value::List(array_to_value_list(&arr));
328 }
329
330 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
332 return Value::List(array_to_value_list(&list.value(row)));
333 }
334
335 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
337 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
338
339 if field_names.contains(&"nanos_since_epoch")
341 && field_names.contains(&"offset_seconds")
342 && field_names.contains(&"timezone_name")
343 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
344 s.column_by_name("nanos_since_epoch"),
345 s.column_by_name("offset_seconds"),
346 s.column_by_name("timezone_name"),
347 )
348 {
349 let nanos_opt = nanos_col
351 .as_any()
352 .downcast_ref::<TimestampNanosecondArray>()
353 .map(|a| {
354 if a.is_null(row) {
355 None
356 } else {
357 Some(a.value(row))
358 }
359 })
360 .or_else(|| {
361 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
362 if a.is_null(row) {
363 None
364 } else {
365 Some(a.value(row))
366 }
367 })
368 });
369 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
370 if a.is_null(row) {
371 None
372 } else {
373 Some(a.value(row))
374 }
375 });
376
377 if let Some(Some(nanos)) = nanos_opt {
378 match offset_opt {
379 Some(Some(offset)) => {
380 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
381 if a.is_null(row) {
382 None
383 } else {
384 Some(a.value(row).to_string())
385 }
386 });
387 return Value::Temporal(uni_common::TemporalValue::DateTime {
388 nanos_since_epoch: nanos,
389 offset_seconds: offset,
390 timezone_name: tz_name,
391 });
392 }
393 _ => {
394 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
396 nanos_since_epoch: nanos,
397 });
398 }
399 }
400 }
401 }
402
403 if field_names.contains(&"nanos_since_midnight")
405 && field_names.contains(&"offset_seconds")
406 && let (Some(nanos_col), Some(offset_col)) = (
407 s.column_by_name("nanos_since_midnight"),
408 s.column_by_name("offset_seconds"),
409 )
410 {
411 let nanos_opt = nanos_col
413 .as_any()
414 .downcast_ref::<Time64NanosecondArray>()
415 .map(|a| {
416 if a.is_null(row) {
417 None
418 } else {
419 Some(a.value(row))
420 }
421 })
422 .or_else(|| {
423 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
424 if a.is_null(row) {
425 None
426 } else {
427 Some(a.value(row))
428 }
429 })
430 });
431 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
432 if a.is_null(row) {
433 None
434 } else {
435 Some(a.value(row))
436 }
437 });
438
439 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
440 return Value::Temporal(uni_common::TemporalValue::Time {
441 nanos_since_midnight: nanos,
442 offset_seconds: offset,
443 });
444 }
445 }
446
447 let mut map = HashMap::new();
449 for (field, child) in s.fields().iter().zip(s.columns()) {
450 map.insert(
451 field.name().clone(),
452 arrow_to_value(child.as_ref(), row, None),
453 );
454 }
455 return Value::Map(map);
456 }
457
458 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
460 let days = d.value(row);
461 return Value::Temporal(uni_common::TemporalValue::Date {
462 days_since_epoch: days,
463 });
464 }
465
466 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
468 let nanos = ts.value(row);
469 return match ts.timezone() {
470 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
471 nanos_since_epoch: nanos,
472 offset_seconds: 0,
473 timezone_name: Some(tz.to_string()),
474 }),
475 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
476 nanos_since_epoch: nanos,
477 }),
478 };
479 }
480
481 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
483 let nanos = t.value(row);
484 return Value::Temporal(uni_common::TemporalValue::LocalTime {
485 nanos_since_midnight: nanos,
486 });
487 }
488
489 if let Some(t) = col
491 .as_any()
492 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
493 {
494 let micros = t.value(row);
495 return Value::Temporal(uni_common::TemporalValue::LocalTime {
496 nanos_since_midnight: micros * 1000,
497 });
498 }
499
500 if let Some(d) = col
502 .as_any()
503 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
504 {
505 let micros = d.value(row);
506 let total_nanos = micros * 1000;
507 let seconds = total_nanos / 1_000_000_000;
508 let remaining_nanos = total_nanos % 1_000_000_000;
509 return Value::Temporal(uni_common::TemporalValue::Duration {
510 months: 0,
511 days: 0,
512 nanos: seconds * 1_000_000_000 + remaining_nanos,
513 });
514 }
515
516 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
518 let val = interval.value(row);
519 return Value::Temporal(uni_common::TemporalValue::Duration {
520 months: val.months as i64,
521 days: val.days as i64,
522 nanos: val.nanoseconds,
523 });
524 }
525
526 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
528 let bytes = b.value(row);
529 if bytes.is_empty() {
530 return Value::Null;
531 }
532 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
533 eprintln!("CypherValue decode error: {}", e);
534 Value::Null
535 });
536 }
537
538 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
540 let bytes = b.value(row);
541 return Crdt::from_msgpack(bytes)
542 .ok()
543 .and_then(|crdt| serde_json::to_value(&crdt).ok())
544 .map(Value::from)
545 .unwrap_or(Value::Null);
546 }
547
548 Value::Null
550}
551
552fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
553 let mut builder = UInt64Builder::with_capacity(values.len());
554 for v in values {
555 if let Some(n) = v.as_u64() {
556 builder.append_value(n);
557 } else {
558 builder.append_null();
559 }
560 }
561 Arc::new(builder.finish())
562}
563
564fn values_to_int64_array(values: &[Value]) -> ArrayRef {
565 let mut builder = Int64Builder::with_capacity(values.len());
566 for v in values {
567 if let Some(n) = v.as_i64() {
568 builder.append_value(n);
569 } else {
570 builder.append_null();
571 }
572 }
573 Arc::new(builder.finish())
574}
575
576fn values_to_int32_array(values: &[Value]) -> ArrayRef {
577 let mut builder = Int32Builder::with_capacity(values.len());
578 for v in values {
579 if let Some(n) = v.as_i64() {
580 builder.append_value(n as i32);
581 } else {
582 builder.append_null();
583 }
584 }
585 Arc::new(builder.finish())
586}
587
588fn values_to_string_array(values: &[Value]) -> ArrayRef {
589 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
590 for v in values {
591 if let Some(s) = v.as_str() {
592 builder.append_value(s);
593 } else if v.is_null() {
594 builder.append_null();
595 } else {
596 builder.append_value(v.to_string());
597 }
598 }
599 Arc::new(builder.finish())
600}
601
602fn values_to_bool_array(values: &[Value]) -> ArrayRef {
603 let mut builder = BooleanBuilder::with_capacity(values.len());
604 for v in values {
605 if let Some(b) = v.as_bool() {
606 builder.append_value(b);
607 } else {
608 builder.append_null();
609 }
610 }
611 Arc::new(builder.finish())
612}
613
614fn values_to_float32_array(values: &[Value]) -> ArrayRef {
615 let mut builder = Float32Builder::with_capacity(values.len());
616 for v in values {
617 if let Some(n) = v.as_f64() {
618 builder.append_value(n as f32);
619 } else {
620 builder.append_null();
621 }
622 }
623 Arc::new(builder.finish())
624}
625
626fn values_to_float64_array(values: &[Value]) -> ArrayRef {
627 let mut builder = Float64Builder::with_capacity(values.len());
628 for v in values {
629 if let Some(n) = v.as_f64() {
630 builder.append_value(n);
631 } else {
632 builder.append_null();
633 }
634 }
635 Arc::new(builder.finish())
636}
637
638fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
639 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
640 for v in values {
641 if let Value::List(bytes) = v {
642 let b: Vec<u8> = bytes
643 .iter()
644 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
645 .collect();
646 if b.len() as i32 == size {
647 builder.append_value(&b)?;
648 } else {
649 builder.append_null();
650 }
651 } else {
652 builder.append_null();
653 }
654 }
655 Ok(Arc::new(builder.finish()))
656}
657
658pub fn extract_vector_f32_values(
673 val: Option<&Value>,
674 is_deleted: bool,
675 dimensions: usize,
676) -> (Vec<f32>, bool) {
677 let zeros = || vec![0.0_f32; dimensions];
678
679 if is_deleted {
681 return (zeros(), true);
682 }
683
684 match val {
685 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
687 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
690 let values: Vec<f32> = arr
691 .iter()
692 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
693 .collect();
694 (values, true)
695 }
696 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
699}
700
701fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
702 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
703 for v in values {
704 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
705 for val in vals {
706 builder.values().append_value(val);
707 }
708 builder.append(valid);
709 }
710 Arc::new(builder.finish())
711}
712
713fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
714 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
715 for v in values {
716 if v.is_null() {
717 builder.append_null();
718 } else if let Value::Temporal(tv) = v {
719 match tv {
720 uni_common::TemporalValue::DateTime {
721 nanos_since_epoch, ..
722 }
723 | uni_common::TemporalValue::LocalDateTime {
724 nanos_since_epoch, ..
725 } => builder.append_value(*nanos_since_epoch),
726 _ => builder.append_null(),
727 }
728 } else if let Some(n) = v.as_i64() {
729 builder.append_value(n);
730 } else if let Some(s) = v.as_str() {
731 match parse_datetime_to_nanos(s) {
732 Some(nanos) => builder.append_value(nanos),
733 None => builder.append_null(),
734 }
735 } else {
736 builder.append_null();
737 }
738 }
739
740 let arr = builder.finish();
741 if let Some(tz) = tz {
742 Arc::new(arr.with_timezone(tz.as_ref()))
743 } else {
744 Arc::new(arr)
745 }
746}
747
748fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
753 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
754 let mut offset_builder = Int32Builder::with_capacity(values.len());
755 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
756 let mut null_buffer = BooleanBufferBuilder::new(values.len());
757
758 for v in values {
759 match v {
760 Value::Temporal(uni_common::TemporalValue::DateTime {
761 nanos_since_epoch,
762 offset_seconds,
763 timezone_name,
764 }) => {
765 nanos_builder.append_value(*nanos_since_epoch);
766 offset_builder.append_value(*offset_seconds);
767 tz_builder.append_option(timezone_name.as_deref());
768 null_buffer.append(true);
769 }
770 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
771 nanos_builder.append_value(*nanos_since_epoch);
772 offset_builder.append_null();
773 tz_builder.append_null();
774 null_buffer.append(true);
775 }
776 _ => {
777 nanos_builder.append_null();
778 offset_builder.append_null();
779 tz_builder.append_null();
780 null_buffer.append(false);
781 }
782 }
783 }
784
785 let struct_arr = StructArray::new(
786 schema::datetime_struct_fields(),
787 vec![
788 Arc::new(nanos_builder.finish()) as ArrayRef,
789 Arc::new(offset_builder.finish()) as ArrayRef,
790 Arc::new(tz_builder.finish()) as ArrayRef,
791 ],
792 Some(null_buffer.finish().into()),
793 );
794 Arc::new(struct_arr)
795}
796
797fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
802 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
803 let mut offset_builder = Int32Builder::with_capacity(values.len());
804 let mut null_buffer = BooleanBufferBuilder::new(values.len());
805
806 for v in values {
807 match v {
808 Value::Temporal(uni_common::TemporalValue::Time {
809 nanos_since_midnight,
810 offset_seconds,
811 }) => {
812 nanos_builder.append_value(*nanos_since_midnight);
813 offset_builder.append_value(*offset_seconds);
814 null_buffer.append(true);
815 }
816 Value::Temporal(uni_common::TemporalValue::LocalTime {
817 nanos_since_midnight,
818 }) => {
819 nanos_builder.append_value(*nanos_since_midnight);
820 offset_builder.append_null();
821 null_buffer.append(true);
822 }
823 _ => {
824 nanos_builder.append_null();
825 offset_builder.append_null();
826 null_buffer.append(false);
827 }
828 }
829 }
830
831 let struct_arr = StructArray::new(
832 schema::time_struct_fields(),
833 vec![
834 Arc::new(nanos_builder.finish()) as ArrayRef,
835 Arc::new(offset_builder.finish()) as ArrayRef,
836 ],
837 Some(null_buffer.finish().into()),
838 );
839 Arc::new(struct_arr)
840}
841
842fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
843 let mut builder =
844 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
845 for v in values {
846 if v.is_null() {
847 builder.append_null();
848 } else {
849 let cv_bytes = uni_common::cypher_value_codec::encode(v);
851 builder.append_value(&cv_bytes);
852 }
853 }
854 Arc::new(builder.finish())
855}
856
857pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
859 match dt {
860 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
861 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
862 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
863 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
864 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
865 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
866 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
867 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
868 ArrowDataType::FixedSizeList(inner, size) => {
869 if inner.data_type() == &ArrowDataType::Float32 {
870 Ok(values_to_fixed_size_list_f32_array(values, *size))
871 } else {
872 Err(anyhow!("Unsupported FixedSizeList inner type"))
873 }
874 }
875 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
876 Ok(values_to_timestamp_array(values, tz.as_ref()))
877 }
878 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
879 Ok(values_to_timestamp_array(values, tz.as_ref()))
880 }
881 ArrowDataType::Date32 => {
882 let mut builder = Date32Builder::with_capacity(values.len());
883 for v in values {
884 if v.is_null() {
885 builder.append_null();
886 } else if let Value::Temporal(uni_common::TemporalValue::Date {
887 days_since_epoch,
888 }) = v
889 {
890 builder.append_value(*days_since_epoch);
891 } else if let Some(n) = v.as_i64() {
892 builder.append_value(n as i32);
893 } else {
894 builder.append_null();
895 }
896 }
897 Ok(Arc::new(builder.finish()))
898 }
899 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
900 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
901 for v in values {
902 if v.is_null() {
903 builder.append_null();
904 } else if let Value::Temporal(tv) = v {
905 match tv {
906 uni_common::TemporalValue::LocalTime {
907 nanos_since_midnight,
908 }
909 | uni_common::TemporalValue::Time {
910 nanos_since_midnight,
911 ..
912 } => builder.append_value(*nanos_since_midnight),
913 _ => builder.append_null(),
914 }
915 } else if let Some(n) = v.as_i64() {
916 builder.append_value(n);
917 } else {
918 builder.append_null();
919 }
920 }
921 Ok(Arc::new(builder.finish()))
922 }
923 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
924 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
925 for v in values {
926 if v.is_null() {
927 builder.append_null();
928 } else if let Value::Temporal(tv) = v {
929 match tv {
930 uni_common::TemporalValue::LocalTime {
931 nanos_since_midnight,
932 }
933 | uni_common::TemporalValue::Time {
934 nanos_since_midnight,
935 ..
936 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
938 }
939 } else if let Some(n) = v.as_i64() {
940 builder.append_value(n);
941 } else {
942 builder.append_null();
943 }
944 }
945 Ok(Arc::new(builder.finish()))
946 }
947 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
948 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
949 for v in values {
950 if v.is_null() {
951 builder.append_null();
952 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
953 months,
954 days,
955 nanos,
956 }) = v
957 {
958 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
959 months: *months as i32,
960 days: *days as i32,
961 nanoseconds: *nanos,
962 });
963 } else {
964 builder.append_null();
965 }
966 }
967 Ok(Arc::new(builder.finish()))
968 }
969 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
970 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
971 for v in values {
972 if v.is_null() {
973 builder.append_null();
974 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
975 months,
976 days,
977 nanos,
978 }) = v
979 {
980 let total_micros =
981 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
982 builder.append_value(total_micros);
983 } else if let Some(n) = v.as_i64() {
984 builder.append_value(n);
985 } else {
986 builder.append_null();
987 }
988 }
989 Ok(Arc::new(builder.finish()))
990 }
991 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
992 ArrowDataType::List(field) => {
993 if field.data_type() == &ArrowDataType::Utf8 {
994 let mut builder = ListBuilder::new(StringBuilder::new());
995 for v in values {
996 if let Value::List(arr) = v {
997 for item in arr {
998 if let Some(s) = item.as_str() {
999 builder.values().append_value(s);
1000 } else {
1001 builder.values().append_null();
1002 }
1003 }
1004 builder.append(true);
1005 } else {
1006 builder.append_null();
1007 }
1008 }
1009 Ok(Arc::new(builder.finish()))
1010 } else {
1011 Err(anyhow!(
1012 "Unsupported List inner type: {:?}",
1013 field.data_type()
1014 ))
1015 }
1016 }
1017 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1018 Ok(values_to_datetime_struct_array(values))
1019 }
1020 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1021 Ok(values_to_time_struct_array(values))
1022 }
1023 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1024 }
1025}
1026
1027pub struct PropertyExtractor<'a> {
1029 data_type: &'a DataType,
1030}
1031
1032impl<'a> PropertyExtractor<'a> {
1033 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1034 Self { data_type }
1035 }
1036
1037 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1040 where
1041 F: Fn(usize) -> Option<&'a Value>,
1042 {
1043 match self.data_type {
1044 DataType::String => self.build_string_column(len, deleted, get_props),
1045 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1046 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1047 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1048 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1049 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1050 DataType::Vector { dimensions } => {
1051 self.build_vector_column(len, deleted, get_props, *dimensions)
1052 }
1053 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1054 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1055 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1056 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1057 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1058 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1059 DataType::Date => self.build_date32_column(len, deleted, get_props),
1060 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1061 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1062 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1063 _ => Err(anyhow!(
1064 "Unsupported data type for arrow conversion: {:?}",
1065 self.data_type
1066 )),
1067 }
1068 }
1069
1070 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1071 where
1072 F: Fn(usize) -> Option<&'a Value>,
1073 {
1074 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1075 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1076 let prop = get_props(i);
1077 if let Some(s) = prop.and_then(|v| v.as_str()) {
1078 builder.append_value(s);
1079 } else if let Some(Value::Temporal(tv)) = prop {
1080 builder.append_value(tv.to_string());
1081 } else if is_deleted {
1082 builder.append_value("");
1083 } else {
1084 builder.append_null();
1085 }
1086 }
1087 Ok(Arc::new(builder.finish()))
1088 }
1089
1090 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1091 where
1092 F: Fn(usize) -> Option<&'a Value>,
1093 {
1094 let mut values = Vec::with_capacity(len);
1095 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1096 let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1097 if val.is_none() && is_deleted {
1098 values.push(Some(0));
1099 } else {
1100 values.push(val);
1101 }
1102 }
1103 Ok(Arc::new(Int32Array::from(values)))
1104 }
1105
1106 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1107 where
1108 F: Fn(usize) -> Option<&'a Value>,
1109 {
1110 let mut values = Vec::with_capacity(len);
1111 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1112 let val = get_props(i).and_then(|v| v.as_i64());
1113 if val.is_none() && is_deleted {
1114 values.push(Some(0));
1115 } else {
1116 values.push(val);
1117 }
1118 }
1119 Ok(Arc::new(Int64Array::from(values)))
1120 }
1121
1122 fn build_timestamp_column<F>(
1123 &self,
1124 len: usize,
1125 deleted: &[bool],
1126 get_props: F,
1127 ) -> Result<ArrayRef>
1128 where
1129 F: Fn(usize) -> Option<&'a Value>,
1130 {
1131 let mut values = Vec::with_capacity(len);
1132 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1133 let val = get_props(i);
1134 let ts = if is_deleted || val.is_none() {
1135 Some(0i64)
1136 } else if let Some(Value::Temporal(tv)) = val {
1137 match tv {
1138 uni_common::TemporalValue::DateTime {
1139 nanos_since_epoch, ..
1140 }
1141 | uni_common::TemporalValue::LocalDateTime {
1142 nanos_since_epoch, ..
1143 } => Some(*nanos_since_epoch),
1144 _ => None,
1145 }
1146 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1147 Some(v)
1148 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1149 parse_datetime_to_nanos(s)
1150 } else {
1151 None
1152 };
1153
1154 if is_deleted {
1155 values.push(Some(0));
1156 } else {
1157 values.push(ts);
1158 }
1159 }
1160 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1161 Ok(Arc::new(arr))
1162 }
1163
1164 fn build_datetime_struct_column<F>(
1165 &self,
1166 len: usize,
1167 deleted: &[bool],
1168 get_props: F,
1169 ) -> Result<ArrayRef>
1170 where
1171 F: Fn(usize) -> Option<&'a Value>,
1172 {
1173 let values = self.collect_values_or_null(len, deleted, &get_props);
1174 Ok(values_to_datetime_struct_array(&values))
1175 }
1176
1177 fn build_time_struct_column<F>(
1178 &self,
1179 len: usize,
1180 deleted: &[bool],
1181 get_props: F,
1182 ) -> Result<ArrayRef>
1183 where
1184 F: Fn(usize) -> Option<&'a Value>,
1185 {
1186 let values = self.collect_values_or_null(len, deleted, &get_props);
1187 Ok(values_to_time_struct_array(&values))
1188 }
1189
1190 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1192 where
1193 F: Fn(usize) -> Option<&'a Value>,
1194 {
1195 deleted
1196 .iter()
1197 .enumerate()
1198 .take(len)
1199 .map(|(i, &is_deleted)| {
1200 if is_deleted {
1201 Value::Null
1202 } else {
1203 get_props(i).cloned().unwrap_or(Value::Null)
1204 }
1205 })
1206 .collect()
1207 }
1208
1209 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1210 where
1211 F: Fn(usize) -> Option<&'a Value>,
1212 {
1213 let mut builder = Date32Builder::with_capacity(len);
1214 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1215
1216 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1217 let val = get_props(i);
1218 let days = if is_deleted || val.is_none() {
1219 Some(0)
1220 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1221 days_since_epoch,
1222 })) = val
1223 {
1224 Some(*days_since_epoch)
1225 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1226 Some(v as i32)
1227 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1228 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1229 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1230 Err(_) => None,
1231 }
1232 } else {
1233 None
1234 };
1235
1236 if is_deleted {
1237 builder.append_value(0);
1238 } else if let Some(v) = days {
1239 builder.append_value(v);
1240 } else {
1241 builder.append_null();
1242 }
1243 }
1244 Ok(Arc::new(builder.finish()))
1245 }
1246
1247 fn build_duration_column<F>(
1248 &self,
1249 len: usize,
1250 deleted: &[bool],
1251 get_props: F,
1252 ) -> Result<ArrayRef>
1253 where
1254 F: Fn(usize) -> Option<&'a Value>,
1255 {
1256 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1258 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1259 let raw_val = get_props(i);
1260 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1261 {
1262 let encoded = uni_common::cypher_value_codec::encode(val);
1263 builder.append_value(&encoded);
1264 } else if is_deleted {
1265 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1266 months: 0,
1267 days: 0,
1268 nanos: 0,
1269 });
1270 let encoded = uni_common::cypher_value_codec::encode(&zero);
1271 builder.append_value(&encoded);
1272 } else {
1273 builder.append_null();
1274 }
1275 }
1276 Ok(Arc::new(builder.finish()))
1277 }
1278
1279 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1280 where
1281 F: Fn(usize) -> Option<&'a Value>,
1282 {
1283 const ENCODED_LEN: i32 = 24;
1284 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1285 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1286 let raw_val = get_props(i);
1287 let btic = match raw_val {
1288 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1289 uni_btic::Btic::new(*lo, *hi, *meta)
1290 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1291 ),
1292 Some(Value::String(s)) => Some(
1293 uni_btic::parse::parse_btic_literal(s)
1294 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1295 ),
1296 _ => None,
1297 };
1298
1299 if let Some(b) = btic {
1300 builder.append_value(uni_btic::encode::encode(&b))?;
1301 } else if is_deleted {
1302 builder.append_value([0u8; ENCODED_LEN as usize])?;
1303 } else {
1304 builder.append_null();
1305 }
1306 }
1307 Ok(Arc::new(builder.finish()))
1308 }
1309
1310 fn build_float32_column<F>(
1311 &self,
1312 len: usize,
1313 deleted: &[bool],
1314 get_props: F,
1315 ) -> Result<ArrayRef>
1316 where
1317 F: Fn(usize) -> Option<&'a Value>,
1318 {
1319 let mut values = Vec::with_capacity(len);
1320 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1321 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1322 if val.is_none() && is_deleted {
1323 values.push(Some(0.0));
1324 } else {
1325 values.push(val);
1326 }
1327 }
1328 Ok(Arc::new(Float32Array::from(values)))
1329 }
1330
1331 fn build_float64_column<F>(
1332 &self,
1333 len: usize,
1334 deleted: &[bool],
1335 get_props: F,
1336 ) -> Result<ArrayRef>
1337 where
1338 F: Fn(usize) -> Option<&'a Value>,
1339 {
1340 let mut values = Vec::with_capacity(len);
1341 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1342 let val = get_props(i).and_then(|v| v.as_f64());
1343 if val.is_none() && is_deleted {
1344 values.push(Some(0.0));
1345 } else {
1346 values.push(val);
1347 }
1348 }
1349 Ok(Arc::new(Float64Array::from(values)))
1350 }
1351
1352 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1353 where
1354 F: Fn(usize) -> Option<&'a Value>,
1355 {
1356 let mut values = Vec::with_capacity(len);
1357 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1358 let val = get_props(i).and_then(|v| v.as_bool());
1359 if val.is_none() && is_deleted {
1360 values.push(Some(false));
1361 } else {
1362 values.push(val);
1363 }
1364 }
1365 Ok(Arc::new(BooleanArray::from(values)))
1366 }
1367
1368 fn build_vector_column<F>(
1369 &self,
1370 len: usize,
1371 deleted: &[bool],
1372 get_props: F,
1373 dimensions: usize,
1374 ) -> Result<ArrayRef>
1375 where
1376 F: Fn(usize) -> Option<&'a Value>,
1377 {
1378 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1379
1380 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1381 let val = get_props(i);
1382 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1383 for v in values {
1384 builder.values().append_value(v);
1385 }
1386 builder.append(valid);
1387 }
1388 Ok(Arc::new(builder.finish()))
1389 }
1390
1391 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1392 where
1393 F: Fn(usize) -> Option<&'a Value>,
1394 {
1395 let null_val = Value::Null;
1396 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1397 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1398 let val = get_props(i);
1399 let uni_val = if val.is_none() && is_deleted {
1400 &null_val
1401 } else {
1402 val.unwrap_or(&null_val)
1403 };
1404 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1406 builder.append_value(&cv_bytes);
1407 }
1408 Ok(Arc::new(builder.finish()))
1409 }
1410
1411 fn build_list_column<F>(
1412 &self,
1413 len: usize,
1414 deleted: &[bool],
1415 get_props: F,
1416 inner: &DataType,
1417 ) -> Result<ArrayRef>
1418 where
1419 F: Fn(usize) -> Option<&'a Value>,
1420 {
1421 match inner {
1422 DataType::String => {
1423 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1424 if let Some(s) = v.as_str() {
1425 b.append_value(s);
1426 } else {
1427 b.append_null();
1428 }
1429 })
1430 }
1431 DataType::Int64 => {
1432 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1433 if let Some(n) = v.as_i64() {
1434 b.append_value(n);
1435 } else {
1436 b.append_null();
1437 }
1438 })
1439 }
1440 DataType::Float64 => {
1441 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1442 if let Some(f) = v.as_f64() {
1443 b.append_value(f);
1444 } else {
1445 b.append_null();
1446 }
1447 })
1448 }
1449 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1450 }
1451 }
1452
1453 fn build_typed_list<F, B, A>(
1455 &self,
1456 len: usize,
1457 deleted: &[bool],
1458 get_props: &F,
1459 inner_builder: B,
1460 mut append_value: A,
1461 ) -> Result<ArrayRef>
1462 where
1463 F: Fn(usize) -> Option<&'a Value>,
1464 B: arrow_array::builder::ArrayBuilder,
1465 A: FnMut(&Value, &mut B),
1466 {
1467 let mut builder = ListBuilder::new(inner_builder);
1468 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1469 let val_array = get_props(i).and_then(|v| v.as_array());
1470 if val_array.is_none() && is_deleted {
1471 builder.append_null();
1472 } else if let Some(arr) = val_array {
1473 for v in arr {
1474 append_value(v, builder.values());
1475 }
1476 builder.append(true);
1477 } else {
1478 builder.append_null();
1479 }
1480 }
1481 Ok(Arc::new(builder.finish()))
1482 }
1483
1484 fn build_map_column<F>(
1485 &self,
1486 len: usize,
1487 deleted: &[bool],
1488 get_props: F,
1489 key: &DataType,
1490 value: &DataType,
1491 ) -> Result<ArrayRef>
1492 where
1493 F: Fn(usize) -> Option<&'a Value>,
1494 {
1495 if !matches!(key, DataType::String) {
1496 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1497 }
1498
1499 match value {
1500 DataType::String => self.build_typed_map(
1501 len,
1502 deleted,
1503 &get_props,
1504 StringBuilder::new(),
1505 arrow_schema::DataType::Utf8,
1506 |v, b: &mut StringBuilder| {
1507 if let Some(s) = v.as_str() {
1508 b.append_value(s);
1509 } else {
1510 b.append_null();
1511 }
1512 },
1513 ),
1514 DataType::Int64 => self.build_typed_map(
1515 len,
1516 deleted,
1517 &get_props,
1518 Int64Builder::new(),
1519 arrow_schema::DataType::Int64,
1520 |v, b: &mut Int64Builder| {
1521 if let Some(n) = v.as_i64() {
1522 b.append_value(n);
1523 } else {
1524 b.append_null();
1525 }
1526 },
1527 ),
1528 _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1529 }
1530 }
1531
1532 fn build_typed_map<F, B, A>(
1534 &self,
1535 len: usize,
1536 deleted: &[bool],
1537 get_props: &F,
1538 value_builder: B,
1539 value_arrow_type: arrow_schema::DataType,
1540 mut append_value: A,
1541 ) -> Result<ArrayRef>
1542 where
1543 F: Fn(usize) -> Option<&'a Value>,
1544 B: arrow_array::builder::ArrayBuilder,
1545 A: FnMut(&Value, &mut B),
1546 {
1547 let key_builder = Box::new(StringBuilder::new());
1548 let value_builder = Box::new(value_builder);
1549 let struct_builder = StructBuilder::new(
1550 vec![
1551 Field::new("key", arrow_schema::DataType::Utf8, false),
1552 Field::new("value", value_arrow_type, true),
1553 ],
1554 vec![key_builder, value_builder],
1555 );
1556 let mut builder = ListBuilder::new(struct_builder);
1557
1558 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1559 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1560 }
1561 Ok(Arc::new(builder.finish()))
1562 }
1563
1564 fn append_map_entry<B, A>(
1566 &self,
1567 builder: &mut ListBuilder<StructBuilder>,
1568 val: Option<&'a Value>,
1569 is_deleted: bool,
1570 append_value: &mut A,
1571 ) where
1572 B: arrow_array::builder::ArrayBuilder,
1573 A: FnMut(&Value, &mut B),
1574 {
1575 let val_obj = val.and_then(|v| v.as_object());
1576 if val_obj.is_none() && is_deleted {
1577 builder.append(false);
1578 } else if let Some(obj) = val_obj {
1579 let struct_b = builder.values();
1580 for (k, v) in obj {
1581 struct_b
1582 .field_builder::<StringBuilder>(0)
1583 .unwrap()
1584 .append_value(k);
1585 let value_b = struct_b.field_builder::<B>(1).unwrap();
1587 append_value(v, value_b);
1588 struct_b.append(true);
1589 }
1590 builder.append(true);
1591 } else {
1592 builder.append(false);
1593 }
1594 }
1595
1596 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1597 where
1598 F: Fn(usize) -> Option<&'a Value>,
1599 {
1600 let mut builder = BinaryBuilder::new();
1601 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1602 if is_deleted {
1603 builder.append_null();
1604 continue;
1605 }
1606 if let Some(val) = get_props(i) {
1607 let crdt_result = if let Some(s) = val.as_str() {
1610 serde_json::from_str::<Crdt>(s)
1611 } else {
1612 let json_val: serde_json::Value = val.clone().into();
1614 serde_json::from_value::<Crdt>(json_val)
1615 };
1616
1617 if let Ok(crdt) = crdt_result {
1618 if let Ok(bytes) = crdt.to_msgpack() {
1619 builder.append_value(&bytes);
1620 } else {
1621 builder.append_null();
1622 }
1623 } else {
1624 builder.append_null();
1625 }
1626 } else {
1627 builder.append_null();
1628 }
1629 }
1630 Ok(Arc::new(builder.finish()))
1631 }
1632}
1633
1634pub fn build_edge_column<'a>(
1636 name: &'a str,
1637 data_type: &'a DataType,
1638 len: usize,
1639 get_props: impl Fn(usize) -> Option<&'a Value>,
1640) -> Result<ArrayRef> {
1641 let deleted = vec![false; len];
1643 let extractor = PropertyExtractor::new(name, data_type);
1644 extractor.build_column(len, &deleted, get_props)
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649 use super::*;
1650 use arrow_array::{
1651 Array, DurationMicrosecondArray,
1652 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1653 };
1654 use std::collections::HashMap;
1655 use uni_common::TemporalValue;
1656 use uni_crdt::{Crdt, GCounter};
1657
1658 #[test]
1659 fn test_arrow_to_value_string() {
1660 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1661 assert_eq!(
1662 arrow_to_value(&arr, 0, None),
1663 Value::String("hello".to_string())
1664 );
1665 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1666 assert_eq!(
1667 arrow_to_value(&arr, 2, None),
1668 Value::String("world".to_string())
1669 );
1670 }
1671
1672 #[test]
1673 fn test_arrow_to_value_int64() {
1674 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1675 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1676 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1677 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1678 }
1679
1680 #[test]
1681 #[allow(clippy::approx_constant)]
1682 fn test_arrow_to_value_float64() {
1683 let arr = Float64Array::from(vec![Some(3.14), None]);
1684 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1685 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1686 }
1687
1688 #[test]
1689 fn test_arrow_to_value_bool() {
1690 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1691 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1692 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1693 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1694 }
1695
1696 #[test]
1697 fn test_values_to_array_int64() {
1698 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1699 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1700 assert_eq!(arr.len(), 4);
1701
1702 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1703 assert_eq!(int_arr.value(0), 1);
1704 assert_eq!(int_arr.value(1), 2);
1705 assert!(int_arr.is_null(2));
1706 assert_eq!(int_arr.value(3), 4);
1707 }
1708
1709 #[test]
1710 fn test_values_to_array_string() {
1711 let values = vec![
1712 Value::String("a".to_string()),
1713 Value::String("b".to_string()),
1714 Value::Null,
1715 ];
1716 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1717 assert_eq!(arr.len(), 3);
1718
1719 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1720 assert_eq!(str_arr.value(0), "a");
1721 assert_eq!(str_arr.value(1), "b");
1722 assert!(str_arr.is_null(2));
1723 }
1724
1725 #[test]
1726 fn test_property_extractor_string() {
1727 let props: Vec<HashMap<String, Value>> = vec![
1728 [("name".to_string(), Value::String("Alice".to_string()))]
1729 .into_iter()
1730 .collect(),
1731 [("name".to_string(), Value::String("Bob".to_string()))]
1732 .into_iter()
1733 .collect(),
1734 HashMap::new(),
1735 ];
1736 let deleted = vec![false, false, true];
1737
1738 let extractor = PropertyExtractor::new("name", &DataType::String);
1739 let arr = extractor
1740 .build_column(3, &deleted, |i| props[i].get("name"))
1741 .unwrap();
1742
1743 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1744 assert_eq!(str_arr.value(0), "Alice");
1745 assert_eq!(str_arr.value(1), "Bob");
1746 assert_eq!(str_arr.value(2), ""); }
1748
1749 #[test]
1750 fn test_property_extractor_int64() {
1751 let props: Vec<HashMap<String, Value>> = vec![
1752 [("age".to_string(), Value::Int(25))].into_iter().collect(),
1753 [("age".to_string(), Value::Int(30))].into_iter().collect(),
1754 HashMap::new(),
1755 ];
1756 let deleted = vec![false, false, true];
1757
1758 let extractor = PropertyExtractor::new("age", &DataType::Int64);
1759 let arr = extractor
1760 .build_column(3, &deleted, |i| props[i].get("age"))
1761 .unwrap();
1762
1763 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1764 assert_eq!(int_arr.value(0), 25);
1765 assert_eq!(int_arr.value(1), 30);
1766 assert_eq!(int_arr.value(2), 0); }
1768
1769 #[test]
1770 fn test_arrow_to_value_time64() {
1771 let mut builder = Time64MicrosecondBuilder::new();
1773 builder.append_value(37_845_000_000);
1775 builder.append_value(0);
1777 builder.append_value(86_399_123_456);
1779 builder.append_null();
1780
1781 let arr = builder.finish();
1782 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1784 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1785 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1786 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1787 }
1788
1789 #[test]
1790 fn test_arrow_to_value_duration() {
1791 let arr = DurationMicrosecondArray::from(vec![
1794 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
1798 ]);
1799
1800 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1801 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1802 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1803 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1804 }
1805
1806 #[test]
1807 fn test_arrow_to_value_binary_crdt() {
1808 let mut builder = BinaryBuilder::new();
1810
1811 let mut counter = GCounter::new();
1813 counter.increment("actor1", 5);
1814 let crdt = Crdt::GCounter(counter);
1815 let bytes = crdt.to_msgpack().unwrap();
1816 builder.append_value(&bytes);
1817
1818 builder.append_null();
1820
1821 let arr = builder.finish();
1822
1823 let result = arrow_to_value(&arr, 0, None);
1825 assert!(result.as_object().is_some());
1826 let obj = result.as_object().unwrap();
1827 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1829
1830 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1832 }
1833
1834 #[test]
1835 fn test_datetime_struct_encode_decode_roundtrip() {
1836 let values = vec![
1838 Value::Temporal(TemporalValue::DateTime {
1839 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
1842 }),
1843 Value::Temporal(TemporalValue::DateTime {
1844 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
1847 }),
1848 Value::Temporal(TemporalValue::DateTime {
1849 nanos_since_epoch: 0, offset_seconds: 0,
1851 timezone_name: Some("UTC".to_string()),
1852 }),
1853 ];
1854
1855 let arr_ref = values_to_datetime_struct_array(&values);
1857 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1858 assert_eq!(arr.len(), 3);
1859
1860 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1862 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1863 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1864
1865 assert_eq!(decoded_0, values[0]);
1867 assert_eq!(decoded_1, values[1]);
1868 assert_eq!(decoded_2, values[2]);
1869
1870 if let Value::Temporal(TemporalValue::DateTime {
1872 nanos_since_epoch,
1873 offset_seconds,
1874 timezone_name,
1875 }) = decoded_0
1876 {
1877 assert_eq!(nanos_since_epoch, 441763200000000000);
1878 assert_eq!(offset_seconds, 3600);
1879 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1880 } else {
1881 panic!("Expected DateTime value");
1882 }
1883 }
1884
1885 #[test]
1886 fn test_datetime_struct_null_handling() {
1887 let values = vec![
1889 Value::Temporal(TemporalValue::DateTime {
1890 nanos_since_epoch: 441763200000000000,
1891 offset_seconds: 3600,
1892 timezone_name: Some("Europe/Paris".to_string()),
1893 }),
1894 Value::Null,
1895 Value::Temporal(TemporalValue::DateTime {
1896 nanos_since_epoch: 0,
1897 offset_seconds: 0,
1898 timezone_name: None,
1899 }),
1900 ];
1901
1902 let arr_ref = values_to_datetime_struct_array(&values);
1903 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1904 assert_eq!(arr.len(), 3);
1905
1906 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1908 assert_eq!(decoded_0, values[0]);
1909
1910 assert!(arr.is_null(1));
1912 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1913 assert_eq!(decoded_1, Value::Null);
1914
1915 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1917 assert_eq!(decoded_2, values[2]);
1918 }
1919
1920 #[test]
1921 fn test_datetime_struct_boundary_values() {
1922 let values = vec![
1924 Value::Temporal(TemporalValue::DateTime {
1925 nanos_since_epoch: 441763200000000000,
1926 offset_seconds: 0, timezone_name: None,
1928 }),
1929 Value::Temporal(TemporalValue::DateTime {
1930 nanos_since_epoch: 441763200000000000,
1931 offset_seconds: 43200, timezone_name: None,
1933 }),
1934 Value::Temporal(TemporalValue::DateTime {
1935 nanos_since_epoch: 441763200000000000,
1936 offset_seconds: -43200, timezone_name: None,
1938 }),
1939 ];
1940
1941 let arr_ref = values_to_datetime_struct_array(&values);
1942 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1943 assert_eq!(arr.len(), 3);
1944
1945 for (i, expected) in values.iter().enumerate() {
1947 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
1948 assert_eq!(&decoded, expected);
1949 }
1950 }
1951
1952 #[test]
1953 fn test_datetime_old_schema_migration() {
1954 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
1956 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
1959
1960 let arr = builder.finish();
1961
1962 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
1964 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
1965 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
1966
1967 if let Value::Temporal(TemporalValue::DateTime {
1969 nanos_since_epoch,
1970 offset_seconds,
1971 timezone_name,
1972 }) = decoded_0
1973 {
1974 assert_eq!(nanos_since_epoch, 441763200000000000);
1975 assert_eq!(offset_seconds, 0);
1976 assert_eq!(timezone_name, Some("UTC".to_string()));
1977 } else {
1978 panic!("Expected DateTime value");
1979 }
1980
1981 assert_eq!(decoded_2, Value::Null);
1983 }
1984
1985 #[test]
1986 fn test_time_struct_encode_decode_roundtrip() {
1987 let values = vec![
1989 Value::Temporal(TemporalValue::Time {
1990 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
1993 Value::Temporal(TemporalValue::Time {
1994 nanos_since_midnight: 0, offset_seconds: 0,
1996 }),
1997 Value::Temporal(TemporalValue::Time {
1998 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2001 ];
2002
2003 let arr_ref = values_to_time_struct_array(&values);
2005 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2006 assert_eq!(arr.len(), 3);
2007
2008 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2010 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2011 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2012
2013 assert_eq!(decoded_0, values[0]);
2015 assert_eq!(decoded_1, values[1]);
2016 assert_eq!(decoded_2, values[2]);
2017
2018 if let Value::Temporal(TemporalValue::Time {
2020 nanos_since_midnight,
2021 offset_seconds,
2022 }) = decoded_0
2023 {
2024 assert_eq!(nanos_since_midnight, 37845000000000);
2025 assert_eq!(offset_seconds, 3600);
2026 } else {
2027 panic!("Expected Time value");
2028 }
2029 }
2030
2031 #[test]
2032 fn test_time_struct_null_handling() {
2033 let values = vec![
2035 Value::Temporal(TemporalValue::Time {
2036 nanos_since_midnight: 37845000000000,
2037 offset_seconds: 3600,
2038 }),
2039 Value::Null,
2040 Value::Temporal(TemporalValue::Time {
2041 nanos_since_midnight: 0,
2042 offset_seconds: 0,
2043 }),
2044 ];
2045
2046 let arr_ref = values_to_time_struct_array(&values);
2047 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2048 assert_eq!(arr.len(), 3);
2049
2050 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2052 assert_eq!(decoded_0, values[0]);
2053
2054 assert!(arr.is_null(1));
2056 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2057 assert_eq!(decoded_1, Value::Null);
2058
2059 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2061 assert_eq!(decoded_2, values[2]);
2062 }
2063
2064 #[test]
2067 fn test_extract_vector_f32_values_valid_vector() {
2068 let v = vec![1.0, 2.0, 3.0];
2069 let val = Value::Vector(v.clone());
2070 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2071 assert_eq!(result, v);
2072 assert!(valid);
2073 }
2074
2075 #[test]
2076 fn test_extract_vector_f32_values_vector_wrong_dims() {
2077 let v = vec![1.0, 2.0];
2078 let val = Value::Vector(v);
2079 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2080 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2081 assert!(!valid);
2082 }
2083
2084 #[test]
2085 fn test_extract_vector_f32_values_valid_list() {
2086 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2087 let val = Value::List(v);
2088 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2089 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2090 assert!(valid);
2091 }
2092
2093 #[test]
2094 fn test_extract_vector_f32_values_list_wrong_dims() {
2095 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2096 let val = Value::List(v);
2097 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2098 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2099 assert!(!valid);
2100 }
2101
2102 #[test]
2103 fn test_extract_vector_f32_values_list_int_coercion() {
2104 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2105 let val = Value::List(v);
2106 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2107 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2108 assert!(valid);
2109 }
2110
2111 #[test]
2112 fn test_extract_vector_f32_values_none() {
2113 let (result, valid) = extract_vector_f32_values(None, false, 3);
2114 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2115 assert!(!valid);
2116 }
2117
2118 #[test]
2119 fn test_extract_vector_f32_values_null() {
2120 let val = Value::Null;
2121 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2122 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2123 assert!(!valid);
2124 }
2125
2126 #[test]
2127 fn test_extract_vector_f32_values_unsupported_type() {
2128 let val = Value::String("not a vector".to_string());
2129 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2130 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2131 assert!(!valid);
2132 }
2133
2134 #[test]
2135 fn test_extract_vector_f32_values_deleted_with_none() {
2136 let (result, valid) = extract_vector_f32_values(None, true, 3);
2137 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2138 assert!(valid); }
2140
2141 #[test]
2142 fn test_extract_vector_f32_values_deleted_with_null() {
2143 let val = Value::Null;
2144 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2145 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2146 assert!(valid); }
2148
2149 #[test]
2152 fn test_values_to_fixed_size_list_vector_with_nulls() {
2153 let values = vec![
2154 Value::Vector(vec![1.0, 2.0]),
2155 Value::Null,
2156 Value::Vector(vec![3.0, 4.0]),
2157 Value::String("invalid".to_string()),
2158 ];
2159 let arr_ref = values_to_array(
2160 &values,
2161 &ArrowDataType::FixedSizeList(
2162 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2163 2,
2164 ),
2165 )
2166 .unwrap();
2167
2168 let arr = arr_ref
2169 .as_any()
2170 .downcast_ref::<FixedSizeListArray>()
2171 .unwrap();
2172
2173 assert_eq!(arr.len(), 4);
2174 assert!(arr.is_valid(0));
2175 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2177 assert!(!arr.is_valid(3)); }
2179
2180 #[test]
2181 fn test_values_to_fixed_size_list_from_list() {
2182 let values = vec![
2183 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2184 Value::List(vec![Value::Int(3), Value::Int(4)]),
2185 ];
2186 let arr_ref = values_to_array(
2187 &values,
2188 &ArrowDataType::FixedSizeList(
2189 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2190 2,
2191 ),
2192 )
2193 .unwrap();
2194
2195 let arr = arr_ref
2196 .as_any()
2197 .downcast_ref::<FixedSizeListArray>()
2198 .unwrap();
2199
2200 assert_eq!(arr.len(), 2);
2201 assert!(arr.is_valid(0));
2202 assert!(arr.is_valid(1));
2203
2204 let child = arr
2206 .values()
2207 .as_any()
2208 .downcast_ref::<Float32Array>()
2209 .unwrap();
2210 assert_eq!(child.value(0), 1.0);
2211 assert_eq!(child.value(1), 2.0);
2212 assert_eq!(child.value(2), 3.0);
2213 assert_eq!(child.value(3), 4.0);
2214 }
2215
2216 #[test]
2217 fn test_values_to_fixed_size_list_wrong_dimensions() {
2218 let values = vec![
2219 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2222 let arr_ref = values_to_array(
2223 &values,
2224 &ArrowDataType::FixedSizeList(
2225 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2226 2,
2227 ),
2228 )
2229 .unwrap();
2230
2231 let arr = arr_ref
2232 .as_any()
2233 .downcast_ref::<FixedSizeListArray>()
2234 .unwrap();
2235
2236 assert_eq!(arr.len(), 2);
2237 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2242 .values()
2243 .as_any()
2244 .downcast_ref::<Float32Array>()
2245 .unwrap();
2246 assert_eq!(child.value(0), 0.0);
2247 assert_eq!(child.value(1), 0.0);
2248 assert_eq!(child.value(2), 0.0);
2249 assert_eq!(child.value(3), 0.0);
2250 }
2251
2252 #[test]
2253 fn test_values_to_fixed_size_list_all_nulls() {
2254 let values = vec![Value::Null, Value::Null, Value::Null];
2255 let arr_ref = values_to_array(
2256 &values,
2257 &ArrowDataType::FixedSizeList(
2258 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2259 3,
2260 ),
2261 )
2262 .unwrap();
2263
2264 let arr = arr_ref
2265 .as_any()
2266 .downcast_ref::<FixedSizeListArray>()
2267 .unwrap();
2268
2269 assert_eq!(arr.len(), 3);
2270 assert!(!arr.is_valid(0));
2271 assert!(!arr.is_valid(1));
2272 assert!(!arr.is_valid(2));
2273
2274 let child = arr
2276 .values()
2277 .as_any()
2278 .downcast_ref::<Float32Array>()
2279 .unwrap();
2280 assert_eq!(child.len(), 9);
2281 }
2282
2283 #[test]
2284 fn test_values_to_fixed_size_list_mixed_types() {
2285 let values = vec![
2286 Value::Vector(vec![1.0, 2.0]),
2287 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2288 Value::Null,
2289 Value::String("invalid".to_string()),
2290 ];
2291 let arr_ref = values_to_array(
2292 &values,
2293 &ArrowDataType::FixedSizeList(
2294 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2295 2,
2296 ),
2297 )
2298 .unwrap();
2299
2300 let arr = arr_ref
2301 .as_any()
2302 .downcast_ref::<FixedSizeListArray>()
2303 .unwrap();
2304
2305 assert_eq!(arr.len(), 4);
2306 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2313 .values()
2314 .as_any()
2315 .downcast_ref::<Float32Array>()
2316 .unwrap();
2317 assert_eq!(child.value(0), 1.0);
2318 assert_eq!(child.value(1), 2.0);
2319 assert_eq!(child.value(2), 3.0);
2320 assert_eq!(child.value(3), 4.0);
2321 }
2322
2323 #[test]
2326 fn test_build_vector_column_with_nulls_and_deleted() {
2327 let data_type = DataType::Vector { dimensions: 3 };
2328 let extractor = PropertyExtractor::new("test_vec", &data_type);
2329
2330 let props = [
2331 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2332 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2335 ];
2336 let deleted = [false, false, false, true]; let arr_ref = extractor
2339 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2340 .unwrap();
2341
2342 let arr = arr_ref
2343 .as_any()
2344 .downcast_ref::<FixedSizeListArray>()
2345 .unwrap();
2346
2347 assert_eq!(arr.len(), 4);
2348 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2355 .values()
2356 .as_any()
2357 .downcast_ref::<Float32Array>()
2358 .unwrap();
2359 assert_eq!(child.value(0), 1.0);
2360 assert_eq!(child.value(1), 2.0);
2361 assert_eq!(child.value(2), 3.0);
2362 assert_eq!(child.value(9), 0.0);
2366 assert_eq!(child.value(10), 0.0);
2367 assert_eq!(child.value(11), 0.0);
2368 }
2369
2370 #[test]
2371 fn test_build_vector_column_with_list_input() {
2372 let data_type = DataType::Vector { dimensions: 2 };
2373 let extractor = PropertyExtractor::new("test_vec", &data_type);
2374
2375 let props = [
2376 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2377 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2378 Some(Value::Vector(vec![5.0, 6.0])),
2379 ];
2380 let deleted = [false, false, false];
2381
2382 let arr_ref = extractor
2383 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2384 .unwrap();
2385
2386 let arr = arr_ref
2387 .as_any()
2388 .downcast_ref::<FixedSizeListArray>()
2389 .unwrap();
2390
2391 assert_eq!(arr.len(), 3);
2392 assert!(arr.is_valid(0));
2393 assert!(arr.is_valid(1));
2394 assert!(arr.is_valid(2));
2395
2396 let child = arr
2398 .values()
2399 .as_any()
2400 .downcast_ref::<Float32Array>()
2401 .unwrap();
2402 assert_eq!(child.value(0), 1.0);
2403 assert_eq!(child.value(1), 2.0);
2404 assert_eq!(child.value(2), 3.0);
2405 assert_eq!(child.value(3), 4.0);
2406 assert_eq!(child.value(4), 5.0);
2407 assert_eq!(child.value(5), 6.0);
2408 }
2409}