1use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12 BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13 FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14 Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15 StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16 UInt32Builder, UInt64Builder,
17};
18use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21 IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22 Time64NanosecondArray, TimestampNanosecondArray, UInt32Array, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33fn build_timestamp_column_from_id_map<K, I>(
38 ids: I,
39 timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42 K: Eq + std::hash::Hash,
43 I: IntoIterator<Item = K>,
44{
45 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46 for id in ids {
47 match timestamps.and_then(|m| m.get(&id)) {
48 Some(&ts) => builder.append_value(ts),
49 None => builder.append_null(),
50 }
51 }
52 Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56 ids: I,
57 timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60 I: IntoIterator<Item = Vid>,
61{
62 build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66 ids: I,
67 timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70 I: IntoIterator<Item = Eid>,
71{
72 build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80 I: IntoIterator<Item = Option<i64>>,
81{
82 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83 for ts in timestamps {
84 builder.append_option(ts);
85 }
86 Arc::new(builder.finish())
87}
88
89pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95 if list_arr.is_null(row) {
96 return Vec::new();
97 }
98 let values = list_arr.value(row);
99 let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100 return Vec::new();
101 };
102 (0..str_arr.len())
103 .filter(|&j| !str_arr.is_null(j))
104 .map(|j| str_arr.value(j).to_string())
105 .collect()
106}
107
108fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113 chrono::DateTime::parse_from_rfc3339(s)
114 .map(|dt| {
115 dt.with_timezone(&chrono::Utc)
116 .timestamp_nanos_opt()
117 .unwrap_or(0)
118 })
119 .or_else(|_| {
120 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122 })
123 .or_else(|_| {
124 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126 })
127 .or_else(|_| {
128 chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129 dt.with_timezone(&chrono::Utc)
130 .timestamp_nanos_opt()
131 .unwrap_or(0)
132 })
133 })
134 .ok()
135 .or_else(|| {
136 s.strip_suffix('Z')
137 .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138 .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139 })
140}
141
142pub(crate) fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148 let structs = arr.as_any().downcast_ref::<StructArray>()?;
149 let fields = structs.fields();
150 if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151 return None;
152 }
153 let value_hint = raw_bytes_hint(fields[1].metadata());
156 let key_col = structs.column(0);
157 let val_col = structs.column(1);
158 let mut map = HashMap::new();
159 for i in 0..structs.len() {
160 if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
161 map.insert(k, arrow_to_value(val_col.as_ref(), i, value_hint));
162 }
163 }
164 Some(map)
165}
166
167fn array_to_value_list(arr: &ArrayRef, elem_type: Option<&DataType>) -> Vec<Value> {
172 (0..arr.len())
173 .map(|i| arrow_to_value(arr.as_ref(), i, elem_type))
174 .collect()
175}
176
177fn raw_bytes_hint(metadata: &HashMap<String, String>) -> Option<&'static DataType> {
181 if metadata.get("uni_raw_bytes").is_some_and(|v| v == "true") {
182 Some(&DataType::Bytes)
183 } else {
184 None
185 }
186}
187
188fn list_child_bytes_hint(dt: &ArrowDataType) -> Option<&'static DataType> {
190 match dt {
191 ArrowDataType::List(f)
192 | ArrowDataType::LargeList(f)
193 | ArrowDataType::FixedSizeList(f, _) => raw_bytes_hint(f.metadata()),
194 _ => None,
195 }
196}
197
198pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
205 if col.is_null(row) {
206 return Value::Null;
207 }
208
209 if let Some(dt) = data_type {
211 match dt {
212 DataType::DateTime => {
213 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
215 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
216 struct_arr.column_by_name("nanos_since_epoch"),
217 struct_arr.column_by_name("offset_seconds"),
218 struct_arr.column_by_name("timezone_name"),
219 )
220 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
221 nanos_col
222 .as_any()
223 .downcast_ref::<TimestampNanosecondArray>(),
224 offset_col.as_any().downcast_ref::<Int32Array>(),
225 tz_col.as_any().downcast_ref::<StringArray>(),
226 )
227 {
228 if nanos_arr.is_null(row) {
229 return Value::Null;
230 }
231 let nanos = nanos_arr.value(row);
232 if offset_arr.is_null(row) {
233 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
235 nanos_since_epoch: nanos,
236 });
237 }
238 let offset = offset_arr.value(row);
239 let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
240 return Value::Temporal(uni_common::TemporalValue::DateTime {
241 nanos_since_epoch: nanos,
242 offset_seconds: offset,
243 timezone_name: tz_name,
244 });
245 }
246 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
248 let nanos = ts.value(row);
249 let tz_name = ts.timezone().map(|s| s.to_string());
250 return Value::Temporal(uni_common::TemporalValue::DateTime {
251 nanos_since_epoch: nanos,
252 offset_seconds: 0,
253 timezone_name: tz_name,
254 });
255 }
256 }
257 DataType::Time => {
258 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
260 && let (Some(nanos_col), Some(offset_col)) = (
261 struct_arr.column_by_name("nanos_since_midnight"),
262 struct_arr.column_by_name("offset_seconds"),
263 )
264 && let (Some(nanos_arr), Some(offset_arr)) = (
265 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
266 offset_col.as_any().downcast_ref::<Int32Array>(),
267 )
268 {
269 if nanos_arr.is_null(row) || offset_arr.is_null(row) {
271 return Value::Null;
272 }
273 let nanos = nanos_arr.value(row);
274 let offset = offset_arr.value(row);
275 return Value::Temporal(uni_common::TemporalValue::Time {
276 nanos_since_midnight: nanos,
277 offset_seconds: offset,
278 });
279 }
280 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
282 let nanos = t.value(row);
283 return Value::Temporal(uni_common::TemporalValue::Time {
284 nanos_since_midnight: nanos,
285 offset_seconds: 0,
286 });
287 }
288 }
289 DataType::Bytes => {
290 let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
291 log::warn!("Bytes column is not LargeBinaryArray");
292 return Value::Null;
293 };
294 if arr.is_null(row) {
295 return Value::Null;
296 }
297 return Value::Bytes(arr.value(row).to_vec());
298 }
299 DataType::Btic => {
300 let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
301 log::warn!("BTIC column is not FixedSizeBinaryArray");
302 return Value::Null;
303 };
304 let bytes = fsb.value(row);
305 return match uni_btic::encode::decode_slice(bytes) {
306 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
307 lo: btic.lo(),
308 hi: btic.hi(),
309 meta: btic.meta(),
310 }),
311 Err(e) => {
312 log::warn!("BTIC decode error: {}", e);
313 Value::Null
314 }
315 };
316 }
317 DataType::SparseVector { .. } => {
318 let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>() else {
319 log::warn!("SparseVector column is not StructArray");
320 return Value::Null;
321 };
322 if struct_arr.is_null(row) {
323 return Value::Null;
324 }
325 let (Some(indices_list), Some(values_list)) = (
326 struct_arr
327 .column_by_name("indices")
328 .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
329 struct_arr
330 .column_by_name("values")
331 .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
332 ) else {
333 log::warn!("SparseVector struct missing indices/values list columns");
334 return Value::Null;
335 };
336 let idx_vals = indices_list.value(row);
337 let Some(idx_arr) = idx_vals.as_any().downcast_ref::<UInt32Array>() else {
338 log::warn!("SparseVector 'indices' inner not UInt32");
339 return Value::Null;
340 };
341 let w_vals = values_list.value(row);
342 let Some(w_arr) = w_vals.as_any().downcast_ref::<Float32Array>() else {
343 log::warn!("SparseVector 'values' inner not Float32");
344 return Value::Null;
345 };
346 let indices: Vec<u32> = (0..idx_arr.len()).map(|i| idx_arr.value(i)).collect();
347 let values: Vec<f32> = (0..w_arr.len()).map(|i| w_arr.value(i)).collect();
348 return Value::SparseVector { indices, values };
349 }
350 _ => {}
351 }
352 }
353
354 if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
356 return Value::String(s.value(row).to_string());
357 }
358
359 if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
361 return Value::Int(u.value(row) as i64);
362 }
363 if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
364 return Value::Int(i.value(row));
365 }
366 if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
367 return Value::Int(i.value(row) as i64);
368 }
369
370 if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
372 return Value::Float(f.value(row));
373 }
374 if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
375 return Value::Float(f.value(row) as f64);
376 }
377
378 if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
380 return Value::Bool(b.value(row));
381 }
382
383 if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
390 let inner = list.value(row);
391 if let Some(floats) = inner.as_any().downcast_ref::<Float32Array>() {
392 return Value::Vector((0..floats.len()).map(|i| floats.value(i)).collect());
393 }
394 let elem_hint = list_child_bytes_hint(list.data_type());
395 return Value::List(array_to_value_list(&inner, elem_hint));
396 }
397
398 if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
400 let arr = list.value(row);
401
402 if let Some(obj) = try_reconstruct_map(&arr) {
404 return Value::Map(obj);
405 }
406
407 let elem_hint = list_child_bytes_hint(list.data_type());
408 return Value::List(array_to_value_list(&arr, elem_hint));
409 }
410
411 if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
413 let elem_hint = list_child_bytes_hint(list.data_type());
414 return Value::List(array_to_value_list(&list.value(row), elem_hint));
415 }
416
417 if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
419 if schema::is_sparse_vector_struct(col.data_type()) {
423 if s.is_null(row) {
424 return Value::Null;
425 }
426 if let (Some(idx_list), Some(val_list)) = (
427 s.column_by_name("indices")
428 .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
429 s.column_by_name("values")
430 .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
431 ) {
432 let idx_vals = idx_list.value(row);
433 let val_vals = val_list.value(row);
434 if let (Some(ia), Some(va)) = (
435 idx_vals.as_any().downcast_ref::<UInt32Array>(),
436 val_vals.as_any().downcast_ref::<Float32Array>(),
437 ) {
438 let indices = (0..ia.len()).map(|i| ia.value(i)).collect();
439 let values = (0..va.len()).map(|i| va.value(i)).collect();
440 return Value::SparseVector { indices, values };
441 }
442 }
443 }
444
445 let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
446
447 if field_names.contains(&"nanos_since_epoch")
449 && field_names.contains(&"offset_seconds")
450 && field_names.contains(&"timezone_name")
451 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
452 s.column_by_name("nanos_since_epoch"),
453 s.column_by_name("offset_seconds"),
454 s.column_by_name("timezone_name"),
455 )
456 {
457 let nanos_opt = nanos_col
459 .as_any()
460 .downcast_ref::<TimestampNanosecondArray>()
461 .map(|a| {
462 if a.is_null(row) {
463 None
464 } else {
465 Some(a.value(row))
466 }
467 })
468 .or_else(|| {
469 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
470 if a.is_null(row) {
471 None
472 } else {
473 Some(a.value(row))
474 }
475 })
476 });
477 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
478 if a.is_null(row) {
479 None
480 } else {
481 Some(a.value(row))
482 }
483 });
484
485 if let Some(Some(nanos)) = nanos_opt {
486 match offset_opt {
487 Some(Some(offset)) => {
488 let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
489 if a.is_null(row) {
490 None
491 } else {
492 Some(a.value(row).to_string())
493 }
494 });
495 return Value::Temporal(uni_common::TemporalValue::DateTime {
496 nanos_since_epoch: nanos,
497 offset_seconds: offset,
498 timezone_name: tz_name,
499 });
500 }
501 _ => {
502 return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
504 nanos_since_epoch: nanos,
505 });
506 }
507 }
508 }
509 }
510
511 if field_names.contains(&"nanos_since_midnight")
513 && field_names.contains(&"offset_seconds")
514 && let (Some(nanos_col), Some(offset_col)) = (
515 s.column_by_name("nanos_since_midnight"),
516 s.column_by_name("offset_seconds"),
517 )
518 {
519 let nanos_opt = nanos_col
521 .as_any()
522 .downcast_ref::<Time64NanosecondArray>()
523 .map(|a| {
524 if a.is_null(row) {
525 None
526 } else {
527 Some(a.value(row))
528 }
529 })
530 .or_else(|| {
531 nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
532 if a.is_null(row) {
533 None
534 } else {
535 Some(a.value(row))
536 }
537 })
538 });
539 let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
540 if a.is_null(row) {
541 None
542 } else {
543 Some(a.value(row))
544 }
545 });
546
547 if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
548 return Value::Temporal(uni_common::TemporalValue::Time {
549 nanos_since_midnight: nanos,
550 offset_seconds: offset,
551 });
552 }
553 }
554
555 let mut map = HashMap::new();
557 for (field, child) in s.fields().iter().zip(s.columns()) {
558 map.insert(
559 field.name().clone(),
560 arrow_to_value(child.as_ref(), row, None),
561 );
562 }
563 return Value::Map(map);
564 }
565
566 if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
568 let days = d.value(row);
569 return Value::Temporal(uni_common::TemporalValue::Date {
570 days_since_epoch: days,
571 });
572 }
573
574 if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
576 let nanos = ts.value(row);
577 return match ts.timezone() {
578 Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
579 nanos_since_epoch: nanos,
580 offset_seconds: 0,
581 timezone_name: Some(tz.to_string()),
582 }),
583 None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
584 nanos_since_epoch: nanos,
585 }),
586 };
587 }
588
589 if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
591 let nanos = t.value(row);
592 return Value::Temporal(uni_common::TemporalValue::LocalTime {
593 nanos_since_midnight: nanos,
594 });
595 }
596
597 if let Some(t) = col
599 .as_any()
600 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
601 {
602 let micros = t.value(row);
603 return Value::Temporal(uni_common::TemporalValue::LocalTime {
604 nanos_since_midnight: micros * 1000,
605 });
606 }
607
608 if let Some(d) = col
610 .as_any()
611 .downcast_ref::<arrow_array::DurationMicrosecondArray>()
612 {
613 let micros = d.value(row);
614 let total_nanos = micros * 1000;
615 let seconds = total_nanos / 1_000_000_000;
616 let remaining_nanos = total_nanos % 1_000_000_000;
617 return Value::Temporal(uni_common::TemporalValue::Duration {
618 months: 0,
619 days: 0,
620 nanos: seconds * 1_000_000_000 + remaining_nanos,
621 });
622 }
623
624 if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
626 let val = interval.value(row);
627 return Value::Temporal(uni_common::TemporalValue::Duration {
628 months: val.months as i64,
629 days: val.days as i64,
630 nanos: val.nanoseconds,
631 });
632 }
633
634 if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
636 let bytes = b.value(row);
637 if bytes.is_empty() {
638 return Value::Null;
639 }
640 return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
641 eprintln!("CypherValue decode error: {}", e);
642 Value::Null
643 });
644 }
645
646 if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
648 && fsb.value_length() == 24
649 {
650 let bytes = fsb.value(row);
651 return match uni_btic::encode::decode_slice(bytes) {
652 Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
653 lo: btic.lo(),
654 hi: btic.hi(),
655 meta: btic.meta(),
656 }),
657 Err(e) => {
658 log::warn!("BTIC decode error: {}", e);
659 Value::Null
660 }
661 };
662 }
663
664 if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
666 let bytes = b.value(row);
667 return Crdt::from_msgpack(bytes)
668 .ok()
669 .and_then(|crdt| serde_json::to_value(&crdt).ok())
670 .map(Value::from)
671 .unwrap_or(Value::Null);
672 }
673
674 Value::Null
676}
677
678fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
679 let mut builder = UInt64Builder::with_capacity(values.len());
680 for v in values {
681 if let Some(n) = v.as_u64() {
682 builder.append_value(n);
683 } else {
684 builder.append_null();
685 }
686 }
687 Arc::new(builder.finish())
688}
689
690fn values_to_int64_array(values: &[Value]) -> ArrayRef {
691 let mut builder = Int64Builder::with_capacity(values.len());
692 for v in values {
693 if let Some(n) = v.as_i64() {
694 builder.append_value(n);
695 } else {
696 builder.append_null();
697 }
698 }
699 Arc::new(builder.finish())
700}
701
702fn values_to_int32_array(values: &[Value]) -> ArrayRef {
703 let mut builder = Int32Builder::with_capacity(values.len());
704 for v in values {
705 if let Some(n) = v.as_i64() {
706 builder.append_value(n as i32);
707 } else {
708 builder.append_null();
709 }
710 }
711 Arc::new(builder.finish())
712}
713
714fn values_to_string_array(values: &[Value]) -> ArrayRef {
715 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
716 for v in values {
717 if let Some(s) = v.as_str() {
718 builder.append_value(s);
719 } else if v.is_null() {
720 builder.append_null();
721 } else {
722 builder.append_value(v.to_string());
723 }
724 }
725 Arc::new(builder.finish())
726}
727
728fn values_to_bool_array(values: &[Value]) -> ArrayRef {
729 let mut builder = BooleanBuilder::with_capacity(values.len());
730 for v in values {
731 if let Some(b) = v.as_bool() {
732 builder.append_value(b);
733 } else {
734 builder.append_null();
735 }
736 }
737 Arc::new(builder.finish())
738}
739
740fn values_to_float32_array(values: &[Value]) -> ArrayRef {
741 let mut builder = Float32Builder::with_capacity(values.len());
742 for v in values {
743 if let Some(n) = v.as_f64() {
744 builder.append_value(n as f32);
745 } else {
746 builder.append_null();
747 }
748 }
749 Arc::new(builder.finish())
750}
751
752fn values_to_float64_array(values: &[Value]) -> ArrayRef {
753 let mut builder = Float64Builder::with_capacity(values.len());
754 for v in values {
755 if let Some(n) = v.as_f64() {
756 builder.append_value(n);
757 } else {
758 builder.append_null();
759 }
760 }
761 Arc::new(builder.finish())
762}
763
764fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
765 let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
766 for v in values {
767 match v {
768 Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
769 let btic = uni_btic::Btic::new(*lo, *hi, *meta)
770 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
771 builder.append_value(uni_btic::encode::encode(&btic))?;
772 }
773 Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
774 Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
775 Err(_) => builder.append_null(),
776 },
777 Value::List(bytes) => {
778 let b: Vec<u8> = bytes
779 .iter()
780 .map(|bv| bv.as_u64().unwrap_or(0) as u8)
781 .collect();
782 if b.len() as i32 == size {
783 builder.append_value(&b)?;
784 } else {
785 builder.append_null();
786 }
787 }
788 _ => builder.append_null(),
789 }
790 }
791 Ok(Arc::new(builder.finish()))
792}
793
794pub fn extract_vector_f32_values(
809 val: Option<&Value>,
810 is_deleted: bool,
811 dimensions: usize,
812) -> (Vec<f32>, bool) {
813 let zeros = || vec![0.0_f32; dimensions];
814
815 if is_deleted {
817 return (zeros(), true);
818 }
819
820 match val {
821 Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
823 Some(Value::Vector(_)) => (zeros(), false), Some(Value::List(arr)) if arr.len() == dimensions => {
826 let values: Vec<f32> = arr
827 .iter()
828 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
829 .collect();
830 (values, true)
831 }
832 Some(Value::List(_)) => (zeros(), false), _ => (zeros(), false), }
835}
836
837fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
838 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
839 for v in values {
840 let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
841 for val in vals {
842 builder.values().append_value(val);
843 }
844 builder.append(valid);
845 }
846 Arc::new(builder.finish())
847}
848
849fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
850 let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
851 for v in values {
852 if v.is_null() {
853 builder.append_null();
854 } else if let Value::Temporal(tv) = v {
855 match tv {
856 uni_common::TemporalValue::DateTime {
857 nanos_since_epoch, ..
858 }
859 | uni_common::TemporalValue::LocalDateTime {
860 nanos_since_epoch, ..
861 } => builder.append_value(*nanos_since_epoch),
862 _ => builder.append_null(),
863 }
864 } else if let Some(n) = v.as_i64() {
865 builder.append_value(n);
866 } else if let Some(s) = v.as_str() {
867 match parse_datetime_to_nanos(s) {
868 Some(nanos) => builder.append_value(nanos),
869 None => builder.append_null(),
870 }
871 } else {
872 builder.append_null();
873 }
874 }
875
876 let arr = builder.finish();
877 if let Some(tz) = tz {
878 Arc::new(arr.with_timezone(tz.as_ref()))
879 } else {
880 Arc::new(arr)
881 }
882}
883
884fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
889 let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
890 let mut offset_builder = Int32Builder::with_capacity(values.len());
891 let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
892 let mut null_buffer = BooleanBufferBuilder::new(values.len());
893
894 for v in values {
895 match v {
896 Value::Temporal(uni_common::TemporalValue::DateTime {
897 nanos_since_epoch,
898 offset_seconds,
899 timezone_name,
900 }) => {
901 nanos_builder.append_value(*nanos_since_epoch);
902 offset_builder.append_value(*offset_seconds);
903 tz_builder.append_option(timezone_name.as_deref());
904 null_buffer.append(true);
905 }
906 Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
907 nanos_builder.append_value(*nanos_since_epoch);
908 offset_builder.append_null();
909 tz_builder.append_null();
910 null_buffer.append(true);
911 }
912 _ => {
913 nanos_builder.append_null();
914 offset_builder.append_null();
915 tz_builder.append_null();
916 null_buffer.append(false);
917 }
918 }
919 }
920
921 let struct_arr = StructArray::new(
922 schema::datetime_struct_fields(),
923 vec![
924 Arc::new(nanos_builder.finish()) as ArrayRef,
925 Arc::new(offset_builder.finish()) as ArrayRef,
926 Arc::new(tz_builder.finish()) as ArrayRef,
927 ],
928 Some(null_buffer.finish().into()),
929 );
930 Arc::new(struct_arr)
931}
932
933fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
938 let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
939 let mut offset_builder = Int32Builder::with_capacity(values.len());
940 let mut null_buffer = BooleanBufferBuilder::new(values.len());
941
942 for v in values {
943 match v {
944 Value::Temporal(uni_common::TemporalValue::Time {
945 nanos_since_midnight,
946 offset_seconds,
947 }) => {
948 nanos_builder.append_value(*nanos_since_midnight);
949 offset_builder.append_value(*offset_seconds);
950 null_buffer.append(true);
951 }
952 Value::Temporal(uni_common::TemporalValue::LocalTime {
953 nanos_since_midnight,
954 }) => {
955 nanos_builder.append_value(*nanos_since_midnight);
956 offset_builder.append_null();
957 null_buffer.append(true);
958 }
959 _ => {
960 nanos_builder.append_null();
961 offset_builder.append_null();
962 null_buffer.append(false);
963 }
964 }
965 }
966
967 let struct_arr = StructArray::new(
968 schema::time_struct_fields(),
969 vec![
970 Arc::new(nanos_builder.finish()) as ArrayRef,
971 Arc::new(offset_builder.finish()) as ArrayRef,
972 ],
973 Some(null_buffer.finish().into()),
974 );
975 Arc::new(struct_arr)
976}
977
978fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
979 let mut builder =
980 arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
981 for v in values {
982 if v.is_null() {
983 builder.append_null();
984 } else {
985 let cv_bytes = uni_common::cypher_value_codec::encode(v);
987 builder.append_value(&cv_bytes);
988 }
989 }
990 Arc::new(builder.finish())
991}
992
993pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
995 match dt {
996 ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
997 ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
998 ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
999 ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
1000 ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
1001 ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
1002 ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
1003 ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
1004 ArrowDataType::FixedSizeList(inner, size) => {
1005 if inner.data_type() == &ArrowDataType::Float32 {
1006 Ok(values_to_fixed_size_list_f32_array(values, *size))
1007 } else {
1008 Err(anyhow!("Unsupported FixedSizeList inner type"))
1009 }
1010 }
1011 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
1012 Ok(values_to_timestamp_array(values, tz.as_ref()))
1013 }
1014 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
1015 Ok(values_to_timestamp_array(values, tz.as_ref()))
1016 }
1017 ArrowDataType::Date32 => {
1018 let mut builder = Date32Builder::with_capacity(values.len());
1019 for v in values {
1020 if v.is_null() {
1021 builder.append_null();
1022 } else if let Value::Temporal(uni_common::TemporalValue::Date {
1023 days_since_epoch,
1024 }) = v
1025 {
1026 builder.append_value(*days_since_epoch);
1027 } else if let Some(n) = v.as_i64() {
1028 builder.append_value(n as i32);
1029 } else {
1030 builder.append_null();
1031 }
1032 }
1033 Ok(Arc::new(builder.finish()))
1034 }
1035 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
1036 let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
1037 for v in values {
1038 if v.is_null() {
1039 builder.append_null();
1040 } else if let Value::Temporal(tv) = v {
1041 match tv {
1042 uni_common::TemporalValue::LocalTime {
1043 nanos_since_midnight,
1044 }
1045 | uni_common::TemporalValue::Time {
1046 nanos_since_midnight,
1047 ..
1048 } => builder.append_value(*nanos_since_midnight),
1049 _ => builder.append_null(),
1050 }
1051 } else if let Some(n) = v.as_i64() {
1052 builder.append_value(n);
1053 } else {
1054 builder.append_null();
1055 }
1056 }
1057 Ok(Arc::new(builder.finish()))
1058 }
1059 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
1060 let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
1061 for v in values {
1062 if v.is_null() {
1063 builder.append_null();
1064 } else if let Value::Temporal(tv) = v {
1065 match tv {
1066 uni_common::TemporalValue::LocalTime {
1067 nanos_since_midnight,
1068 }
1069 | uni_common::TemporalValue::Time {
1070 nanos_since_midnight,
1071 ..
1072 } => builder.append_value(*nanos_since_midnight / 1_000), _ => builder.append_null(),
1074 }
1075 } else if let Some(n) = v.as_i64() {
1076 builder.append_value(n);
1077 } else {
1078 builder.append_null();
1079 }
1080 }
1081 Ok(Arc::new(builder.finish()))
1082 }
1083 ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
1084 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
1085 for v in values {
1086 if v.is_null() {
1087 builder.append_null();
1088 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1089 months,
1090 days,
1091 nanos,
1092 }) = v
1093 {
1094 builder.append_value(arrow::datatypes::IntervalMonthDayNano {
1095 months: *months as i32,
1096 days: *days as i32,
1097 nanoseconds: *nanos,
1098 });
1099 } else {
1100 builder.append_null();
1101 }
1102 }
1103 Ok(Arc::new(builder.finish()))
1104 }
1105 ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1106 let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1107 for v in values {
1108 if v.is_null() {
1109 builder.append_null();
1110 } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1111 months,
1112 days,
1113 nanos,
1114 }) = v
1115 {
1116 let total_micros =
1117 months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1118 builder.append_value(total_micros);
1119 } else if let Some(n) = v.as_i64() {
1120 builder.append_value(n);
1121 } else {
1122 builder.append_null();
1123 }
1124 }
1125 Ok(Arc::new(builder.finish()))
1126 }
1127 ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1128 ArrowDataType::List(field) => {
1129 if field.data_type() == &ArrowDataType::Utf8 {
1130 let mut builder = ListBuilder::new(StringBuilder::new());
1131 for v in values {
1132 if let Value::List(arr) = v {
1133 for item in arr {
1134 if let Some(s) = item.as_str() {
1135 builder.values().append_value(s);
1136 } else {
1137 builder.values().append_null();
1138 }
1139 }
1140 builder.append(true);
1141 } else {
1142 builder.append_null();
1143 }
1144 }
1145 Ok(Arc::new(builder.finish()))
1146 } else {
1147 Err(anyhow!(
1148 "Unsupported List inner type: {:?}",
1149 field.data_type()
1150 ))
1151 }
1152 }
1153 ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1154 Ok(values_to_datetime_struct_array(values))
1155 }
1156 ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1157 Ok(values_to_time_struct_array(values))
1158 }
1159 _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1160 }
1161}
1162
1163pub struct PropertyExtractor<'a> {
1165 data_type: &'a DataType,
1166}
1167
1168fn sparse_pair_from_value(v: &Value) -> Option<(Vec<u32>, Vec<f32>)> {
1177 match v {
1178 Value::SparseVector { indices, values } => {
1179 if indices.len() != values.len() {
1185 return None;
1186 }
1187 Some((indices.clone(), values.clone()))
1188 }
1189 Value::Map(m) => {
1190 let idx = match m.get("indices") {
1191 Some(Value::List(l)) => l,
1192 _ => return None,
1193 };
1194 let vals = match m.get("values") {
1195 Some(Value::List(l)) => l,
1196 _ => return None,
1197 };
1198 let indices: Vec<u32> = idx
1199 .iter()
1200 .map(|x| x.as_u64().map(|n| n as u32))
1201 .collect::<Option<_>>()?;
1202 let values: Vec<f32> = vals
1203 .iter()
1204 .map(|x| x.as_f64().map(|n| n as f32))
1205 .collect::<Option<_>>()?;
1206 if indices.len() != values.len() {
1207 return None;
1208 }
1209 Some((indices, values))
1210 }
1211 _ => None,
1212 }
1213}
1214
1215pub fn build_sparse_vector_array(values: &[Option<Value>]) -> ArrayRef {
1221 let mut indices_builder = ListBuilder::new(UInt32Builder::new());
1222 let mut values_builder = ListBuilder::new(Float32Builder::new());
1223 let mut null_buffer = BooleanBufferBuilder::new(values.len());
1224 for v in values {
1225 match v.as_ref().and_then(sparse_pair_from_value) {
1226 Some((indices, vals)) => {
1227 for ix in indices {
1228 indices_builder.values().append_value(ix);
1229 }
1230 indices_builder.append(true);
1231 for w in vals {
1232 values_builder.values().append_value(w);
1233 }
1234 values_builder.append(true);
1235 null_buffer.append(true);
1236 }
1237 None => {
1238 indices_builder.append(true);
1239 values_builder.append(true);
1240 null_buffer.append(false);
1241 }
1242 }
1243 }
1244 let struct_arr = StructArray::new(
1245 schema::sparse_vector_struct_fields(),
1246 vec![
1247 Arc::new(indices_builder.finish()) as ArrayRef,
1248 Arc::new(values_builder.finish()) as ArrayRef,
1249 ],
1250 Some(null_buffer.finish().into()),
1251 );
1252 Arc::new(struct_arr)
1253}
1254
1255pub fn build_multivector_array(values: &[Option<Value>], dimensions: usize) -> ArrayRef {
1269 let dim = dimensions as i32;
1270 let mut builder = ListBuilder::new(FixedSizeListBuilder::new(Float32Builder::new(), dim));
1271 for v in values {
1272 match v.as_ref().and_then(|v| v.as_array()) {
1273 Some(arr) => {
1274 for tok in arr {
1278 let (vals, valid) = extract_vector_f32_values(Some(tok), false, dimensions);
1279 for f in vals {
1280 builder.values().values().append_value(f);
1281 }
1282 builder.values().append(valid);
1283 }
1284 builder.append(true);
1285 }
1286 None => builder.append_null(),
1287 }
1288 }
1289 Arc::new(builder.finish())
1290}
1291
1292impl<'a> PropertyExtractor<'a> {
1293 pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1294 Self { data_type }
1295 }
1296
1297 pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1300 where
1301 F: Fn(usize) -> Option<&'a Value>,
1302 {
1303 match self.data_type {
1304 DataType::String => self.build_string_column(len, deleted, get_props),
1305 DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1306 DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1307 DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1308 DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1309 DataType::Bool => self.build_bool_column(len, deleted, get_props),
1310 DataType::Vector { dimensions } => {
1311 self.build_vector_column(len, deleted, get_props, *dimensions)
1312 }
1313 DataType::SparseVector { .. } => {
1314 self.build_sparse_vector_column(len, deleted, get_props)
1315 }
1316 DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1317 DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1318 DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1319 DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1320 DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1321 DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1322 DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1323 DataType::Date => self.build_date32_column(len, deleted, get_props),
1324 DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1325 DataType::Duration => self.build_duration_column(len, deleted, get_props),
1326 DataType::Btic => self.build_btic_column(len, deleted, get_props),
1327 _ => Err(anyhow!(
1328 "Unsupported data type for arrow conversion: {:?}",
1329 self.data_type
1330 )),
1331 }
1332 }
1333
1334 fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1335 where
1336 F: Fn(usize) -> Option<&'a Value>,
1337 {
1338 let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1339 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1340 let prop = get_props(i);
1341 if let Some(s) = prop.and_then(|v| v.as_str()) {
1342 builder.append_value(s);
1343 } else if let Some(Value::Temporal(tv)) = prop {
1344 builder.append_value(tv.to_string());
1345 } else if is_deleted {
1346 builder.append_value("");
1347 } else {
1348 builder.append_null();
1349 }
1350 }
1351 Ok(Arc::new(builder.finish()))
1352 }
1353
1354 fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1355 where
1356 F: Fn(usize) -> Option<&'a Value>,
1357 {
1358 let mut values = Vec::with_capacity(len);
1359 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1360 let val = get_props(i)
1363 .and_then(|v| v.as_i64())
1364 .and_then(|v| i32::try_from(v).ok());
1365 if val.is_none() && is_deleted {
1366 values.push(Some(0));
1367 } else {
1368 values.push(val);
1369 }
1370 }
1371 Ok(Arc::new(Int32Array::from(values)))
1372 }
1373
1374 fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1375 where
1376 F: Fn(usize) -> Option<&'a Value>,
1377 {
1378 let mut values = Vec::with_capacity(len);
1379 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1380 let val = get_props(i).and_then(|v| v.as_i64());
1381 if val.is_none() && is_deleted {
1382 values.push(Some(0));
1383 } else {
1384 values.push(val);
1385 }
1386 }
1387 Ok(Arc::new(Int64Array::from(values)))
1388 }
1389
1390 fn build_timestamp_column<F>(
1391 &self,
1392 len: usize,
1393 deleted: &[bool],
1394 get_props: F,
1395 ) -> Result<ArrayRef>
1396 where
1397 F: Fn(usize) -> Option<&'a Value>,
1398 {
1399 let mut values = Vec::with_capacity(len);
1400 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1401 let val = get_props(i);
1402 let ts = if is_deleted || val.is_none() {
1403 Some(0i64)
1404 } else if let Some(Value::Temporal(tv)) = val {
1405 match tv {
1406 uni_common::TemporalValue::DateTime {
1407 nanos_since_epoch, ..
1408 }
1409 | uni_common::TemporalValue::LocalDateTime {
1410 nanos_since_epoch, ..
1411 } => Some(*nanos_since_epoch),
1412 _ => None,
1413 }
1414 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1415 Some(v)
1416 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1417 parse_datetime_to_nanos(s)
1418 } else {
1419 None
1420 };
1421
1422 if is_deleted {
1423 values.push(Some(0));
1424 } else {
1425 values.push(ts);
1426 }
1427 }
1428 let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1429 Ok(Arc::new(arr))
1430 }
1431
1432 fn build_datetime_struct_column<F>(
1433 &self,
1434 len: usize,
1435 deleted: &[bool],
1436 get_props: F,
1437 ) -> Result<ArrayRef>
1438 where
1439 F: Fn(usize) -> Option<&'a Value>,
1440 {
1441 let values = self.collect_values_or_null(len, deleted, &get_props);
1442 Ok(values_to_datetime_struct_array(&values))
1443 }
1444
1445 fn build_time_struct_column<F>(
1446 &self,
1447 len: usize,
1448 deleted: &[bool],
1449 get_props: F,
1450 ) -> Result<ArrayRef>
1451 where
1452 F: Fn(usize) -> Option<&'a Value>,
1453 {
1454 let values = self.collect_values_or_null(len, deleted, &get_props);
1455 Ok(values_to_time_struct_array(&values))
1456 }
1457
1458 fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1460 where
1461 F: Fn(usize) -> Option<&'a Value>,
1462 {
1463 deleted
1464 .iter()
1465 .enumerate()
1466 .take(len)
1467 .map(|(i, &is_deleted)| {
1468 if is_deleted {
1469 Value::Null
1470 } else {
1471 get_props(i).cloned().unwrap_or(Value::Null)
1472 }
1473 })
1474 .collect()
1475 }
1476
1477 fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1478 where
1479 F: Fn(usize) -> Option<&'a Value>,
1480 {
1481 let mut builder = Date32Builder::with_capacity(len);
1482 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1483
1484 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1485 let val = get_props(i);
1486 let days = if is_deleted || val.is_none() {
1487 Some(0)
1488 } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1489 days_since_epoch,
1490 })) = val
1491 {
1492 Some(*days_since_epoch)
1493 } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1494 i32::try_from(v).ok()
1497 } else if let Some(s) = val.and_then(|v| v.as_str()) {
1498 match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1499 Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1500 Err(_) => None,
1501 }
1502 } else {
1503 None
1504 };
1505
1506 if is_deleted {
1507 builder.append_value(0);
1508 } else if let Some(v) = days {
1509 builder.append_value(v);
1510 } else {
1511 builder.append_null();
1512 }
1513 }
1514 Ok(Arc::new(builder.finish()))
1515 }
1516
1517 fn build_duration_column<F>(
1518 &self,
1519 len: usize,
1520 deleted: &[bool],
1521 get_props: F,
1522 ) -> Result<ArrayRef>
1523 where
1524 F: Fn(usize) -> Option<&'a Value>,
1525 {
1526 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1528 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1529 let raw_val = get_props(i);
1530 if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1531 {
1532 let encoded = uni_common::cypher_value_codec::encode(val);
1533 builder.append_value(&encoded);
1534 } else if is_deleted {
1535 let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1536 months: 0,
1537 days: 0,
1538 nanos: 0,
1539 });
1540 let encoded = uni_common::cypher_value_codec::encode(&zero);
1541 builder.append_value(&encoded);
1542 } else {
1543 builder.append_null();
1544 }
1545 }
1546 Ok(Arc::new(builder.finish()))
1547 }
1548
1549 fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1550 where
1551 F: Fn(usize) -> Option<&'a Value>,
1552 {
1553 const ENCODED_LEN: i32 = 24;
1554 let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1555 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1556 let raw_val = get_props(i);
1557 let btic = match raw_val {
1558 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1559 uni_btic::Btic::new(*lo, *hi, *meta)
1560 .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1561 ),
1562 Some(Value::String(s)) => Some(
1563 uni_btic::parse::parse_btic_literal(s)
1564 .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1565 ),
1566 _ => None,
1567 };
1568
1569 if let Some(b) = btic {
1570 builder.append_value(uni_btic::encode::encode(&b))?;
1571 } else if is_deleted {
1572 builder.append_value([0u8; ENCODED_LEN as usize])?;
1573 } else {
1574 builder.append_null();
1575 }
1576 }
1577 Ok(Arc::new(builder.finish()))
1578 }
1579
1580 fn build_float32_column<F>(
1581 &self,
1582 len: usize,
1583 deleted: &[bool],
1584 get_props: F,
1585 ) -> Result<ArrayRef>
1586 where
1587 F: Fn(usize) -> Option<&'a Value>,
1588 {
1589 let mut values = Vec::with_capacity(len);
1590 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1591 let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1592 if val.is_none() && is_deleted {
1593 values.push(Some(0.0));
1594 } else {
1595 values.push(val);
1596 }
1597 }
1598 Ok(Arc::new(Float32Array::from(values)))
1599 }
1600
1601 fn build_float64_column<F>(
1602 &self,
1603 len: usize,
1604 deleted: &[bool],
1605 get_props: F,
1606 ) -> Result<ArrayRef>
1607 where
1608 F: Fn(usize) -> Option<&'a Value>,
1609 {
1610 let mut values = Vec::with_capacity(len);
1611 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1612 let val = get_props(i).and_then(|v| v.as_f64());
1613 if val.is_none() && is_deleted {
1614 values.push(Some(0.0));
1615 } else {
1616 values.push(val);
1617 }
1618 }
1619 Ok(Arc::new(Float64Array::from(values)))
1620 }
1621
1622 fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1623 where
1624 F: Fn(usize) -> Option<&'a Value>,
1625 {
1626 let mut values = Vec::with_capacity(len);
1627 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1628 let val = get_props(i).and_then(|v| v.as_bool());
1629 if val.is_none() && is_deleted {
1630 values.push(Some(false));
1631 } else {
1632 values.push(val);
1633 }
1634 }
1635 Ok(Arc::new(BooleanArray::from(values)))
1636 }
1637
1638 fn build_vector_column<F>(
1639 &self,
1640 len: usize,
1641 deleted: &[bool],
1642 get_props: F,
1643 dimensions: usize,
1644 ) -> Result<ArrayRef>
1645 where
1646 F: Fn(usize) -> Option<&'a Value>,
1647 {
1648 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1649
1650 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1651 let val = get_props(i);
1652 let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1653 for v in values {
1654 builder.values().append_value(v);
1655 }
1656 builder.append(valid);
1657 }
1658 Ok(Arc::new(builder.finish()))
1659 }
1660
1661 fn build_sparse_vector_column<F>(
1667 &self,
1668 len: usize,
1669 deleted: &[bool],
1670 get_props: F,
1671 ) -> Result<ArrayRef>
1672 where
1673 F: Fn(usize) -> Option<&'a Value>,
1674 {
1675 let mut indices_builder = ListBuilder::new(UInt32Builder::new());
1676 let mut values_builder = ListBuilder::new(Float32Builder::new());
1677 let mut null_buffer = BooleanBufferBuilder::new(len);
1678
1679 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1680 let pair = if is_deleted {
1681 None
1682 } else {
1683 get_props(i).and_then(sparse_pair_from_value)
1684 };
1685 match pair {
1686 Some((indices, values)) => {
1687 for ix in indices {
1688 indices_builder.values().append_value(ix);
1689 }
1690 indices_builder.append(true);
1691 for w in values {
1692 values_builder.values().append_value(w);
1693 }
1694 values_builder.append(true);
1695 null_buffer.append(true);
1696 }
1697 None => {
1698 indices_builder.append(true);
1699 values_builder.append(true);
1700 null_buffer.append(false);
1701 }
1702 }
1703 }
1704
1705 let struct_arr = StructArray::new(
1706 schema::sparse_vector_struct_fields(),
1707 vec![
1708 Arc::new(indices_builder.finish()) as ArrayRef,
1709 Arc::new(values_builder.finish()) as ArrayRef,
1710 ],
1711 Some(null_buffer.finish().into()),
1712 );
1713 Ok(Arc::new(struct_arr))
1714 }
1715
1716 fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1717 where
1718 F: Fn(usize) -> Option<&'a Value>,
1719 {
1720 let null_val = Value::Null;
1721 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1722 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1723 let val = get_props(i);
1724 let uni_val = if val.is_none() && is_deleted {
1725 &null_val
1726 } else {
1727 val.unwrap_or(&null_val)
1728 };
1729 let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1731 builder.append_value(&cv_bytes);
1732 }
1733 Ok(Arc::new(builder.finish()))
1734 }
1735
1736 fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1737 where
1738 F: Fn(usize) -> Option<&'a Value>,
1739 {
1740 let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1741 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1742 let val = get_props(i);
1743 if let Some(Value::Bytes(b)) = val {
1744 builder.append_value(b);
1745 } else if is_deleted {
1746 builder.append_value(&[][..]);
1747 } else {
1748 builder.append_null();
1749 }
1750 }
1751 Ok(Arc::new(builder.finish()))
1752 }
1753
1754 fn build_list_column<F>(
1755 &self,
1756 len: usize,
1757 deleted: &[bool],
1758 get_props: F,
1759 inner: &DataType,
1760 ) -> Result<ArrayRef>
1761 where
1762 F: Fn(usize) -> Option<&'a Value>,
1763 {
1764 match inner {
1765 DataType::String => {
1766 self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1767 if let Some(s) = v.as_str() {
1768 b.append_value(s);
1769 } else {
1770 b.append_null();
1771 }
1772 })
1773 }
1774 DataType::Int64 => {
1775 self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1776 if let Some(n) = v.as_i64() {
1777 b.append_value(n);
1778 } else {
1779 b.append_null();
1780 }
1781 })
1782 }
1783 DataType::Float64 => {
1784 self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1785 if let Some(f) = v.as_f64() {
1786 b.append_value(f);
1787 } else {
1788 b.append_null();
1789 }
1790 })
1791 }
1792 DataType::Bytes => {
1793 let item_field = Arc::new(
1797 Field::new("item", ArrowDataType::LargeBinary, true)
1798 .with_metadata(schema::raw_bytes_field_metadata()),
1799 );
1800 let mut builder =
1801 ListBuilder::new(LargeBinaryBuilder::new()).with_field(item_field);
1802 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1803 let val_array = get_props(i).and_then(|v| v.as_array());
1804 if val_array.is_none() && is_deleted {
1805 builder.append_null();
1806 } else if let Some(arr) = val_array {
1807 for v in arr {
1808 if let Value::Bytes(b) = v {
1809 builder.values().append_value(b);
1810 } else {
1811 builder.values().append_null();
1812 }
1813 }
1814 builder.append(true);
1815 } else {
1816 builder.append_null();
1817 }
1818 }
1819 Ok(Arc::new(builder.finish()))
1820 }
1821 DataType::Vector { dimensions } => {
1822 let dim = *dimensions as i32;
1833 let mut builder =
1834 ListBuilder::new(FixedSizeListBuilder::new(Float32Builder::new(), dim));
1835 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1836 let val_array = get_props(i).and_then(|v| v.as_array());
1837 if val_array.is_none() && is_deleted {
1838 builder.append_null();
1839 } else if let Some(arr) = val_array {
1840 for tok in arr {
1844 let (vals, valid) =
1845 extract_vector_f32_values(Some(tok), false, *dimensions);
1846 for v in vals {
1847 builder.values().values().append_value(v);
1848 }
1849 builder.values().append(valid);
1850 }
1851 builder.append(true);
1852 } else {
1853 builder.append_null();
1854 }
1855 }
1856 Ok(Arc::new(builder.finish()))
1857 }
1858 _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1859 }
1860 }
1861
1862 fn build_typed_list<F, B, A>(
1864 &self,
1865 len: usize,
1866 deleted: &[bool],
1867 get_props: &F,
1868 inner_builder: B,
1869 mut append_value: A,
1870 ) -> Result<ArrayRef>
1871 where
1872 F: Fn(usize) -> Option<&'a Value>,
1873 B: arrow_array::builder::ArrayBuilder,
1874 A: FnMut(&Value, &mut B),
1875 {
1876 let mut builder = ListBuilder::new(inner_builder);
1877 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1878 let val_array = get_props(i).and_then(|v| v.as_array());
1879 if val_array.is_none() && is_deleted {
1880 builder.append_null();
1881 } else if let Some(arr) = val_array {
1882 for v in arr {
1883 append_value(v, builder.values());
1884 }
1885 builder.append(true);
1886 } else {
1887 builder.append_null();
1888 }
1889 }
1890 Ok(Arc::new(builder.finish()))
1891 }
1892
1893 fn build_map_column<F>(
1894 &self,
1895 len: usize,
1896 deleted: &[bool],
1897 get_props: F,
1898 key: &DataType,
1899 value: &DataType,
1900 ) -> Result<ArrayRef>
1901 where
1902 F: Fn(usize) -> Option<&'a Value>,
1903 {
1904 if !matches!(key, DataType::String) {
1905 return Err(anyhow!("Map keys must be String (JSON limitation)"));
1906 }
1907
1908 match value {
1909 DataType::String => self.build_typed_map(
1910 len,
1911 deleted,
1912 &get_props,
1913 StringBuilder::new(),
1914 arrow_schema::DataType::Utf8,
1915 None,
1916 |v, b: &mut StringBuilder| {
1917 if let Some(s) = v.as_str() {
1918 b.append_value(s);
1919 } else {
1920 b.append_null();
1921 }
1922 },
1923 ),
1924 DataType::Int64 => self.build_typed_map(
1925 len,
1926 deleted,
1927 &get_props,
1928 Int64Builder::new(),
1929 arrow_schema::DataType::Int64,
1930 None,
1931 |v, b: &mut Int64Builder| {
1932 if let Some(n) = v.as_i64() {
1933 b.append_value(n);
1934 } else {
1935 b.append_null();
1936 }
1937 },
1938 ),
1939 DataType::Int32 => self.build_typed_map(
1940 len,
1941 deleted,
1942 &get_props,
1943 Int32Builder::new(),
1944 arrow_schema::DataType::Int32,
1945 None,
1946 |v, b: &mut Int32Builder| match v.as_i64().and_then(|n| i32::try_from(n).ok()) {
1947 Some(n) => b.append_value(n),
1948 None => b.append_null(),
1949 },
1950 ),
1951 DataType::Float64 => self.build_typed_map(
1952 len,
1953 deleted,
1954 &get_props,
1955 Float64Builder::new(),
1956 arrow_schema::DataType::Float64,
1957 None,
1958 |v, b: &mut Float64Builder| match v.as_f64() {
1959 Some(f) => b.append_value(f),
1960 None => b.append_null(),
1961 },
1962 ),
1963 DataType::Float32 => self.build_typed_map(
1964 len,
1965 deleted,
1966 &get_props,
1967 Float32Builder::new(),
1968 arrow_schema::DataType::Float32,
1969 None,
1970 |v, b: &mut Float32Builder| match v.as_f64() {
1971 Some(f) => b.append_value(f as f32),
1972 None => b.append_null(),
1973 },
1974 ),
1975 DataType::Bool => self.build_typed_map(
1976 len,
1977 deleted,
1978 &get_props,
1979 BooleanBuilder::new(),
1980 arrow_schema::DataType::Boolean,
1981 None,
1982 |v, b: &mut BooleanBuilder| match v.as_bool() {
1983 Some(x) => b.append_value(x),
1984 None => b.append_null(),
1985 },
1986 ),
1987 DataType::Bytes => self.build_typed_map(
1988 len,
1989 deleted,
1990 &get_props,
1991 LargeBinaryBuilder::new(),
1992 arrow_schema::DataType::LargeBinary,
1993 Some(schema::raw_bytes_field_metadata()),
1996 |v, b: &mut LargeBinaryBuilder| {
1997 if let Value::Bytes(bytes) = v {
1998 b.append_value(bytes);
1999 } else {
2000 b.append_null();
2001 }
2002 },
2003 ),
2004 _ => self.build_typed_map(
2010 len,
2011 deleted,
2012 &get_props,
2013 LargeBinaryBuilder::new(),
2014 arrow_schema::DataType::LargeBinary,
2015 None,
2016 |v, b: &mut LargeBinaryBuilder| {
2017 if v.is_null() {
2018 b.append_null();
2019 } else {
2020 b.append_value(uni_common::cypher_value_codec::encode(v));
2021 }
2022 },
2023 ),
2024 }
2025 }
2026
2027 #[expect(
2029 clippy::too_many_arguments,
2030 reason = "builder plumbing: value type + optional child metadata are distinct knobs"
2031 )]
2032 fn build_typed_map<F, B, A>(
2033 &self,
2034 len: usize,
2035 deleted: &[bool],
2036 get_props: &F,
2037 value_builder: B,
2038 value_arrow_type: arrow_schema::DataType,
2039 value_metadata: Option<HashMap<String, String>>,
2040 mut append_value: A,
2041 ) -> Result<ArrayRef>
2042 where
2043 F: Fn(usize) -> Option<&'a Value>,
2044 B: arrow_array::builder::ArrayBuilder,
2045 A: FnMut(&Value, &mut B),
2046 {
2047 let key_builder = Box::new(StringBuilder::new());
2048 let value_builder = Box::new(value_builder);
2049 let value_field = match value_metadata {
2050 Some(meta) => Field::new("value", value_arrow_type, true).with_metadata(meta),
2051 None => Field::new("value", value_arrow_type, true),
2052 };
2053 let struct_builder = StructBuilder::new(
2054 vec![
2055 Field::new("key", arrow_schema::DataType::Utf8, false),
2056 value_field,
2057 ],
2058 vec![key_builder, value_builder],
2059 );
2060 let mut builder = ListBuilder::new(struct_builder);
2061
2062 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
2063 self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
2064 }
2065 Ok(Arc::new(builder.finish()))
2066 }
2067
2068 fn append_map_entry<B, A>(
2070 &self,
2071 builder: &mut ListBuilder<StructBuilder>,
2072 val: Option<&'a Value>,
2073 is_deleted: bool,
2074 append_value: &mut A,
2075 ) where
2076 B: arrow_array::builder::ArrayBuilder,
2077 A: FnMut(&Value, &mut B),
2078 {
2079 let val_obj = val.and_then(|v| v.as_object());
2080 if val_obj.is_none() && is_deleted {
2081 builder.append(false);
2082 } else if let Some(obj) = val_obj {
2083 let struct_b = builder.values();
2084 for (k, v) in obj {
2085 struct_b
2086 .field_builder::<StringBuilder>(0)
2087 .unwrap()
2088 .append_value(k);
2089 let value_b = struct_b.field_builder::<B>(1).unwrap();
2091 append_value(v, value_b);
2092 struct_b.append(true);
2093 }
2094 builder.append(true);
2095 } else {
2096 builder.append(false);
2097 }
2098 }
2099
2100 fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
2101 where
2102 F: Fn(usize) -> Option<&'a Value>,
2103 {
2104 let mut builder = BinaryBuilder::new();
2105 for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
2106 if is_deleted {
2107 builder.append_null();
2108 continue;
2109 }
2110 if let Some(val) = get_props(i) {
2111 let crdt_result = if let Some(s) = val.as_str() {
2114 serde_json::from_str::<Crdt>(s)
2115 } else {
2116 let json_val: serde_json::Value = val.clone().into();
2118 serde_json::from_value::<Crdt>(json_val)
2119 };
2120
2121 if let Ok(crdt) = crdt_result {
2122 if let Ok(bytes) = crdt.to_msgpack() {
2123 builder.append_value(&bytes);
2124 } else {
2125 builder.append_null();
2126 }
2127 } else {
2128 builder.append_null();
2129 }
2130 } else {
2131 builder.append_null();
2132 }
2133 }
2134 Ok(Arc::new(builder.finish()))
2135 }
2136}
2137
2138pub fn build_edge_column<'a>(
2140 name: &'a str,
2141 data_type: &'a DataType,
2142 len: usize,
2143 get_props: impl Fn(usize) -> Option<&'a Value>,
2144) -> Result<ArrayRef> {
2145 let deleted = vec![false; len];
2147 let extractor = PropertyExtractor::new(name, data_type);
2148 extractor.build_column(len, &deleted, get_props)
2149}
2150
2151#[cfg(test)]
2152mod tests {
2153 use super::*;
2154 use arrow_array::{
2155 Array, DurationMicrosecondArray,
2156 builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
2157 };
2158 use std::collections::HashMap;
2159 use uni_common::TemporalValue;
2160 use uni_crdt::{Crdt, GCounter};
2161
2162 #[test]
2163 fn test_sparse_vector_columnar_roundtrip_and_no_silent_null() {
2164 use crate::storage::value_codec::{CrdtDecodeMode, decode_column_value, value_from_column};
2165
2166 let dt = DataType::SparseVector { dimensions: 100 };
2167 let v0 = Value::SparseVector {
2168 indices: vec![1, 5, 9],
2169 values: vec![0.5, -1.0, 2.0],
2170 };
2171 let v1 = Value::SparseVector {
2173 indices: vec![],
2174 values: vec![],
2175 };
2176 let props = [Some(v0.clone()), Some(v1.clone()), None];
2178 let deleted = [false, false, true];
2179
2180 let extractor = PropertyExtractor::new("emb", &dt);
2181 let col = extractor
2182 .build_column(3, &deleted, |i| props[i].as_ref())
2183 .unwrap();
2184
2185 assert_eq!(
2187 decode_column_value(&col, &dt, 0, CrdtDecodeMode::Strict).unwrap(),
2188 v0
2189 );
2190 assert_eq!(
2191 decode_column_value(&col, &dt, 1, CrdtDecodeMode::Strict).unwrap(),
2192 v1
2193 );
2194 assert_eq!(
2195 decode_column_value(&col, &dt, 2, CrdtDecodeMode::Strict).unwrap(),
2196 Value::Null
2197 );
2198
2199 let json0 = value_from_column(&col, &dt, 0, CrdtDecodeMode::Strict).unwrap();
2202 assert!(
2203 json0.is_object(),
2204 "sparse column was silently nulled by value_from_column: {json0:?}"
2205 );
2206 assert_eq!(json0["indices"], serde_json::json!([1u32, 5u32, 9u32]));
2207 assert_eq!(
2208 json0["values"],
2209 serde_json::json!([0.5f32, -1.0f32, 2.0f32])
2210 );
2211 }
2212
2213 #[test]
2214 fn test_arrow_to_value_string() {
2215 let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
2216 assert_eq!(
2217 arrow_to_value(&arr, 0, None),
2218 Value::String("hello".to_string())
2219 );
2220 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2221 assert_eq!(
2222 arrow_to_value(&arr, 2, None),
2223 Value::String("world".to_string())
2224 );
2225 }
2226
2227 #[test]
2228 fn test_arrow_to_value_int64() {
2229 let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
2230 assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
2231 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2232 assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
2233 }
2234
2235 #[test]
2236 #[allow(clippy::approx_constant)]
2237 fn test_arrow_to_value_float64() {
2238 let arr = Float64Array::from(vec![Some(3.14), None]);
2239 assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
2240 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2241 }
2242
2243 #[test]
2244 fn test_arrow_to_value_bool() {
2245 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
2246 assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
2247 assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
2248 assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
2249 }
2250
2251 #[test]
2252 fn test_values_to_array_int64() {
2253 let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
2254 let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
2255 assert_eq!(arr.len(), 4);
2256
2257 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2258 assert_eq!(int_arr.value(0), 1);
2259 assert_eq!(int_arr.value(1), 2);
2260 assert!(int_arr.is_null(2));
2261 assert_eq!(int_arr.value(3), 4);
2262 }
2263
2264 #[test]
2265 fn test_values_to_array_string() {
2266 let values = vec![
2267 Value::String("a".to_string()),
2268 Value::String("b".to_string()),
2269 Value::Null,
2270 ];
2271 let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
2272 assert_eq!(arr.len(), 3);
2273
2274 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
2275 assert_eq!(str_arr.value(0), "a");
2276 assert_eq!(str_arr.value(1), "b");
2277 assert!(str_arr.is_null(2));
2278 }
2279
2280 #[test]
2281 fn test_property_extractor_string() {
2282 let props: Vec<HashMap<String, Value>> = vec![
2283 [("name".to_string(), Value::String("Alice".to_string()))]
2284 .into_iter()
2285 .collect(),
2286 [("name".to_string(), Value::String("Bob".to_string()))]
2287 .into_iter()
2288 .collect(),
2289 HashMap::new(),
2290 ];
2291 let deleted = vec![false, false, true];
2292
2293 let extractor = PropertyExtractor::new("name", &DataType::String);
2294 let arr = extractor
2295 .build_column(3, &deleted, |i| props[i].get("name"))
2296 .unwrap();
2297
2298 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
2299 assert_eq!(str_arr.value(0), "Alice");
2300 assert_eq!(str_arr.value(1), "Bob");
2301 assert_eq!(str_arr.value(2), ""); }
2303
2304 #[test]
2305 fn test_property_extractor_int64() {
2306 let props: Vec<HashMap<String, Value>> = vec![
2307 [("age".to_string(), Value::Int(25))].into_iter().collect(),
2308 [("age".to_string(), Value::Int(30))].into_iter().collect(),
2309 HashMap::new(),
2310 ];
2311 let deleted = vec![false, false, true];
2312
2313 let extractor = PropertyExtractor::new("age", &DataType::Int64);
2314 let arr = extractor
2315 .build_column(3, &deleted, |i| props[i].get("age"))
2316 .unwrap();
2317
2318 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2319 assert_eq!(int_arr.value(0), 25);
2320 assert_eq!(int_arr.value(1), 30);
2321 assert_eq!(int_arr.value(2), 0); }
2323
2324 #[test]
2325 fn test_property_extractor_bytes_roundtrip() {
2326 let blob = vec![0u8, 1, 2, 255];
2327 let props: Vec<HashMap<String, Value>> = vec![
2328 [("blob".to_string(), Value::Bytes(blob.clone()))]
2329 .into_iter()
2330 .collect(),
2331 [("blob".to_string(), Value::Bytes(Vec::new()))]
2332 .into_iter()
2333 .collect(),
2334 HashMap::new(),
2335 ];
2336 let deleted = vec![false, false, false];
2337
2338 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2339 let arr = extractor
2340 .build_column(3, &deleted, |i| props[i].get("blob"))
2341 .unwrap();
2342
2343 assert_eq!(
2345 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2346 Value::Bytes(blob)
2347 );
2348 assert_eq!(
2349 arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
2350 Value::Bytes(Vec::new())
2351 );
2352 assert_eq!(
2354 arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
2355 Value::Null
2356 );
2357 }
2358
2359 #[test]
2360 fn test_bytes_vs_cypher_value_disambiguation() {
2361 let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
2365 let props: Vec<HashMap<String, Value>> = vec![
2366 [("blob".to_string(), Value::Bytes(raw.clone()))]
2367 .into_iter()
2368 .collect(),
2369 ];
2370 let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2371 let arr = extractor
2372 .build_column(1, &[false], |i| props[i].get("blob"))
2373 .unwrap();
2374 assert_eq!(
2376 arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2377 Value::Bytes(raw)
2378 );
2379 }
2380
2381 #[test]
2382 fn test_data_type_bytes_to_arrow() {
2383 assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
2384 }
2385
2386 #[test]
2387 fn test_arrow_to_value_time64() {
2388 let mut builder = Time64MicrosecondBuilder::new();
2390 builder.append_value(37_845_000_000);
2392 builder.append_value(0);
2394 builder.append_value(86_399_123_456);
2396 builder.append_null();
2397
2398 let arr = builder.finish();
2399 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
2401 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
2402 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
2403 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2404 }
2405
2406 #[test]
2407 fn test_arrow_to_value_duration() {
2408 let arr = DurationMicrosecondArray::from(vec![
2411 Some(1_000_000), Some(3_600_000_000), Some(86_400_000_000), None,
2415 ]);
2416
2417 assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
2418 assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
2419 assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
2420 assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2421 }
2422
2423 #[test]
2424 fn test_arrow_to_value_binary_crdt() {
2425 let mut builder = BinaryBuilder::new();
2427
2428 let mut counter = GCounter::new();
2430 counter.increment("actor1", 5);
2431 let crdt = Crdt::GCounter(counter);
2432 let bytes = crdt.to_msgpack().unwrap();
2433 builder.append_value(&bytes);
2434
2435 builder.append_null();
2437
2438 let arr = builder.finish();
2439
2440 let result = arrow_to_value(&arr, 0, None);
2442 assert!(result.as_object().is_some());
2443 let obj = result.as_object().unwrap();
2444 assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
2446
2447 assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2449 }
2450
2451 #[test]
2452 fn test_datetime_struct_encode_decode_roundtrip() {
2453 let values = vec![
2455 Value::Temporal(TemporalValue::DateTime {
2456 nanos_since_epoch: 441763200000000000, offset_seconds: 3600, timezone_name: Some("Europe/Paris".to_string()),
2459 }),
2460 Value::Temporal(TemporalValue::DateTime {
2461 nanos_since_epoch: 1704067200000000000, offset_seconds: -18000, timezone_name: None,
2464 }),
2465 Value::Temporal(TemporalValue::DateTime {
2466 nanos_since_epoch: 0, offset_seconds: 0,
2468 timezone_name: Some("UTC".to_string()),
2469 }),
2470 ];
2471
2472 let arr_ref = values_to_datetime_struct_array(&values);
2474 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2475 assert_eq!(arr.len(), 3);
2476
2477 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2479 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2480 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2481
2482 assert_eq!(decoded_0, values[0]);
2484 assert_eq!(decoded_1, values[1]);
2485 assert_eq!(decoded_2, values[2]);
2486
2487 if let Value::Temporal(TemporalValue::DateTime {
2489 nanos_since_epoch,
2490 offset_seconds,
2491 timezone_name,
2492 }) = decoded_0
2493 {
2494 assert_eq!(nanos_since_epoch, 441763200000000000);
2495 assert_eq!(offset_seconds, 3600);
2496 assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2497 } else {
2498 panic!("Expected DateTime value");
2499 }
2500 }
2501
2502 #[test]
2503 fn test_datetime_struct_null_handling() {
2504 let values = vec![
2506 Value::Temporal(TemporalValue::DateTime {
2507 nanos_since_epoch: 441763200000000000,
2508 offset_seconds: 3600,
2509 timezone_name: Some("Europe/Paris".to_string()),
2510 }),
2511 Value::Null,
2512 Value::Temporal(TemporalValue::DateTime {
2513 nanos_since_epoch: 0,
2514 offset_seconds: 0,
2515 timezone_name: None,
2516 }),
2517 ];
2518
2519 let arr_ref = values_to_datetime_struct_array(&values);
2520 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2521 assert_eq!(arr.len(), 3);
2522
2523 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2525 assert_eq!(decoded_0, values[0]);
2526
2527 assert!(arr.is_null(1));
2529 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2530 assert_eq!(decoded_1, Value::Null);
2531
2532 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2534 assert_eq!(decoded_2, values[2]);
2535 }
2536
2537 #[test]
2538 fn test_datetime_struct_boundary_values() {
2539 let values = vec![
2541 Value::Temporal(TemporalValue::DateTime {
2542 nanos_since_epoch: 441763200000000000,
2543 offset_seconds: 0, timezone_name: None,
2545 }),
2546 Value::Temporal(TemporalValue::DateTime {
2547 nanos_since_epoch: 441763200000000000,
2548 offset_seconds: 43200, timezone_name: None,
2550 }),
2551 Value::Temporal(TemporalValue::DateTime {
2552 nanos_since_epoch: 441763200000000000,
2553 offset_seconds: -43200, timezone_name: None,
2555 }),
2556 ];
2557
2558 let arr_ref = values_to_datetime_struct_array(&values);
2559 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2560 assert_eq!(arr.len(), 3);
2561
2562 for (i, expected) in values.iter().enumerate() {
2564 let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2565 assert_eq!(&decoded, expected);
2566 }
2567 }
2568
2569 #[test]
2570 fn test_datetime_old_schema_migration() {
2571 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2573 builder.append_value(441763200000000000); builder.append_value(1704067200000000000); builder.append_null();
2576
2577 let arr = builder.finish();
2578
2579 let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2581 let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2582 let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2583
2584 if let Value::Temporal(TemporalValue::DateTime {
2586 nanos_since_epoch,
2587 offset_seconds,
2588 timezone_name,
2589 }) = decoded_0
2590 {
2591 assert_eq!(nanos_since_epoch, 441763200000000000);
2592 assert_eq!(offset_seconds, 0);
2593 assert_eq!(timezone_name, Some("UTC".to_string()));
2594 } else {
2595 panic!("Expected DateTime value");
2596 }
2597
2598 assert_eq!(decoded_2, Value::Null);
2600 }
2601
2602 #[test]
2603 fn test_time_struct_encode_decode_roundtrip() {
2604 let values = vec![
2606 Value::Temporal(TemporalValue::Time {
2607 nanos_since_midnight: 37845000000000, offset_seconds: 3600, }),
2610 Value::Temporal(TemporalValue::Time {
2611 nanos_since_midnight: 0, offset_seconds: 0,
2613 }),
2614 Value::Temporal(TemporalValue::Time {
2615 nanos_since_midnight: 86399999999999, offset_seconds: -18000, }),
2618 ];
2619
2620 let arr_ref = values_to_time_struct_array(&values);
2622 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2623 assert_eq!(arr.len(), 3);
2624
2625 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2627 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2628 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2629
2630 assert_eq!(decoded_0, values[0]);
2632 assert_eq!(decoded_1, values[1]);
2633 assert_eq!(decoded_2, values[2]);
2634
2635 if let Value::Temporal(TemporalValue::Time {
2637 nanos_since_midnight,
2638 offset_seconds,
2639 }) = decoded_0
2640 {
2641 assert_eq!(nanos_since_midnight, 37845000000000);
2642 assert_eq!(offset_seconds, 3600);
2643 } else {
2644 panic!("Expected Time value");
2645 }
2646 }
2647
2648 #[test]
2649 fn test_time_struct_null_handling() {
2650 let values = vec![
2652 Value::Temporal(TemporalValue::Time {
2653 nanos_since_midnight: 37845000000000,
2654 offset_seconds: 3600,
2655 }),
2656 Value::Null,
2657 Value::Temporal(TemporalValue::Time {
2658 nanos_since_midnight: 0,
2659 offset_seconds: 0,
2660 }),
2661 ];
2662
2663 let arr_ref = values_to_time_struct_array(&values);
2664 let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2665 assert_eq!(arr.len(), 3);
2666
2667 let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2669 assert_eq!(decoded_0, values[0]);
2670
2671 assert!(arr.is_null(1));
2673 let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2674 assert_eq!(decoded_1, Value::Null);
2675
2676 let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2678 assert_eq!(decoded_2, values[2]);
2679 }
2680
2681 #[test]
2684 fn test_extract_vector_f32_values_valid_vector() {
2685 let v = vec![1.0, 2.0, 3.0];
2686 let val = Value::Vector(v.clone());
2687 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2688 assert_eq!(result, v);
2689 assert!(valid);
2690 }
2691
2692 #[test]
2693 fn test_extract_vector_f32_values_vector_wrong_dims() {
2694 let v = vec![1.0, 2.0];
2695 let val = Value::Vector(v);
2696 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2697 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2698 assert!(!valid);
2699 }
2700
2701 #[test]
2702 fn test_extract_vector_f32_values_valid_list() {
2703 let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2704 let val = Value::List(v);
2705 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2706 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2707 assert!(valid);
2708 }
2709
2710 #[test]
2711 fn test_extract_vector_f32_values_list_wrong_dims() {
2712 let v = vec![Value::Float(1.0), Value::Float(2.0)];
2713 let val = Value::List(v);
2714 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2715 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2716 assert!(!valid);
2717 }
2718
2719 #[test]
2720 fn test_extract_vector_f32_values_list_int_coercion() {
2721 let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2722 let val = Value::List(v);
2723 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2724 assert_eq!(result, vec![1.0, 2.0, 3.0]);
2725 assert!(valid);
2726 }
2727
2728 #[test]
2729 fn test_extract_vector_f32_values_none() {
2730 let (result, valid) = extract_vector_f32_values(None, false, 3);
2731 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2732 assert!(!valid);
2733 }
2734
2735 #[test]
2736 fn test_extract_vector_f32_values_null() {
2737 let val = Value::Null;
2738 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2739 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2740 assert!(!valid);
2741 }
2742
2743 #[test]
2744 fn test_extract_vector_f32_values_unsupported_type() {
2745 let val = Value::String("not a vector".to_string());
2746 let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2747 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2748 assert!(!valid);
2749 }
2750
2751 #[test]
2752 fn test_extract_vector_f32_values_deleted_with_none() {
2753 let (result, valid) = extract_vector_f32_values(None, true, 3);
2754 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2755 assert!(valid); }
2757
2758 #[test]
2759 fn test_extract_vector_f32_values_deleted_with_null() {
2760 let val = Value::Null;
2761 let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2762 assert_eq!(result, vec![0.0, 0.0, 0.0]);
2763 assert!(valid); }
2765
2766 #[test]
2769 fn test_values_to_fixed_size_list_vector_with_nulls() {
2770 let values = vec![
2771 Value::Vector(vec![1.0, 2.0]),
2772 Value::Null,
2773 Value::Vector(vec![3.0, 4.0]),
2774 Value::String("invalid".to_string()),
2775 ];
2776 let arr_ref = values_to_array(
2777 &values,
2778 &ArrowDataType::FixedSizeList(
2779 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2780 2,
2781 ),
2782 )
2783 .unwrap();
2784
2785 let arr = arr_ref
2786 .as_any()
2787 .downcast_ref::<FixedSizeListArray>()
2788 .unwrap();
2789
2790 assert_eq!(arr.len(), 4);
2791 assert!(arr.is_valid(0));
2792 assert!(!arr.is_valid(1)); assert!(arr.is_valid(2));
2794 assert!(!arr.is_valid(3)); }
2796
2797 #[test]
2798 fn test_values_to_fixed_size_list_from_list() {
2799 let values = vec![
2800 Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2801 Value::List(vec![Value::Int(3), Value::Int(4)]),
2802 ];
2803 let arr_ref = values_to_array(
2804 &values,
2805 &ArrowDataType::FixedSizeList(
2806 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2807 2,
2808 ),
2809 )
2810 .unwrap();
2811
2812 let arr = arr_ref
2813 .as_any()
2814 .downcast_ref::<FixedSizeListArray>()
2815 .unwrap();
2816
2817 assert_eq!(arr.len(), 2);
2818 assert!(arr.is_valid(0));
2819 assert!(arr.is_valid(1));
2820
2821 let child = arr
2823 .values()
2824 .as_any()
2825 .downcast_ref::<Float32Array>()
2826 .unwrap();
2827 assert_eq!(child.value(0), 1.0);
2828 assert_eq!(child.value(1), 2.0);
2829 assert_eq!(child.value(2), 3.0);
2830 assert_eq!(child.value(3), 4.0);
2831 }
2832
2833 #[test]
2834 fn test_values_to_fixed_size_list_wrong_dimensions() {
2835 let values = vec![
2836 Value::Vector(vec![1.0, 2.0, 3.0]), Value::List(vec![Value::Float(4.0)]), ];
2839 let arr_ref = values_to_array(
2840 &values,
2841 &ArrowDataType::FixedSizeList(
2842 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2843 2,
2844 ),
2845 )
2846 .unwrap();
2847
2848 let arr = arr_ref
2849 .as_any()
2850 .downcast_ref::<FixedSizeListArray>()
2851 .unwrap();
2852
2853 assert_eq!(arr.len(), 2);
2854 assert!(!arr.is_valid(0)); assert!(!arr.is_valid(1)); let child = arr
2859 .values()
2860 .as_any()
2861 .downcast_ref::<Float32Array>()
2862 .unwrap();
2863 assert_eq!(child.value(0), 0.0);
2864 assert_eq!(child.value(1), 0.0);
2865 assert_eq!(child.value(2), 0.0);
2866 assert_eq!(child.value(3), 0.0);
2867 }
2868
2869 #[test]
2870 fn test_values_to_fixed_size_list_all_nulls() {
2871 let values = vec![Value::Null, Value::Null, Value::Null];
2872 let arr_ref = values_to_array(
2873 &values,
2874 &ArrowDataType::FixedSizeList(
2875 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2876 3,
2877 ),
2878 )
2879 .unwrap();
2880
2881 let arr = arr_ref
2882 .as_any()
2883 .downcast_ref::<FixedSizeListArray>()
2884 .unwrap();
2885
2886 assert_eq!(arr.len(), 3);
2887 assert!(!arr.is_valid(0));
2888 assert!(!arr.is_valid(1));
2889 assert!(!arr.is_valid(2));
2890
2891 let child = arr
2893 .values()
2894 .as_any()
2895 .downcast_ref::<Float32Array>()
2896 .unwrap();
2897 assert_eq!(child.len(), 9);
2898 }
2899
2900 #[test]
2901 fn test_values_to_fixed_size_list_mixed_types() {
2902 let values = vec![
2903 Value::Vector(vec![1.0, 2.0]),
2904 Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2905 Value::Null,
2906 Value::String("invalid".to_string()),
2907 ];
2908 let arr_ref = values_to_array(
2909 &values,
2910 &ArrowDataType::FixedSizeList(
2911 Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2912 2,
2913 ),
2914 )
2915 .unwrap();
2916
2917 let arr = arr_ref
2918 .as_any()
2919 .downcast_ref::<FixedSizeListArray>()
2920 .unwrap();
2921
2922 assert_eq!(arr.len(), 4);
2923 assert!(arr.is_valid(0)); assert!(arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(!arr.is_valid(3)); let child = arr
2930 .values()
2931 .as_any()
2932 .downcast_ref::<Float32Array>()
2933 .unwrap();
2934 assert_eq!(child.value(0), 1.0);
2935 assert_eq!(child.value(1), 2.0);
2936 assert_eq!(child.value(2), 3.0);
2937 assert_eq!(child.value(3), 4.0);
2938 }
2939
2940 #[test]
2943 fn test_build_vector_column_with_nulls_and_deleted() {
2944 let data_type = DataType::Vector { dimensions: 3 };
2945 let extractor = PropertyExtractor::new("test_vec", &data_type);
2946
2947 let props = [
2948 Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2949 None, Some(Value::Null), Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2952 ];
2953 let deleted = [false, false, false, true]; let arr_ref = extractor
2956 .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2957 .unwrap();
2958
2959 let arr = arr_ref
2960 .as_any()
2961 .downcast_ref::<FixedSizeListArray>()
2962 .unwrap();
2963
2964 assert_eq!(arr.len(), 4);
2965 assert!(arr.is_valid(0)); assert!(!arr.is_valid(1)); assert!(!arr.is_valid(2)); assert!(arr.is_valid(3)); let child = arr
2972 .values()
2973 .as_any()
2974 .downcast_ref::<Float32Array>()
2975 .unwrap();
2976 assert_eq!(child.value(0), 1.0);
2977 assert_eq!(child.value(1), 2.0);
2978 assert_eq!(child.value(2), 3.0);
2979 assert_eq!(child.value(9), 0.0);
2983 assert_eq!(child.value(10), 0.0);
2984 assert_eq!(child.value(11), 0.0);
2985 }
2986
2987 #[test]
2988 fn test_build_vector_column_with_list_input() {
2989 let data_type = DataType::Vector { dimensions: 2 };
2990 let extractor = PropertyExtractor::new("test_vec", &data_type);
2991
2992 let props = [
2993 Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2994 Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2995 Some(Value::Vector(vec![5.0, 6.0])),
2996 ];
2997 let deleted = [false, false, false];
2998
2999 let arr_ref = extractor
3000 .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
3001 .unwrap();
3002
3003 let arr = arr_ref
3004 .as_any()
3005 .downcast_ref::<FixedSizeListArray>()
3006 .unwrap();
3007
3008 assert_eq!(arr.len(), 3);
3009 assert!(arr.is_valid(0));
3010 assert!(arr.is_valid(1));
3011 assert!(arr.is_valid(2));
3012
3013 let child = arr
3015 .values()
3016 .as_any()
3017 .downcast_ref::<Float32Array>()
3018 .unwrap();
3019 assert_eq!(child.value(0), 1.0);
3020 assert_eq!(child.value(1), 2.0);
3021 assert_eq!(child.value(2), 3.0);
3022 assert_eq!(child.value(3), 4.0);
3023 assert_eq!(child.value(4), 5.0);
3024 assert_eq!(child.value(5), 6.0);
3025 }
3026
3027 #[test]
3030 fn test_build_multivector_list_column_roundtrip() {
3031 let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 3 }));
3037 let extractor = PropertyExtractor::new("tokens", &data_type);
3038
3039 let props = [
3040 Some(Value::List(vec![
3042 Value::Vector(vec![1.0, 2.0, 3.0]),
3043 Value::Vector(vec![4.0, 5.0, 6.0]),
3044 ])),
3045 Some(Value::List(vec![
3047 Value::Vector(vec![7.0, 8.0, 9.0]),
3048 Value::Vector(vec![10.0, 11.0, 12.0]),
3049 Value::Vector(vec![13.0, 14.0, 15.0]),
3050 ])),
3051 Some(Value::List(vec![])),
3053 None,
3055 ];
3056 let deleted = [false, false, false, true];
3057
3058 let arr_ref = extractor
3059 .build_column(4, &deleted, |i| props[i].as_ref())
3060 .unwrap();
3061
3062 let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
3064 assert_eq!(outer.len(), 4);
3065 assert!(outer.is_valid(0));
3066 assert!(outer.is_valid(1));
3067 assert!(outer.is_valid(2)); assert!(!outer.is_valid(3)); assert_eq!(outer.value(0).len(), 2);
3072 assert_eq!(outer.value(1).len(), 3);
3073 assert_eq!(outer.value(2).len(), 0);
3074
3075 let row0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&data_type));
3078 assert_eq!(
3079 row0,
3080 Value::List(vec![
3081 Value::Vector(vec![1.0, 2.0, 3.0]),
3082 Value::Vector(vec![4.0, 5.0, 6.0]),
3083 ])
3084 );
3085
3086 let row1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&data_type));
3087 let Value::List(tokens) = row1 else {
3088 panic!("row1 should decode to a list of tokens");
3089 };
3090 assert_eq!(tokens.len(), 3);
3091 assert_eq!(tokens[2], Value::Vector(vec![13.0, 14.0, 15.0]));
3092 }
3093
3094 #[test]
3095 fn test_build_multivector_invalid_inner_tokens() {
3096 let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 2 }));
3100 let extractor = PropertyExtractor::new("tokens", &data_type);
3101
3102 let props = [Some(Value::List(vec![
3103 Value::Vector(vec![1.0, 2.0]), Value::Vector(vec![9.0, 9.0, 9.0]), Value::String("nope".to_string()), Value::List(vec![Value::Float(3.0), Value::Float(4.0)]), ]))];
3108 let deleted = [false];
3109
3110 let arr_ref = extractor
3111 .build_column(1, &deleted, |i| props[i].as_ref())
3112 .unwrap();
3113 let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
3114 let inner_row = outer.value(0);
3115 let inner = inner_row
3116 .as_any()
3117 .downcast_ref::<FixedSizeListArray>()
3118 .unwrap();
3119 assert_eq!(inner.len(), 4);
3120 assert!(inner.is_valid(0)); assert!(!inner.is_valid(1)); assert!(!inner.is_valid(2)); assert!(inner.is_valid(3)); }
3125
3126 #[test]
3127 fn test_values_to_array_multivector_schemaless_deferred() {
3128 let values = vec![Value::List(vec![Value::Vector(vec![1.0, 2.0])])];
3133 let dt = ArrowDataType::List(Arc::new(Field::new(
3134 "item",
3135 ArrowDataType::FixedSizeList(
3136 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
3137 2,
3138 ),
3139 true,
3140 )));
3141 assert!(values_to_array(&values, &dt).is_err());
3142 }
3143
3144 #[test]
3147 fn test_int32_and_date32_columns_null_out_of_range() {
3148 let dt = DataType::Int64;
3149 let extractor = PropertyExtractor::new("x", &dt);
3150
3151 let over = Value::Int(i64::from(i32::MAX) + 1);
3152 let ok = Value::Int(42);
3153 let vals = [over, ok];
3154 let deleted = [false, false];
3155
3156 let arr = extractor
3157 .build_int32_column(2, &deleted, |i| Some(&vals[i]))
3158 .unwrap();
3159 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3160 assert!(
3161 arr.is_null(0),
3162 "out-of-range i64 must be NULL in an int32 column, not wrapped"
3163 );
3164 assert_eq!(arr.value(1), 42);
3165
3166 let arr = extractor
3167 .build_date32_column(2, &deleted, |i| Some(&vals[i]))
3168 .unwrap();
3169 let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
3170 assert!(
3171 arr.is_null(0),
3172 "out-of-range day count must be NULL in a date32 column, not wrapped"
3173 );
3174 assert_eq!(arr.value(1), 42);
3175 }
3176}