Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147pub(crate) fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    // A typed `Map(_, Bytes)` value child carries the `uni_raw_bytes` marker; decode
154    // each value verbatim. CV-encoded map values carry no marker → codec path.
155    let value_hint = raw_bytes_hint(fields[1].metadata());
156    let key_col = structs.column(0);
157    let val_col = structs.column(1);
158    let mut map = HashMap::new();
159    for i in 0..structs.len() {
160        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
161            map.insert(k, arrow_to_value(val_col.as_ref(), i, value_hint));
162        }
163    }
164    Some(map)
165}
166
167/// Convert all elements of an Arrow array into a `Vec<Value>`.
168///
169/// `elem_type` is the schema hint for each element — `Some(DataType::Bytes)` when the
170/// list child field is marked `uni_raw_bytes`, so raw `Bytes` elements decode verbatim.
171fn array_to_value_list(arr: &ArrayRef, elem_type: Option<&DataType>) -> Vec<Value> {
172    (0..arr.len())
173        .map(|i| arrow_to_value(arr.as_ref(), i, elem_type))
174        .collect()
175}
176
177/// Returns `Some(&DataType::Bytes)` when Arrow field metadata marks the field as a
178/// raw `Bytes` value (`uni_raw_bytes=true`), else `None`. Used to discriminate raw
179/// `Bytes` container children from CV-encoded `LargeBinary` without array sniffing.
180fn raw_bytes_hint(metadata: &HashMap<String, String>) -> Option<&'static DataType> {
181    if metadata.get("uni_raw_bytes").is_some_and(|v| v == "true") {
182        Some(&DataType::Bytes)
183    } else {
184        None
185    }
186}
187
188/// Extracts the raw-`Bytes` element hint from a list-like Arrow type's child field.
189fn list_child_bytes_hint(dt: &ArrowDataType) -> Option<&'static DataType> {
190    match dt {
191        ArrowDataType::List(f)
192        | ArrowDataType::LargeList(f)
193        | ArrowDataType::FixedSizeList(f, _) => raw_bytes_hint(f.metadata()),
194        _ => None,
195    }
196}
197
198/// Convert an Arrow array value at a given row index to a Uni Value.
199///
200/// Handles all common Arrow types and recursively processes nested structures
201/// like Lists and Structs. The optional `data_type` parameter provides schema
202/// context for decoding DateTime and Time struct arrays; when provided, it
203/// takes precedence over runtime type detection.
204pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
205    if col.is_null(row) {
206        return Value::Null;
207    }
208
209    // Schema-driven decode for DateTime and Time structs
210    if let Some(dt) = data_type {
211        match dt {
212            DataType::DateTime => {
213                // Expect StructArray with three fields
214                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
215                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
216                        struct_arr.column_by_name("nanos_since_epoch"),
217                        struct_arr.column_by_name("offset_seconds"),
218                        struct_arr.column_by_name("timezone_name"),
219                    )
220                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
221                        nanos_col
222                            .as_any()
223                            .downcast_ref::<TimestampNanosecondArray>(),
224                        offset_col.as_any().downcast_ref::<Int32Array>(),
225                        tz_col.as_any().downcast_ref::<StringArray>(),
226                    )
227                {
228                    if nanos_arr.is_null(row) {
229                        return Value::Null;
230                    }
231                    let nanos = nanos_arr.value(row);
232                    if offset_arr.is_null(row) {
233                        // No offset → LocalDateTime
234                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
235                            nanos_since_epoch: nanos,
236                        });
237                    }
238                    let offset = offset_arr.value(row);
239                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
240                    return Value::Temporal(uni_common::TemporalValue::DateTime {
241                        nanos_since_epoch: nanos,
242                        offset_seconds: offset,
243                        timezone_name: tz_name,
244                    });
245                }
246                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
247                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
248                    let nanos = ts.value(row);
249                    let tz_name = ts.timezone().map(|s| s.to_string());
250                    return Value::Temporal(uni_common::TemporalValue::DateTime {
251                        nanos_since_epoch: nanos,
252                        offset_seconds: 0,
253                        timezone_name: tz_name,
254                    });
255                }
256            }
257            DataType::Time => {
258                // Expect StructArray with two fields
259                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
260                    && let (Some(nanos_col), Some(offset_col)) = (
261                        struct_arr.column_by_name("nanos_since_midnight"),
262                        struct_arr.column_by_name("offset_seconds"),
263                    )
264                    && let (Some(nanos_arr), Some(offset_arr)) = (
265                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
266                        offset_col.as_any().downcast_ref::<Int32Array>(),
267                    )
268                {
269                    // Check field-level nulls before calling .value()
270                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
271                        return Value::Null;
272                    }
273                    let nanos = nanos_arr.value(row);
274                    let offset = offset_arr.value(row);
275                    return Value::Temporal(uni_common::TemporalValue::Time {
276                        nanos_since_midnight: nanos,
277                        offset_seconds: offset,
278                    });
279                }
280                // Fall back to old schema: Time64Nanosecond → Time with offset=0
281                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
282                    let nanos = t.value(row);
283                    return Value::Temporal(uni_common::TemporalValue::Time {
284                        nanos_since_midnight: nanos,
285                        offset_seconds: 0,
286                    });
287                }
288            }
289            DataType::Bytes => {
290                let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
291                    log::warn!("Bytes column is not LargeBinaryArray");
292                    return Value::Null;
293                };
294                if arr.is_null(row) {
295                    return Value::Null;
296                }
297                return Value::Bytes(arr.value(row).to_vec());
298            }
299            DataType::Btic => {
300                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
301                    log::warn!("BTIC column is not FixedSizeBinaryArray");
302                    return Value::Null;
303                };
304                let bytes = fsb.value(row);
305                return match uni_btic::encode::decode_slice(bytes) {
306                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
307                        lo: btic.lo(),
308                        hi: btic.hi(),
309                        meta: btic.meta(),
310                    }),
311                    Err(e) => {
312                        log::warn!("BTIC decode error: {}", e);
313                        Value::Null
314                    }
315                };
316            }
317            _ => {}
318        }
319    }
320
321    // String types
322    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
323        return Value::String(s.value(row).to_string());
324    }
325
326    // Integer types
327    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
328        return Value::Int(u.value(row) as i64);
329    }
330    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
331        return Value::Int(i.value(row));
332    }
333    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
334        return Value::Int(i.value(row) as i64);
335    }
336
337    // Float types
338    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
339        return Value::Float(f.value(row));
340    }
341    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
342        return Value::Float(f.value(row) as f64);
343    }
344
345    // Boolean type
346    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
347        return Value::Bool(b.value(row));
348    }
349
350    // Fixed-size list (vectors)
351    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
352        let elem_hint = list_child_bytes_hint(list.data_type());
353        return Value::List(array_to_value_list(&list.value(row), elem_hint));
354    }
355
356    // Variable-size list
357    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
358        let arr = list.value(row);
359
360        // Map types are stored as List(Struct(key, value)); reconstruct as map
361        if let Some(obj) = try_reconstruct_map(&arr) {
362            return Value::Map(obj);
363        }
364
365        let elem_hint = list_child_bytes_hint(list.data_type());
366        return Value::List(array_to_value_list(&arr, elem_hint));
367    }
368
369    // Large list (variable-size list with i64 offsets)
370    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
371        let elem_hint = list_child_bytes_hint(list.data_type());
372        return Value::List(array_to_value_list(&list.value(row), elem_hint));
373    }
374
375    // Struct type — detect temporal structs by field names before generic handler
376    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
377        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
378
379        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
380        if field_names.contains(&"nanos_since_epoch")
381            && field_names.contains(&"offset_seconds")
382            && field_names.contains(&"timezone_name")
383            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
384                s.column_by_name("nanos_since_epoch"),
385                s.column_by_name("offset_seconds"),
386                s.column_by_name("timezone_name"),
387            )
388        {
389            // Try TimestampNanosecond first (standard schema), then Int64 fallback
390            let nanos_opt = nanos_col
391                .as_any()
392                .downcast_ref::<TimestampNanosecondArray>()
393                .map(|a| {
394                    if a.is_null(row) {
395                        None
396                    } else {
397                        Some(a.value(row))
398                    }
399                })
400                .or_else(|| {
401                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
402                        if a.is_null(row) {
403                            None
404                        } else {
405                            Some(a.value(row))
406                        }
407                    })
408                });
409            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
410                if a.is_null(row) {
411                    None
412                } else {
413                    Some(a.value(row))
414                }
415            });
416
417            if let Some(Some(nanos)) = nanos_opt {
418                match offset_opt {
419                    Some(Some(offset)) => {
420                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
421                            if a.is_null(row) {
422                                None
423                            } else {
424                                Some(a.value(row).to_string())
425                            }
426                        });
427                        return Value::Temporal(uni_common::TemporalValue::DateTime {
428                            nanos_since_epoch: nanos,
429                            offset_seconds: offset,
430                            timezone_name: tz_name,
431                        });
432                    }
433                    _ => {
434                        // No offset → LocalDateTime
435                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
436                            nanos_since_epoch: nanos,
437                        });
438                    }
439                }
440            }
441        }
442
443        // Time struct: {nanos_since_midnight, offset_seconds}
444        if field_names.contains(&"nanos_since_midnight")
445            && field_names.contains(&"offset_seconds")
446            && let (Some(nanos_col), Some(offset_col)) = (
447                s.column_by_name("nanos_since_midnight"),
448                s.column_by_name("offset_seconds"),
449            )
450        {
451            // Try Time64Nanosecond first (standard schema), then Int64 fallback
452            let nanos_opt = nanos_col
453                .as_any()
454                .downcast_ref::<Time64NanosecondArray>()
455                .map(|a| {
456                    if a.is_null(row) {
457                        None
458                    } else {
459                        Some(a.value(row))
460                    }
461                })
462                .or_else(|| {
463                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
464                        if a.is_null(row) {
465                            None
466                        } else {
467                            Some(a.value(row))
468                        }
469                    })
470                });
471            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
472                if a.is_null(row) {
473                    None
474                } else {
475                    Some(a.value(row))
476                }
477            });
478
479            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
480                return Value::Temporal(uni_common::TemporalValue::Time {
481                    nanos_since_midnight: nanos,
482                    offset_seconds: offset,
483                });
484            }
485        }
486
487        // Generic struct → Map
488        let mut map = HashMap::new();
489        for (field, child) in s.fields().iter().zip(s.columns()) {
490            map.insert(
491                field.name().clone(),
492                arrow_to_value(child.as_ref(), row, None),
493            );
494        }
495        return Value::Map(map);
496    }
497
498    // Date32 type (days since epoch) - return as Value::Temporal
499    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
500        let days = d.value(row);
501        return Value::Temporal(uni_common::TemporalValue::Date {
502            days_since_epoch: days,
503        });
504    }
505
506    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
507    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
508        let nanos = ts.value(row);
509        return match ts.timezone() {
510            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
511                nanos_since_epoch: nanos,
512                offset_seconds: 0,
513                timezone_name: Some(tz.to_string()),
514            }),
515            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
516                nanos_since_epoch: nanos,
517            }),
518        };
519    }
520
521    // Time64 (nanoseconds since midnight) - return as Value::Temporal
522    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
523        let nanos = t.value(row);
524        return Value::Temporal(uni_common::TemporalValue::LocalTime {
525            nanos_since_midnight: nanos,
526        });
527    }
528
529    // Time64 (microseconds since midnight) - convert to nanoseconds
530    if let Some(t) = col
531        .as_any()
532        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
533    {
534        let micros = t.value(row);
535        return Value::Temporal(uni_common::TemporalValue::LocalTime {
536            nanos_since_midnight: micros * 1000,
537        });
538    }
539
540    // DurationMicrosecond - convert to Duration with nanoseconds
541    if let Some(d) = col
542        .as_any()
543        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
544    {
545        let micros = d.value(row);
546        let total_nanos = micros * 1000;
547        let seconds = total_nanos / 1_000_000_000;
548        let remaining_nanos = total_nanos % 1_000_000_000;
549        return Value::Temporal(uni_common::TemporalValue::Duration {
550            months: 0,
551            days: 0,
552            nanos: seconds * 1_000_000_000 + remaining_nanos,
553        });
554    }
555
556    // IntervalMonthDayNano - return as Value::Temporal(Duration)
557    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
558        let val = interval.value(row);
559        return Value::Temporal(uni_common::TemporalValue::Duration {
560            months: val.months as i64,
561            days: val.days as i64,
562            nanos: val.nanoseconds,
563        });
564    }
565
566    // LargeBinary (CypherValue MessagePack-tagged encoding)
567    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
568        let bytes = b.value(row);
569        if bytes.is_empty() {
570            return Value::Null;
571        }
572        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
573            eprintln!("CypherValue decode error: {}", e);
574            Value::Null
575        });
576    }
577
578    // FixedSizeBinary(24) — BTIC temporal interval
579    if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
580        && fsb.value_length() == 24
581    {
582        let bytes = fsb.value(row);
583        return match uni_btic::encode::decode_slice(bytes) {
584            Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
585                lo: btic.lo(),
586                hi: btic.hi(),
587                meta: btic.meta(),
588            }),
589            Err(e) => {
590                log::warn!("BTIC decode error: {}", e);
591                Value::Null
592            }
593        };
594    }
595
596    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
597    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
598        let bytes = b.value(row);
599        return Crdt::from_msgpack(bytes)
600            .ok()
601            .and_then(|crdt| serde_json::to_value(&crdt).ok())
602            .map(Value::from)
603            .unwrap_or(Value::Null);
604    }
605
606    // Fallback
607    Value::Null
608}
609
610fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
611    let mut builder = UInt64Builder::with_capacity(values.len());
612    for v in values {
613        if let Some(n) = v.as_u64() {
614            builder.append_value(n);
615        } else {
616            builder.append_null();
617        }
618    }
619    Arc::new(builder.finish())
620}
621
622fn values_to_int64_array(values: &[Value]) -> ArrayRef {
623    let mut builder = Int64Builder::with_capacity(values.len());
624    for v in values {
625        if let Some(n) = v.as_i64() {
626            builder.append_value(n);
627        } else {
628            builder.append_null();
629        }
630    }
631    Arc::new(builder.finish())
632}
633
634fn values_to_int32_array(values: &[Value]) -> ArrayRef {
635    let mut builder = Int32Builder::with_capacity(values.len());
636    for v in values {
637        if let Some(n) = v.as_i64() {
638            builder.append_value(n as i32);
639        } else {
640            builder.append_null();
641        }
642    }
643    Arc::new(builder.finish())
644}
645
646fn values_to_string_array(values: &[Value]) -> ArrayRef {
647    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
648    for v in values {
649        if let Some(s) = v.as_str() {
650            builder.append_value(s);
651        } else if v.is_null() {
652            builder.append_null();
653        } else {
654            builder.append_value(v.to_string());
655        }
656    }
657    Arc::new(builder.finish())
658}
659
660fn values_to_bool_array(values: &[Value]) -> ArrayRef {
661    let mut builder = BooleanBuilder::with_capacity(values.len());
662    for v in values {
663        if let Some(b) = v.as_bool() {
664            builder.append_value(b);
665        } else {
666            builder.append_null();
667        }
668    }
669    Arc::new(builder.finish())
670}
671
672fn values_to_float32_array(values: &[Value]) -> ArrayRef {
673    let mut builder = Float32Builder::with_capacity(values.len());
674    for v in values {
675        if let Some(n) = v.as_f64() {
676            builder.append_value(n as f32);
677        } else {
678            builder.append_null();
679        }
680    }
681    Arc::new(builder.finish())
682}
683
684fn values_to_float64_array(values: &[Value]) -> ArrayRef {
685    let mut builder = Float64Builder::with_capacity(values.len());
686    for v in values {
687        if let Some(n) = v.as_f64() {
688            builder.append_value(n);
689        } else {
690            builder.append_null();
691        }
692    }
693    Arc::new(builder.finish())
694}
695
696fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
697    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
698    for v in values {
699        match v {
700            Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
701                let btic = uni_btic::Btic::new(*lo, *hi, *meta)
702                    .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
703                builder.append_value(uni_btic::encode::encode(&btic))?;
704            }
705            Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
706                Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
707                Err(_) => builder.append_null(),
708            },
709            Value::List(bytes) => {
710                let b: Vec<u8> = bytes
711                    .iter()
712                    .map(|bv| bv.as_u64().unwrap_or(0) as u8)
713                    .collect();
714                if b.len() as i32 == size {
715                    builder.append_value(&b)?;
716                } else {
717                    builder.append_null();
718                }
719            }
720            _ => builder.append_null(),
721        }
722    }
723    Ok(Arc::new(builder.finish()))
724}
725
726/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
727///
728/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
729/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
730///
731/// # Arguments
732/// - `val`: Optional property value to extract from
733/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
734/// - `dimensions`: Expected vector dimensions
735///
736/// # Returns
737/// - Tuple of (vector values, validity flag)
738///   - Vector always has exactly `dimensions` elements
739///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
740pub fn extract_vector_f32_values(
741    val: Option<&Value>,
742    is_deleted: bool,
743    dimensions: usize,
744) -> (Vec<f32>, bool) {
745    let zeros = || vec![0.0_f32; dimensions];
746
747    // Deleted entries always return zeros with valid=true
748    if is_deleted {
749        return (zeros(), true);
750    }
751
752    match val {
753        // Native f32 vector (Value::Vector)
754        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
755        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
756        // List of values (Value::List) - convert to f32
757        Some(Value::List(arr)) if arr.len() == dimensions => {
758            let values: Vec<f32> = arr
759                .iter()
760                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
761                .collect();
762            (values, true)
763        }
764        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
765        _ => (zeros(), false),                    // Missing or unsupported value
766    }
767}
768
769fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
770    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
771    for v in values {
772        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
773        for val in vals {
774            builder.values().append_value(val);
775        }
776        builder.append(valid);
777    }
778    Arc::new(builder.finish())
779}
780
781fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
782    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
783    for v in values {
784        if v.is_null() {
785            builder.append_null();
786        } else if let Value::Temporal(tv) = v {
787            match tv {
788                uni_common::TemporalValue::DateTime {
789                    nanos_since_epoch, ..
790                }
791                | uni_common::TemporalValue::LocalDateTime {
792                    nanos_since_epoch, ..
793                } => builder.append_value(*nanos_since_epoch),
794                _ => builder.append_null(),
795            }
796        } else if let Some(n) = v.as_i64() {
797            builder.append_value(n);
798        } else if let Some(s) = v.as_str() {
799            match parse_datetime_to_nanos(s) {
800                Some(nanos) => builder.append_value(nanos),
801                None => builder.append_null(),
802            }
803        } else {
804            builder.append_null();
805        }
806    }
807
808    let arr = builder.finish();
809    if let Some(tz) = tz {
810        Arc::new(arr.with_timezone(tz.as_ref()))
811    } else {
812        Arc::new(arr)
813    }
814}
815
816/// Build a DateTime struct array from values.
817///
818/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
819/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
820fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
821    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
822    let mut offset_builder = Int32Builder::with_capacity(values.len());
823    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
824    let mut null_buffer = BooleanBufferBuilder::new(values.len());
825
826    for v in values {
827        match v {
828            Value::Temporal(uni_common::TemporalValue::DateTime {
829                nanos_since_epoch,
830                offset_seconds,
831                timezone_name,
832            }) => {
833                nanos_builder.append_value(*nanos_since_epoch);
834                offset_builder.append_value(*offset_seconds);
835                tz_builder.append_option(timezone_name.as_deref());
836                null_buffer.append(true);
837            }
838            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
839                nanos_builder.append_value(*nanos_since_epoch);
840                offset_builder.append_null();
841                tz_builder.append_null();
842                null_buffer.append(true);
843            }
844            _ => {
845                nanos_builder.append_null();
846                offset_builder.append_null();
847                tz_builder.append_null();
848                null_buffer.append(false);
849            }
850        }
851    }
852
853    let struct_arr = StructArray::new(
854        schema::datetime_struct_fields(),
855        vec![
856            Arc::new(nanos_builder.finish()) as ArrayRef,
857            Arc::new(offset_builder.finish()) as ArrayRef,
858            Arc::new(tz_builder.finish()) as ArrayRef,
859        ],
860        Some(null_buffer.finish().into()),
861    );
862    Arc::new(struct_arr)
863}
864
865/// Build a Time struct array from values.
866///
867/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
868/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
869fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
870    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
871    let mut offset_builder = Int32Builder::with_capacity(values.len());
872    let mut null_buffer = BooleanBufferBuilder::new(values.len());
873
874    for v in values {
875        match v {
876            Value::Temporal(uni_common::TemporalValue::Time {
877                nanos_since_midnight,
878                offset_seconds,
879            }) => {
880                nanos_builder.append_value(*nanos_since_midnight);
881                offset_builder.append_value(*offset_seconds);
882                null_buffer.append(true);
883            }
884            Value::Temporal(uni_common::TemporalValue::LocalTime {
885                nanos_since_midnight,
886            }) => {
887                nanos_builder.append_value(*nanos_since_midnight);
888                offset_builder.append_null();
889                null_buffer.append(true);
890            }
891            _ => {
892                nanos_builder.append_null();
893                offset_builder.append_null();
894                null_buffer.append(false);
895            }
896        }
897    }
898
899    let struct_arr = StructArray::new(
900        schema::time_struct_fields(),
901        vec![
902            Arc::new(nanos_builder.finish()) as ArrayRef,
903            Arc::new(offset_builder.finish()) as ArrayRef,
904        ],
905        Some(null_buffer.finish().into()),
906    );
907    Arc::new(struct_arr)
908}
909
910fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
911    let mut builder =
912        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
913    for v in values {
914        if v.is_null() {
915            builder.append_null();
916        } else {
917            // Encode as CypherValue (MessagePack-tagged)
918            let cv_bytes = uni_common::cypher_value_codec::encode(v);
919            builder.append_value(&cv_bytes);
920        }
921    }
922    Arc::new(builder.finish())
923}
924
925/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
926pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
927    match dt {
928        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
929        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
930        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
931        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
932        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
933        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
934        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
935        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
936        ArrowDataType::FixedSizeList(inner, size) => {
937            if inner.data_type() == &ArrowDataType::Float32 {
938                Ok(values_to_fixed_size_list_f32_array(values, *size))
939            } else {
940                Err(anyhow!("Unsupported FixedSizeList inner type"))
941            }
942        }
943        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
944            Ok(values_to_timestamp_array(values, tz.as_ref()))
945        }
946        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
947            Ok(values_to_timestamp_array(values, tz.as_ref()))
948        }
949        ArrowDataType::Date32 => {
950            let mut builder = Date32Builder::with_capacity(values.len());
951            for v in values {
952                if v.is_null() {
953                    builder.append_null();
954                } else if let Value::Temporal(uni_common::TemporalValue::Date {
955                    days_since_epoch,
956                }) = v
957                {
958                    builder.append_value(*days_since_epoch);
959                } else if let Some(n) = v.as_i64() {
960                    builder.append_value(n as i32);
961                } else {
962                    builder.append_null();
963                }
964            }
965            Ok(Arc::new(builder.finish()))
966        }
967        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
968            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
969            for v in values {
970                if v.is_null() {
971                    builder.append_null();
972                } else if let Value::Temporal(tv) = v {
973                    match tv {
974                        uni_common::TemporalValue::LocalTime {
975                            nanos_since_midnight,
976                        }
977                        | uni_common::TemporalValue::Time {
978                            nanos_since_midnight,
979                            ..
980                        } => builder.append_value(*nanos_since_midnight),
981                        _ => builder.append_null(),
982                    }
983                } else if let Some(n) = v.as_i64() {
984                    builder.append_value(n);
985                } else {
986                    builder.append_null();
987                }
988            }
989            Ok(Arc::new(builder.finish()))
990        }
991        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
992            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
993            for v in values {
994                if v.is_null() {
995                    builder.append_null();
996                } else if let Value::Temporal(tv) = v {
997                    match tv {
998                        uni_common::TemporalValue::LocalTime {
999                            nanos_since_midnight,
1000                        }
1001                        | uni_common::TemporalValue::Time {
1002                            nanos_since_midnight,
1003                            ..
1004                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
1005                        _ => builder.append_null(),
1006                    }
1007                } else if let Some(n) = v.as_i64() {
1008                    builder.append_value(n);
1009                } else {
1010                    builder.append_null();
1011                }
1012            }
1013            Ok(Arc::new(builder.finish()))
1014        }
1015        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
1016            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
1017            for v in values {
1018                if v.is_null() {
1019                    builder.append_null();
1020                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1021                    months,
1022                    days,
1023                    nanos,
1024                }) = v
1025                {
1026                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
1027                        months: *months as i32,
1028                        days: *days as i32,
1029                        nanoseconds: *nanos,
1030                    });
1031                } else {
1032                    builder.append_null();
1033                }
1034            }
1035            Ok(Arc::new(builder.finish()))
1036        }
1037        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1038            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1039            for v in values {
1040                if v.is_null() {
1041                    builder.append_null();
1042                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1043                    months,
1044                    days,
1045                    nanos,
1046                }) = v
1047                {
1048                    let total_micros =
1049                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1050                    builder.append_value(total_micros);
1051                } else if let Some(n) = v.as_i64() {
1052                    builder.append_value(n);
1053                } else {
1054                    builder.append_null();
1055                }
1056            }
1057            Ok(Arc::new(builder.finish()))
1058        }
1059        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1060        ArrowDataType::List(field) => {
1061            if field.data_type() == &ArrowDataType::Utf8 {
1062                let mut builder = ListBuilder::new(StringBuilder::new());
1063                for v in values {
1064                    if let Value::List(arr) = v {
1065                        for item in arr {
1066                            if let Some(s) = item.as_str() {
1067                                builder.values().append_value(s);
1068                            } else {
1069                                builder.values().append_null();
1070                            }
1071                        }
1072                        builder.append(true);
1073                    } else {
1074                        builder.append_null();
1075                    }
1076                }
1077                Ok(Arc::new(builder.finish()))
1078            } else {
1079                Err(anyhow!(
1080                    "Unsupported List inner type: {:?}",
1081                    field.data_type()
1082                ))
1083            }
1084        }
1085        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1086            Ok(values_to_datetime_struct_array(values))
1087        }
1088        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1089            Ok(values_to_time_struct_array(values))
1090        }
1091        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1092    }
1093}
1094
1095/// Property value extractor for building Arrow columns from entity properties.
1096pub struct PropertyExtractor<'a> {
1097    data_type: &'a DataType,
1098}
1099
1100impl<'a> PropertyExtractor<'a> {
1101    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1102        Self { data_type }
1103    }
1104
1105    /// Build an Arrow column from a slice of property maps.
1106    /// The `deleted` slice indicates which entries are deleted (use default values).
1107    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1108    where
1109        F: Fn(usize) -> Option<&'a Value>,
1110    {
1111        match self.data_type {
1112            DataType::String => self.build_string_column(len, deleted, get_props),
1113            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1114            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1115            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1116            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1117            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1118            DataType::Vector { dimensions } => {
1119                self.build_vector_column(len, deleted, get_props, *dimensions)
1120            }
1121            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1122            DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1123            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1124            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1125            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1126            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1127            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1128            DataType::Date => self.build_date32_column(len, deleted, get_props),
1129            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1130            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1131            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1132            _ => Err(anyhow!(
1133                "Unsupported data type for arrow conversion: {:?}",
1134                self.data_type
1135            )),
1136        }
1137    }
1138
1139    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1140    where
1141        F: Fn(usize) -> Option<&'a Value>,
1142    {
1143        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1144        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1145            let prop = get_props(i);
1146            if let Some(s) = prop.and_then(|v| v.as_str()) {
1147                builder.append_value(s);
1148            } else if let Some(Value::Temporal(tv)) = prop {
1149                builder.append_value(tv.to_string());
1150            } else if is_deleted {
1151                builder.append_value("");
1152            } else {
1153                builder.append_null();
1154            }
1155        }
1156        Ok(Arc::new(builder.finish()))
1157    }
1158
1159    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1160    where
1161        F: Fn(usize) -> Option<&'a Value>,
1162    {
1163        let mut values = Vec::with_capacity(len);
1164        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1165            // i64 -> i32 via try_from: an out-of-range value becomes NULL rather
1166            // than silently wrapping to a different number. (review H13)
1167            let val = get_props(i)
1168                .and_then(|v| v.as_i64())
1169                .and_then(|v| i32::try_from(v).ok());
1170            if val.is_none() && is_deleted {
1171                values.push(Some(0));
1172            } else {
1173                values.push(val);
1174            }
1175        }
1176        Ok(Arc::new(Int32Array::from(values)))
1177    }
1178
1179    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1180    where
1181        F: Fn(usize) -> Option<&'a Value>,
1182    {
1183        let mut values = Vec::with_capacity(len);
1184        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1185            let val = get_props(i).and_then(|v| v.as_i64());
1186            if val.is_none() && is_deleted {
1187                values.push(Some(0));
1188            } else {
1189                values.push(val);
1190            }
1191        }
1192        Ok(Arc::new(Int64Array::from(values)))
1193    }
1194
1195    fn build_timestamp_column<F>(
1196        &self,
1197        len: usize,
1198        deleted: &[bool],
1199        get_props: F,
1200    ) -> Result<ArrayRef>
1201    where
1202        F: Fn(usize) -> Option<&'a Value>,
1203    {
1204        let mut values = Vec::with_capacity(len);
1205        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1206            let val = get_props(i);
1207            let ts = if is_deleted || val.is_none() {
1208                Some(0i64)
1209            } else if let Some(Value::Temporal(tv)) = val {
1210                match tv {
1211                    uni_common::TemporalValue::DateTime {
1212                        nanos_since_epoch, ..
1213                    }
1214                    | uni_common::TemporalValue::LocalDateTime {
1215                        nanos_since_epoch, ..
1216                    } => Some(*nanos_since_epoch),
1217                    _ => None,
1218                }
1219            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1220                Some(v)
1221            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1222                parse_datetime_to_nanos(s)
1223            } else {
1224                None
1225            };
1226
1227            if is_deleted {
1228                values.push(Some(0));
1229            } else {
1230                values.push(ts);
1231            }
1232        }
1233        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1234        Ok(Arc::new(arr))
1235    }
1236
1237    fn build_datetime_struct_column<F>(
1238        &self,
1239        len: usize,
1240        deleted: &[bool],
1241        get_props: F,
1242    ) -> Result<ArrayRef>
1243    where
1244        F: Fn(usize) -> Option<&'a Value>,
1245    {
1246        let values = self.collect_values_or_null(len, deleted, &get_props);
1247        Ok(values_to_datetime_struct_array(&values))
1248    }
1249
1250    fn build_time_struct_column<F>(
1251        &self,
1252        len: usize,
1253        deleted: &[bool],
1254        get_props: F,
1255    ) -> Result<ArrayRef>
1256    where
1257        F: Fn(usize) -> Option<&'a Value>,
1258    {
1259        let values = self.collect_values_or_null(len, deleted, &get_props);
1260        Ok(values_to_time_struct_array(&values))
1261    }
1262
1263    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1264    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1265    where
1266        F: Fn(usize) -> Option<&'a Value>,
1267    {
1268        deleted
1269            .iter()
1270            .enumerate()
1271            .take(len)
1272            .map(|(i, &is_deleted)| {
1273                if is_deleted {
1274                    Value::Null
1275                } else {
1276                    get_props(i).cloned().unwrap_or(Value::Null)
1277                }
1278            })
1279            .collect()
1280    }
1281
1282    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1283    where
1284        F: Fn(usize) -> Option<&'a Value>,
1285    {
1286        let mut builder = Date32Builder::with_capacity(len);
1287        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1288
1289        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1290            let val = get_props(i);
1291            let days = if is_deleted || val.is_none() {
1292                Some(0)
1293            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1294                days_since_epoch,
1295            })) = val
1296            {
1297                Some(*days_since_epoch)
1298            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1299                // i64 day count -> i32: an out-of-range value becomes NULL
1300                // rather than silently wrapping to a different date. (review H13)
1301                i32::try_from(v).ok()
1302            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1303                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1304                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1305                    Err(_) => None,
1306                }
1307            } else {
1308                None
1309            };
1310
1311            if is_deleted {
1312                builder.append_value(0);
1313            } else if let Some(v) = days {
1314                builder.append_value(v);
1315            } else {
1316                builder.append_null();
1317            }
1318        }
1319        Ok(Arc::new(builder.finish()))
1320    }
1321
1322    fn build_duration_column<F>(
1323        &self,
1324        len: usize,
1325        deleted: &[bool],
1326        get_props: F,
1327    ) -> Result<ArrayRef>
1328    where
1329        F: Fn(usize) -> Option<&'a Value>,
1330    {
1331        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1332        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1333        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1334            let raw_val = get_props(i);
1335            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1336            {
1337                let encoded = uni_common::cypher_value_codec::encode(val);
1338                builder.append_value(&encoded);
1339            } else if is_deleted {
1340                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1341                    months: 0,
1342                    days: 0,
1343                    nanos: 0,
1344                });
1345                let encoded = uni_common::cypher_value_codec::encode(&zero);
1346                builder.append_value(&encoded);
1347            } else {
1348                builder.append_null();
1349            }
1350        }
1351        Ok(Arc::new(builder.finish()))
1352    }
1353
1354    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1355    where
1356        F: Fn(usize) -> Option<&'a Value>,
1357    {
1358        const ENCODED_LEN: i32 = 24;
1359        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1360        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1361            let raw_val = get_props(i);
1362            let btic = match raw_val {
1363                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1364                    uni_btic::Btic::new(*lo, *hi, *meta)
1365                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1366                ),
1367                Some(Value::String(s)) => Some(
1368                    uni_btic::parse::parse_btic_literal(s)
1369                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1370                ),
1371                _ => None,
1372            };
1373
1374            if let Some(b) = btic {
1375                builder.append_value(uni_btic::encode::encode(&b))?;
1376            } else if is_deleted {
1377                builder.append_value([0u8; ENCODED_LEN as usize])?;
1378            } else {
1379                builder.append_null();
1380            }
1381        }
1382        Ok(Arc::new(builder.finish()))
1383    }
1384
1385    fn build_float32_column<F>(
1386        &self,
1387        len: usize,
1388        deleted: &[bool],
1389        get_props: F,
1390    ) -> Result<ArrayRef>
1391    where
1392        F: Fn(usize) -> Option<&'a Value>,
1393    {
1394        let mut values = Vec::with_capacity(len);
1395        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1396            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1397            if val.is_none() && is_deleted {
1398                values.push(Some(0.0));
1399            } else {
1400                values.push(val);
1401            }
1402        }
1403        Ok(Arc::new(Float32Array::from(values)))
1404    }
1405
1406    fn build_float64_column<F>(
1407        &self,
1408        len: usize,
1409        deleted: &[bool],
1410        get_props: F,
1411    ) -> Result<ArrayRef>
1412    where
1413        F: Fn(usize) -> Option<&'a Value>,
1414    {
1415        let mut values = Vec::with_capacity(len);
1416        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1417            let val = get_props(i).and_then(|v| v.as_f64());
1418            if val.is_none() && is_deleted {
1419                values.push(Some(0.0));
1420            } else {
1421                values.push(val);
1422            }
1423        }
1424        Ok(Arc::new(Float64Array::from(values)))
1425    }
1426
1427    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1428    where
1429        F: Fn(usize) -> Option<&'a Value>,
1430    {
1431        let mut values = Vec::with_capacity(len);
1432        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1433            let val = get_props(i).and_then(|v| v.as_bool());
1434            if val.is_none() && is_deleted {
1435                values.push(Some(false));
1436            } else {
1437                values.push(val);
1438            }
1439        }
1440        Ok(Arc::new(BooleanArray::from(values)))
1441    }
1442
1443    fn build_vector_column<F>(
1444        &self,
1445        len: usize,
1446        deleted: &[bool],
1447        get_props: F,
1448        dimensions: usize,
1449    ) -> Result<ArrayRef>
1450    where
1451        F: Fn(usize) -> Option<&'a Value>,
1452    {
1453        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1454
1455        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1456            let val = get_props(i);
1457            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1458            for v in values {
1459                builder.values().append_value(v);
1460            }
1461            builder.append(valid);
1462        }
1463        Ok(Arc::new(builder.finish()))
1464    }
1465
1466    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1467    where
1468        F: Fn(usize) -> Option<&'a Value>,
1469    {
1470        let null_val = Value::Null;
1471        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1472        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1473            let val = get_props(i);
1474            let uni_val = if val.is_none() && is_deleted {
1475                &null_val
1476            } else {
1477                val.unwrap_or(&null_val)
1478            };
1479            // Encode to CypherValue (MessagePack-tagged)
1480            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1481            builder.append_value(&cv_bytes);
1482        }
1483        Ok(Arc::new(builder.finish()))
1484    }
1485
1486    fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1487    where
1488        F: Fn(usize) -> Option<&'a Value>,
1489    {
1490        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1491        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1492            let val = get_props(i);
1493            if let Some(Value::Bytes(b)) = val {
1494                builder.append_value(b);
1495            } else if is_deleted {
1496                builder.append_value(&[][..]);
1497            } else {
1498                builder.append_null();
1499            }
1500        }
1501        Ok(Arc::new(builder.finish()))
1502    }
1503
1504    fn build_list_column<F>(
1505        &self,
1506        len: usize,
1507        deleted: &[bool],
1508        get_props: F,
1509        inner: &DataType,
1510    ) -> Result<ArrayRef>
1511    where
1512        F: Fn(usize) -> Option<&'a Value>,
1513    {
1514        match inner {
1515            DataType::String => {
1516                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1517                    if let Some(s) = v.as_str() {
1518                        b.append_value(s);
1519                    } else {
1520                        b.append_null();
1521                    }
1522                })
1523            }
1524            DataType::Int64 => {
1525                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1526                    if let Some(n) = v.as_i64() {
1527                        b.append_value(n);
1528                    } else {
1529                        b.append_null();
1530                    }
1531                })
1532            }
1533            DataType::Float64 => {
1534                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1535                    if let Some(f) = v.as_f64() {
1536                        b.append_value(f);
1537                    } else {
1538                        b.append_null();
1539                    }
1540                })
1541            }
1542            DataType::Bytes => {
1543                // Raw `Bytes` elements: store each buffer verbatim in a `LargeBinary`
1544                // child and mark the child field `uni_raw_bytes` so the read path
1545                // decodes it verbatim instead of through the tagged codec.
1546                let item_field = Arc::new(
1547                    Field::new("item", ArrowDataType::LargeBinary, true)
1548                        .with_metadata(schema::raw_bytes_field_metadata()),
1549                );
1550                let mut builder =
1551                    ListBuilder::new(LargeBinaryBuilder::new()).with_field(item_field);
1552                for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1553                    let val_array = get_props(i).and_then(|v| v.as_array());
1554                    if val_array.is_none() && is_deleted {
1555                        builder.append_null();
1556                    } else if let Some(arr) = val_array {
1557                        for v in arr {
1558                            if let Value::Bytes(b) = v {
1559                                builder.values().append_value(b);
1560                            } else {
1561                                builder.values().append_null();
1562                            }
1563                        }
1564                        builder.append(true);
1565                    } else {
1566                        builder.append_null();
1567                    }
1568                }
1569                Ok(Arc::new(builder.finish()))
1570            }
1571            DataType::Vector { dimensions } => {
1572                // Multi-vector / late-interaction (ColBERT) property: a per-row
1573                // variable-count set of fixed-`dimensions` token vectors, stored as
1574                // `List<FixedSizeList<Float32, dimensions>>`. Each inner token is
1575                // validated through the same `extract_vector_f32_values` path used for
1576                // single dense vectors, so dimension/type failure modes match.
1577                //
1578                // NOTE: only the declared-schema write path (this builder) supports
1579                // multi-vector today. The schemaless, Arrow-type-driven `values_to_array`
1580                // path does not yet handle `List<FixedSizeList<Float32>>`; schemaless
1581                // multi-vector writes are a deferred follow-up (issue #96, Phase 1.5).
1582                let dim = *dimensions as i32;
1583                let mut builder =
1584                    ListBuilder::new(FixedSizeListBuilder::new(Float32Builder::new(), dim));
1585                for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1586                    let val_array = get_props(i).and_then(|v| v.as_array());
1587                    if val_array.is_none() && is_deleted {
1588                        builder.append_null();
1589                    } else if let Some(arr) = val_array {
1590                        // Variable token count per row: append one fixed-size inner
1591                        // vector per token. `extract_vector_f32_values` always yields
1592                        // exactly `dimensions` values, satisfying the FixedSizeList stride.
1593                        for tok in arr {
1594                            let (vals, valid) =
1595                                extract_vector_f32_values(Some(tok), false, *dimensions);
1596                            for v in vals {
1597                                builder.values().values().append_value(v);
1598                            }
1599                            builder.values().append(valid);
1600                        }
1601                        builder.append(true);
1602                    } else {
1603                        builder.append_null();
1604                    }
1605                }
1606                Ok(Arc::new(builder.finish()))
1607            }
1608            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1609        }
1610    }
1611
1612    /// Generic helper to build a list column with any inner builder type.
1613    fn build_typed_list<F, B, A>(
1614        &self,
1615        len: usize,
1616        deleted: &[bool],
1617        get_props: &F,
1618        inner_builder: B,
1619        mut append_value: A,
1620    ) -> Result<ArrayRef>
1621    where
1622        F: Fn(usize) -> Option<&'a Value>,
1623        B: arrow_array::builder::ArrayBuilder,
1624        A: FnMut(&Value, &mut B),
1625    {
1626        let mut builder = ListBuilder::new(inner_builder);
1627        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1628            let val_array = get_props(i).and_then(|v| v.as_array());
1629            if val_array.is_none() && is_deleted {
1630                builder.append_null();
1631            } else if let Some(arr) = val_array {
1632                for v in arr {
1633                    append_value(v, builder.values());
1634                }
1635                builder.append(true);
1636            } else {
1637                builder.append_null();
1638            }
1639        }
1640        Ok(Arc::new(builder.finish()))
1641    }
1642
1643    fn build_map_column<F>(
1644        &self,
1645        len: usize,
1646        deleted: &[bool],
1647        get_props: F,
1648        key: &DataType,
1649        value: &DataType,
1650    ) -> Result<ArrayRef>
1651    where
1652        F: Fn(usize) -> Option<&'a Value>,
1653    {
1654        if !matches!(key, DataType::String) {
1655            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1656        }
1657
1658        match value {
1659            DataType::String => self.build_typed_map(
1660                len,
1661                deleted,
1662                &get_props,
1663                StringBuilder::new(),
1664                arrow_schema::DataType::Utf8,
1665                None,
1666                |v, b: &mut StringBuilder| {
1667                    if let Some(s) = v.as_str() {
1668                        b.append_value(s);
1669                    } else {
1670                        b.append_null();
1671                    }
1672                },
1673            ),
1674            DataType::Int64 => self.build_typed_map(
1675                len,
1676                deleted,
1677                &get_props,
1678                Int64Builder::new(),
1679                arrow_schema::DataType::Int64,
1680                None,
1681                |v, b: &mut Int64Builder| {
1682                    if let Some(n) = v.as_i64() {
1683                        b.append_value(n);
1684                    } else {
1685                        b.append_null();
1686                    }
1687                },
1688            ),
1689            DataType::Int32 => self.build_typed_map(
1690                len,
1691                deleted,
1692                &get_props,
1693                Int32Builder::new(),
1694                arrow_schema::DataType::Int32,
1695                None,
1696                |v, b: &mut Int32Builder| match v.as_i64().and_then(|n| i32::try_from(n).ok()) {
1697                    Some(n) => b.append_value(n),
1698                    None => b.append_null(),
1699                },
1700            ),
1701            DataType::Float64 => self.build_typed_map(
1702                len,
1703                deleted,
1704                &get_props,
1705                Float64Builder::new(),
1706                arrow_schema::DataType::Float64,
1707                None,
1708                |v, b: &mut Float64Builder| match v.as_f64() {
1709                    Some(f) => b.append_value(f),
1710                    None => b.append_null(),
1711                },
1712            ),
1713            DataType::Float32 => self.build_typed_map(
1714                len,
1715                deleted,
1716                &get_props,
1717                Float32Builder::new(),
1718                arrow_schema::DataType::Float32,
1719                None,
1720                |v, b: &mut Float32Builder| match v.as_f64() {
1721                    Some(f) => b.append_value(f as f32),
1722                    None => b.append_null(),
1723                },
1724            ),
1725            DataType::Bool => self.build_typed_map(
1726                len,
1727                deleted,
1728                &get_props,
1729                BooleanBuilder::new(),
1730                arrow_schema::DataType::Boolean,
1731                None,
1732                |v, b: &mut BooleanBuilder| match v.as_bool() {
1733                    Some(x) => b.append_value(x),
1734                    None => b.append_null(),
1735                },
1736            ),
1737            DataType::Bytes => self.build_typed_map(
1738                len,
1739                deleted,
1740                &get_props,
1741                LargeBinaryBuilder::new(),
1742                arrow_schema::DataType::LargeBinary,
1743                // Mark the value child `uni_raw_bytes` so the read path decodes each
1744                // raw `Bytes` value verbatim rather than through the tagged codec.
1745                Some(schema::raw_bytes_field_metadata()),
1746                |v, b: &mut LargeBinaryBuilder| {
1747                    if let Value::Bytes(bytes) = v {
1748                        b.append_value(bytes);
1749                    } else {
1750                        b.append_null();
1751                    }
1752                },
1753            ),
1754            // Nested / non-scalar value types (Vector, List, Map, temporal, …): no typed
1755            // Arrow builder — CypherValue-encode each value into an UNMARKED LargeBinary
1756            // child (no `uni_raw_bytes`), so the read path (`try_reconstruct_map` →
1757            // `arrow_to_value` LargeBinary arm) decodes it through the tagged codec. This
1758            // mirrors how schemaless/overflow maps are stored and handles arbitrary nesting.
1759            _ => self.build_typed_map(
1760                len,
1761                deleted,
1762                &get_props,
1763                LargeBinaryBuilder::new(),
1764                arrow_schema::DataType::LargeBinary,
1765                None,
1766                |v, b: &mut LargeBinaryBuilder| {
1767                    if v.is_null() {
1768                        b.append_null();
1769                    } else {
1770                        b.append_value(uni_common::cypher_value_codec::encode(v));
1771                    }
1772                },
1773            ),
1774        }
1775    }
1776
1777    /// Generic helper to build a map column with any value builder type.
1778    #[expect(
1779        clippy::too_many_arguments,
1780        reason = "builder plumbing: value type + optional child metadata are distinct knobs"
1781    )]
1782    fn build_typed_map<F, B, A>(
1783        &self,
1784        len: usize,
1785        deleted: &[bool],
1786        get_props: &F,
1787        value_builder: B,
1788        value_arrow_type: arrow_schema::DataType,
1789        value_metadata: Option<HashMap<String, String>>,
1790        mut append_value: A,
1791    ) -> Result<ArrayRef>
1792    where
1793        F: Fn(usize) -> Option<&'a Value>,
1794        B: arrow_array::builder::ArrayBuilder,
1795        A: FnMut(&Value, &mut B),
1796    {
1797        let key_builder = Box::new(StringBuilder::new());
1798        let value_builder = Box::new(value_builder);
1799        let value_field = match value_metadata {
1800            Some(meta) => Field::new("value", value_arrow_type, true).with_metadata(meta),
1801            None => Field::new("value", value_arrow_type, true),
1802        };
1803        let struct_builder = StructBuilder::new(
1804            vec![
1805                Field::new("key", arrow_schema::DataType::Utf8, false),
1806                value_field,
1807            ],
1808            vec![key_builder, value_builder],
1809        );
1810        let mut builder = ListBuilder::new(struct_builder);
1811
1812        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1813            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1814        }
1815        Ok(Arc::new(builder.finish()))
1816    }
1817
1818    /// Append a single map entry to the list builder.
1819    fn append_map_entry<B, A>(
1820        &self,
1821        builder: &mut ListBuilder<StructBuilder>,
1822        val: Option<&'a Value>,
1823        is_deleted: bool,
1824        append_value: &mut A,
1825    ) where
1826        B: arrow_array::builder::ArrayBuilder,
1827        A: FnMut(&Value, &mut B),
1828    {
1829        let val_obj = val.and_then(|v| v.as_object());
1830        if val_obj.is_none() && is_deleted {
1831            builder.append(false);
1832        } else if let Some(obj) = val_obj {
1833            let struct_b = builder.values();
1834            for (k, v) in obj {
1835                struct_b
1836                    .field_builder::<StringBuilder>(0)
1837                    .unwrap()
1838                    .append_value(k);
1839                // Safety: We know the value builder type matches B
1840                let value_b = struct_b.field_builder::<B>(1).unwrap();
1841                append_value(v, value_b);
1842                struct_b.append(true);
1843            }
1844            builder.append(true);
1845        } else {
1846            builder.append(false);
1847        }
1848    }
1849
1850    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1851    where
1852        F: Fn(usize) -> Option<&'a Value>,
1853    {
1854        let mut builder = BinaryBuilder::new();
1855        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1856            if is_deleted {
1857                builder.append_null();
1858                continue;
1859            }
1860            if let Some(val) = get_props(i) {
1861                // Try to parse CRDT from the value
1862                // If it's a string, first parse it as JSON, then as CRDT
1863                let crdt_result = if let Some(s) = val.as_str() {
1864                    serde_json::from_str::<Crdt>(s)
1865                } else {
1866                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1867                    let json_val: serde_json::Value = val.clone().into();
1868                    serde_json::from_value::<Crdt>(json_val)
1869                };
1870
1871                if let Ok(crdt) = crdt_result {
1872                    if let Ok(bytes) = crdt.to_msgpack() {
1873                        builder.append_value(&bytes);
1874                    } else {
1875                        builder.append_null();
1876                    }
1877                } else {
1878                    builder.append_null();
1879                }
1880            } else {
1881                builder.append_null();
1882            }
1883        }
1884        Ok(Arc::new(builder.finish()))
1885    }
1886}
1887
1888/// Build a column for edge entries (no deleted flag handling needed).
1889pub fn build_edge_column<'a>(
1890    name: &'a str,
1891    data_type: &'a DataType,
1892    len: usize,
1893    get_props: impl Fn(usize) -> Option<&'a Value>,
1894) -> Result<ArrayRef> {
1895    // For edges, use empty deleted array
1896    let deleted = vec![false; len];
1897    let extractor = PropertyExtractor::new(name, data_type);
1898    extractor.build_column(len, &deleted, get_props)
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903    use super::*;
1904    use arrow_array::{
1905        Array, DurationMicrosecondArray,
1906        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1907    };
1908    use std::collections::HashMap;
1909    use uni_common::TemporalValue;
1910    use uni_crdt::{Crdt, GCounter};
1911
1912    #[test]
1913    fn test_arrow_to_value_string() {
1914        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1915        assert_eq!(
1916            arrow_to_value(&arr, 0, None),
1917            Value::String("hello".to_string())
1918        );
1919        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1920        assert_eq!(
1921            arrow_to_value(&arr, 2, None),
1922            Value::String("world".to_string())
1923        );
1924    }
1925
1926    #[test]
1927    fn test_arrow_to_value_int64() {
1928        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1929        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1930        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1931        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1932    }
1933
1934    #[test]
1935    #[allow(clippy::approx_constant)]
1936    fn test_arrow_to_value_float64() {
1937        let arr = Float64Array::from(vec![Some(3.14), None]);
1938        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1939        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1940    }
1941
1942    #[test]
1943    fn test_arrow_to_value_bool() {
1944        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1945        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1946        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1947        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1948    }
1949
1950    #[test]
1951    fn test_values_to_array_int64() {
1952        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1953        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1954        assert_eq!(arr.len(), 4);
1955
1956        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1957        assert_eq!(int_arr.value(0), 1);
1958        assert_eq!(int_arr.value(1), 2);
1959        assert!(int_arr.is_null(2));
1960        assert_eq!(int_arr.value(3), 4);
1961    }
1962
1963    #[test]
1964    fn test_values_to_array_string() {
1965        let values = vec![
1966            Value::String("a".to_string()),
1967            Value::String("b".to_string()),
1968            Value::Null,
1969        ];
1970        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1971        assert_eq!(arr.len(), 3);
1972
1973        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1974        assert_eq!(str_arr.value(0), "a");
1975        assert_eq!(str_arr.value(1), "b");
1976        assert!(str_arr.is_null(2));
1977    }
1978
1979    #[test]
1980    fn test_property_extractor_string() {
1981        let props: Vec<HashMap<String, Value>> = vec![
1982            [("name".to_string(), Value::String("Alice".to_string()))]
1983                .into_iter()
1984                .collect(),
1985            [("name".to_string(), Value::String("Bob".to_string()))]
1986                .into_iter()
1987                .collect(),
1988            HashMap::new(),
1989        ];
1990        let deleted = vec![false, false, true];
1991
1992        let extractor = PropertyExtractor::new("name", &DataType::String);
1993        let arr = extractor
1994            .build_column(3, &deleted, |i| props[i].get("name"))
1995            .unwrap();
1996
1997        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1998        assert_eq!(str_arr.value(0), "Alice");
1999        assert_eq!(str_arr.value(1), "Bob");
2000        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
2001    }
2002
2003    #[test]
2004    fn test_property_extractor_int64() {
2005        let props: Vec<HashMap<String, Value>> = vec![
2006            [("age".to_string(), Value::Int(25))].into_iter().collect(),
2007            [("age".to_string(), Value::Int(30))].into_iter().collect(),
2008            HashMap::new(),
2009        ];
2010        let deleted = vec![false, false, true];
2011
2012        let extractor = PropertyExtractor::new("age", &DataType::Int64);
2013        let arr = extractor
2014            .build_column(3, &deleted, |i| props[i].get("age"))
2015            .unwrap();
2016
2017        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2018        assert_eq!(int_arr.value(0), 25);
2019        assert_eq!(int_arr.value(1), 30);
2020        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
2021    }
2022
2023    #[test]
2024    fn test_property_extractor_bytes_roundtrip() {
2025        let blob = vec![0u8, 1, 2, 255];
2026        let props: Vec<HashMap<String, Value>> = vec![
2027            [("blob".to_string(), Value::Bytes(blob.clone()))]
2028                .into_iter()
2029                .collect(),
2030            [("blob".to_string(), Value::Bytes(Vec::new()))]
2031                .into_iter()
2032                .collect(),
2033            HashMap::new(),
2034        ];
2035        let deleted = vec![false, false, false];
2036
2037        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2038        let arr = extractor
2039            .build_column(3, &deleted, |i| props[i].get("blob"))
2040            .unwrap();
2041
2042        // Decode each row through arrow_to_value with Bytes hint.
2043        assert_eq!(
2044            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2045            Value::Bytes(blob)
2046        );
2047        assert_eq!(
2048            arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
2049            Value::Bytes(Vec::new())
2050        );
2051        // Missing property → null in the Arrow array.
2052        assert_eq!(
2053            arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
2054            Value::Null
2055        );
2056    }
2057
2058    #[test]
2059    fn test_bytes_vs_cypher_value_disambiguation() {
2060        // A LargeBinary column tagged as DataType::Bytes must NOT be decoded
2061        // through the CypherValue MessagePack codec, even though both share
2062        // the same Arrow physical type.
2063        let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
2064        let props: Vec<HashMap<String, Value>> = vec![
2065            [("blob".to_string(), Value::Bytes(raw.clone()))]
2066                .into_iter()
2067                .collect(),
2068        ];
2069        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2070        let arr = extractor
2071            .build_column(1, &[false], |i| props[i].get("blob"))
2072            .unwrap();
2073        // With schema hint: raw bytes returned.
2074        assert_eq!(
2075            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2076            Value::Bytes(raw)
2077        );
2078    }
2079
2080    #[test]
2081    fn test_data_type_bytes_to_arrow() {
2082        assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
2083    }
2084
2085    #[test]
2086    fn test_arrow_to_value_time64() {
2087        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
2088        let mut builder = Time64MicrosecondBuilder::new();
2089        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
2090        builder.append_value(37_845_000_000);
2091        // 00:00:00 = 0 microseconds
2092        builder.append_value(0);
2093        // 23:59:59.123456 = 86399.123456 seconds
2094        builder.append_value(86_399_123_456);
2095        builder.append_null();
2096
2097        let arr = builder.finish();
2098        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
2099        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
2100        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
2101        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
2102        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2103    }
2104
2105    #[test]
2106    fn test_arrow_to_value_duration() {
2107        // Test DurationMicrosecondArray conversion
2108        // Arrow→Value now returns Value::Temporal(Duration)
2109        let arr = DurationMicrosecondArray::from(vec![
2110            Some(1_000_000),      // 1 second in microseconds
2111            Some(3_600_000_000),  // 1 hour
2112            Some(86_400_000_000), // 1 day
2113            None,
2114        ]);
2115
2116        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
2117        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
2118        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
2119        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2120    }
2121
2122    #[test]
2123    fn test_arrow_to_value_binary_crdt() {
2124        // Test BinaryArray (CRDT) conversion - round-trip test
2125        let mut builder = BinaryBuilder::new();
2126
2127        // Create a GCounter CRDT and serialize it
2128        let mut counter = GCounter::new();
2129        counter.increment("actor1", 5);
2130        let crdt = Crdt::GCounter(counter);
2131        let bytes = crdt.to_msgpack().unwrap();
2132        builder.append_value(&bytes);
2133
2134        // Add a null value
2135        builder.append_null();
2136
2137        let arr = builder.finish();
2138
2139        // The first value should deserialize back to a map
2140        let result = arrow_to_value(&arr, 0, None);
2141        assert!(result.as_object().is_some());
2142        let obj = result.as_object().unwrap();
2143        // GCounter serializes with tag "t": "gc"
2144        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
2145
2146        // Null value should return null
2147        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2148    }
2149
2150    #[test]
2151    fn test_datetime_struct_encode_decode_roundtrip() {
2152        // Test DateTime struct encoding with offset and timezone preservation
2153        let values = vec![
2154            Value::Temporal(TemporalValue::DateTime {
2155                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
2156                offset_seconds: 3600,                  // +01:00
2157                timezone_name: Some("Europe/Paris".to_string()),
2158            }),
2159            Value::Temporal(TemporalValue::DateTime {
2160                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
2161                offset_seconds: -18000,                 // -05:00
2162                timezone_name: None,
2163            }),
2164            Value::Temporal(TemporalValue::DateTime {
2165                nanos_since_epoch: 0, // Unix epoch
2166                offset_seconds: 0,
2167                timezone_name: Some("UTC".to_string()),
2168            }),
2169        ];
2170
2171        // Encode to Arrow struct
2172        let arr_ref = values_to_datetime_struct_array(&values);
2173        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2174        assert_eq!(arr.len(), 3);
2175
2176        // Decode back to Value
2177        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2178        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2179        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2180
2181        // Verify round-trip preserves all fields
2182        assert_eq!(decoded_0, values[0]);
2183        assert_eq!(decoded_1, values[1]);
2184        assert_eq!(decoded_2, values[2]);
2185
2186        // Verify struct field extraction
2187        if let Value::Temporal(TemporalValue::DateTime {
2188            nanos_since_epoch,
2189            offset_seconds,
2190            timezone_name,
2191        }) = decoded_0
2192        {
2193            assert_eq!(nanos_since_epoch, 441763200000000000);
2194            assert_eq!(offset_seconds, 3600);
2195            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2196        } else {
2197            panic!("Expected DateTime value");
2198        }
2199    }
2200
2201    #[test]
2202    fn test_datetime_struct_null_handling() {
2203        // Test DateTime struct with null values
2204        let values = vec![
2205            Value::Temporal(TemporalValue::DateTime {
2206                nanos_since_epoch: 441763200000000000,
2207                offset_seconds: 3600,
2208                timezone_name: Some("Europe/Paris".to_string()),
2209            }),
2210            Value::Null,
2211            Value::Temporal(TemporalValue::DateTime {
2212                nanos_since_epoch: 0,
2213                offset_seconds: 0,
2214                timezone_name: None,
2215            }),
2216        ];
2217
2218        let arr_ref = values_to_datetime_struct_array(&values);
2219        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2220        assert_eq!(arr.len(), 3);
2221
2222        // Check first value is valid
2223        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2224        assert_eq!(decoded_0, values[0]);
2225
2226        // Check second value is null
2227        assert!(arr.is_null(1));
2228        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2229        assert_eq!(decoded_1, Value::Null);
2230
2231        // Check third value is valid
2232        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2233        assert_eq!(decoded_2, values[2]);
2234    }
2235
2236    #[test]
2237    fn test_datetime_struct_boundary_values() {
2238        // Test boundary values: offset=0, large positive/negative offsets
2239        let values = vec![
2240            Value::Temporal(TemporalValue::DateTime {
2241                nanos_since_epoch: 441763200000000000,
2242                offset_seconds: 0, // UTC
2243                timezone_name: None,
2244            }),
2245            Value::Temporal(TemporalValue::DateTime {
2246                nanos_since_epoch: 441763200000000000,
2247                offset_seconds: 43200, // +12:00 (max typical offset)
2248                timezone_name: None,
2249            }),
2250            Value::Temporal(TemporalValue::DateTime {
2251                nanos_since_epoch: 441763200000000000,
2252                offset_seconds: -43200, // -12:00 (min typical offset)
2253                timezone_name: None,
2254            }),
2255        ];
2256
2257        let arr_ref = values_to_datetime_struct_array(&values);
2258        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2259        assert_eq!(arr.len(), 3);
2260
2261        // Verify round-trip for all boundary values
2262        for (i, expected) in values.iter().enumerate() {
2263            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2264            assert_eq!(&decoded, expected);
2265        }
2266    }
2267
2268    #[test]
2269    fn test_datetime_old_schema_migration() {
2270        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
2271        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2272        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
2273        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
2274        builder.append_null();
2275
2276        let arr = builder.finish();
2277
2278        // Decode with DataType::DateTime hint should migrate old schema
2279        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2280        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2281        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2282
2283        // Old schema should default to offset=0, preserve timezone
2284        if let Value::Temporal(TemporalValue::DateTime {
2285            nanos_since_epoch,
2286            offset_seconds,
2287            timezone_name,
2288        }) = decoded_0
2289        {
2290            assert_eq!(nanos_since_epoch, 441763200000000000);
2291            assert_eq!(offset_seconds, 0);
2292            assert_eq!(timezone_name, Some("UTC".to_string()));
2293        } else {
2294            panic!("Expected DateTime value");
2295        }
2296
2297        // Verify null handling
2298        assert_eq!(decoded_2, Value::Null);
2299    }
2300
2301    #[test]
2302    fn test_time_struct_encode_decode_roundtrip() {
2303        // Test Time struct encoding with offset preservation
2304        let values = vec![
2305            Value::Temporal(TemporalValue::Time {
2306                nanos_since_midnight: 37845000000000, // 10:30:45
2307                offset_seconds: 3600,                 // +01:00
2308            }),
2309            Value::Temporal(TemporalValue::Time {
2310                nanos_since_midnight: 0, // 00:00:00
2311                offset_seconds: 0,
2312            }),
2313            Value::Temporal(TemporalValue::Time {
2314                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
2315                offset_seconds: -18000,               // -05:00
2316            }),
2317        ];
2318
2319        // Encode to Arrow struct
2320        let arr_ref = values_to_time_struct_array(&values);
2321        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2322        assert_eq!(arr.len(), 3);
2323
2324        // Decode back to Value
2325        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2326        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2327        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2328
2329        // Verify round-trip preserves all fields
2330        assert_eq!(decoded_0, values[0]);
2331        assert_eq!(decoded_1, values[1]);
2332        assert_eq!(decoded_2, values[2]);
2333
2334        // Verify struct field extraction
2335        if let Value::Temporal(TemporalValue::Time {
2336            nanos_since_midnight,
2337            offset_seconds,
2338        }) = decoded_0
2339        {
2340            assert_eq!(nanos_since_midnight, 37845000000000);
2341            assert_eq!(offset_seconds, 3600);
2342        } else {
2343            panic!("Expected Time value");
2344        }
2345    }
2346
2347    #[test]
2348    fn test_time_struct_null_handling() {
2349        // Test Time struct with null values
2350        let values = vec![
2351            Value::Temporal(TemporalValue::Time {
2352                nanos_since_midnight: 37845000000000,
2353                offset_seconds: 3600,
2354            }),
2355            Value::Null,
2356            Value::Temporal(TemporalValue::Time {
2357                nanos_since_midnight: 0,
2358                offset_seconds: 0,
2359            }),
2360        ];
2361
2362        let arr_ref = values_to_time_struct_array(&values);
2363        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2364        assert_eq!(arr.len(), 3);
2365
2366        // Check first value is valid
2367        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2368        assert_eq!(decoded_0, values[0]);
2369
2370        // Check second value is null
2371        assert!(arr.is_null(1));
2372        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2373        assert_eq!(decoded_1, Value::Null);
2374
2375        // Check third value is valid
2376        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2377        assert_eq!(decoded_2, values[2]);
2378    }
2379
2380    // Tests for extract_vector_f32_values
2381
2382    #[test]
2383    fn test_extract_vector_f32_values_valid_vector() {
2384        let v = vec![1.0, 2.0, 3.0];
2385        let val = Value::Vector(v.clone());
2386        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2387        assert_eq!(result, v);
2388        assert!(valid);
2389    }
2390
2391    #[test]
2392    fn test_extract_vector_f32_values_vector_wrong_dims() {
2393        let v = vec![1.0, 2.0];
2394        let val = Value::Vector(v);
2395        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2396        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2397        assert!(!valid);
2398    }
2399
2400    #[test]
2401    fn test_extract_vector_f32_values_valid_list() {
2402        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2403        let val = Value::List(v);
2404        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2405        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2406        assert!(valid);
2407    }
2408
2409    #[test]
2410    fn test_extract_vector_f32_values_list_wrong_dims() {
2411        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2412        let val = Value::List(v);
2413        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2414        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2415        assert!(!valid);
2416    }
2417
2418    #[test]
2419    fn test_extract_vector_f32_values_list_int_coercion() {
2420        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2421        let val = Value::List(v);
2422        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2423        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2424        assert!(valid);
2425    }
2426
2427    #[test]
2428    fn test_extract_vector_f32_values_none() {
2429        let (result, valid) = extract_vector_f32_values(None, false, 3);
2430        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2431        assert!(!valid);
2432    }
2433
2434    #[test]
2435    fn test_extract_vector_f32_values_null() {
2436        let val = Value::Null;
2437        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2438        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2439        assert!(!valid);
2440    }
2441
2442    #[test]
2443    fn test_extract_vector_f32_values_unsupported_type() {
2444        let val = Value::String("not a vector".to_string());
2445        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2446        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2447        assert!(!valid);
2448    }
2449
2450    #[test]
2451    fn test_extract_vector_f32_values_deleted_with_none() {
2452        let (result, valid) = extract_vector_f32_values(None, true, 3);
2453        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2454        assert!(valid); // Deleted entries are marked as valid with zeros
2455    }
2456
2457    #[test]
2458    fn test_extract_vector_f32_values_deleted_with_null() {
2459        let val = Value::Null;
2460        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2461        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2462        assert!(valid); // Deleted entries are marked as valid with zeros
2463    }
2464
2465    // Tests for values_to_array with FixedSizeList
2466
2467    #[test]
2468    fn test_values_to_fixed_size_list_vector_with_nulls() {
2469        let values = vec![
2470            Value::Vector(vec![1.0, 2.0]),
2471            Value::Null,
2472            Value::Vector(vec![3.0, 4.0]),
2473            Value::String("invalid".to_string()),
2474        ];
2475        let arr_ref = values_to_array(
2476            &values,
2477            &ArrowDataType::FixedSizeList(
2478                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2479                2,
2480            ),
2481        )
2482        .unwrap();
2483
2484        let arr = arr_ref
2485            .as_any()
2486            .downcast_ref::<FixedSizeListArray>()
2487            .unwrap();
2488
2489        assert_eq!(arr.len(), 4);
2490        assert!(arr.is_valid(0));
2491        assert!(!arr.is_valid(1)); // Null value
2492        assert!(arr.is_valid(2));
2493        assert!(!arr.is_valid(3)); // Invalid type
2494    }
2495
2496    #[test]
2497    fn test_values_to_fixed_size_list_from_list() {
2498        let values = vec![
2499            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2500            Value::List(vec![Value::Int(3), Value::Int(4)]),
2501        ];
2502        let arr_ref = values_to_array(
2503            &values,
2504            &ArrowDataType::FixedSizeList(
2505                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2506                2,
2507            ),
2508        )
2509        .unwrap();
2510
2511        let arr = arr_ref
2512            .as_any()
2513            .downcast_ref::<FixedSizeListArray>()
2514            .unwrap();
2515
2516        assert_eq!(arr.len(), 2);
2517        assert!(arr.is_valid(0));
2518        assert!(arr.is_valid(1));
2519
2520        // Check values
2521        let child = arr
2522            .values()
2523            .as_any()
2524            .downcast_ref::<Float32Array>()
2525            .unwrap();
2526        assert_eq!(child.value(0), 1.0);
2527        assert_eq!(child.value(1), 2.0);
2528        assert_eq!(child.value(2), 3.0);
2529        assert_eq!(child.value(3), 4.0);
2530    }
2531
2532    #[test]
2533    fn test_values_to_fixed_size_list_wrong_dimensions() {
2534        let values = vec![
2535            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2536            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2537        ];
2538        let arr_ref = values_to_array(
2539            &values,
2540            &ArrowDataType::FixedSizeList(
2541                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2542                2,
2543            ),
2544        )
2545        .unwrap();
2546
2547        let arr = arr_ref
2548            .as_any()
2549            .downcast_ref::<FixedSizeListArray>()
2550            .unwrap();
2551
2552        assert_eq!(arr.len(), 2);
2553        assert!(!arr.is_valid(0)); // Wrong dimensions
2554        assert!(!arr.is_valid(1)); // Wrong dimensions
2555
2556        // Check that child array has zeros for invalid entries
2557        let child = arr
2558            .values()
2559            .as_any()
2560            .downcast_ref::<Float32Array>()
2561            .unwrap();
2562        assert_eq!(child.value(0), 0.0);
2563        assert_eq!(child.value(1), 0.0);
2564        assert_eq!(child.value(2), 0.0);
2565        assert_eq!(child.value(3), 0.0);
2566    }
2567
2568    #[test]
2569    fn test_values_to_fixed_size_list_all_nulls() {
2570        let values = vec![Value::Null, Value::Null, Value::Null];
2571        let arr_ref = values_to_array(
2572            &values,
2573            &ArrowDataType::FixedSizeList(
2574                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2575                3,
2576            ),
2577        )
2578        .unwrap();
2579
2580        let arr = arr_ref
2581            .as_any()
2582            .downcast_ref::<FixedSizeListArray>()
2583            .unwrap();
2584
2585        assert_eq!(arr.len(), 3);
2586        assert!(!arr.is_valid(0));
2587        assert!(!arr.is_valid(1));
2588        assert!(!arr.is_valid(2));
2589
2590        // Verify child array length is correct (3 rows × 3 dims = 9)
2591        let child = arr
2592            .values()
2593            .as_any()
2594            .downcast_ref::<Float32Array>()
2595            .unwrap();
2596        assert_eq!(child.len(), 9);
2597    }
2598
2599    #[test]
2600    fn test_values_to_fixed_size_list_mixed_types() {
2601        let values = vec![
2602            Value::Vector(vec![1.0, 2.0]),
2603            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2604            Value::Null,
2605            Value::String("invalid".to_string()),
2606        ];
2607        let arr_ref = values_to_array(
2608            &values,
2609            &ArrowDataType::FixedSizeList(
2610                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2611                2,
2612            ),
2613        )
2614        .unwrap();
2615
2616        let arr = arr_ref
2617            .as_any()
2618            .downcast_ref::<FixedSizeListArray>()
2619            .unwrap();
2620
2621        assert_eq!(arr.len(), 4);
2622        assert!(arr.is_valid(0)); // Value::Vector
2623        assert!(arr.is_valid(1)); // Value::List
2624        assert!(!arr.is_valid(2)); // Value::Null
2625        assert!(!arr.is_valid(3)); // Value::String
2626
2627        // Check values for valid entries
2628        let child = arr
2629            .values()
2630            .as_any()
2631            .downcast_ref::<Float32Array>()
2632            .unwrap();
2633        assert_eq!(child.value(0), 1.0);
2634        assert_eq!(child.value(1), 2.0);
2635        assert_eq!(child.value(2), 3.0);
2636        assert_eq!(child.value(3), 4.0);
2637    }
2638
2639    // Tests for PropertyExtractor::build_vector_column
2640
2641    #[test]
2642    fn test_build_vector_column_with_nulls_and_deleted() {
2643        let data_type = DataType::Vector { dimensions: 3 };
2644        let extractor = PropertyExtractor::new("test_vec", &data_type);
2645
2646        let props = [
2647            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2648            None,              // Missing property
2649            Some(Value::Null), // Null value
2650            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2651        ];
2652        let deleted = [false, false, false, true]; // Last one is deleted
2653
2654        let arr_ref = extractor
2655            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2656            .unwrap();
2657
2658        let arr = arr_ref
2659            .as_any()
2660            .downcast_ref::<FixedSizeListArray>()
2661            .unwrap();
2662
2663        assert_eq!(arr.len(), 4);
2664        assert!(arr.is_valid(0)); // Valid vector
2665        assert!(!arr.is_valid(1)); // Missing property
2666        assert!(!arr.is_valid(2)); // Null value
2667        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2668
2669        // Check values
2670        let child = arr
2671            .values()
2672            .as_any()
2673            .downcast_ref::<Float32Array>()
2674            .unwrap();
2675        assert_eq!(child.value(0), 1.0);
2676        assert_eq!(child.value(1), 2.0);
2677        assert_eq!(child.value(2), 3.0);
2678        // Indices 3-5: zeros for missing
2679        // Indices 6-8: zeros for null
2680        // Indices 9-11: zeros for deleted (but marked as valid)
2681        assert_eq!(child.value(9), 0.0);
2682        assert_eq!(child.value(10), 0.0);
2683        assert_eq!(child.value(11), 0.0);
2684    }
2685
2686    #[test]
2687    fn test_build_vector_column_with_list_input() {
2688        let data_type = DataType::Vector { dimensions: 2 };
2689        let extractor = PropertyExtractor::new("test_vec", &data_type);
2690
2691        let props = [
2692            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2693            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2694            Some(Value::Vector(vec![5.0, 6.0])),
2695        ];
2696        let deleted = [false, false, false];
2697
2698        let arr_ref = extractor
2699            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2700            .unwrap();
2701
2702        let arr = arr_ref
2703            .as_any()
2704            .downcast_ref::<FixedSizeListArray>()
2705            .unwrap();
2706
2707        assert_eq!(arr.len(), 3);
2708        assert!(arr.is_valid(0));
2709        assert!(arr.is_valid(1));
2710        assert!(arr.is_valid(2));
2711
2712        // Check values
2713        let child = arr
2714            .values()
2715            .as_any()
2716            .downcast_ref::<Float32Array>()
2717            .unwrap();
2718        assert_eq!(child.value(0), 1.0);
2719        assert_eq!(child.value(1), 2.0);
2720        assert_eq!(child.value(2), 3.0);
2721        assert_eq!(child.value(3), 4.0);
2722        assert_eq!(child.value(4), 5.0);
2723        assert_eq!(child.value(5), 6.0);
2724    }
2725
2726    // Tests for multi-vector (ColBERT) `List<Vector>` columns (issue #96)
2727
2728    #[test]
2729    fn test_build_multivector_list_column_roundtrip() {
2730        // A multi-vector property is `List<FixedSizeList<Float32, dim>>` with a
2731        // VARIABLE token count per row. It round-trips numerically: a single dense
2732        // vector already reads back as `Value::List`, so a multi-vector reads back
2733        // as a nested `Value::List`.
2734        let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 3 }));
2735        let extractor = PropertyExtractor::new("tokens", &data_type);
2736
2737        let props = [
2738            // row 0: two tokens
2739            Some(Value::List(vec![
2740                Value::Vector(vec![1.0, 2.0, 3.0]),
2741                Value::Vector(vec![4.0, 5.0, 6.0]),
2742            ])),
2743            // row 1: three tokens (different count -> exercises variable length)
2744            Some(Value::List(vec![
2745                Value::Vector(vec![7.0, 8.0, 9.0]),
2746                Value::Vector(vec![10.0, 11.0, 12.0]),
2747                Value::Vector(vec![13.0, 14.0, 15.0]),
2748            ])),
2749            // row 2: empty token set (present but zero tokens)
2750            Some(Value::List(vec![])),
2751            // row 3: deleted, missing property
2752            None,
2753        ];
2754        let deleted = [false, false, false, true];
2755
2756        let arr_ref = extractor
2757            .build_column(4, &deleted, |i| props[i].as_ref())
2758            .unwrap();
2759
2760        // Outer column is a variable-length list of fixed-size vectors.
2761        let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
2762        assert_eq!(outer.len(), 4);
2763        assert!(outer.is_valid(0));
2764        assert!(outer.is_valid(1));
2765        assert!(outer.is_valid(2)); // empty-but-present row
2766        assert!(!outer.is_valid(3)); // deleted/missing -> null
2767
2768        // Variable token counts survive.
2769        assert_eq!(outer.value(0).len(), 2);
2770        assert_eq!(outer.value(1).len(), 3);
2771        assert_eq!(outer.value(2).len(), 0);
2772
2773        // Read back through the generic decoder; compare numerically.
2774        let row0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&data_type));
2775        assert_eq!(
2776            row0,
2777            Value::List(vec![
2778                Value::List(vec![
2779                    Value::Float(1.0),
2780                    Value::Float(2.0),
2781                    Value::Float(3.0)
2782                ]),
2783                Value::List(vec![
2784                    Value::Float(4.0),
2785                    Value::Float(5.0),
2786                    Value::Float(6.0)
2787                ]),
2788            ])
2789        );
2790
2791        let row1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&data_type));
2792        let Value::List(tokens) = row1 else {
2793            panic!("row1 should decode to a list of tokens");
2794        };
2795        assert_eq!(tokens.len(), 3);
2796        assert_eq!(
2797            tokens[2],
2798            Value::List(vec![
2799                Value::Float(13.0),
2800                Value::Float(14.0),
2801                Value::Float(15.0)
2802            ])
2803        );
2804    }
2805
2806    #[test]
2807    fn test_build_multivector_invalid_inner_tokens() {
2808        // A row whose tokens have wrong dimension or non-numeric content: each
2809        // bad token becomes a null inner entry (zeros, invalid) without crashing,
2810        // and valid tokens in the same row are preserved.
2811        let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 2 }));
2812        let extractor = PropertyExtractor::new("tokens", &data_type);
2813
2814        let props = [Some(Value::List(vec![
2815            Value::Vector(vec![1.0, 2.0]),                           // valid
2816            Value::Vector(vec![9.0, 9.0, 9.0]),                      // wrong dim -> null
2817            Value::String("nope".to_string()),                       // non-vector -> null
2818            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]), // valid (list form)
2819        ]))];
2820        let deleted = [false];
2821
2822        let arr_ref = extractor
2823            .build_column(1, &deleted, |i| props[i].as_ref())
2824            .unwrap();
2825        let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
2826        let inner_row = outer.value(0);
2827        let inner = inner_row
2828            .as_any()
2829            .downcast_ref::<FixedSizeListArray>()
2830            .unwrap();
2831        assert_eq!(inner.len(), 4);
2832        assert!(inner.is_valid(0)); // [1,2]
2833        assert!(!inner.is_valid(1)); // wrong dim -> null
2834        assert!(!inner.is_valid(2)); // non-vector -> null
2835        assert!(inner.is_valid(3)); // [3,4] from list form
2836    }
2837
2838    #[test]
2839    fn test_values_to_array_multivector_schemaless_deferred() {
2840        // Schemaless (Arrow-type-driven) multi-vector writes are intentionally NOT
2841        // supported yet (issue #96, Phase 1.5). Declared-schema writes go through
2842        // `build_list_column` instead. This test pins the deferral contract so a
2843        // future change that enables it updates this expectation deliberately.
2844        let values = vec![Value::List(vec![Value::Vector(vec![1.0, 2.0])])];
2845        let dt = ArrowDataType::List(Arc::new(Field::new(
2846            "item",
2847            ArrowDataType::FixedSizeList(
2848                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
2849                2,
2850            ),
2851            true,
2852        )));
2853        assert!(values_to_array(&values, &dt).is_err());
2854    }
2855
2856    /// H13: out-of-range i64 values must become NULL in i32/date32 columns
2857    /// instead of silently wrapping to a different number/date.
2858    #[test]
2859    fn test_int32_and_date32_columns_null_out_of_range() {
2860        let dt = DataType::Int64;
2861        let extractor = PropertyExtractor::new("x", &dt);
2862
2863        let over = Value::Int(i64::from(i32::MAX) + 1);
2864        let ok = Value::Int(42);
2865        let vals = [over, ok];
2866        let deleted = [false, false];
2867
2868        let arr = extractor
2869            .build_int32_column(2, &deleted, |i| Some(&vals[i]))
2870            .unwrap();
2871        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2872        assert!(
2873            arr.is_null(0),
2874            "out-of-range i64 must be NULL in an int32 column, not wrapped"
2875        );
2876        assert_eq!(arr.value(1), 42);
2877
2878        let arr = extractor
2879            .build_date32_column(2, &deleted, |i| Some(&vals[i]))
2880            .unwrap();
2881        let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
2882        assert!(
2883            arr.is_null(0),
2884            "out-of-range day count must be NULL in a date32 column, not wrapped"
2885        );
2886        assert_eq!(arr.value(1), 42);
2887    }
2888}