Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    // A typed `Map(_, Bytes)` value child carries the `uni_raw_bytes` marker; decode
154    // each value verbatim. CV-encoded map values carry no marker → codec path.
155    let value_hint = raw_bytes_hint(fields[1].metadata());
156    let key_col = structs.column(0);
157    let val_col = structs.column(1);
158    let mut map = HashMap::new();
159    for i in 0..structs.len() {
160        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
161            map.insert(k, arrow_to_value(val_col.as_ref(), i, value_hint));
162        }
163    }
164    Some(map)
165}
166
167/// Convert all elements of an Arrow array into a `Vec<Value>`.
168///
169/// `elem_type` is the schema hint for each element — `Some(DataType::Bytes)` when the
170/// list child field is marked `uni_raw_bytes`, so raw `Bytes` elements decode verbatim.
171fn array_to_value_list(arr: &ArrayRef, elem_type: Option<&DataType>) -> Vec<Value> {
172    (0..arr.len())
173        .map(|i| arrow_to_value(arr.as_ref(), i, elem_type))
174        .collect()
175}
176
177/// Returns `Some(&DataType::Bytes)` when Arrow field metadata marks the field as a
178/// raw `Bytes` value (`uni_raw_bytes=true`), else `None`. Used to discriminate raw
179/// `Bytes` container children from CV-encoded `LargeBinary` without array sniffing.
180fn raw_bytes_hint(metadata: &HashMap<String, String>) -> Option<&'static DataType> {
181    if metadata.get("uni_raw_bytes").is_some_and(|v| v == "true") {
182        Some(&DataType::Bytes)
183    } else {
184        None
185    }
186}
187
188/// Extracts the raw-`Bytes` element hint from a list-like Arrow type's child field.
189fn list_child_bytes_hint(dt: &ArrowDataType) -> Option<&'static DataType> {
190    match dt {
191        ArrowDataType::List(f)
192        | ArrowDataType::LargeList(f)
193        | ArrowDataType::FixedSizeList(f, _) => raw_bytes_hint(f.metadata()),
194        _ => None,
195    }
196}
197
198/// Convert an Arrow array value at a given row index to a Uni Value.
199///
200/// Handles all common Arrow types and recursively processes nested structures
201/// like Lists and Structs. The optional `data_type` parameter provides schema
202/// context for decoding DateTime and Time struct arrays; when provided, it
203/// takes precedence over runtime type detection.
204pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
205    if col.is_null(row) {
206        return Value::Null;
207    }
208
209    // Schema-driven decode for DateTime and Time structs
210    if let Some(dt) = data_type {
211        match dt {
212            DataType::DateTime => {
213                // Expect StructArray with three fields
214                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
215                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
216                        struct_arr.column_by_name("nanos_since_epoch"),
217                        struct_arr.column_by_name("offset_seconds"),
218                        struct_arr.column_by_name("timezone_name"),
219                    )
220                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
221                        nanos_col
222                            .as_any()
223                            .downcast_ref::<TimestampNanosecondArray>(),
224                        offset_col.as_any().downcast_ref::<Int32Array>(),
225                        tz_col.as_any().downcast_ref::<StringArray>(),
226                    )
227                {
228                    if nanos_arr.is_null(row) {
229                        return Value::Null;
230                    }
231                    let nanos = nanos_arr.value(row);
232                    if offset_arr.is_null(row) {
233                        // No offset → LocalDateTime
234                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
235                            nanos_since_epoch: nanos,
236                        });
237                    }
238                    let offset = offset_arr.value(row);
239                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
240                    return Value::Temporal(uni_common::TemporalValue::DateTime {
241                        nanos_since_epoch: nanos,
242                        offset_seconds: offset,
243                        timezone_name: tz_name,
244                    });
245                }
246                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
247                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
248                    let nanos = ts.value(row);
249                    let tz_name = ts.timezone().map(|s| s.to_string());
250                    return Value::Temporal(uni_common::TemporalValue::DateTime {
251                        nanos_since_epoch: nanos,
252                        offset_seconds: 0,
253                        timezone_name: tz_name,
254                    });
255                }
256            }
257            DataType::Time => {
258                // Expect StructArray with two fields
259                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
260                    && let (Some(nanos_col), Some(offset_col)) = (
261                        struct_arr.column_by_name("nanos_since_midnight"),
262                        struct_arr.column_by_name("offset_seconds"),
263                    )
264                    && let (Some(nanos_arr), Some(offset_arr)) = (
265                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
266                        offset_col.as_any().downcast_ref::<Int32Array>(),
267                    )
268                {
269                    // Check field-level nulls before calling .value()
270                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
271                        return Value::Null;
272                    }
273                    let nanos = nanos_arr.value(row);
274                    let offset = offset_arr.value(row);
275                    return Value::Temporal(uni_common::TemporalValue::Time {
276                        nanos_since_midnight: nanos,
277                        offset_seconds: offset,
278                    });
279                }
280                // Fall back to old schema: Time64Nanosecond → Time with offset=0
281                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
282                    let nanos = t.value(row);
283                    return Value::Temporal(uni_common::TemporalValue::Time {
284                        nanos_since_midnight: nanos,
285                        offset_seconds: 0,
286                    });
287                }
288            }
289            DataType::Bytes => {
290                let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
291                    log::warn!("Bytes column is not LargeBinaryArray");
292                    return Value::Null;
293                };
294                if arr.is_null(row) {
295                    return Value::Null;
296                }
297                return Value::Bytes(arr.value(row).to_vec());
298            }
299            DataType::Btic => {
300                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
301                    log::warn!("BTIC column is not FixedSizeBinaryArray");
302                    return Value::Null;
303                };
304                let bytes = fsb.value(row);
305                return match uni_btic::encode::decode_slice(bytes) {
306                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
307                        lo: btic.lo(),
308                        hi: btic.hi(),
309                        meta: btic.meta(),
310                    }),
311                    Err(e) => {
312                        log::warn!("BTIC decode error: {}", e);
313                        Value::Null
314                    }
315                };
316            }
317            _ => {}
318        }
319    }
320
321    // String types
322    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
323        return Value::String(s.value(row).to_string());
324    }
325
326    // Integer types
327    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
328        return Value::Int(u.value(row) as i64);
329    }
330    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
331        return Value::Int(i.value(row));
332    }
333    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
334        return Value::Int(i.value(row) as i64);
335    }
336
337    // Float types
338    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
339        return Value::Float(f.value(row));
340    }
341    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
342        return Value::Float(f.value(row) as f64);
343    }
344
345    // Boolean type
346    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
347        return Value::Bool(b.value(row));
348    }
349
350    // Fixed-size list (vectors)
351    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
352        let elem_hint = list_child_bytes_hint(list.data_type());
353        return Value::List(array_to_value_list(&list.value(row), elem_hint));
354    }
355
356    // Variable-size list
357    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
358        let arr = list.value(row);
359
360        // Map types are stored as List(Struct(key, value)); reconstruct as map
361        if let Some(obj) = try_reconstruct_map(&arr) {
362            return Value::Map(obj);
363        }
364
365        let elem_hint = list_child_bytes_hint(list.data_type());
366        return Value::List(array_to_value_list(&arr, elem_hint));
367    }
368
369    // Large list (variable-size list with i64 offsets)
370    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
371        let elem_hint = list_child_bytes_hint(list.data_type());
372        return Value::List(array_to_value_list(&list.value(row), elem_hint));
373    }
374
375    // Struct type — detect temporal structs by field names before generic handler
376    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
377        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
378
379        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
380        if field_names.contains(&"nanos_since_epoch")
381            && field_names.contains(&"offset_seconds")
382            && field_names.contains(&"timezone_name")
383            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
384                s.column_by_name("nanos_since_epoch"),
385                s.column_by_name("offset_seconds"),
386                s.column_by_name("timezone_name"),
387            )
388        {
389            // Try TimestampNanosecond first (standard schema), then Int64 fallback
390            let nanos_opt = nanos_col
391                .as_any()
392                .downcast_ref::<TimestampNanosecondArray>()
393                .map(|a| {
394                    if a.is_null(row) {
395                        None
396                    } else {
397                        Some(a.value(row))
398                    }
399                })
400                .or_else(|| {
401                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
402                        if a.is_null(row) {
403                            None
404                        } else {
405                            Some(a.value(row))
406                        }
407                    })
408                });
409            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
410                if a.is_null(row) {
411                    None
412                } else {
413                    Some(a.value(row))
414                }
415            });
416
417            if let Some(Some(nanos)) = nanos_opt {
418                match offset_opt {
419                    Some(Some(offset)) => {
420                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
421                            if a.is_null(row) {
422                                None
423                            } else {
424                                Some(a.value(row).to_string())
425                            }
426                        });
427                        return Value::Temporal(uni_common::TemporalValue::DateTime {
428                            nanos_since_epoch: nanos,
429                            offset_seconds: offset,
430                            timezone_name: tz_name,
431                        });
432                    }
433                    _ => {
434                        // No offset → LocalDateTime
435                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
436                            nanos_since_epoch: nanos,
437                        });
438                    }
439                }
440            }
441        }
442
443        // Time struct: {nanos_since_midnight, offset_seconds}
444        if field_names.contains(&"nanos_since_midnight")
445            && field_names.contains(&"offset_seconds")
446            && let (Some(nanos_col), Some(offset_col)) = (
447                s.column_by_name("nanos_since_midnight"),
448                s.column_by_name("offset_seconds"),
449            )
450        {
451            // Try Time64Nanosecond first (standard schema), then Int64 fallback
452            let nanos_opt = nanos_col
453                .as_any()
454                .downcast_ref::<Time64NanosecondArray>()
455                .map(|a| {
456                    if a.is_null(row) {
457                        None
458                    } else {
459                        Some(a.value(row))
460                    }
461                })
462                .or_else(|| {
463                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
464                        if a.is_null(row) {
465                            None
466                        } else {
467                            Some(a.value(row))
468                        }
469                    })
470                });
471            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
472                if a.is_null(row) {
473                    None
474                } else {
475                    Some(a.value(row))
476                }
477            });
478
479            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
480                return Value::Temporal(uni_common::TemporalValue::Time {
481                    nanos_since_midnight: nanos,
482                    offset_seconds: offset,
483                });
484            }
485        }
486
487        // Generic struct → Map
488        let mut map = HashMap::new();
489        for (field, child) in s.fields().iter().zip(s.columns()) {
490            map.insert(
491                field.name().clone(),
492                arrow_to_value(child.as_ref(), row, None),
493            );
494        }
495        return Value::Map(map);
496    }
497
498    // Date32 type (days since epoch) - return as Value::Temporal
499    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
500        let days = d.value(row);
501        return Value::Temporal(uni_common::TemporalValue::Date {
502            days_since_epoch: days,
503        });
504    }
505
506    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
507    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
508        let nanos = ts.value(row);
509        return match ts.timezone() {
510            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
511                nanos_since_epoch: nanos,
512                offset_seconds: 0,
513                timezone_name: Some(tz.to_string()),
514            }),
515            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
516                nanos_since_epoch: nanos,
517            }),
518        };
519    }
520
521    // Time64 (nanoseconds since midnight) - return as Value::Temporal
522    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
523        let nanos = t.value(row);
524        return Value::Temporal(uni_common::TemporalValue::LocalTime {
525            nanos_since_midnight: nanos,
526        });
527    }
528
529    // Time64 (microseconds since midnight) - convert to nanoseconds
530    if let Some(t) = col
531        .as_any()
532        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
533    {
534        let micros = t.value(row);
535        return Value::Temporal(uni_common::TemporalValue::LocalTime {
536            nanos_since_midnight: micros * 1000,
537        });
538    }
539
540    // DurationMicrosecond - convert to Duration with nanoseconds
541    if let Some(d) = col
542        .as_any()
543        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
544    {
545        let micros = d.value(row);
546        let total_nanos = micros * 1000;
547        let seconds = total_nanos / 1_000_000_000;
548        let remaining_nanos = total_nanos % 1_000_000_000;
549        return Value::Temporal(uni_common::TemporalValue::Duration {
550            months: 0,
551            days: 0,
552            nanos: seconds * 1_000_000_000 + remaining_nanos,
553        });
554    }
555
556    // IntervalMonthDayNano - return as Value::Temporal(Duration)
557    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
558        let val = interval.value(row);
559        return Value::Temporal(uni_common::TemporalValue::Duration {
560            months: val.months as i64,
561            days: val.days as i64,
562            nanos: val.nanoseconds,
563        });
564    }
565
566    // LargeBinary (CypherValue MessagePack-tagged encoding)
567    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
568        let bytes = b.value(row);
569        if bytes.is_empty() {
570            return Value::Null;
571        }
572        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
573            eprintln!("CypherValue decode error: {}", e);
574            Value::Null
575        });
576    }
577
578    // FixedSizeBinary(24) — BTIC temporal interval
579    if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
580        && fsb.value_length() == 24
581    {
582        let bytes = fsb.value(row);
583        return match uni_btic::encode::decode_slice(bytes) {
584            Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
585                lo: btic.lo(),
586                hi: btic.hi(),
587                meta: btic.meta(),
588            }),
589            Err(e) => {
590                log::warn!("BTIC decode error: {}", e);
591                Value::Null
592            }
593        };
594    }
595
596    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
597    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
598        let bytes = b.value(row);
599        return Crdt::from_msgpack(bytes)
600            .ok()
601            .and_then(|crdt| serde_json::to_value(&crdt).ok())
602            .map(Value::from)
603            .unwrap_or(Value::Null);
604    }
605
606    // Fallback
607    Value::Null
608}
609
610fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
611    let mut builder = UInt64Builder::with_capacity(values.len());
612    for v in values {
613        if let Some(n) = v.as_u64() {
614            builder.append_value(n);
615        } else {
616            builder.append_null();
617        }
618    }
619    Arc::new(builder.finish())
620}
621
622fn values_to_int64_array(values: &[Value]) -> ArrayRef {
623    let mut builder = Int64Builder::with_capacity(values.len());
624    for v in values {
625        if let Some(n) = v.as_i64() {
626            builder.append_value(n);
627        } else {
628            builder.append_null();
629        }
630    }
631    Arc::new(builder.finish())
632}
633
634fn values_to_int32_array(values: &[Value]) -> ArrayRef {
635    let mut builder = Int32Builder::with_capacity(values.len());
636    for v in values {
637        if let Some(n) = v.as_i64() {
638            builder.append_value(n as i32);
639        } else {
640            builder.append_null();
641        }
642    }
643    Arc::new(builder.finish())
644}
645
646fn values_to_string_array(values: &[Value]) -> ArrayRef {
647    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
648    for v in values {
649        if let Some(s) = v.as_str() {
650            builder.append_value(s);
651        } else if v.is_null() {
652            builder.append_null();
653        } else {
654            builder.append_value(v.to_string());
655        }
656    }
657    Arc::new(builder.finish())
658}
659
660fn values_to_bool_array(values: &[Value]) -> ArrayRef {
661    let mut builder = BooleanBuilder::with_capacity(values.len());
662    for v in values {
663        if let Some(b) = v.as_bool() {
664            builder.append_value(b);
665        } else {
666            builder.append_null();
667        }
668    }
669    Arc::new(builder.finish())
670}
671
672fn values_to_float32_array(values: &[Value]) -> ArrayRef {
673    let mut builder = Float32Builder::with_capacity(values.len());
674    for v in values {
675        if let Some(n) = v.as_f64() {
676            builder.append_value(n as f32);
677        } else {
678            builder.append_null();
679        }
680    }
681    Arc::new(builder.finish())
682}
683
684fn values_to_float64_array(values: &[Value]) -> ArrayRef {
685    let mut builder = Float64Builder::with_capacity(values.len());
686    for v in values {
687        if let Some(n) = v.as_f64() {
688            builder.append_value(n);
689        } else {
690            builder.append_null();
691        }
692    }
693    Arc::new(builder.finish())
694}
695
696fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
697    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
698    for v in values {
699        match v {
700            Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
701                let btic = uni_btic::Btic::new(*lo, *hi, *meta)
702                    .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
703                builder.append_value(uni_btic::encode::encode(&btic))?;
704            }
705            Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
706                Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
707                Err(_) => builder.append_null(),
708            },
709            Value::List(bytes) => {
710                let b: Vec<u8> = bytes
711                    .iter()
712                    .map(|bv| bv.as_u64().unwrap_or(0) as u8)
713                    .collect();
714                if b.len() as i32 == size {
715                    builder.append_value(&b)?;
716                } else {
717                    builder.append_null();
718                }
719            }
720            _ => builder.append_null(),
721        }
722    }
723    Ok(Arc::new(builder.finish()))
724}
725
726/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
727///
728/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
729/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
730///
731/// # Arguments
732/// - `val`: Optional property value to extract from
733/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
734/// - `dimensions`: Expected vector dimensions
735///
736/// # Returns
737/// - Tuple of (vector values, validity flag)
738///   - Vector always has exactly `dimensions` elements
739///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
740pub fn extract_vector_f32_values(
741    val: Option<&Value>,
742    is_deleted: bool,
743    dimensions: usize,
744) -> (Vec<f32>, bool) {
745    let zeros = || vec![0.0_f32; dimensions];
746
747    // Deleted entries always return zeros with valid=true
748    if is_deleted {
749        return (zeros(), true);
750    }
751
752    match val {
753        // Native f32 vector (Value::Vector)
754        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
755        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
756        // List of values (Value::List) - convert to f32
757        Some(Value::List(arr)) if arr.len() == dimensions => {
758            let values: Vec<f32> = arr
759                .iter()
760                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
761                .collect();
762            (values, true)
763        }
764        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
765        _ => (zeros(), false),                    // Missing or unsupported value
766    }
767}
768
769fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
770    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
771    for v in values {
772        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
773        for val in vals {
774            builder.values().append_value(val);
775        }
776        builder.append(valid);
777    }
778    Arc::new(builder.finish())
779}
780
781fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
782    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
783    for v in values {
784        if v.is_null() {
785            builder.append_null();
786        } else if let Value::Temporal(tv) = v {
787            match tv {
788                uni_common::TemporalValue::DateTime {
789                    nanos_since_epoch, ..
790                }
791                | uni_common::TemporalValue::LocalDateTime {
792                    nanos_since_epoch, ..
793                } => builder.append_value(*nanos_since_epoch),
794                _ => builder.append_null(),
795            }
796        } else if let Some(n) = v.as_i64() {
797            builder.append_value(n);
798        } else if let Some(s) = v.as_str() {
799            match parse_datetime_to_nanos(s) {
800                Some(nanos) => builder.append_value(nanos),
801                None => builder.append_null(),
802            }
803        } else {
804            builder.append_null();
805        }
806    }
807
808    let arr = builder.finish();
809    if let Some(tz) = tz {
810        Arc::new(arr.with_timezone(tz.as_ref()))
811    } else {
812        Arc::new(arr)
813    }
814}
815
816/// Build a DateTime struct array from values.
817///
818/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
819/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
820fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
821    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
822    let mut offset_builder = Int32Builder::with_capacity(values.len());
823    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
824    let mut null_buffer = BooleanBufferBuilder::new(values.len());
825
826    for v in values {
827        match v {
828            Value::Temporal(uni_common::TemporalValue::DateTime {
829                nanos_since_epoch,
830                offset_seconds,
831                timezone_name,
832            }) => {
833                nanos_builder.append_value(*nanos_since_epoch);
834                offset_builder.append_value(*offset_seconds);
835                tz_builder.append_option(timezone_name.as_deref());
836                null_buffer.append(true);
837            }
838            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
839                nanos_builder.append_value(*nanos_since_epoch);
840                offset_builder.append_null();
841                tz_builder.append_null();
842                null_buffer.append(true);
843            }
844            _ => {
845                nanos_builder.append_null();
846                offset_builder.append_null();
847                tz_builder.append_null();
848                null_buffer.append(false);
849            }
850        }
851    }
852
853    let struct_arr = StructArray::new(
854        schema::datetime_struct_fields(),
855        vec![
856            Arc::new(nanos_builder.finish()) as ArrayRef,
857            Arc::new(offset_builder.finish()) as ArrayRef,
858            Arc::new(tz_builder.finish()) as ArrayRef,
859        ],
860        Some(null_buffer.finish().into()),
861    );
862    Arc::new(struct_arr)
863}
864
865/// Build a Time struct array from values.
866///
867/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
868/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
869fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
870    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
871    let mut offset_builder = Int32Builder::with_capacity(values.len());
872    let mut null_buffer = BooleanBufferBuilder::new(values.len());
873
874    for v in values {
875        match v {
876            Value::Temporal(uni_common::TemporalValue::Time {
877                nanos_since_midnight,
878                offset_seconds,
879            }) => {
880                nanos_builder.append_value(*nanos_since_midnight);
881                offset_builder.append_value(*offset_seconds);
882                null_buffer.append(true);
883            }
884            Value::Temporal(uni_common::TemporalValue::LocalTime {
885                nanos_since_midnight,
886            }) => {
887                nanos_builder.append_value(*nanos_since_midnight);
888                offset_builder.append_null();
889                null_buffer.append(true);
890            }
891            _ => {
892                nanos_builder.append_null();
893                offset_builder.append_null();
894                null_buffer.append(false);
895            }
896        }
897    }
898
899    let struct_arr = StructArray::new(
900        schema::time_struct_fields(),
901        vec![
902            Arc::new(nanos_builder.finish()) as ArrayRef,
903            Arc::new(offset_builder.finish()) as ArrayRef,
904        ],
905        Some(null_buffer.finish().into()),
906    );
907    Arc::new(struct_arr)
908}
909
910fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
911    let mut builder =
912        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
913    for v in values {
914        if v.is_null() {
915            builder.append_null();
916        } else {
917            // Encode as CypherValue (MessagePack-tagged)
918            let cv_bytes = uni_common::cypher_value_codec::encode(v);
919            builder.append_value(&cv_bytes);
920        }
921    }
922    Arc::new(builder.finish())
923}
924
925/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
926pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
927    match dt {
928        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
929        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
930        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
931        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
932        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
933        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
934        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
935        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
936        ArrowDataType::FixedSizeList(inner, size) => {
937            if inner.data_type() == &ArrowDataType::Float32 {
938                Ok(values_to_fixed_size_list_f32_array(values, *size))
939            } else {
940                Err(anyhow!("Unsupported FixedSizeList inner type"))
941            }
942        }
943        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
944            Ok(values_to_timestamp_array(values, tz.as_ref()))
945        }
946        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
947            Ok(values_to_timestamp_array(values, tz.as_ref()))
948        }
949        ArrowDataType::Date32 => {
950            let mut builder = Date32Builder::with_capacity(values.len());
951            for v in values {
952                if v.is_null() {
953                    builder.append_null();
954                } else if let Value::Temporal(uni_common::TemporalValue::Date {
955                    days_since_epoch,
956                }) = v
957                {
958                    builder.append_value(*days_since_epoch);
959                } else if let Some(n) = v.as_i64() {
960                    builder.append_value(n as i32);
961                } else {
962                    builder.append_null();
963                }
964            }
965            Ok(Arc::new(builder.finish()))
966        }
967        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
968            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
969            for v in values {
970                if v.is_null() {
971                    builder.append_null();
972                } else if let Value::Temporal(tv) = v {
973                    match tv {
974                        uni_common::TemporalValue::LocalTime {
975                            nanos_since_midnight,
976                        }
977                        | uni_common::TemporalValue::Time {
978                            nanos_since_midnight,
979                            ..
980                        } => builder.append_value(*nanos_since_midnight),
981                        _ => builder.append_null(),
982                    }
983                } else if let Some(n) = v.as_i64() {
984                    builder.append_value(n);
985                } else {
986                    builder.append_null();
987                }
988            }
989            Ok(Arc::new(builder.finish()))
990        }
991        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
992            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
993            for v in values {
994                if v.is_null() {
995                    builder.append_null();
996                } else if let Value::Temporal(tv) = v {
997                    match tv {
998                        uni_common::TemporalValue::LocalTime {
999                            nanos_since_midnight,
1000                        }
1001                        | uni_common::TemporalValue::Time {
1002                            nanos_since_midnight,
1003                            ..
1004                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
1005                        _ => builder.append_null(),
1006                    }
1007                } else if let Some(n) = v.as_i64() {
1008                    builder.append_value(n);
1009                } else {
1010                    builder.append_null();
1011                }
1012            }
1013            Ok(Arc::new(builder.finish()))
1014        }
1015        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
1016            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
1017            for v in values {
1018                if v.is_null() {
1019                    builder.append_null();
1020                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1021                    months,
1022                    days,
1023                    nanos,
1024                }) = v
1025                {
1026                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
1027                        months: *months as i32,
1028                        days: *days as i32,
1029                        nanoseconds: *nanos,
1030                    });
1031                } else {
1032                    builder.append_null();
1033                }
1034            }
1035            Ok(Arc::new(builder.finish()))
1036        }
1037        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1038            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1039            for v in values {
1040                if v.is_null() {
1041                    builder.append_null();
1042                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1043                    months,
1044                    days,
1045                    nanos,
1046                }) = v
1047                {
1048                    let total_micros =
1049                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1050                    builder.append_value(total_micros);
1051                } else if let Some(n) = v.as_i64() {
1052                    builder.append_value(n);
1053                } else {
1054                    builder.append_null();
1055                }
1056            }
1057            Ok(Arc::new(builder.finish()))
1058        }
1059        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1060        ArrowDataType::List(field) => {
1061            if field.data_type() == &ArrowDataType::Utf8 {
1062                let mut builder = ListBuilder::new(StringBuilder::new());
1063                for v in values {
1064                    if let Value::List(arr) = v {
1065                        for item in arr {
1066                            if let Some(s) = item.as_str() {
1067                                builder.values().append_value(s);
1068                            } else {
1069                                builder.values().append_null();
1070                            }
1071                        }
1072                        builder.append(true);
1073                    } else {
1074                        builder.append_null();
1075                    }
1076                }
1077                Ok(Arc::new(builder.finish()))
1078            } else {
1079                Err(anyhow!(
1080                    "Unsupported List inner type: {:?}",
1081                    field.data_type()
1082                ))
1083            }
1084        }
1085        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1086            Ok(values_to_datetime_struct_array(values))
1087        }
1088        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1089            Ok(values_to_time_struct_array(values))
1090        }
1091        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1092    }
1093}
1094
1095/// Property value extractor for building Arrow columns from entity properties.
1096pub struct PropertyExtractor<'a> {
1097    data_type: &'a DataType,
1098}
1099
1100impl<'a> PropertyExtractor<'a> {
1101    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1102        Self { data_type }
1103    }
1104
1105    /// Build an Arrow column from a slice of property maps.
1106    /// The `deleted` slice indicates which entries are deleted (use default values).
1107    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1108    where
1109        F: Fn(usize) -> Option<&'a Value>,
1110    {
1111        match self.data_type {
1112            DataType::String => self.build_string_column(len, deleted, get_props),
1113            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1114            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1115            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1116            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1117            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1118            DataType::Vector { dimensions } => {
1119                self.build_vector_column(len, deleted, get_props, *dimensions)
1120            }
1121            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1122            DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1123            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1124            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1125            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1126            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1127            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1128            DataType::Date => self.build_date32_column(len, deleted, get_props),
1129            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1130            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1131            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1132            _ => Err(anyhow!(
1133                "Unsupported data type for arrow conversion: {:?}",
1134                self.data_type
1135            )),
1136        }
1137    }
1138
1139    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1140    where
1141        F: Fn(usize) -> Option<&'a Value>,
1142    {
1143        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1144        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1145            let prop = get_props(i);
1146            if let Some(s) = prop.and_then(|v| v.as_str()) {
1147                builder.append_value(s);
1148            } else if let Some(Value::Temporal(tv)) = prop {
1149                builder.append_value(tv.to_string());
1150            } else if is_deleted {
1151                builder.append_value("");
1152            } else {
1153                builder.append_null();
1154            }
1155        }
1156        Ok(Arc::new(builder.finish()))
1157    }
1158
1159    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1160    where
1161        F: Fn(usize) -> Option<&'a Value>,
1162    {
1163        let mut values = Vec::with_capacity(len);
1164        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1165            // i64 -> i32 via try_from: an out-of-range value becomes NULL rather
1166            // than silently wrapping to a different number. (review H13)
1167            let val = get_props(i)
1168                .and_then(|v| v.as_i64())
1169                .and_then(|v| i32::try_from(v).ok());
1170            if val.is_none() && is_deleted {
1171                values.push(Some(0));
1172            } else {
1173                values.push(val);
1174            }
1175        }
1176        Ok(Arc::new(Int32Array::from(values)))
1177    }
1178
1179    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1180    where
1181        F: Fn(usize) -> Option<&'a Value>,
1182    {
1183        let mut values = Vec::with_capacity(len);
1184        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1185            let val = get_props(i).and_then(|v| v.as_i64());
1186            if val.is_none() && is_deleted {
1187                values.push(Some(0));
1188            } else {
1189                values.push(val);
1190            }
1191        }
1192        Ok(Arc::new(Int64Array::from(values)))
1193    }
1194
1195    fn build_timestamp_column<F>(
1196        &self,
1197        len: usize,
1198        deleted: &[bool],
1199        get_props: F,
1200    ) -> Result<ArrayRef>
1201    where
1202        F: Fn(usize) -> Option<&'a Value>,
1203    {
1204        let mut values = Vec::with_capacity(len);
1205        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1206            let val = get_props(i);
1207            let ts = if is_deleted || val.is_none() {
1208                Some(0i64)
1209            } else if let Some(Value::Temporal(tv)) = val {
1210                match tv {
1211                    uni_common::TemporalValue::DateTime {
1212                        nanos_since_epoch, ..
1213                    }
1214                    | uni_common::TemporalValue::LocalDateTime {
1215                        nanos_since_epoch, ..
1216                    } => Some(*nanos_since_epoch),
1217                    _ => None,
1218                }
1219            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1220                Some(v)
1221            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1222                parse_datetime_to_nanos(s)
1223            } else {
1224                None
1225            };
1226
1227            if is_deleted {
1228                values.push(Some(0));
1229            } else {
1230                values.push(ts);
1231            }
1232        }
1233        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1234        Ok(Arc::new(arr))
1235    }
1236
1237    fn build_datetime_struct_column<F>(
1238        &self,
1239        len: usize,
1240        deleted: &[bool],
1241        get_props: F,
1242    ) -> Result<ArrayRef>
1243    where
1244        F: Fn(usize) -> Option<&'a Value>,
1245    {
1246        let values = self.collect_values_or_null(len, deleted, &get_props);
1247        Ok(values_to_datetime_struct_array(&values))
1248    }
1249
1250    fn build_time_struct_column<F>(
1251        &self,
1252        len: usize,
1253        deleted: &[bool],
1254        get_props: F,
1255    ) -> Result<ArrayRef>
1256    where
1257        F: Fn(usize) -> Option<&'a Value>,
1258    {
1259        let values = self.collect_values_or_null(len, deleted, &get_props);
1260        Ok(values_to_time_struct_array(&values))
1261    }
1262
1263    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1264    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1265    where
1266        F: Fn(usize) -> Option<&'a Value>,
1267    {
1268        deleted
1269            .iter()
1270            .enumerate()
1271            .take(len)
1272            .map(|(i, &is_deleted)| {
1273                if is_deleted {
1274                    Value::Null
1275                } else {
1276                    get_props(i).cloned().unwrap_or(Value::Null)
1277                }
1278            })
1279            .collect()
1280    }
1281
1282    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1283    where
1284        F: Fn(usize) -> Option<&'a Value>,
1285    {
1286        let mut builder = Date32Builder::with_capacity(len);
1287        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1288
1289        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1290            let val = get_props(i);
1291            let days = if is_deleted || val.is_none() {
1292                Some(0)
1293            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1294                days_since_epoch,
1295            })) = val
1296            {
1297                Some(*days_since_epoch)
1298            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1299                // i64 day count -> i32: an out-of-range value becomes NULL
1300                // rather than silently wrapping to a different date. (review H13)
1301                i32::try_from(v).ok()
1302            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1303                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1304                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1305                    Err(_) => None,
1306                }
1307            } else {
1308                None
1309            };
1310
1311            if is_deleted {
1312                builder.append_value(0);
1313            } else if let Some(v) = days {
1314                builder.append_value(v);
1315            } else {
1316                builder.append_null();
1317            }
1318        }
1319        Ok(Arc::new(builder.finish()))
1320    }
1321
1322    fn build_duration_column<F>(
1323        &self,
1324        len: usize,
1325        deleted: &[bool],
1326        get_props: F,
1327    ) -> Result<ArrayRef>
1328    where
1329        F: Fn(usize) -> Option<&'a Value>,
1330    {
1331        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1332        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1333        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1334            let raw_val = get_props(i);
1335            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1336            {
1337                let encoded = uni_common::cypher_value_codec::encode(val);
1338                builder.append_value(&encoded);
1339            } else if is_deleted {
1340                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1341                    months: 0,
1342                    days: 0,
1343                    nanos: 0,
1344                });
1345                let encoded = uni_common::cypher_value_codec::encode(&zero);
1346                builder.append_value(&encoded);
1347            } else {
1348                builder.append_null();
1349            }
1350        }
1351        Ok(Arc::new(builder.finish()))
1352    }
1353
1354    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1355    where
1356        F: Fn(usize) -> Option<&'a Value>,
1357    {
1358        const ENCODED_LEN: i32 = 24;
1359        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1360        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1361            let raw_val = get_props(i);
1362            let btic = match raw_val {
1363                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1364                    uni_btic::Btic::new(*lo, *hi, *meta)
1365                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1366                ),
1367                Some(Value::String(s)) => Some(
1368                    uni_btic::parse::parse_btic_literal(s)
1369                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1370                ),
1371                _ => None,
1372            };
1373
1374            if let Some(b) = btic {
1375                builder.append_value(uni_btic::encode::encode(&b))?;
1376            } else if is_deleted {
1377                builder.append_value([0u8; ENCODED_LEN as usize])?;
1378            } else {
1379                builder.append_null();
1380            }
1381        }
1382        Ok(Arc::new(builder.finish()))
1383    }
1384
1385    fn build_float32_column<F>(
1386        &self,
1387        len: usize,
1388        deleted: &[bool],
1389        get_props: F,
1390    ) -> Result<ArrayRef>
1391    where
1392        F: Fn(usize) -> Option<&'a Value>,
1393    {
1394        let mut values = Vec::with_capacity(len);
1395        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1396            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1397            if val.is_none() && is_deleted {
1398                values.push(Some(0.0));
1399            } else {
1400                values.push(val);
1401            }
1402        }
1403        Ok(Arc::new(Float32Array::from(values)))
1404    }
1405
1406    fn build_float64_column<F>(
1407        &self,
1408        len: usize,
1409        deleted: &[bool],
1410        get_props: F,
1411    ) -> Result<ArrayRef>
1412    where
1413        F: Fn(usize) -> Option<&'a Value>,
1414    {
1415        let mut values = Vec::with_capacity(len);
1416        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1417            let val = get_props(i).and_then(|v| v.as_f64());
1418            if val.is_none() && is_deleted {
1419                values.push(Some(0.0));
1420            } else {
1421                values.push(val);
1422            }
1423        }
1424        Ok(Arc::new(Float64Array::from(values)))
1425    }
1426
1427    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1428    where
1429        F: Fn(usize) -> Option<&'a Value>,
1430    {
1431        let mut values = Vec::with_capacity(len);
1432        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1433            let val = get_props(i).and_then(|v| v.as_bool());
1434            if val.is_none() && is_deleted {
1435                values.push(Some(false));
1436            } else {
1437                values.push(val);
1438            }
1439        }
1440        Ok(Arc::new(BooleanArray::from(values)))
1441    }
1442
1443    fn build_vector_column<F>(
1444        &self,
1445        len: usize,
1446        deleted: &[bool],
1447        get_props: F,
1448        dimensions: usize,
1449    ) -> Result<ArrayRef>
1450    where
1451        F: Fn(usize) -> Option<&'a Value>,
1452    {
1453        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1454
1455        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1456            let val = get_props(i);
1457            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1458            for v in values {
1459                builder.values().append_value(v);
1460            }
1461            builder.append(valid);
1462        }
1463        Ok(Arc::new(builder.finish()))
1464    }
1465
1466    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1467    where
1468        F: Fn(usize) -> Option<&'a Value>,
1469    {
1470        let null_val = Value::Null;
1471        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1472        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1473            let val = get_props(i);
1474            let uni_val = if val.is_none() && is_deleted {
1475                &null_val
1476            } else {
1477                val.unwrap_or(&null_val)
1478            };
1479            // Encode to CypherValue (MessagePack-tagged)
1480            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1481            builder.append_value(&cv_bytes);
1482        }
1483        Ok(Arc::new(builder.finish()))
1484    }
1485
1486    fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1487    where
1488        F: Fn(usize) -> Option<&'a Value>,
1489    {
1490        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1491        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1492            let val = get_props(i);
1493            if let Some(Value::Bytes(b)) = val {
1494                builder.append_value(b);
1495            } else if is_deleted {
1496                builder.append_value(&[][..]);
1497            } else {
1498                builder.append_null();
1499            }
1500        }
1501        Ok(Arc::new(builder.finish()))
1502    }
1503
1504    fn build_list_column<F>(
1505        &self,
1506        len: usize,
1507        deleted: &[bool],
1508        get_props: F,
1509        inner: &DataType,
1510    ) -> Result<ArrayRef>
1511    where
1512        F: Fn(usize) -> Option<&'a Value>,
1513    {
1514        match inner {
1515            DataType::String => {
1516                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1517                    if let Some(s) = v.as_str() {
1518                        b.append_value(s);
1519                    } else {
1520                        b.append_null();
1521                    }
1522                })
1523            }
1524            DataType::Int64 => {
1525                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1526                    if let Some(n) = v.as_i64() {
1527                        b.append_value(n);
1528                    } else {
1529                        b.append_null();
1530                    }
1531                })
1532            }
1533            DataType::Float64 => {
1534                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1535                    if let Some(f) = v.as_f64() {
1536                        b.append_value(f);
1537                    } else {
1538                        b.append_null();
1539                    }
1540                })
1541            }
1542            DataType::Bytes => {
1543                // Raw `Bytes` elements: store each buffer verbatim in a `LargeBinary`
1544                // child and mark the child field `uni_raw_bytes` so the read path
1545                // decodes it verbatim instead of through the tagged codec.
1546                let item_field = Arc::new(
1547                    Field::new("item", ArrowDataType::LargeBinary, true)
1548                        .with_metadata(schema::raw_bytes_field_metadata()),
1549                );
1550                let mut builder =
1551                    ListBuilder::new(LargeBinaryBuilder::new()).with_field(item_field);
1552                for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1553                    let val_array = get_props(i).and_then(|v| v.as_array());
1554                    if val_array.is_none() && is_deleted {
1555                        builder.append_null();
1556                    } else if let Some(arr) = val_array {
1557                        for v in arr {
1558                            if let Value::Bytes(b) = v {
1559                                builder.values().append_value(b);
1560                            } else {
1561                                builder.values().append_null();
1562                            }
1563                        }
1564                        builder.append(true);
1565                    } else {
1566                        builder.append_null();
1567                    }
1568                }
1569                Ok(Arc::new(builder.finish()))
1570            }
1571            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1572        }
1573    }
1574
1575    /// Generic helper to build a list column with any inner builder type.
1576    fn build_typed_list<F, B, A>(
1577        &self,
1578        len: usize,
1579        deleted: &[bool],
1580        get_props: &F,
1581        inner_builder: B,
1582        mut append_value: A,
1583    ) -> Result<ArrayRef>
1584    where
1585        F: Fn(usize) -> Option<&'a Value>,
1586        B: arrow_array::builder::ArrayBuilder,
1587        A: FnMut(&Value, &mut B),
1588    {
1589        let mut builder = ListBuilder::new(inner_builder);
1590        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1591            let val_array = get_props(i).and_then(|v| v.as_array());
1592            if val_array.is_none() && is_deleted {
1593                builder.append_null();
1594            } else if let Some(arr) = val_array {
1595                for v in arr {
1596                    append_value(v, builder.values());
1597                }
1598                builder.append(true);
1599            } else {
1600                builder.append_null();
1601            }
1602        }
1603        Ok(Arc::new(builder.finish()))
1604    }
1605
1606    fn build_map_column<F>(
1607        &self,
1608        len: usize,
1609        deleted: &[bool],
1610        get_props: F,
1611        key: &DataType,
1612        value: &DataType,
1613    ) -> Result<ArrayRef>
1614    where
1615        F: Fn(usize) -> Option<&'a Value>,
1616    {
1617        if !matches!(key, DataType::String) {
1618            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1619        }
1620
1621        match value {
1622            DataType::String => self.build_typed_map(
1623                len,
1624                deleted,
1625                &get_props,
1626                StringBuilder::new(),
1627                arrow_schema::DataType::Utf8,
1628                None,
1629                |v, b: &mut StringBuilder| {
1630                    if let Some(s) = v.as_str() {
1631                        b.append_value(s);
1632                    } else {
1633                        b.append_null();
1634                    }
1635                },
1636            ),
1637            DataType::Int64 => self.build_typed_map(
1638                len,
1639                deleted,
1640                &get_props,
1641                Int64Builder::new(),
1642                arrow_schema::DataType::Int64,
1643                None,
1644                |v, b: &mut Int64Builder| {
1645                    if let Some(n) = v.as_i64() {
1646                        b.append_value(n);
1647                    } else {
1648                        b.append_null();
1649                    }
1650                },
1651            ),
1652            DataType::Bytes => self.build_typed_map(
1653                len,
1654                deleted,
1655                &get_props,
1656                LargeBinaryBuilder::new(),
1657                arrow_schema::DataType::LargeBinary,
1658                // Mark the value child `uni_raw_bytes` so the read path decodes each
1659                // raw `Bytes` value verbatim rather than through the tagged codec.
1660                Some(schema::raw_bytes_field_metadata()),
1661                |v, b: &mut LargeBinaryBuilder| {
1662                    if let Value::Bytes(bytes) = v {
1663                        b.append_value(bytes);
1664                    } else {
1665                        b.append_null();
1666                    }
1667                },
1668            ),
1669            _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1670        }
1671    }
1672
1673    /// Generic helper to build a map column with any value builder type.
1674    #[expect(
1675        clippy::too_many_arguments,
1676        reason = "builder plumbing: value type + optional child metadata are distinct knobs"
1677    )]
1678    fn build_typed_map<F, B, A>(
1679        &self,
1680        len: usize,
1681        deleted: &[bool],
1682        get_props: &F,
1683        value_builder: B,
1684        value_arrow_type: arrow_schema::DataType,
1685        value_metadata: Option<HashMap<String, String>>,
1686        mut append_value: A,
1687    ) -> Result<ArrayRef>
1688    where
1689        F: Fn(usize) -> Option<&'a Value>,
1690        B: arrow_array::builder::ArrayBuilder,
1691        A: FnMut(&Value, &mut B),
1692    {
1693        let key_builder = Box::new(StringBuilder::new());
1694        let value_builder = Box::new(value_builder);
1695        let value_field = match value_metadata {
1696            Some(meta) => Field::new("value", value_arrow_type, true).with_metadata(meta),
1697            None => Field::new("value", value_arrow_type, true),
1698        };
1699        let struct_builder = StructBuilder::new(
1700            vec![
1701                Field::new("key", arrow_schema::DataType::Utf8, false),
1702                value_field,
1703            ],
1704            vec![key_builder, value_builder],
1705        );
1706        let mut builder = ListBuilder::new(struct_builder);
1707
1708        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1709            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1710        }
1711        Ok(Arc::new(builder.finish()))
1712    }
1713
1714    /// Append a single map entry to the list builder.
1715    fn append_map_entry<B, A>(
1716        &self,
1717        builder: &mut ListBuilder<StructBuilder>,
1718        val: Option<&'a Value>,
1719        is_deleted: bool,
1720        append_value: &mut A,
1721    ) where
1722        B: arrow_array::builder::ArrayBuilder,
1723        A: FnMut(&Value, &mut B),
1724    {
1725        let val_obj = val.and_then(|v| v.as_object());
1726        if val_obj.is_none() && is_deleted {
1727            builder.append(false);
1728        } else if let Some(obj) = val_obj {
1729            let struct_b = builder.values();
1730            for (k, v) in obj {
1731                struct_b
1732                    .field_builder::<StringBuilder>(0)
1733                    .unwrap()
1734                    .append_value(k);
1735                // Safety: We know the value builder type matches B
1736                let value_b = struct_b.field_builder::<B>(1).unwrap();
1737                append_value(v, value_b);
1738                struct_b.append(true);
1739            }
1740            builder.append(true);
1741        } else {
1742            builder.append(false);
1743        }
1744    }
1745
1746    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1747    where
1748        F: Fn(usize) -> Option<&'a Value>,
1749    {
1750        let mut builder = BinaryBuilder::new();
1751        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1752            if is_deleted {
1753                builder.append_null();
1754                continue;
1755            }
1756            if let Some(val) = get_props(i) {
1757                // Try to parse CRDT from the value
1758                // If it's a string, first parse it as JSON, then as CRDT
1759                let crdt_result = if let Some(s) = val.as_str() {
1760                    serde_json::from_str::<Crdt>(s)
1761                } else {
1762                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1763                    let json_val: serde_json::Value = val.clone().into();
1764                    serde_json::from_value::<Crdt>(json_val)
1765                };
1766
1767                if let Ok(crdt) = crdt_result {
1768                    if let Ok(bytes) = crdt.to_msgpack() {
1769                        builder.append_value(&bytes);
1770                    } else {
1771                        builder.append_null();
1772                    }
1773                } else {
1774                    builder.append_null();
1775                }
1776            } else {
1777                builder.append_null();
1778            }
1779        }
1780        Ok(Arc::new(builder.finish()))
1781    }
1782}
1783
1784/// Build a column for edge entries (no deleted flag handling needed).
1785pub fn build_edge_column<'a>(
1786    name: &'a str,
1787    data_type: &'a DataType,
1788    len: usize,
1789    get_props: impl Fn(usize) -> Option<&'a Value>,
1790) -> Result<ArrayRef> {
1791    // For edges, use empty deleted array
1792    let deleted = vec![false; len];
1793    let extractor = PropertyExtractor::new(name, data_type);
1794    extractor.build_column(len, &deleted, get_props)
1795}
1796
1797#[cfg(test)]
1798mod tests {
1799    use super::*;
1800    use arrow_array::{
1801        Array, DurationMicrosecondArray,
1802        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1803    };
1804    use std::collections::HashMap;
1805    use uni_common::TemporalValue;
1806    use uni_crdt::{Crdt, GCounter};
1807
1808    #[test]
1809    fn test_arrow_to_value_string() {
1810        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1811        assert_eq!(
1812            arrow_to_value(&arr, 0, None),
1813            Value::String("hello".to_string())
1814        );
1815        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1816        assert_eq!(
1817            arrow_to_value(&arr, 2, None),
1818            Value::String("world".to_string())
1819        );
1820    }
1821
1822    #[test]
1823    fn test_arrow_to_value_int64() {
1824        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1825        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1826        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1827        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1828    }
1829
1830    #[test]
1831    #[allow(clippy::approx_constant)]
1832    fn test_arrow_to_value_float64() {
1833        let arr = Float64Array::from(vec![Some(3.14), None]);
1834        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1835        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1836    }
1837
1838    #[test]
1839    fn test_arrow_to_value_bool() {
1840        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1841        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1842        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1843        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1844    }
1845
1846    #[test]
1847    fn test_values_to_array_int64() {
1848        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1849        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1850        assert_eq!(arr.len(), 4);
1851
1852        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1853        assert_eq!(int_arr.value(0), 1);
1854        assert_eq!(int_arr.value(1), 2);
1855        assert!(int_arr.is_null(2));
1856        assert_eq!(int_arr.value(3), 4);
1857    }
1858
1859    #[test]
1860    fn test_values_to_array_string() {
1861        let values = vec![
1862            Value::String("a".to_string()),
1863            Value::String("b".to_string()),
1864            Value::Null,
1865        ];
1866        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1867        assert_eq!(arr.len(), 3);
1868
1869        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1870        assert_eq!(str_arr.value(0), "a");
1871        assert_eq!(str_arr.value(1), "b");
1872        assert!(str_arr.is_null(2));
1873    }
1874
1875    #[test]
1876    fn test_property_extractor_string() {
1877        let props: Vec<HashMap<String, Value>> = vec![
1878            [("name".to_string(), Value::String("Alice".to_string()))]
1879                .into_iter()
1880                .collect(),
1881            [("name".to_string(), Value::String("Bob".to_string()))]
1882                .into_iter()
1883                .collect(),
1884            HashMap::new(),
1885        ];
1886        let deleted = vec![false, false, true];
1887
1888        let extractor = PropertyExtractor::new("name", &DataType::String);
1889        let arr = extractor
1890            .build_column(3, &deleted, |i| props[i].get("name"))
1891            .unwrap();
1892
1893        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1894        assert_eq!(str_arr.value(0), "Alice");
1895        assert_eq!(str_arr.value(1), "Bob");
1896        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
1897    }
1898
1899    #[test]
1900    fn test_property_extractor_int64() {
1901        let props: Vec<HashMap<String, Value>> = vec![
1902            [("age".to_string(), Value::Int(25))].into_iter().collect(),
1903            [("age".to_string(), Value::Int(30))].into_iter().collect(),
1904            HashMap::new(),
1905        ];
1906        let deleted = vec![false, false, true];
1907
1908        let extractor = PropertyExtractor::new("age", &DataType::Int64);
1909        let arr = extractor
1910            .build_column(3, &deleted, |i| props[i].get("age"))
1911            .unwrap();
1912
1913        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1914        assert_eq!(int_arr.value(0), 25);
1915        assert_eq!(int_arr.value(1), 30);
1916        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
1917    }
1918
1919    #[test]
1920    fn test_property_extractor_bytes_roundtrip() {
1921        let blob = vec![0u8, 1, 2, 255];
1922        let props: Vec<HashMap<String, Value>> = vec![
1923            [("blob".to_string(), Value::Bytes(blob.clone()))]
1924                .into_iter()
1925                .collect(),
1926            [("blob".to_string(), Value::Bytes(Vec::new()))]
1927                .into_iter()
1928                .collect(),
1929            HashMap::new(),
1930        ];
1931        let deleted = vec![false, false, false];
1932
1933        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1934        let arr = extractor
1935            .build_column(3, &deleted, |i| props[i].get("blob"))
1936            .unwrap();
1937
1938        // Decode each row through arrow_to_value with Bytes hint.
1939        assert_eq!(
1940            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1941            Value::Bytes(blob)
1942        );
1943        assert_eq!(
1944            arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
1945            Value::Bytes(Vec::new())
1946        );
1947        // Missing property → null in the Arrow array.
1948        assert_eq!(
1949            arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
1950            Value::Null
1951        );
1952    }
1953
1954    #[test]
1955    fn test_bytes_vs_cypher_value_disambiguation() {
1956        // A LargeBinary column tagged as DataType::Bytes must NOT be decoded
1957        // through the CypherValue MessagePack codec, even though both share
1958        // the same Arrow physical type.
1959        let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1960        let props: Vec<HashMap<String, Value>> = vec![
1961            [("blob".to_string(), Value::Bytes(raw.clone()))]
1962                .into_iter()
1963                .collect(),
1964        ];
1965        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1966        let arr = extractor
1967            .build_column(1, &[false], |i| props[i].get("blob"))
1968            .unwrap();
1969        // With schema hint: raw bytes returned.
1970        assert_eq!(
1971            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1972            Value::Bytes(raw)
1973        );
1974    }
1975
1976    #[test]
1977    fn test_data_type_bytes_to_arrow() {
1978        assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
1979    }
1980
1981    #[test]
1982    fn test_arrow_to_value_time64() {
1983        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
1984        let mut builder = Time64MicrosecondBuilder::new();
1985        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
1986        builder.append_value(37_845_000_000);
1987        // 00:00:00 = 0 microseconds
1988        builder.append_value(0);
1989        // 23:59:59.123456 = 86399.123456 seconds
1990        builder.append_value(86_399_123_456);
1991        builder.append_null();
1992
1993        let arr = builder.finish();
1994        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
1995        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1996        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1997        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1998        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1999    }
2000
2001    #[test]
2002    fn test_arrow_to_value_duration() {
2003        // Test DurationMicrosecondArray conversion
2004        // Arrow→Value now returns Value::Temporal(Duration)
2005        let arr = DurationMicrosecondArray::from(vec![
2006            Some(1_000_000),      // 1 second in microseconds
2007            Some(3_600_000_000),  // 1 hour
2008            Some(86_400_000_000), // 1 day
2009            None,
2010        ]);
2011
2012        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
2013        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
2014        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
2015        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2016    }
2017
2018    #[test]
2019    fn test_arrow_to_value_binary_crdt() {
2020        // Test BinaryArray (CRDT) conversion - round-trip test
2021        let mut builder = BinaryBuilder::new();
2022
2023        // Create a GCounter CRDT and serialize it
2024        let mut counter = GCounter::new();
2025        counter.increment("actor1", 5);
2026        let crdt = Crdt::GCounter(counter);
2027        let bytes = crdt.to_msgpack().unwrap();
2028        builder.append_value(&bytes);
2029
2030        // Add a null value
2031        builder.append_null();
2032
2033        let arr = builder.finish();
2034
2035        // The first value should deserialize back to a map
2036        let result = arrow_to_value(&arr, 0, None);
2037        assert!(result.as_object().is_some());
2038        let obj = result.as_object().unwrap();
2039        // GCounter serializes with tag "t": "gc"
2040        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
2041
2042        // Null value should return null
2043        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2044    }
2045
2046    #[test]
2047    fn test_datetime_struct_encode_decode_roundtrip() {
2048        // Test DateTime struct encoding with offset and timezone preservation
2049        let values = vec![
2050            Value::Temporal(TemporalValue::DateTime {
2051                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
2052                offset_seconds: 3600,                  // +01:00
2053                timezone_name: Some("Europe/Paris".to_string()),
2054            }),
2055            Value::Temporal(TemporalValue::DateTime {
2056                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
2057                offset_seconds: -18000,                 // -05:00
2058                timezone_name: None,
2059            }),
2060            Value::Temporal(TemporalValue::DateTime {
2061                nanos_since_epoch: 0, // Unix epoch
2062                offset_seconds: 0,
2063                timezone_name: Some("UTC".to_string()),
2064            }),
2065        ];
2066
2067        // Encode to Arrow struct
2068        let arr_ref = values_to_datetime_struct_array(&values);
2069        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2070        assert_eq!(arr.len(), 3);
2071
2072        // Decode back to Value
2073        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2074        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2075        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2076
2077        // Verify round-trip preserves all fields
2078        assert_eq!(decoded_0, values[0]);
2079        assert_eq!(decoded_1, values[1]);
2080        assert_eq!(decoded_2, values[2]);
2081
2082        // Verify struct field extraction
2083        if let Value::Temporal(TemporalValue::DateTime {
2084            nanos_since_epoch,
2085            offset_seconds,
2086            timezone_name,
2087        }) = decoded_0
2088        {
2089            assert_eq!(nanos_since_epoch, 441763200000000000);
2090            assert_eq!(offset_seconds, 3600);
2091            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2092        } else {
2093            panic!("Expected DateTime value");
2094        }
2095    }
2096
2097    #[test]
2098    fn test_datetime_struct_null_handling() {
2099        // Test DateTime struct with null values
2100        let values = vec![
2101            Value::Temporal(TemporalValue::DateTime {
2102                nanos_since_epoch: 441763200000000000,
2103                offset_seconds: 3600,
2104                timezone_name: Some("Europe/Paris".to_string()),
2105            }),
2106            Value::Null,
2107            Value::Temporal(TemporalValue::DateTime {
2108                nanos_since_epoch: 0,
2109                offset_seconds: 0,
2110                timezone_name: None,
2111            }),
2112        ];
2113
2114        let arr_ref = values_to_datetime_struct_array(&values);
2115        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2116        assert_eq!(arr.len(), 3);
2117
2118        // Check first value is valid
2119        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2120        assert_eq!(decoded_0, values[0]);
2121
2122        // Check second value is null
2123        assert!(arr.is_null(1));
2124        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2125        assert_eq!(decoded_1, Value::Null);
2126
2127        // Check third value is valid
2128        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2129        assert_eq!(decoded_2, values[2]);
2130    }
2131
2132    #[test]
2133    fn test_datetime_struct_boundary_values() {
2134        // Test boundary values: offset=0, large positive/negative offsets
2135        let values = vec![
2136            Value::Temporal(TemporalValue::DateTime {
2137                nanos_since_epoch: 441763200000000000,
2138                offset_seconds: 0, // UTC
2139                timezone_name: None,
2140            }),
2141            Value::Temporal(TemporalValue::DateTime {
2142                nanos_since_epoch: 441763200000000000,
2143                offset_seconds: 43200, // +12:00 (max typical offset)
2144                timezone_name: None,
2145            }),
2146            Value::Temporal(TemporalValue::DateTime {
2147                nanos_since_epoch: 441763200000000000,
2148                offset_seconds: -43200, // -12:00 (min typical offset)
2149                timezone_name: None,
2150            }),
2151        ];
2152
2153        let arr_ref = values_to_datetime_struct_array(&values);
2154        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2155        assert_eq!(arr.len(), 3);
2156
2157        // Verify round-trip for all boundary values
2158        for (i, expected) in values.iter().enumerate() {
2159            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2160            assert_eq!(&decoded, expected);
2161        }
2162    }
2163
2164    #[test]
2165    fn test_datetime_old_schema_migration() {
2166        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
2167        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2168        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
2169        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
2170        builder.append_null();
2171
2172        let arr = builder.finish();
2173
2174        // Decode with DataType::DateTime hint should migrate old schema
2175        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2176        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2177        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2178
2179        // Old schema should default to offset=0, preserve timezone
2180        if let Value::Temporal(TemporalValue::DateTime {
2181            nanos_since_epoch,
2182            offset_seconds,
2183            timezone_name,
2184        }) = decoded_0
2185        {
2186            assert_eq!(nanos_since_epoch, 441763200000000000);
2187            assert_eq!(offset_seconds, 0);
2188            assert_eq!(timezone_name, Some("UTC".to_string()));
2189        } else {
2190            panic!("Expected DateTime value");
2191        }
2192
2193        // Verify null handling
2194        assert_eq!(decoded_2, Value::Null);
2195    }
2196
2197    #[test]
2198    fn test_time_struct_encode_decode_roundtrip() {
2199        // Test Time struct encoding with offset preservation
2200        let values = vec![
2201            Value::Temporal(TemporalValue::Time {
2202                nanos_since_midnight: 37845000000000, // 10:30:45
2203                offset_seconds: 3600,                 // +01:00
2204            }),
2205            Value::Temporal(TemporalValue::Time {
2206                nanos_since_midnight: 0, // 00:00:00
2207                offset_seconds: 0,
2208            }),
2209            Value::Temporal(TemporalValue::Time {
2210                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
2211                offset_seconds: -18000,               // -05:00
2212            }),
2213        ];
2214
2215        // Encode to Arrow struct
2216        let arr_ref = values_to_time_struct_array(&values);
2217        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2218        assert_eq!(arr.len(), 3);
2219
2220        // Decode back to Value
2221        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2222        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2223        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2224
2225        // Verify round-trip preserves all fields
2226        assert_eq!(decoded_0, values[0]);
2227        assert_eq!(decoded_1, values[1]);
2228        assert_eq!(decoded_2, values[2]);
2229
2230        // Verify struct field extraction
2231        if let Value::Temporal(TemporalValue::Time {
2232            nanos_since_midnight,
2233            offset_seconds,
2234        }) = decoded_0
2235        {
2236            assert_eq!(nanos_since_midnight, 37845000000000);
2237            assert_eq!(offset_seconds, 3600);
2238        } else {
2239            panic!("Expected Time value");
2240        }
2241    }
2242
2243    #[test]
2244    fn test_time_struct_null_handling() {
2245        // Test Time struct with null values
2246        let values = vec![
2247            Value::Temporal(TemporalValue::Time {
2248                nanos_since_midnight: 37845000000000,
2249                offset_seconds: 3600,
2250            }),
2251            Value::Null,
2252            Value::Temporal(TemporalValue::Time {
2253                nanos_since_midnight: 0,
2254                offset_seconds: 0,
2255            }),
2256        ];
2257
2258        let arr_ref = values_to_time_struct_array(&values);
2259        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2260        assert_eq!(arr.len(), 3);
2261
2262        // Check first value is valid
2263        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2264        assert_eq!(decoded_0, values[0]);
2265
2266        // Check second value is null
2267        assert!(arr.is_null(1));
2268        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2269        assert_eq!(decoded_1, Value::Null);
2270
2271        // Check third value is valid
2272        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2273        assert_eq!(decoded_2, values[2]);
2274    }
2275
2276    // Tests for extract_vector_f32_values
2277
2278    #[test]
2279    fn test_extract_vector_f32_values_valid_vector() {
2280        let v = vec![1.0, 2.0, 3.0];
2281        let val = Value::Vector(v.clone());
2282        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2283        assert_eq!(result, v);
2284        assert!(valid);
2285    }
2286
2287    #[test]
2288    fn test_extract_vector_f32_values_vector_wrong_dims() {
2289        let v = vec![1.0, 2.0];
2290        let val = Value::Vector(v);
2291        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2292        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2293        assert!(!valid);
2294    }
2295
2296    #[test]
2297    fn test_extract_vector_f32_values_valid_list() {
2298        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2299        let val = Value::List(v);
2300        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2301        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2302        assert!(valid);
2303    }
2304
2305    #[test]
2306    fn test_extract_vector_f32_values_list_wrong_dims() {
2307        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2308        let val = Value::List(v);
2309        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2310        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2311        assert!(!valid);
2312    }
2313
2314    #[test]
2315    fn test_extract_vector_f32_values_list_int_coercion() {
2316        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2317        let val = Value::List(v);
2318        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2319        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2320        assert!(valid);
2321    }
2322
2323    #[test]
2324    fn test_extract_vector_f32_values_none() {
2325        let (result, valid) = extract_vector_f32_values(None, false, 3);
2326        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2327        assert!(!valid);
2328    }
2329
2330    #[test]
2331    fn test_extract_vector_f32_values_null() {
2332        let val = Value::Null;
2333        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2334        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2335        assert!(!valid);
2336    }
2337
2338    #[test]
2339    fn test_extract_vector_f32_values_unsupported_type() {
2340        let val = Value::String("not a vector".to_string());
2341        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2342        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2343        assert!(!valid);
2344    }
2345
2346    #[test]
2347    fn test_extract_vector_f32_values_deleted_with_none() {
2348        let (result, valid) = extract_vector_f32_values(None, true, 3);
2349        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2350        assert!(valid); // Deleted entries are marked as valid with zeros
2351    }
2352
2353    #[test]
2354    fn test_extract_vector_f32_values_deleted_with_null() {
2355        let val = Value::Null;
2356        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2357        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2358        assert!(valid); // Deleted entries are marked as valid with zeros
2359    }
2360
2361    // Tests for values_to_array with FixedSizeList
2362
2363    #[test]
2364    fn test_values_to_fixed_size_list_vector_with_nulls() {
2365        let values = vec![
2366            Value::Vector(vec![1.0, 2.0]),
2367            Value::Null,
2368            Value::Vector(vec![3.0, 4.0]),
2369            Value::String("invalid".to_string()),
2370        ];
2371        let arr_ref = values_to_array(
2372            &values,
2373            &ArrowDataType::FixedSizeList(
2374                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2375                2,
2376            ),
2377        )
2378        .unwrap();
2379
2380        let arr = arr_ref
2381            .as_any()
2382            .downcast_ref::<FixedSizeListArray>()
2383            .unwrap();
2384
2385        assert_eq!(arr.len(), 4);
2386        assert!(arr.is_valid(0));
2387        assert!(!arr.is_valid(1)); // Null value
2388        assert!(arr.is_valid(2));
2389        assert!(!arr.is_valid(3)); // Invalid type
2390    }
2391
2392    #[test]
2393    fn test_values_to_fixed_size_list_from_list() {
2394        let values = vec![
2395            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2396            Value::List(vec![Value::Int(3), Value::Int(4)]),
2397        ];
2398        let arr_ref = values_to_array(
2399            &values,
2400            &ArrowDataType::FixedSizeList(
2401                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2402                2,
2403            ),
2404        )
2405        .unwrap();
2406
2407        let arr = arr_ref
2408            .as_any()
2409            .downcast_ref::<FixedSizeListArray>()
2410            .unwrap();
2411
2412        assert_eq!(arr.len(), 2);
2413        assert!(arr.is_valid(0));
2414        assert!(arr.is_valid(1));
2415
2416        // Check values
2417        let child = arr
2418            .values()
2419            .as_any()
2420            .downcast_ref::<Float32Array>()
2421            .unwrap();
2422        assert_eq!(child.value(0), 1.0);
2423        assert_eq!(child.value(1), 2.0);
2424        assert_eq!(child.value(2), 3.0);
2425        assert_eq!(child.value(3), 4.0);
2426    }
2427
2428    #[test]
2429    fn test_values_to_fixed_size_list_wrong_dimensions() {
2430        let values = vec![
2431            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2432            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2433        ];
2434        let arr_ref = values_to_array(
2435            &values,
2436            &ArrowDataType::FixedSizeList(
2437                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2438                2,
2439            ),
2440        )
2441        .unwrap();
2442
2443        let arr = arr_ref
2444            .as_any()
2445            .downcast_ref::<FixedSizeListArray>()
2446            .unwrap();
2447
2448        assert_eq!(arr.len(), 2);
2449        assert!(!arr.is_valid(0)); // Wrong dimensions
2450        assert!(!arr.is_valid(1)); // Wrong dimensions
2451
2452        // Check that child array has zeros for invalid entries
2453        let child = arr
2454            .values()
2455            .as_any()
2456            .downcast_ref::<Float32Array>()
2457            .unwrap();
2458        assert_eq!(child.value(0), 0.0);
2459        assert_eq!(child.value(1), 0.0);
2460        assert_eq!(child.value(2), 0.0);
2461        assert_eq!(child.value(3), 0.0);
2462    }
2463
2464    #[test]
2465    fn test_values_to_fixed_size_list_all_nulls() {
2466        let values = vec![Value::Null, Value::Null, Value::Null];
2467        let arr_ref = values_to_array(
2468            &values,
2469            &ArrowDataType::FixedSizeList(
2470                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2471                3,
2472            ),
2473        )
2474        .unwrap();
2475
2476        let arr = arr_ref
2477            .as_any()
2478            .downcast_ref::<FixedSizeListArray>()
2479            .unwrap();
2480
2481        assert_eq!(arr.len(), 3);
2482        assert!(!arr.is_valid(0));
2483        assert!(!arr.is_valid(1));
2484        assert!(!arr.is_valid(2));
2485
2486        // Verify child array length is correct (3 rows × 3 dims = 9)
2487        let child = arr
2488            .values()
2489            .as_any()
2490            .downcast_ref::<Float32Array>()
2491            .unwrap();
2492        assert_eq!(child.len(), 9);
2493    }
2494
2495    #[test]
2496    fn test_values_to_fixed_size_list_mixed_types() {
2497        let values = vec![
2498            Value::Vector(vec![1.0, 2.0]),
2499            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2500            Value::Null,
2501            Value::String("invalid".to_string()),
2502        ];
2503        let arr_ref = values_to_array(
2504            &values,
2505            &ArrowDataType::FixedSizeList(
2506                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2507                2,
2508            ),
2509        )
2510        .unwrap();
2511
2512        let arr = arr_ref
2513            .as_any()
2514            .downcast_ref::<FixedSizeListArray>()
2515            .unwrap();
2516
2517        assert_eq!(arr.len(), 4);
2518        assert!(arr.is_valid(0)); // Value::Vector
2519        assert!(arr.is_valid(1)); // Value::List
2520        assert!(!arr.is_valid(2)); // Value::Null
2521        assert!(!arr.is_valid(3)); // Value::String
2522
2523        // Check values for valid entries
2524        let child = arr
2525            .values()
2526            .as_any()
2527            .downcast_ref::<Float32Array>()
2528            .unwrap();
2529        assert_eq!(child.value(0), 1.0);
2530        assert_eq!(child.value(1), 2.0);
2531        assert_eq!(child.value(2), 3.0);
2532        assert_eq!(child.value(3), 4.0);
2533    }
2534
2535    // Tests for PropertyExtractor::build_vector_column
2536
2537    #[test]
2538    fn test_build_vector_column_with_nulls_and_deleted() {
2539        let data_type = DataType::Vector { dimensions: 3 };
2540        let extractor = PropertyExtractor::new("test_vec", &data_type);
2541
2542        let props = [
2543            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2544            None,              // Missing property
2545            Some(Value::Null), // Null value
2546            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2547        ];
2548        let deleted = [false, false, false, true]; // Last one is deleted
2549
2550        let arr_ref = extractor
2551            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2552            .unwrap();
2553
2554        let arr = arr_ref
2555            .as_any()
2556            .downcast_ref::<FixedSizeListArray>()
2557            .unwrap();
2558
2559        assert_eq!(arr.len(), 4);
2560        assert!(arr.is_valid(0)); // Valid vector
2561        assert!(!arr.is_valid(1)); // Missing property
2562        assert!(!arr.is_valid(2)); // Null value
2563        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2564
2565        // Check values
2566        let child = arr
2567            .values()
2568            .as_any()
2569            .downcast_ref::<Float32Array>()
2570            .unwrap();
2571        assert_eq!(child.value(0), 1.0);
2572        assert_eq!(child.value(1), 2.0);
2573        assert_eq!(child.value(2), 3.0);
2574        // Indices 3-5: zeros for missing
2575        // Indices 6-8: zeros for null
2576        // Indices 9-11: zeros for deleted (but marked as valid)
2577        assert_eq!(child.value(9), 0.0);
2578        assert_eq!(child.value(10), 0.0);
2579        assert_eq!(child.value(11), 0.0);
2580    }
2581
2582    #[test]
2583    fn test_build_vector_column_with_list_input() {
2584        let data_type = DataType::Vector { dimensions: 2 };
2585        let extractor = PropertyExtractor::new("test_vec", &data_type);
2586
2587        let props = [
2588            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2589            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2590            Some(Value::Vector(vec![5.0, 6.0])),
2591        ];
2592        let deleted = [false, false, false];
2593
2594        let arr_ref = extractor
2595            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2596            .unwrap();
2597
2598        let arr = arr_ref
2599            .as_any()
2600            .downcast_ref::<FixedSizeListArray>()
2601            .unwrap();
2602
2603        assert_eq!(arr.len(), 3);
2604        assert!(arr.is_valid(0));
2605        assert!(arr.is_valid(1));
2606        assert!(arr.is_valid(2));
2607
2608        // Check values
2609        let child = arr
2610            .values()
2611            .as_any()
2612            .downcast_ref::<Float32Array>()
2613            .unwrap();
2614        assert_eq!(child.value(0), 1.0);
2615        assert_eq!(child.value(1), 2.0);
2616        assert_eq!(child.value(2), 3.0);
2617        assert_eq!(child.value(3), 4.0);
2618        assert_eq!(child.value(4), 5.0);
2619        assert_eq!(child.value(5), 6.0);
2620    }
2621
2622    /// H13: out-of-range i64 values must become NULL in i32/date32 columns
2623    /// instead of silently wrapping to a different number/date.
2624    #[test]
2625    fn test_int32_and_date32_columns_null_out_of_range() {
2626        let dt = DataType::Int64;
2627        let extractor = PropertyExtractor::new("x", &dt);
2628
2629        let over = Value::Int(i64::from(i32::MAX) + 1);
2630        let ok = Value::Int(42);
2631        let vals = [over, ok];
2632        let deleted = [false, false];
2633
2634        let arr = extractor
2635            .build_int32_column(2, &deleted, |i| Some(&vals[i]))
2636            .unwrap();
2637        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2638        assert!(
2639            arr.is_null(0),
2640            "out-of-range i64 must be NULL in an int32 column, not wrapped"
2641        );
2642        assert_eq!(arr.value(1), 42);
2643
2644        let arr = extractor
2645            .build_date32_column(2, &deleted, |i| Some(&vals[i]))
2646            .unwrap();
2647        let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
2648        assert!(
2649            arr.is_null(0),
2650            "out-of-range day count must be NULL in a date32 column, not wrapped"
2651        );
2652        assert_eq!(arr.value(1), 42);
2653    }
2654}