Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt32Builder, UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt32Array, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147pub(crate) fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    // A typed `Map(_, Bytes)` value child carries the `uni_raw_bytes` marker; decode
154    // each value verbatim. CV-encoded map values carry no marker → codec path.
155    let value_hint = raw_bytes_hint(fields[1].metadata());
156    let key_col = structs.column(0);
157    let val_col = structs.column(1);
158    let mut map = HashMap::new();
159    for i in 0..structs.len() {
160        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
161            map.insert(k, arrow_to_value(val_col.as_ref(), i, value_hint));
162        }
163    }
164    Some(map)
165}
166
167/// Convert all elements of an Arrow array into a `Vec<Value>`.
168///
169/// `elem_type` is the schema hint for each element — `Some(DataType::Bytes)` when the
170/// list child field is marked `uni_raw_bytes`, so raw `Bytes` elements decode verbatim.
171fn array_to_value_list(arr: &ArrayRef, elem_type: Option<&DataType>) -> Vec<Value> {
172    (0..arr.len())
173        .map(|i| arrow_to_value(arr.as_ref(), i, elem_type))
174        .collect()
175}
176
177/// Returns `Some(&DataType::Bytes)` when Arrow field metadata marks the field as a
178/// raw `Bytes` value (`uni_raw_bytes=true`), else `None`. Used to discriminate raw
179/// `Bytes` container children from CV-encoded `LargeBinary` without array sniffing.
180fn raw_bytes_hint(metadata: &HashMap<String, String>) -> Option<&'static DataType> {
181    if metadata.get("uni_raw_bytes").is_some_and(|v| v == "true") {
182        Some(&DataType::Bytes)
183    } else {
184        None
185    }
186}
187
188/// Extracts the raw-`Bytes` element hint from a list-like Arrow type's child field.
189fn list_child_bytes_hint(dt: &ArrowDataType) -> Option<&'static DataType> {
190    match dt {
191        ArrowDataType::List(f)
192        | ArrowDataType::LargeList(f)
193        | ArrowDataType::FixedSizeList(f, _) => raw_bytes_hint(f.metadata()),
194        _ => None,
195    }
196}
197
198/// Convert an Arrow array value at a given row index to a Uni Value.
199///
200/// Handles all common Arrow types and recursively processes nested structures
201/// like Lists and Structs. The optional `data_type` parameter provides schema
202/// context for decoding DateTime and Time struct arrays; when provided, it
203/// takes precedence over runtime type detection.
204pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
205    if col.is_null(row) {
206        return Value::Null;
207    }
208
209    // Schema-driven decode for DateTime and Time structs
210    if let Some(dt) = data_type {
211        match dt {
212            DataType::DateTime => {
213                // Expect StructArray with three fields
214                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
215                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
216                        struct_arr.column_by_name("nanos_since_epoch"),
217                        struct_arr.column_by_name("offset_seconds"),
218                        struct_arr.column_by_name("timezone_name"),
219                    )
220                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
221                        nanos_col
222                            .as_any()
223                            .downcast_ref::<TimestampNanosecondArray>(),
224                        offset_col.as_any().downcast_ref::<Int32Array>(),
225                        tz_col.as_any().downcast_ref::<StringArray>(),
226                    )
227                {
228                    if nanos_arr.is_null(row) {
229                        return Value::Null;
230                    }
231                    let nanos = nanos_arr.value(row);
232                    if offset_arr.is_null(row) {
233                        // No offset → LocalDateTime
234                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
235                            nanos_since_epoch: nanos,
236                        });
237                    }
238                    let offset = offset_arr.value(row);
239                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
240                    return Value::Temporal(uni_common::TemporalValue::DateTime {
241                        nanos_since_epoch: nanos,
242                        offset_seconds: offset,
243                        timezone_name: tz_name,
244                    });
245                }
246                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
247                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
248                    let nanos = ts.value(row);
249                    let tz_name = ts.timezone().map(|s| s.to_string());
250                    return Value::Temporal(uni_common::TemporalValue::DateTime {
251                        nanos_since_epoch: nanos,
252                        offset_seconds: 0,
253                        timezone_name: tz_name,
254                    });
255                }
256            }
257            DataType::Time => {
258                // Expect StructArray with two fields
259                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
260                    && let (Some(nanos_col), Some(offset_col)) = (
261                        struct_arr.column_by_name("nanos_since_midnight"),
262                        struct_arr.column_by_name("offset_seconds"),
263                    )
264                    && let (Some(nanos_arr), Some(offset_arr)) = (
265                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
266                        offset_col.as_any().downcast_ref::<Int32Array>(),
267                    )
268                {
269                    // Check field-level nulls before calling .value()
270                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
271                        return Value::Null;
272                    }
273                    let nanos = nanos_arr.value(row);
274                    let offset = offset_arr.value(row);
275                    return Value::Temporal(uni_common::TemporalValue::Time {
276                        nanos_since_midnight: nanos,
277                        offset_seconds: offset,
278                    });
279                }
280                // Fall back to old schema: Time64Nanosecond → Time with offset=0
281                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
282                    let nanos = t.value(row);
283                    return Value::Temporal(uni_common::TemporalValue::Time {
284                        nanos_since_midnight: nanos,
285                        offset_seconds: 0,
286                    });
287                }
288            }
289            DataType::Bytes => {
290                let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
291                    log::warn!("Bytes column is not LargeBinaryArray");
292                    return Value::Null;
293                };
294                if arr.is_null(row) {
295                    return Value::Null;
296                }
297                return Value::Bytes(arr.value(row).to_vec());
298            }
299            DataType::Btic => {
300                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
301                    log::warn!("BTIC column is not FixedSizeBinaryArray");
302                    return Value::Null;
303                };
304                let bytes = fsb.value(row);
305                return match uni_btic::encode::decode_slice(bytes) {
306                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
307                        lo: btic.lo(),
308                        hi: btic.hi(),
309                        meta: btic.meta(),
310                    }),
311                    Err(e) => {
312                        log::warn!("BTIC decode error: {}", e);
313                        Value::Null
314                    }
315                };
316            }
317            DataType::SparseVector { .. } => {
318                let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>() else {
319                    log::warn!("SparseVector column is not StructArray");
320                    return Value::Null;
321                };
322                if struct_arr.is_null(row) {
323                    return Value::Null;
324                }
325                let (Some(indices_list), Some(values_list)) = (
326                    struct_arr
327                        .column_by_name("indices")
328                        .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
329                    struct_arr
330                        .column_by_name("values")
331                        .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
332                ) else {
333                    log::warn!("SparseVector struct missing indices/values list columns");
334                    return Value::Null;
335                };
336                let idx_vals = indices_list.value(row);
337                let Some(idx_arr) = idx_vals.as_any().downcast_ref::<UInt32Array>() else {
338                    log::warn!("SparseVector 'indices' inner not UInt32");
339                    return Value::Null;
340                };
341                let w_vals = values_list.value(row);
342                let Some(w_arr) = w_vals.as_any().downcast_ref::<Float32Array>() else {
343                    log::warn!("SparseVector 'values' inner not Float32");
344                    return Value::Null;
345                };
346                let indices: Vec<u32> = (0..idx_arr.len()).map(|i| idx_arr.value(i)).collect();
347                let values: Vec<f32> = (0..w_arr.len()).map(|i| w_arr.value(i)).collect();
348                return Value::SparseVector { indices, values };
349            }
350            _ => {}
351        }
352    }
353
354    // String types
355    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
356        return Value::String(s.value(row).to_string());
357    }
358
359    // Integer types
360    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
361        return Value::Int(u.value(row) as i64);
362    }
363    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
364        return Value::Int(i.value(row));
365    }
366    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
367        return Value::Int(i.value(row) as i64);
368    }
369
370    // Float types
371    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
372        return Value::Float(f.value(row));
373    }
374    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
375        return Value::Float(f.value(row) as f64);
376    }
377
378    // Boolean type
379    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
380        return Value::Bool(b.value(row));
381    }
382
383    // Fixed-size list: a `Float32` child is a dense vector and round-trips as
384    // `Value::Vector`, preserving type identity (parity with `SparseVector` /
385    // `Btic`); any other element type decodes as a generic list. The recursive
386    // inner-element decode for multivector tokens (via `array_to_value_list`)
387    // reaches this same arm, so a `List<FixedSizeList<Float32>>` multivector
388    // decodes as a `Value::List` of `Value::Vector` tokens.
389    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
390        let inner = list.value(row);
391        if let Some(floats) = inner.as_any().downcast_ref::<Float32Array>() {
392            return Value::Vector((0..floats.len()).map(|i| floats.value(i)).collect());
393        }
394        let elem_hint = list_child_bytes_hint(list.data_type());
395        return Value::List(array_to_value_list(&inner, elem_hint));
396    }
397
398    // Variable-size list
399    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
400        let arr = list.value(row);
401
402        // Map types are stored as List(Struct(key, value)); reconstruct as map
403        if let Some(obj) = try_reconstruct_map(&arr) {
404            return Value::Map(obj);
405        }
406
407        let elem_hint = list_child_bytes_hint(list.data_type());
408        return Value::List(array_to_value_list(&arr, elem_hint));
409    }
410
411    // Large list (variable-size list with i64 offsets)
412    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
413        let elem_hint = list_child_bytes_hint(list.data_type());
414        return Value::List(array_to_value_list(&list.value(row), elem_hint));
415    }
416
417    // Struct type — detect temporal structs by field names before generic handler
418    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
419        // Sparse-vector struct: the result-row path calls with `data_type = None`,
420        // so detect it by Arrow shape here (otherwise the generic struct→Map
421        // handler below reads the `List<UInt32>` indices child as null).
422        if schema::is_sparse_vector_struct(col.data_type()) {
423            if s.is_null(row) {
424                return Value::Null;
425            }
426            if let (Some(idx_list), Some(val_list)) = (
427                s.column_by_name("indices")
428                    .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
429                s.column_by_name("values")
430                    .and_then(|c| c.as_any().downcast_ref::<ListArray>()),
431            ) {
432                let idx_vals = idx_list.value(row);
433                let val_vals = val_list.value(row);
434                if let (Some(ia), Some(va)) = (
435                    idx_vals.as_any().downcast_ref::<UInt32Array>(),
436                    val_vals.as_any().downcast_ref::<Float32Array>(),
437                ) {
438                    let indices = (0..ia.len()).map(|i| ia.value(i)).collect();
439                    let values = (0..va.len()).map(|i| va.value(i)).collect();
440                    return Value::SparseVector { indices, values };
441                }
442            }
443        }
444
445        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
446
447        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
448        if field_names.contains(&"nanos_since_epoch")
449            && field_names.contains(&"offset_seconds")
450            && field_names.contains(&"timezone_name")
451            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
452                s.column_by_name("nanos_since_epoch"),
453                s.column_by_name("offset_seconds"),
454                s.column_by_name("timezone_name"),
455            )
456        {
457            // Try TimestampNanosecond first (standard schema), then Int64 fallback
458            let nanos_opt = nanos_col
459                .as_any()
460                .downcast_ref::<TimestampNanosecondArray>()
461                .map(|a| {
462                    if a.is_null(row) {
463                        None
464                    } else {
465                        Some(a.value(row))
466                    }
467                })
468                .or_else(|| {
469                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
470                        if a.is_null(row) {
471                            None
472                        } else {
473                            Some(a.value(row))
474                        }
475                    })
476                });
477            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
478                if a.is_null(row) {
479                    None
480                } else {
481                    Some(a.value(row))
482                }
483            });
484
485            if let Some(Some(nanos)) = nanos_opt {
486                match offset_opt {
487                    Some(Some(offset)) => {
488                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
489                            if a.is_null(row) {
490                                None
491                            } else {
492                                Some(a.value(row).to_string())
493                            }
494                        });
495                        return Value::Temporal(uni_common::TemporalValue::DateTime {
496                            nanos_since_epoch: nanos,
497                            offset_seconds: offset,
498                            timezone_name: tz_name,
499                        });
500                    }
501                    _ => {
502                        // No offset → LocalDateTime
503                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
504                            nanos_since_epoch: nanos,
505                        });
506                    }
507                }
508            }
509        }
510
511        // Time struct: {nanos_since_midnight, offset_seconds}
512        if field_names.contains(&"nanos_since_midnight")
513            && field_names.contains(&"offset_seconds")
514            && let (Some(nanos_col), Some(offset_col)) = (
515                s.column_by_name("nanos_since_midnight"),
516                s.column_by_name("offset_seconds"),
517            )
518        {
519            // Try Time64Nanosecond first (standard schema), then Int64 fallback
520            let nanos_opt = nanos_col
521                .as_any()
522                .downcast_ref::<Time64NanosecondArray>()
523                .map(|a| {
524                    if a.is_null(row) {
525                        None
526                    } else {
527                        Some(a.value(row))
528                    }
529                })
530                .or_else(|| {
531                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
532                        if a.is_null(row) {
533                            None
534                        } else {
535                            Some(a.value(row))
536                        }
537                    })
538                });
539            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
540                if a.is_null(row) {
541                    None
542                } else {
543                    Some(a.value(row))
544                }
545            });
546
547            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
548                return Value::Temporal(uni_common::TemporalValue::Time {
549                    nanos_since_midnight: nanos,
550                    offset_seconds: offset,
551                });
552            }
553        }
554
555        // Generic struct → Map
556        let mut map = HashMap::new();
557        for (field, child) in s.fields().iter().zip(s.columns()) {
558            map.insert(
559                field.name().clone(),
560                arrow_to_value(child.as_ref(), row, None),
561            );
562        }
563        return Value::Map(map);
564    }
565
566    // Date32 type (days since epoch) - return as Value::Temporal
567    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
568        let days = d.value(row);
569        return Value::Temporal(uni_common::TemporalValue::Date {
570            days_since_epoch: days,
571        });
572    }
573
574    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
575    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
576        let nanos = ts.value(row);
577        return match ts.timezone() {
578            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
579                nanos_since_epoch: nanos,
580                offset_seconds: 0,
581                timezone_name: Some(tz.to_string()),
582            }),
583            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
584                nanos_since_epoch: nanos,
585            }),
586        };
587    }
588
589    // Time64 (nanoseconds since midnight) - return as Value::Temporal
590    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
591        let nanos = t.value(row);
592        return Value::Temporal(uni_common::TemporalValue::LocalTime {
593            nanos_since_midnight: nanos,
594        });
595    }
596
597    // Time64 (microseconds since midnight) - convert to nanoseconds
598    if let Some(t) = col
599        .as_any()
600        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
601    {
602        let micros = t.value(row);
603        return Value::Temporal(uni_common::TemporalValue::LocalTime {
604            nanos_since_midnight: micros * 1000,
605        });
606    }
607
608    // DurationMicrosecond - convert to Duration with nanoseconds
609    if let Some(d) = col
610        .as_any()
611        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
612    {
613        let micros = d.value(row);
614        let total_nanos = micros * 1000;
615        let seconds = total_nanos / 1_000_000_000;
616        let remaining_nanos = total_nanos % 1_000_000_000;
617        return Value::Temporal(uni_common::TemporalValue::Duration {
618            months: 0,
619            days: 0,
620            nanos: seconds * 1_000_000_000 + remaining_nanos,
621        });
622    }
623
624    // IntervalMonthDayNano - return as Value::Temporal(Duration)
625    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
626        let val = interval.value(row);
627        return Value::Temporal(uni_common::TemporalValue::Duration {
628            months: val.months as i64,
629            days: val.days as i64,
630            nanos: val.nanoseconds,
631        });
632    }
633
634    // LargeBinary (CypherValue MessagePack-tagged encoding)
635    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
636        let bytes = b.value(row);
637        if bytes.is_empty() {
638            return Value::Null;
639        }
640        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
641            eprintln!("CypherValue decode error: {}", e);
642            Value::Null
643        });
644    }
645
646    // FixedSizeBinary(24) — BTIC temporal interval
647    if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
648        && fsb.value_length() == 24
649    {
650        let bytes = fsb.value(row);
651        return match uni_btic::encode::decode_slice(bytes) {
652            Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
653                lo: btic.lo(),
654                hi: btic.hi(),
655                meta: btic.meta(),
656            }),
657            Err(e) => {
658                log::warn!("BTIC decode error: {}", e);
659                Value::Null
660            }
661        };
662    }
663
664    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
665    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
666        let bytes = b.value(row);
667        return Crdt::from_msgpack(bytes)
668            .ok()
669            .and_then(|crdt| serde_json::to_value(&crdt).ok())
670            .map(Value::from)
671            .unwrap_or(Value::Null);
672    }
673
674    // Fallback
675    Value::Null
676}
677
678fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
679    let mut builder = UInt64Builder::with_capacity(values.len());
680    for v in values {
681        if let Some(n) = v.as_u64() {
682            builder.append_value(n);
683        } else {
684            builder.append_null();
685        }
686    }
687    Arc::new(builder.finish())
688}
689
690fn values_to_int64_array(values: &[Value]) -> ArrayRef {
691    let mut builder = Int64Builder::with_capacity(values.len());
692    for v in values {
693        if let Some(n) = v.as_i64() {
694            builder.append_value(n);
695        } else {
696            builder.append_null();
697        }
698    }
699    Arc::new(builder.finish())
700}
701
702fn values_to_int32_array(values: &[Value]) -> ArrayRef {
703    let mut builder = Int32Builder::with_capacity(values.len());
704    for v in values {
705        if let Some(n) = v.as_i64() {
706            builder.append_value(n as i32);
707        } else {
708            builder.append_null();
709        }
710    }
711    Arc::new(builder.finish())
712}
713
714fn values_to_string_array(values: &[Value]) -> ArrayRef {
715    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
716    for v in values {
717        if let Some(s) = v.as_str() {
718            builder.append_value(s);
719        } else if v.is_null() {
720            builder.append_null();
721        } else {
722            builder.append_value(v.to_string());
723        }
724    }
725    Arc::new(builder.finish())
726}
727
728fn values_to_bool_array(values: &[Value]) -> ArrayRef {
729    let mut builder = BooleanBuilder::with_capacity(values.len());
730    for v in values {
731        if let Some(b) = v.as_bool() {
732            builder.append_value(b);
733        } else {
734            builder.append_null();
735        }
736    }
737    Arc::new(builder.finish())
738}
739
740fn values_to_float32_array(values: &[Value]) -> ArrayRef {
741    let mut builder = Float32Builder::with_capacity(values.len());
742    for v in values {
743        if let Some(n) = v.as_f64() {
744            builder.append_value(n as f32);
745        } else {
746            builder.append_null();
747        }
748    }
749    Arc::new(builder.finish())
750}
751
752fn values_to_float64_array(values: &[Value]) -> ArrayRef {
753    let mut builder = Float64Builder::with_capacity(values.len());
754    for v in values {
755        if let Some(n) = v.as_f64() {
756            builder.append_value(n);
757        } else {
758            builder.append_null();
759        }
760    }
761    Arc::new(builder.finish())
762}
763
764fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
765    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
766    for v in values {
767        match v {
768            Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
769                let btic = uni_btic::Btic::new(*lo, *hi, *meta)
770                    .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
771                builder.append_value(uni_btic::encode::encode(&btic))?;
772            }
773            Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
774                Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
775                Err(_) => builder.append_null(),
776            },
777            Value::List(bytes) => {
778                let b: Vec<u8> = bytes
779                    .iter()
780                    .map(|bv| bv.as_u64().unwrap_or(0) as u8)
781                    .collect();
782                if b.len() as i32 == size {
783                    builder.append_value(&b)?;
784                } else {
785                    builder.append_null();
786                }
787            }
788            _ => builder.append_null(),
789        }
790    }
791    Ok(Arc::new(builder.finish()))
792}
793
794/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
795///
796/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
797/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
798///
799/// # Arguments
800/// - `val`: Optional property value to extract from
801/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
802/// - `dimensions`: Expected vector dimensions
803///
804/// # Returns
805/// - Tuple of (vector values, validity flag)
806///   - Vector always has exactly `dimensions` elements
807///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
808pub fn extract_vector_f32_values(
809    val: Option<&Value>,
810    is_deleted: bool,
811    dimensions: usize,
812) -> (Vec<f32>, bool) {
813    let zeros = || vec![0.0_f32; dimensions];
814
815    // Deleted entries always return zeros with valid=true
816    if is_deleted {
817        return (zeros(), true);
818    }
819
820    match val {
821        // Native f32 vector (Value::Vector)
822        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
823        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
824        // List of values (Value::List) - convert to f32
825        Some(Value::List(arr)) if arr.len() == dimensions => {
826            let values: Vec<f32> = arr
827                .iter()
828                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
829                .collect();
830            (values, true)
831        }
832        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
833        _ => (zeros(), false),                    // Missing or unsupported value
834    }
835}
836
837fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
838    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
839    for v in values {
840        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
841        for val in vals {
842            builder.values().append_value(val);
843        }
844        builder.append(valid);
845    }
846    Arc::new(builder.finish())
847}
848
849fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
850    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
851    for v in values {
852        if v.is_null() {
853            builder.append_null();
854        } else if let Value::Temporal(tv) = v {
855            match tv {
856                uni_common::TemporalValue::DateTime {
857                    nanos_since_epoch, ..
858                }
859                | uni_common::TemporalValue::LocalDateTime {
860                    nanos_since_epoch, ..
861                } => builder.append_value(*nanos_since_epoch),
862                _ => builder.append_null(),
863            }
864        } else if let Some(n) = v.as_i64() {
865            builder.append_value(n);
866        } else if let Some(s) = v.as_str() {
867            match parse_datetime_to_nanos(s) {
868                Some(nanos) => builder.append_value(nanos),
869                None => builder.append_null(),
870            }
871        } else {
872            builder.append_null();
873        }
874    }
875
876    let arr = builder.finish();
877    if let Some(tz) = tz {
878        Arc::new(arr.with_timezone(tz.as_ref()))
879    } else {
880        Arc::new(arr)
881    }
882}
883
884/// Build a DateTime struct array from values.
885///
886/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
887/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
888fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
889    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
890    let mut offset_builder = Int32Builder::with_capacity(values.len());
891    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
892    let mut null_buffer = BooleanBufferBuilder::new(values.len());
893
894    for v in values {
895        match v {
896            Value::Temporal(uni_common::TemporalValue::DateTime {
897                nanos_since_epoch,
898                offset_seconds,
899                timezone_name,
900            }) => {
901                nanos_builder.append_value(*nanos_since_epoch);
902                offset_builder.append_value(*offset_seconds);
903                tz_builder.append_option(timezone_name.as_deref());
904                null_buffer.append(true);
905            }
906            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
907                nanos_builder.append_value(*nanos_since_epoch);
908                offset_builder.append_null();
909                tz_builder.append_null();
910                null_buffer.append(true);
911            }
912            _ => {
913                nanos_builder.append_null();
914                offset_builder.append_null();
915                tz_builder.append_null();
916                null_buffer.append(false);
917            }
918        }
919    }
920
921    let struct_arr = StructArray::new(
922        schema::datetime_struct_fields(),
923        vec![
924            Arc::new(nanos_builder.finish()) as ArrayRef,
925            Arc::new(offset_builder.finish()) as ArrayRef,
926            Arc::new(tz_builder.finish()) as ArrayRef,
927        ],
928        Some(null_buffer.finish().into()),
929    );
930    Arc::new(struct_arr)
931}
932
933/// Build a Time struct array from values.
934///
935/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
936/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
937fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
938    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
939    let mut offset_builder = Int32Builder::with_capacity(values.len());
940    let mut null_buffer = BooleanBufferBuilder::new(values.len());
941
942    for v in values {
943        match v {
944            Value::Temporal(uni_common::TemporalValue::Time {
945                nanos_since_midnight,
946                offset_seconds,
947            }) => {
948                nanos_builder.append_value(*nanos_since_midnight);
949                offset_builder.append_value(*offset_seconds);
950                null_buffer.append(true);
951            }
952            Value::Temporal(uni_common::TemporalValue::LocalTime {
953                nanos_since_midnight,
954            }) => {
955                nanos_builder.append_value(*nanos_since_midnight);
956                offset_builder.append_null();
957                null_buffer.append(true);
958            }
959            _ => {
960                nanos_builder.append_null();
961                offset_builder.append_null();
962                null_buffer.append(false);
963            }
964        }
965    }
966
967    let struct_arr = StructArray::new(
968        schema::time_struct_fields(),
969        vec![
970            Arc::new(nanos_builder.finish()) as ArrayRef,
971            Arc::new(offset_builder.finish()) as ArrayRef,
972        ],
973        Some(null_buffer.finish().into()),
974    );
975    Arc::new(struct_arr)
976}
977
978fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
979    let mut builder =
980        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
981    for v in values {
982        if v.is_null() {
983            builder.append_null();
984        } else {
985            // Encode as CypherValue (MessagePack-tagged)
986            let cv_bytes = uni_common::cypher_value_codec::encode(v);
987            builder.append_value(&cv_bytes);
988        }
989    }
990    Arc::new(builder.finish())
991}
992
993/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
994pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
995    match dt {
996        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
997        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
998        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
999        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
1000        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
1001        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
1002        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
1003        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
1004        ArrowDataType::FixedSizeList(inner, size) => {
1005            if inner.data_type() == &ArrowDataType::Float32 {
1006                Ok(values_to_fixed_size_list_f32_array(values, *size))
1007            } else {
1008                Err(anyhow!("Unsupported FixedSizeList inner type"))
1009            }
1010        }
1011        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
1012            Ok(values_to_timestamp_array(values, tz.as_ref()))
1013        }
1014        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
1015            Ok(values_to_timestamp_array(values, tz.as_ref()))
1016        }
1017        ArrowDataType::Date32 => {
1018            let mut builder = Date32Builder::with_capacity(values.len());
1019            for v in values {
1020                if v.is_null() {
1021                    builder.append_null();
1022                } else if let Value::Temporal(uni_common::TemporalValue::Date {
1023                    days_since_epoch,
1024                }) = v
1025                {
1026                    builder.append_value(*days_since_epoch);
1027                } else if let Some(n) = v.as_i64() {
1028                    builder.append_value(n as i32);
1029                } else {
1030                    builder.append_null();
1031                }
1032            }
1033            Ok(Arc::new(builder.finish()))
1034        }
1035        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
1036            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
1037            for v in values {
1038                if v.is_null() {
1039                    builder.append_null();
1040                } else if let Value::Temporal(tv) = v {
1041                    match tv {
1042                        uni_common::TemporalValue::LocalTime {
1043                            nanos_since_midnight,
1044                        }
1045                        | uni_common::TemporalValue::Time {
1046                            nanos_since_midnight,
1047                            ..
1048                        } => builder.append_value(*nanos_since_midnight),
1049                        _ => builder.append_null(),
1050                    }
1051                } else if let Some(n) = v.as_i64() {
1052                    builder.append_value(n);
1053                } else {
1054                    builder.append_null();
1055                }
1056            }
1057            Ok(Arc::new(builder.finish()))
1058        }
1059        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
1060            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
1061            for v in values {
1062                if v.is_null() {
1063                    builder.append_null();
1064                } else if let Value::Temporal(tv) = v {
1065                    match tv {
1066                        uni_common::TemporalValue::LocalTime {
1067                            nanos_since_midnight,
1068                        }
1069                        | uni_common::TemporalValue::Time {
1070                            nanos_since_midnight,
1071                            ..
1072                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
1073                        _ => builder.append_null(),
1074                    }
1075                } else if let Some(n) = v.as_i64() {
1076                    builder.append_value(n);
1077                } else {
1078                    builder.append_null();
1079                }
1080            }
1081            Ok(Arc::new(builder.finish()))
1082        }
1083        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
1084            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
1085            for v in values {
1086                if v.is_null() {
1087                    builder.append_null();
1088                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1089                    months,
1090                    days,
1091                    nanos,
1092                }) = v
1093                {
1094                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
1095                        months: *months as i32,
1096                        days: *days as i32,
1097                        nanoseconds: *nanos,
1098                    });
1099                } else {
1100                    builder.append_null();
1101                }
1102            }
1103            Ok(Arc::new(builder.finish()))
1104        }
1105        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1106            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1107            for v in values {
1108                if v.is_null() {
1109                    builder.append_null();
1110                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1111                    months,
1112                    days,
1113                    nanos,
1114                }) = v
1115                {
1116                    let total_micros =
1117                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1118                    builder.append_value(total_micros);
1119                } else if let Some(n) = v.as_i64() {
1120                    builder.append_value(n);
1121                } else {
1122                    builder.append_null();
1123                }
1124            }
1125            Ok(Arc::new(builder.finish()))
1126        }
1127        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1128        ArrowDataType::List(field) => {
1129            if field.data_type() == &ArrowDataType::Utf8 {
1130                let mut builder = ListBuilder::new(StringBuilder::new());
1131                for v in values {
1132                    if let Value::List(arr) = v {
1133                        for item in arr {
1134                            if let Some(s) = item.as_str() {
1135                                builder.values().append_value(s);
1136                            } else {
1137                                builder.values().append_null();
1138                            }
1139                        }
1140                        builder.append(true);
1141                    } else {
1142                        builder.append_null();
1143                    }
1144                }
1145                Ok(Arc::new(builder.finish()))
1146            } else {
1147                Err(anyhow!(
1148                    "Unsupported List inner type: {:?}",
1149                    field.data_type()
1150                ))
1151            }
1152        }
1153        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1154            Ok(values_to_datetime_struct_array(values))
1155        }
1156        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1157            Ok(values_to_time_struct_array(values))
1158        }
1159        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1160    }
1161}
1162
1163/// Property value extractor for building Arrow columns from entity properties.
1164pub struct PropertyExtractor<'a> {
1165    data_type: &'a DataType,
1166}
1167
1168/// Extract sparse `(indices, values)` from a property value.
1169///
1170/// Accepts the native [`Value::SparseVector`] **and** the degraded
1171/// `Value::Map { "indices": [..], "values": [..] }` form that a `SparseVector`
1172/// collapses into when round-tripped through `#[serde(untagged)]` persistence —
1173/// notably the WAL, which serializes mutations via `serde_json`. This mirrors
1174/// how the dense `Vector` column tolerates a `Value::List` from the same hazard.
1175/// Returns `None` for any other value (→ a null struct row).
1176fn sparse_pair_from_value(v: &Value) -> Option<(Vec<u32>, Vec<f32>)> {
1177    match v {
1178        Value::SparseVector { indices, values } => {
1179            // Guard the native arm the same way the `Value::Map` arm below does: a
1180            // length-mismatched value would otherwise emit an Arrow struct whose
1181            // `indices`/`values` child lists desync, which the reader silently
1182            // truncates via `.zip()` — pairing weights with the wrong term ids
1183            // (issue #95). Reject it to a null struct row instead.
1184            if indices.len() != values.len() {
1185                return None;
1186            }
1187            Some((indices.clone(), values.clone()))
1188        }
1189        Value::Map(m) => {
1190            let idx = match m.get("indices") {
1191                Some(Value::List(l)) => l,
1192                _ => return None,
1193            };
1194            let vals = match m.get("values") {
1195                Some(Value::List(l)) => l,
1196                _ => return None,
1197            };
1198            let indices: Vec<u32> = idx
1199                .iter()
1200                .map(|x| x.as_u64().map(|n| n as u32))
1201                .collect::<Option<_>>()?;
1202            let values: Vec<f32> = vals
1203                .iter()
1204                .map(|x| x.as_f64().map(|n| n as f32))
1205                .collect::<Option<_>>()?;
1206            if indices.len() != values.len() {
1207                return None;
1208            }
1209            Some((indices, values))
1210        }
1211        _ => None,
1212    }
1213}
1214
1215/// Build a sparse-vector `Struct { indices: List<UInt32>, values: List<Float32> }`
1216/// Arrow column from a sequence of property values. Used by the query result
1217/// projection path (`RETURN d.sparse_col`). A `None` / non-sparse / map-degraded
1218/// value that can't be extracted becomes a null struct row. Mirrors
1219/// `PropertyExtractor::build_sparse_vector_column` but operates on owned values.
1220pub fn build_sparse_vector_array(values: &[Option<Value>]) -> ArrayRef {
1221    let mut indices_builder = ListBuilder::new(UInt32Builder::new());
1222    let mut values_builder = ListBuilder::new(Float32Builder::new());
1223    let mut null_buffer = BooleanBufferBuilder::new(values.len());
1224    for v in values {
1225        match v.as_ref().and_then(sparse_pair_from_value) {
1226            Some((indices, vals)) => {
1227                for ix in indices {
1228                    indices_builder.values().append_value(ix);
1229                }
1230                indices_builder.append(true);
1231                for w in vals {
1232                    values_builder.values().append_value(w);
1233                }
1234                values_builder.append(true);
1235                null_buffer.append(true);
1236            }
1237            None => {
1238                indices_builder.append(true);
1239                values_builder.append(true);
1240                null_buffer.append(false);
1241            }
1242        }
1243    }
1244    let struct_arr = StructArray::new(
1245        schema::sparse_vector_struct_fields(),
1246        vec![
1247            Arc::new(indices_builder.finish()) as ArrayRef,
1248            Arc::new(values_builder.finish()) as ArrayRef,
1249        ],
1250        Some(null_buffer.finish().into()),
1251    );
1252    Arc::new(struct_arr)
1253}
1254
1255/// Build a multi-vector `List<FixedSizeList<Float32, dimensions>>` Arrow column.
1256///
1257/// Used by the query result projection path (`RETURN d.multivector_col`) on the
1258/// L0 (unflushed) read path. Mirrors the `DataType::Vector { dimensions }` arm of
1259/// `PropertyExtractor::build_list_column` but operates on owned values: a `None`
1260/// or non-list value becomes a null row, and each token is validated through
1261/// `extract_vector_f32_values` (a wrong-dimension or non-numeric token becomes a
1262/// null inner vector), so failure modes match the write path.
1263///
1264/// # Examples
1265/// ```ignore
1266/// let col = build_multivector_array(&[Some(Value::List(vec![Value::Vector(vec![1.0, 2.0])]))], 2);
1267/// ```
1268pub fn build_multivector_array(values: &[Option<Value>], dimensions: usize) -> ArrayRef {
1269    let dim = dimensions as i32;
1270    let mut builder = ListBuilder::new(FixedSizeListBuilder::new(Float32Builder::new(), dim));
1271    for v in values {
1272        match v.as_ref().and_then(|v| v.as_array()) {
1273            Some(arr) => {
1274                // Variable token count per row: one fixed-size inner vector per
1275                // token. `extract_vector_f32_values` always yields exactly
1276                // `dimensions` values, satisfying the FixedSizeList stride.
1277                for tok in arr {
1278                    let (vals, valid) = extract_vector_f32_values(Some(tok), false, dimensions);
1279                    for f in vals {
1280                        builder.values().values().append_value(f);
1281                    }
1282                    builder.values().append(valid);
1283                }
1284                builder.append(true);
1285            }
1286            None => builder.append_null(),
1287        }
1288    }
1289    Arc::new(builder.finish())
1290}
1291
1292impl<'a> PropertyExtractor<'a> {
1293    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1294        Self { data_type }
1295    }
1296
1297    /// Build an Arrow column from a slice of property maps.
1298    /// The `deleted` slice indicates which entries are deleted (use default values).
1299    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1300    where
1301        F: Fn(usize) -> Option<&'a Value>,
1302    {
1303        match self.data_type {
1304            DataType::String => self.build_string_column(len, deleted, get_props),
1305            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1306            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1307            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1308            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1309            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1310            DataType::Vector { dimensions } => {
1311                self.build_vector_column(len, deleted, get_props, *dimensions)
1312            }
1313            DataType::SparseVector { .. } => {
1314                self.build_sparse_vector_column(len, deleted, get_props)
1315            }
1316            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1317            DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1318            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1319            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1320            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1321            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1322            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1323            DataType::Date => self.build_date32_column(len, deleted, get_props),
1324            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1325            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1326            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1327            _ => Err(anyhow!(
1328                "Unsupported data type for arrow conversion: {:?}",
1329                self.data_type
1330            )),
1331        }
1332    }
1333
1334    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1335    where
1336        F: Fn(usize) -> Option<&'a Value>,
1337    {
1338        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1339        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1340            let prop = get_props(i);
1341            if let Some(s) = prop.and_then(|v| v.as_str()) {
1342                builder.append_value(s);
1343            } else if let Some(Value::Temporal(tv)) = prop {
1344                builder.append_value(tv.to_string());
1345            } else if is_deleted {
1346                builder.append_value("");
1347            } else {
1348                builder.append_null();
1349            }
1350        }
1351        Ok(Arc::new(builder.finish()))
1352    }
1353
1354    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1355    where
1356        F: Fn(usize) -> Option<&'a Value>,
1357    {
1358        let mut values = Vec::with_capacity(len);
1359        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1360            // i64 -> i32 via try_from: an out-of-range value becomes NULL rather
1361            // than silently wrapping to a different number. (review H13)
1362            let val = get_props(i)
1363                .and_then(|v| v.as_i64())
1364                .and_then(|v| i32::try_from(v).ok());
1365            if val.is_none() && is_deleted {
1366                values.push(Some(0));
1367            } else {
1368                values.push(val);
1369            }
1370        }
1371        Ok(Arc::new(Int32Array::from(values)))
1372    }
1373
1374    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1375    where
1376        F: Fn(usize) -> Option<&'a Value>,
1377    {
1378        let mut values = Vec::with_capacity(len);
1379        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1380            let val = get_props(i).and_then(|v| v.as_i64());
1381            if val.is_none() && is_deleted {
1382                values.push(Some(0));
1383            } else {
1384                values.push(val);
1385            }
1386        }
1387        Ok(Arc::new(Int64Array::from(values)))
1388    }
1389
1390    fn build_timestamp_column<F>(
1391        &self,
1392        len: usize,
1393        deleted: &[bool],
1394        get_props: F,
1395    ) -> Result<ArrayRef>
1396    where
1397        F: Fn(usize) -> Option<&'a Value>,
1398    {
1399        let mut values = Vec::with_capacity(len);
1400        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1401            let val = get_props(i);
1402            let ts = if is_deleted || val.is_none() {
1403                Some(0i64)
1404            } else if let Some(Value::Temporal(tv)) = val {
1405                match tv {
1406                    uni_common::TemporalValue::DateTime {
1407                        nanos_since_epoch, ..
1408                    }
1409                    | uni_common::TemporalValue::LocalDateTime {
1410                        nanos_since_epoch, ..
1411                    } => Some(*nanos_since_epoch),
1412                    _ => None,
1413                }
1414            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1415                Some(v)
1416            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1417                parse_datetime_to_nanos(s)
1418            } else {
1419                None
1420            };
1421
1422            if is_deleted {
1423                values.push(Some(0));
1424            } else {
1425                values.push(ts);
1426            }
1427        }
1428        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1429        Ok(Arc::new(arr))
1430    }
1431
1432    fn build_datetime_struct_column<F>(
1433        &self,
1434        len: usize,
1435        deleted: &[bool],
1436        get_props: F,
1437    ) -> Result<ArrayRef>
1438    where
1439        F: Fn(usize) -> Option<&'a Value>,
1440    {
1441        let values = self.collect_values_or_null(len, deleted, &get_props);
1442        Ok(values_to_datetime_struct_array(&values))
1443    }
1444
1445    fn build_time_struct_column<F>(
1446        &self,
1447        len: usize,
1448        deleted: &[bool],
1449        get_props: F,
1450    ) -> Result<ArrayRef>
1451    where
1452        F: Fn(usize) -> Option<&'a Value>,
1453    {
1454        let values = self.collect_values_or_null(len, deleted, &get_props);
1455        Ok(values_to_time_struct_array(&values))
1456    }
1457
1458    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1459    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1460    where
1461        F: Fn(usize) -> Option<&'a Value>,
1462    {
1463        deleted
1464            .iter()
1465            .enumerate()
1466            .take(len)
1467            .map(|(i, &is_deleted)| {
1468                if is_deleted {
1469                    Value::Null
1470                } else {
1471                    get_props(i).cloned().unwrap_or(Value::Null)
1472                }
1473            })
1474            .collect()
1475    }
1476
1477    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1478    where
1479        F: Fn(usize) -> Option<&'a Value>,
1480    {
1481        let mut builder = Date32Builder::with_capacity(len);
1482        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1483
1484        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1485            let val = get_props(i);
1486            let days = if is_deleted || val.is_none() {
1487                Some(0)
1488            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1489                days_since_epoch,
1490            })) = val
1491            {
1492                Some(*days_since_epoch)
1493            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1494                // i64 day count -> i32: an out-of-range value becomes NULL
1495                // rather than silently wrapping to a different date. (review H13)
1496                i32::try_from(v).ok()
1497            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1498                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1499                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1500                    Err(_) => None,
1501                }
1502            } else {
1503                None
1504            };
1505
1506            if is_deleted {
1507                builder.append_value(0);
1508            } else if let Some(v) = days {
1509                builder.append_value(v);
1510            } else {
1511                builder.append_null();
1512            }
1513        }
1514        Ok(Arc::new(builder.finish()))
1515    }
1516
1517    fn build_duration_column<F>(
1518        &self,
1519        len: usize,
1520        deleted: &[bool],
1521        get_props: F,
1522    ) -> Result<ArrayRef>
1523    where
1524        F: Fn(usize) -> Option<&'a Value>,
1525    {
1526        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1527        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1528        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1529            let raw_val = get_props(i);
1530            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1531            {
1532                let encoded = uni_common::cypher_value_codec::encode(val);
1533                builder.append_value(&encoded);
1534            } else if is_deleted {
1535                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1536                    months: 0,
1537                    days: 0,
1538                    nanos: 0,
1539                });
1540                let encoded = uni_common::cypher_value_codec::encode(&zero);
1541                builder.append_value(&encoded);
1542            } else {
1543                builder.append_null();
1544            }
1545        }
1546        Ok(Arc::new(builder.finish()))
1547    }
1548
1549    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1550    where
1551        F: Fn(usize) -> Option<&'a Value>,
1552    {
1553        const ENCODED_LEN: i32 = 24;
1554        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1555        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1556            let raw_val = get_props(i);
1557            let btic = match raw_val {
1558                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1559                    uni_btic::Btic::new(*lo, *hi, *meta)
1560                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1561                ),
1562                Some(Value::String(s)) => Some(
1563                    uni_btic::parse::parse_btic_literal(s)
1564                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1565                ),
1566                _ => None,
1567            };
1568
1569            if let Some(b) = btic {
1570                builder.append_value(uni_btic::encode::encode(&b))?;
1571            } else if is_deleted {
1572                builder.append_value([0u8; ENCODED_LEN as usize])?;
1573            } else {
1574                builder.append_null();
1575            }
1576        }
1577        Ok(Arc::new(builder.finish()))
1578    }
1579
1580    fn build_float32_column<F>(
1581        &self,
1582        len: usize,
1583        deleted: &[bool],
1584        get_props: F,
1585    ) -> Result<ArrayRef>
1586    where
1587        F: Fn(usize) -> Option<&'a Value>,
1588    {
1589        let mut values = Vec::with_capacity(len);
1590        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1591            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1592            if val.is_none() && is_deleted {
1593                values.push(Some(0.0));
1594            } else {
1595                values.push(val);
1596            }
1597        }
1598        Ok(Arc::new(Float32Array::from(values)))
1599    }
1600
1601    fn build_float64_column<F>(
1602        &self,
1603        len: usize,
1604        deleted: &[bool],
1605        get_props: F,
1606    ) -> Result<ArrayRef>
1607    where
1608        F: Fn(usize) -> Option<&'a Value>,
1609    {
1610        let mut values = Vec::with_capacity(len);
1611        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1612            let val = get_props(i).and_then(|v| v.as_f64());
1613            if val.is_none() && is_deleted {
1614                values.push(Some(0.0));
1615            } else {
1616                values.push(val);
1617            }
1618        }
1619        Ok(Arc::new(Float64Array::from(values)))
1620    }
1621
1622    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1623    where
1624        F: Fn(usize) -> Option<&'a Value>,
1625    {
1626        let mut values = Vec::with_capacity(len);
1627        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1628            let val = get_props(i).and_then(|v| v.as_bool());
1629            if val.is_none() && is_deleted {
1630                values.push(Some(false));
1631            } else {
1632                values.push(val);
1633            }
1634        }
1635        Ok(Arc::new(BooleanArray::from(values)))
1636    }
1637
1638    fn build_vector_column<F>(
1639        &self,
1640        len: usize,
1641        deleted: &[bool],
1642        get_props: F,
1643        dimensions: usize,
1644    ) -> Result<ArrayRef>
1645    where
1646        F: Fn(usize) -> Option<&'a Value>,
1647    {
1648        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1649
1650        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1651            let val = get_props(i);
1652            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1653            for v in values {
1654                builder.values().append_value(v);
1655            }
1656            builder.append(valid);
1657        }
1658        Ok(Arc::new(builder.finish()))
1659    }
1660
1661    /// Build a sparse-vector column as `Struct { indices: List<UInt32>,
1662    /// values: List<Float32> }` (see `schema::sparse_vector_struct_fields`).
1663    /// Deleted, missing, or non-sparse rows become null struct rows carrying
1664    /// empty child lists — mirroring how `build_vector_column` nulls deleted
1665    /// rows. An empty sparse vector is stored as two empty (non-null) lists.
1666    fn build_sparse_vector_column<F>(
1667        &self,
1668        len: usize,
1669        deleted: &[bool],
1670        get_props: F,
1671    ) -> Result<ArrayRef>
1672    where
1673        F: Fn(usize) -> Option<&'a Value>,
1674    {
1675        let mut indices_builder = ListBuilder::new(UInt32Builder::new());
1676        let mut values_builder = ListBuilder::new(Float32Builder::new());
1677        let mut null_buffer = BooleanBufferBuilder::new(len);
1678
1679        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1680            let pair = if is_deleted {
1681                None
1682            } else {
1683                get_props(i).and_then(sparse_pair_from_value)
1684            };
1685            match pair {
1686                Some((indices, values)) => {
1687                    for ix in indices {
1688                        indices_builder.values().append_value(ix);
1689                    }
1690                    indices_builder.append(true);
1691                    for w in values {
1692                        values_builder.values().append_value(w);
1693                    }
1694                    values_builder.append(true);
1695                    null_buffer.append(true);
1696                }
1697                None => {
1698                    indices_builder.append(true);
1699                    values_builder.append(true);
1700                    null_buffer.append(false);
1701                }
1702            }
1703        }
1704
1705        let struct_arr = StructArray::new(
1706            schema::sparse_vector_struct_fields(),
1707            vec![
1708                Arc::new(indices_builder.finish()) as ArrayRef,
1709                Arc::new(values_builder.finish()) as ArrayRef,
1710            ],
1711            Some(null_buffer.finish().into()),
1712        );
1713        Ok(Arc::new(struct_arr))
1714    }
1715
1716    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1717    where
1718        F: Fn(usize) -> Option<&'a Value>,
1719    {
1720        let null_val = Value::Null;
1721        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1722        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1723            let val = get_props(i);
1724            let uni_val = if val.is_none() && is_deleted {
1725                &null_val
1726            } else {
1727                val.unwrap_or(&null_val)
1728            };
1729            // Encode to CypherValue (MessagePack-tagged)
1730            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1731            builder.append_value(&cv_bytes);
1732        }
1733        Ok(Arc::new(builder.finish()))
1734    }
1735
1736    fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1737    where
1738        F: Fn(usize) -> Option<&'a Value>,
1739    {
1740        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1741        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1742            let val = get_props(i);
1743            if let Some(Value::Bytes(b)) = val {
1744                builder.append_value(b);
1745            } else if is_deleted {
1746                builder.append_value(&[][..]);
1747            } else {
1748                builder.append_null();
1749            }
1750        }
1751        Ok(Arc::new(builder.finish()))
1752    }
1753
1754    fn build_list_column<F>(
1755        &self,
1756        len: usize,
1757        deleted: &[bool],
1758        get_props: F,
1759        inner: &DataType,
1760    ) -> Result<ArrayRef>
1761    where
1762        F: Fn(usize) -> Option<&'a Value>,
1763    {
1764        match inner {
1765            DataType::String => {
1766                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1767                    if let Some(s) = v.as_str() {
1768                        b.append_value(s);
1769                    } else {
1770                        b.append_null();
1771                    }
1772                })
1773            }
1774            DataType::Int64 => {
1775                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1776                    if let Some(n) = v.as_i64() {
1777                        b.append_value(n);
1778                    } else {
1779                        b.append_null();
1780                    }
1781                })
1782            }
1783            DataType::Float64 => {
1784                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1785                    if let Some(f) = v.as_f64() {
1786                        b.append_value(f);
1787                    } else {
1788                        b.append_null();
1789                    }
1790                })
1791            }
1792            DataType::Bytes => {
1793                // Raw `Bytes` elements: store each buffer verbatim in a `LargeBinary`
1794                // child and mark the child field `uni_raw_bytes` so the read path
1795                // decodes it verbatim instead of through the tagged codec.
1796                let item_field = Arc::new(
1797                    Field::new("item", ArrowDataType::LargeBinary, true)
1798                        .with_metadata(schema::raw_bytes_field_metadata()),
1799                );
1800                let mut builder =
1801                    ListBuilder::new(LargeBinaryBuilder::new()).with_field(item_field);
1802                for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1803                    let val_array = get_props(i).and_then(|v| v.as_array());
1804                    if val_array.is_none() && is_deleted {
1805                        builder.append_null();
1806                    } else if let Some(arr) = val_array {
1807                        for v in arr {
1808                            if let Value::Bytes(b) = v {
1809                                builder.values().append_value(b);
1810                            } else {
1811                                builder.values().append_null();
1812                            }
1813                        }
1814                        builder.append(true);
1815                    } else {
1816                        builder.append_null();
1817                    }
1818                }
1819                Ok(Arc::new(builder.finish()))
1820            }
1821            DataType::Vector { dimensions } => {
1822                // Multi-vector / late-interaction (ColBERT) property: a per-row
1823                // variable-count set of fixed-`dimensions` token vectors, stored as
1824                // `List<FixedSizeList<Float32, dimensions>>`. Each inner token is
1825                // validated through the same `extract_vector_f32_values` path used for
1826                // single dense vectors, so dimension/type failure modes match.
1827                //
1828                // NOTE: only the declared-schema write path (this builder) supports
1829                // multi-vector today. The schemaless, Arrow-type-driven `values_to_array`
1830                // path does not yet handle `List<FixedSizeList<Float32>>`; schemaless
1831                // multi-vector writes are a deferred follow-up (issue #96, Phase 1.5).
1832                let dim = *dimensions as i32;
1833                let mut builder =
1834                    ListBuilder::new(FixedSizeListBuilder::new(Float32Builder::new(), dim));
1835                for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1836                    let val_array = get_props(i).and_then(|v| v.as_array());
1837                    if val_array.is_none() && is_deleted {
1838                        builder.append_null();
1839                    } else if let Some(arr) = val_array {
1840                        // Variable token count per row: append one fixed-size inner
1841                        // vector per token. `extract_vector_f32_values` always yields
1842                        // exactly `dimensions` values, satisfying the FixedSizeList stride.
1843                        for tok in arr {
1844                            let (vals, valid) =
1845                                extract_vector_f32_values(Some(tok), false, *dimensions);
1846                            for v in vals {
1847                                builder.values().values().append_value(v);
1848                            }
1849                            builder.values().append(valid);
1850                        }
1851                        builder.append(true);
1852                    } else {
1853                        builder.append_null();
1854                    }
1855                }
1856                Ok(Arc::new(builder.finish()))
1857            }
1858            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1859        }
1860    }
1861
1862    /// Generic helper to build a list column with any inner builder type.
1863    fn build_typed_list<F, B, A>(
1864        &self,
1865        len: usize,
1866        deleted: &[bool],
1867        get_props: &F,
1868        inner_builder: B,
1869        mut append_value: A,
1870    ) -> Result<ArrayRef>
1871    where
1872        F: Fn(usize) -> Option<&'a Value>,
1873        B: arrow_array::builder::ArrayBuilder,
1874        A: FnMut(&Value, &mut B),
1875    {
1876        let mut builder = ListBuilder::new(inner_builder);
1877        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1878            let val_array = get_props(i).and_then(|v| v.as_array());
1879            if val_array.is_none() && is_deleted {
1880                builder.append_null();
1881            } else if let Some(arr) = val_array {
1882                for v in arr {
1883                    append_value(v, builder.values());
1884                }
1885                builder.append(true);
1886            } else {
1887                builder.append_null();
1888            }
1889        }
1890        Ok(Arc::new(builder.finish()))
1891    }
1892
1893    fn build_map_column<F>(
1894        &self,
1895        len: usize,
1896        deleted: &[bool],
1897        get_props: F,
1898        key: &DataType,
1899        value: &DataType,
1900    ) -> Result<ArrayRef>
1901    where
1902        F: Fn(usize) -> Option<&'a Value>,
1903    {
1904        if !matches!(key, DataType::String) {
1905            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1906        }
1907
1908        match value {
1909            DataType::String => self.build_typed_map(
1910                len,
1911                deleted,
1912                &get_props,
1913                StringBuilder::new(),
1914                arrow_schema::DataType::Utf8,
1915                None,
1916                |v, b: &mut StringBuilder| {
1917                    if let Some(s) = v.as_str() {
1918                        b.append_value(s);
1919                    } else {
1920                        b.append_null();
1921                    }
1922                },
1923            ),
1924            DataType::Int64 => self.build_typed_map(
1925                len,
1926                deleted,
1927                &get_props,
1928                Int64Builder::new(),
1929                arrow_schema::DataType::Int64,
1930                None,
1931                |v, b: &mut Int64Builder| {
1932                    if let Some(n) = v.as_i64() {
1933                        b.append_value(n);
1934                    } else {
1935                        b.append_null();
1936                    }
1937                },
1938            ),
1939            DataType::Int32 => self.build_typed_map(
1940                len,
1941                deleted,
1942                &get_props,
1943                Int32Builder::new(),
1944                arrow_schema::DataType::Int32,
1945                None,
1946                |v, b: &mut Int32Builder| match v.as_i64().and_then(|n| i32::try_from(n).ok()) {
1947                    Some(n) => b.append_value(n),
1948                    None => b.append_null(),
1949                },
1950            ),
1951            DataType::Float64 => self.build_typed_map(
1952                len,
1953                deleted,
1954                &get_props,
1955                Float64Builder::new(),
1956                arrow_schema::DataType::Float64,
1957                None,
1958                |v, b: &mut Float64Builder| match v.as_f64() {
1959                    Some(f) => b.append_value(f),
1960                    None => b.append_null(),
1961                },
1962            ),
1963            DataType::Float32 => self.build_typed_map(
1964                len,
1965                deleted,
1966                &get_props,
1967                Float32Builder::new(),
1968                arrow_schema::DataType::Float32,
1969                None,
1970                |v, b: &mut Float32Builder| match v.as_f64() {
1971                    Some(f) => b.append_value(f as f32),
1972                    None => b.append_null(),
1973                },
1974            ),
1975            DataType::Bool => self.build_typed_map(
1976                len,
1977                deleted,
1978                &get_props,
1979                BooleanBuilder::new(),
1980                arrow_schema::DataType::Boolean,
1981                None,
1982                |v, b: &mut BooleanBuilder| match v.as_bool() {
1983                    Some(x) => b.append_value(x),
1984                    None => b.append_null(),
1985                },
1986            ),
1987            DataType::Bytes => self.build_typed_map(
1988                len,
1989                deleted,
1990                &get_props,
1991                LargeBinaryBuilder::new(),
1992                arrow_schema::DataType::LargeBinary,
1993                // Mark the value child `uni_raw_bytes` so the read path decodes each
1994                // raw `Bytes` value verbatim rather than through the tagged codec.
1995                Some(schema::raw_bytes_field_metadata()),
1996                |v, b: &mut LargeBinaryBuilder| {
1997                    if let Value::Bytes(bytes) = v {
1998                        b.append_value(bytes);
1999                    } else {
2000                        b.append_null();
2001                    }
2002                },
2003            ),
2004            // Nested / non-scalar value types (Vector, List, Map, temporal, …): no typed
2005            // Arrow builder — CypherValue-encode each value into an UNMARKED LargeBinary
2006            // child (no `uni_raw_bytes`), so the read path (`try_reconstruct_map` →
2007            // `arrow_to_value` LargeBinary arm) decodes it through the tagged codec. This
2008            // mirrors how schemaless/overflow maps are stored and handles arbitrary nesting.
2009            _ => self.build_typed_map(
2010                len,
2011                deleted,
2012                &get_props,
2013                LargeBinaryBuilder::new(),
2014                arrow_schema::DataType::LargeBinary,
2015                None,
2016                |v, b: &mut LargeBinaryBuilder| {
2017                    if v.is_null() {
2018                        b.append_null();
2019                    } else {
2020                        b.append_value(uni_common::cypher_value_codec::encode(v));
2021                    }
2022                },
2023            ),
2024        }
2025    }
2026
2027    /// Generic helper to build a map column with any value builder type.
2028    #[expect(
2029        clippy::too_many_arguments,
2030        reason = "builder plumbing: value type + optional child metadata are distinct knobs"
2031    )]
2032    fn build_typed_map<F, B, A>(
2033        &self,
2034        len: usize,
2035        deleted: &[bool],
2036        get_props: &F,
2037        value_builder: B,
2038        value_arrow_type: arrow_schema::DataType,
2039        value_metadata: Option<HashMap<String, String>>,
2040        mut append_value: A,
2041    ) -> Result<ArrayRef>
2042    where
2043        F: Fn(usize) -> Option<&'a Value>,
2044        B: arrow_array::builder::ArrayBuilder,
2045        A: FnMut(&Value, &mut B),
2046    {
2047        let key_builder = Box::new(StringBuilder::new());
2048        let value_builder = Box::new(value_builder);
2049        let value_field = match value_metadata {
2050            Some(meta) => Field::new("value", value_arrow_type, true).with_metadata(meta),
2051            None => Field::new("value", value_arrow_type, true),
2052        };
2053        let struct_builder = StructBuilder::new(
2054            vec![
2055                Field::new("key", arrow_schema::DataType::Utf8, false),
2056                value_field,
2057            ],
2058            vec![key_builder, value_builder],
2059        );
2060        let mut builder = ListBuilder::new(struct_builder);
2061
2062        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
2063            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
2064        }
2065        Ok(Arc::new(builder.finish()))
2066    }
2067
2068    /// Append a single map entry to the list builder.
2069    fn append_map_entry<B, A>(
2070        &self,
2071        builder: &mut ListBuilder<StructBuilder>,
2072        val: Option<&'a Value>,
2073        is_deleted: bool,
2074        append_value: &mut A,
2075    ) where
2076        B: arrow_array::builder::ArrayBuilder,
2077        A: FnMut(&Value, &mut B),
2078    {
2079        let val_obj = val.and_then(|v| v.as_object());
2080        if val_obj.is_none() && is_deleted {
2081            builder.append(false);
2082        } else if let Some(obj) = val_obj {
2083            let struct_b = builder.values();
2084            for (k, v) in obj {
2085                struct_b
2086                    .field_builder::<StringBuilder>(0)
2087                    .unwrap()
2088                    .append_value(k);
2089                // Safety: We know the value builder type matches B
2090                let value_b = struct_b.field_builder::<B>(1).unwrap();
2091                append_value(v, value_b);
2092                struct_b.append(true);
2093            }
2094            builder.append(true);
2095        } else {
2096            builder.append(false);
2097        }
2098    }
2099
2100    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
2101    where
2102        F: Fn(usize) -> Option<&'a Value>,
2103    {
2104        let mut builder = BinaryBuilder::new();
2105        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
2106            if is_deleted {
2107                builder.append_null();
2108                continue;
2109            }
2110            if let Some(val) = get_props(i) {
2111                // Try to parse CRDT from the value
2112                // If it's a string, first parse it as JSON, then as CRDT
2113                let crdt_result = if let Some(s) = val.as_str() {
2114                    serde_json::from_str::<Crdt>(s)
2115                } else {
2116                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
2117                    let json_val: serde_json::Value = val.clone().into();
2118                    serde_json::from_value::<Crdt>(json_val)
2119                };
2120
2121                if let Ok(crdt) = crdt_result {
2122                    if let Ok(bytes) = crdt.to_msgpack() {
2123                        builder.append_value(&bytes);
2124                    } else {
2125                        builder.append_null();
2126                    }
2127                } else {
2128                    builder.append_null();
2129                }
2130            } else {
2131                builder.append_null();
2132            }
2133        }
2134        Ok(Arc::new(builder.finish()))
2135    }
2136}
2137
2138/// Build a column for edge entries (no deleted flag handling needed).
2139pub fn build_edge_column<'a>(
2140    name: &'a str,
2141    data_type: &'a DataType,
2142    len: usize,
2143    get_props: impl Fn(usize) -> Option<&'a Value>,
2144) -> Result<ArrayRef> {
2145    // For edges, use empty deleted array
2146    let deleted = vec![false; len];
2147    let extractor = PropertyExtractor::new(name, data_type);
2148    extractor.build_column(len, &deleted, get_props)
2149}
2150
2151#[cfg(test)]
2152mod tests {
2153    use super::*;
2154    use arrow_array::{
2155        Array, DurationMicrosecondArray,
2156        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
2157    };
2158    use std::collections::HashMap;
2159    use uni_common::TemporalValue;
2160    use uni_crdt::{Crdt, GCounter};
2161
2162    #[test]
2163    fn test_sparse_vector_columnar_roundtrip_and_no_silent_null() {
2164        use crate::storage::value_codec::{CrdtDecodeMode, decode_column_value, value_from_column};
2165
2166        let dt = DataType::SparseVector { dimensions: 100 };
2167        let v0 = Value::SparseVector {
2168            indices: vec![1, 5, 9],
2169            values: vec![0.5, -1.0, 2.0],
2170        };
2171        // An empty sparse vector must round-trip as empty lists, not null.
2172        let v1 = Value::SparseVector {
2173            indices: vec![],
2174            values: vec![],
2175        };
2176        // row 2 is a tombstone → decodes back to Null.
2177        let props = [Some(v0.clone()), Some(v1.clone()), None];
2178        let deleted = [false, false, true];
2179
2180        let extractor = PropertyExtractor::new("emb", &dt);
2181        let col = extractor
2182            .build_column(3, &deleted, |i| props[i].as_ref())
2183            .unwrap();
2184
2185        // Full-fidelity decode → uni_common::Value::SparseVector.
2186        assert_eq!(
2187            decode_column_value(&col, &dt, 0, CrdtDecodeMode::Strict).unwrap(),
2188            v0
2189        );
2190        assert_eq!(
2191            decode_column_value(&col, &dt, 1, CrdtDecodeMode::Strict).unwrap(),
2192            v1
2193        );
2194        assert_eq!(
2195            decode_column_value(&col, &dt, 2, CrdtDecodeMode::Strict).unwrap(),
2196            Value::Null
2197        );
2198
2199        // Regression for the `_ => Ok(Value::Null)` fallback: the serde_json
2200        // read path must surface the data, not silently null it.
2201        let json0 = value_from_column(&col, &dt, 0, CrdtDecodeMode::Strict).unwrap();
2202        assert!(
2203            json0.is_object(),
2204            "sparse column was silently nulled by value_from_column: {json0:?}"
2205        );
2206        assert_eq!(json0["indices"], serde_json::json!([1u32, 5u32, 9u32]));
2207        assert_eq!(
2208            json0["values"],
2209            serde_json::json!([0.5f32, -1.0f32, 2.0f32])
2210        );
2211    }
2212
2213    #[test]
2214    fn test_arrow_to_value_string() {
2215        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
2216        assert_eq!(
2217            arrow_to_value(&arr, 0, None),
2218            Value::String("hello".to_string())
2219        );
2220        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2221        assert_eq!(
2222            arrow_to_value(&arr, 2, None),
2223            Value::String("world".to_string())
2224        );
2225    }
2226
2227    #[test]
2228    fn test_arrow_to_value_int64() {
2229        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
2230        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
2231        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2232        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
2233    }
2234
2235    #[test]
2236    #[allow(clippy::approx_constant)]
2237    fn test_arrow_to_value_float64() {
2238        let arr = Float64Array::from(vec![Some(3.14), None]);
2239        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
2240        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2241    }
2242
2243    #[test]
2244    fn test_arrow_to_value_bool() {
2245        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
2246        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
2247        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
2248        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
2249    }
2250
2251    #[test]
2252    fn test_values_to_array_int64() {
2253        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
2254        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
2255        assert_eq!(arr.len(), 4);
2256
2257        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2258        assert_eq!(int_arr.value(0), 1);
2259        assert_eq!(int_arr.value(1), 2);
2260        assert!(int_arr.is_null(2));
2261        assert_eq!(int_arr.value(3), 4);
2262    }
2263
2264    #[test]
2265    fn test_values_to_array_string() {
2266        let values = vec![
2267            Value::String("a".to_string()),
2268            Value::String("b".to_string()),
2269            Value::Null,
2270        ];
2271        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
2272        assert_eq!(arr.len(), 3);
2273
2274        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
2275        assert_eq!(str_arr.value(0), "a");
2276        assert_eq!(str_arr.value(1), "b");
2277        assert!(str_arr.is_null(2));
2278    }
2279
2280    #[test]
2281    fn test_property_extractor_string() {
2282        let props: Vec<HashMap<String, Value>> = vec![
2283            [("name".to_string(), Value::String("Alice".to_string()))]
2284                .into_iter()
2285                .collect(),
2286            [("name".to_string(), Value::String("Bob".to_string()))]
2287                .into_iter()
2288                .collect(),
2289            HashMap::new(),
2290        ];
2291        let deleted = vec![false, false, true];
2292
2293        let extractor = PropertyExtractor::new("name", &DataType::String);
2294        let arr = extractor
2295            .build_column(3, &deleted, |i| props[i].get("name"))
2296            .unwrap();
2297
2298        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
2299        assert_eq!(str_arr.value(0), "Alice");
2300        assert_eq!(str_arr.value(1), "Bob");
2301        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
2302    }
2303
2304    #[test]
2305    fn test_property_extractor_int64() {
2306        let props: Vec<HashMap<String, Value>> = vec![
2307            [("age".to_string(), Value::Int(25))].into_iter().collect(),
2308            [("age".to_string(), Value::Int(30))].into_iter().collect(),
2309            HashMap::new(),
2310        ];
2311        let deleted = vec![false, false, true];
2312
2313        let extractor = PropertyExtractor::new("age", &DataType::Int64);
2314        let arr = extractor
2315            .build_column(3, &deleted, |i| props[i].get("age"))
2316            .unwrap();
2317
2318        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2319        assert_eq!(int_arr.value(0), 25);
2320        assert_eq!(int_arr.value(1), 30);
2321        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
2322    }
2323
2324    #[test]
2325    fn test_property_extractor_bytes_roundtrip() {
2326        let blob = vec![0u8, 1, 2, 255];
2327        let props: Vec<HashMap<String, Value>> = vec![
2328            [("blob".to_string(), Value::Bytes(blob.clone()))]
2329                .into_iter()
2330                .collect(),
2331            [("blob".to_string(), Value::Bytes(Vec::new()))]
2332                .into_iter()
2333                .collect(),
2334            HashMap::new(),
2335        ];
2336        let deleted = vec![false, false, false];
2337
2338        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2339        let arr = extractor
2340            .build_column(3, &deleted, |i| props[i].get("blob"))
2341            .unwrap();
2342
2343        // Decode each row through arrow_to_value with Bytes hint.
2344        assert_eq!(
2345            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2346            Value::Bytes(blob)
2347        );
2348        assert_eq!(
2349            arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
2350            Value::Bytes(Vec::new())
2351        );
2352        // Missing property → null in the Arrow array.
2353        assert_eq!(
2354            arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
2355            Value::Null
2356        );
2357    }
2358
2359    #[test]
2360    fn test_bytes_vs_cypher_value_disambiguation() {
2361        // A LargeBinary column tagged as DataType::Bytes must NOT be decoded
2362        // through the CypherValue MessagePack codec, even though both share
2363        // the same Arrow physical type.
2364        let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
2365        let props: Vec<HashMap<String, Value>> = vec![
2366            [("blob".to_string(), Value::Bytes(raw.clone()))]
2367                .into_iter()
2368                .collect(),
2369        ];
2370        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
2371        let arr = extractor
2372            .build_column(1, &[false], |i| props[i].get("blob"))
2373            .unwrap();
2374        // With schema hint: raw bytes returned.
2375        assert_eq!(
2376            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
2377            Value::Bytes(raw)
2378        );
2379    }
2380
2381    #[test]
2382    fn test_data_type_bytes_to_arrow() {
2383        assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
2384    }
2385
2386    #[test]
2387    fn test_arrow_to_value_time64() {
2388        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
2389        let mut builder = Time64MicrosecondBuilder::new();
2390        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
2391        builder.append_value(37_845_000_000);
2392        // 00:00:00 = 0 microseconds
2393        builder.append_value(0);
2394        // 23:59:59.123456 = 86399.123456 seconds
2395        builder.append_value(86_399_123_456);
2396        builder.append_null();
2397
2398        let arr = builder.finish();
2399        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
2400        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
2401        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
2402        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
2403        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2404    }
2405
2406    #[test]
2407    fn test_arrow_to_value_duration() {
2408        // Test DurationMicrosecondArray conversion
2409        // Arrow→Value now returns Value::Temporal(Duration)
2410        let arr = DurationMicrosecondArray::from(vec![
2411            Some(1_000_000),      // 1 second in microseconds
2412            Some(3_600_000_000),  // 1 hour
2413            Some(86_400_000_000), // 1 day
2414            None,
2415        ]);
2416
2417        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
2418        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
2419        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
2420        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
2421    }
2422
2423    #[test]
2424    fn test_arrow_to_value_binary_crdt() {
2425        // Test BinaryArray (CRDT) conversion - round-trip test
2426        let mut builder = BinaryBuilder::new();
2427
2428        // Create a GCounter CRDT and serialize it
2429        let mut counter = GCounter::new();
2430        counter.increment("actor1", 5);
2431        let crdt = Crdt::GCounter(counter);
2432        let bytes = crdt.to_msgpack().unwrap();
2433        builder.append_value(&bytes);
2434
2435        // Add a null value
2436        builder.append_null();
2437
2438        let arr = builder.finish();
2439
2440        // The first value should deserialize back to a map
2441        let result = arrow_to_value(&arr, 0, None);
2442        assert!(result.as_object().is_some());
2443        let obj = result.as_object().unwrap();
2444        // GCounter serializes with tag "t": "gc"
2445        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
2446
2447        // Null value should return null
2448        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
2449    }
2450
2451    #[test]
2452    fn test_datetime_struct_encode_decode_roundtrip() {
2453        // Test DateTime struct encoding with offset and timezone preservation
2454        let values = vec![
2455            Value::Temporal(TemporalValue::DateTime {
2456                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
2457                offset_seconds: 3600,                  // +01:00
2458                timezone_name: Some("Europe/Paris".to_string()),
2459            }),
2460            Value::Temporal(TemporalValue::DateTime {
2461                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
2462                offset_seconds: -18000,                 // -05:00
2463                timezone_name: None,
2464            }),
2465            Value::Temporal(TemporalValue::DateTime {
2466                nanos_since_epoch: 0, // Unix epoch
2467                offset_seconds: 0,
2468                timezone_name: Some("UTC".to_string()),
2469            }),
2470        ];
2471
2472        // Encode to Arrow struct
2473        let arr_ref = values_to_datetime_struct_array(&values);
2474        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2475        assert_eq!(arr.len(), 3);
2476
2477        // Decode back to Value
2478        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2479        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2480        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2481
2482        // Verify round-trip preserves all fields
2483        assert_eq!(decoded_0, values[0]);
2484        assert_eq!(decoded_1, values[1]);
2485        assert_eq!(decoded_2, values[2]);
2486
2487        // Verify struct field extraction
2488        if let Value::Temporal(TemporalValue::DateTime {
2489            nanos_since_epoch,
2490            offset_seconds,
2491            timezone_name,
2492        }) = decoded_0
2493        {
2494            assert_eq!(nanos_since_epoch, 441763200000000000);
2495            assert_eq!(offset_seconds, 3600);
2496            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2497        } else {
2498            panic!("Expected DateTime value");
2499        }
2500    }
2501
2502    #[test]
2503    fn test_datetime_struct_null_handling() {
2504        // Test DateTime struct with null values
2505        let values = vec![
2506            Value::Temporal(TemporalValue::DateTime {
2507                nanos_since_epoch: 441763200000000000,
2508                offset_seconds: 3600,
2509                timezone_name: Some("Europe/Paris".to_string()),
2510            }),
2511            Value::Null,
2512            Value::Temporal(TemporalValue::DateTime {
2513                nanos_since_epoch: 0,
2514                offset_seconds: 0,
2515                timezone_name: None,
2516            }),
2517        ];
2518
2519        let arr_ref = values_to_datetime_struct_array(&values);
2520        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2521        assert_eq!(arr.len(), 3);
2522
2523        // Check first value is valid
2524        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2525        assert_eq!(decoded_0, values[0]);
2526
2527        // Check second value is null
2528        assert!(arr.is_null(1));
2529        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2530        assert_eq!(decoded_1, Value::Null);
2531
2532        // Check third value is valid
2533        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2534        assert_eq!(decoded_2, values[2]);
2535    }
2536
2537    #[test]
2538    fn test_datetime_struct_boundary_values() {
2539        // Test boundary values: offset=0, large positive/negative offsets
2540        let values = vec![
2541            Value::Temporal(TemporalValue::DateTime {
2542                nanos_since_epoch: 441763200000000000,
2543                offset_seconds: 0, // UTC
2544                timezone_name: None,
2545            }),
2546            Value::Temporal(TemporalValue::DateTime {
2547                nanos_since_epoch: 441763200000000000,
2548                offset_seconds: 43200, // +12:00 (max typical offset)
2549                timezone_name: None,
2550            }),
2551            Value::Temporal(TemporalValue::DateTime {
2552                nanos_since_epoch: 441763200000000000,
2553                offset_seconds: -43200, // -12:00 (min typical offset)
2554                timezone_name: None,
2555            }),
2556        ];
2557
2558        let arr_ref = values_to_datetime_struct_array(&values);
2559        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2560        assert_eq!(arr.len(), 3);
2561
2562        // Verify round-trip for all boundary values
2563        for (i, expected) in values.iter().enumerate() {
2564            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2565            assert_eq!(&decoded, expected);
2566        }
2567    }
2568
2569    #[test]
2570    fn test_datetime_old_schema_migration() {
2571        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
2572        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2573        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
2574        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
2575        builder.append_null();
2576
2577        let arr = builder.finish();
2578
2579        // Decode with DataType::DateTime hint should migrate old schema
2580        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2581        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2582        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2583
2584        // Old schema should default to offset=0, preserve timezone
2585        if let Value::Temporal(TemporalValue::DateTime {
2586            nanos_since_epoch,
2587            offset_seconds,
2588            timezone_name,
2589        }) = decoded_0
2590        {
2591            assert_eq!(nanos_since_epoch, 441763200000000000);
2592            assert_eq!(offset_seconds, 0);
2593            assert_eq!(timezone_name, Some("UTC".to_string()));
2594        } else {
2595            panic!("Expected DateTime value");
2596        }
2597
2598        // Verify null handling
2599        assert_eq!(decoded_2, Value::Null);
2600    }
2601
2602    #[test]
2603    fn test_time_struct_encode_decode_roundtrip() {
2604        // Test Time struct encoding with offset preservation
2605        let values = vec![
2606            Value::Temporal(TemporalValue::Time {
2607                nanos_since_midnight: 37845000000000, // 10:30:45
2608                offset_seconds: 3600,                 // +01:00
2609            }),
2610            Value::Temporal(TemporalValue::Time {
2611                nanos_since_midnight: 0, // 00:00:00
2612                offset_seconds: 0,
2613            }),
2614            Value::Temporal(TemporalValue::Time {
2615                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
2616                offset_seconds: -18000,               // -05:00
2617            }),
2618        ];
2619
2620        // Encode to Arrow struct
2621        let arr_ref = values_to_time_struct_array(&values);
2622        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2623        assert_eq!(arr.len(), 3);
2624
2625        // Decode back to Value
2626        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2627        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2628        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2629
2630        // Verify round-trip preserves all fields
2631        assert_eq!(decoded_0, values[0]);
2632        assert_eq!(decoded_1, values[1]);
2633        assert_eq!(decoded_2, values[2]);
2634
2635        // Verify struct field extraction
2636        if let Value::Temporal(TemporalValue::Time {
2637            nanos_since_midnight,
2638            offset_seconds,
2639        }) = decoded_0
2640        {
2641            assert_eq!(nanos_since_midnight, 37845000000000);
2642            assert_eq!(offset_seconds, 3600);
2643        } else {
2644            panic!("Expected Time value");
2645        }
2646    }
2647
2648    #[test]
2649    fn test_time_struct_null_handling() {
2650        // Test Time struct with null values
2651        let values = vec![
2652            Value::Temporal(TemporalValue::Time {
2653                nanos_since_midnight: 37845000000000,
2654                offset_seconds: 3600,
2655            }),
2656            Value::Null,
2657            Value::Temporal(TemporalValue::Time {
2658                nanos_since_midnight: 0,
2659                offset_seconds: 0,
2660            }),
2661        ];
2662
2663        let arr_ref = values_to_time_struct_array(&values);
2664        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2665        assert_eq!(arr.len(), 3);
2666
2667        // Check first value is valid
2668        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2669        assert_eq!(decoded_0, values[0]);
2670
2671        // Check second value is null
2672        assert!(arr.is_null(1));
2673        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2674        assert_eq!(decoded_1, Value::Null);
2675
2676        // Check third value is valid
2677        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2678        assert_eq!(decoded_2, values[2]);
2679    }
2680
2681    // Tests for extract_vector_f32_values
2682
2683    #[test]
2684    fn test_extract_vector_f32_values_valid_vector() {
2685        let v = vec![1.0, 2.0, 3.0];
2686        let val = Value::Vector(v.clone());
2687        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2688        assert_eq!(result, v);
2689        assert!(valid);
2690    }
2691
2692    #[test]
2693    fn test_extract_vector_f32_values_vector_wrong_dims() {
2694        let v = vec![1.0, 2.0];
2695        let val = Value::Vector(v);
2696        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2697        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2698        assert!(!valid);
2699    }
2700
2701    #[test]
2702    fn test_extract_vector_f32_values_valid_list() {
2703        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2704        let val = Value::List(v);
2705        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2706        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2707        assert!(valid);
2708    }
2709
2710    #[test]
2711    fn test_extract_vector_f32_values_list_wrong_dims() {
2712        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2713        let val = Value::List(v);
2714        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2715        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2716        assert!(!valid);
2717    }
2718
2719    #[test]
2720    fn test_extract_vector_f32_values_list_int_coercion() {
2721        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2722        let val = Value::List(v);
2723        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2724        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2725        assert!(valid);
2726    }
2727
2728    #[test]
2729    fn test_extract_vector_f32_values_none() {
2730        let (result, valid) = extract_vector_f32_values(None, false, 3);
2731        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2732        assert!(!valid);
2733    }
2734
2735    #[test]
2736    fn test_extract_vector_f32_values_null() {
2737        let val = Value::Null;
2738        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2739        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2740        assert!(!valid);
2741    }
2742
2743    #[test]
2744    fn test_extract_vector_f32_values_unsupported_type() {
2745        let val = Value::String("not a vector".to_string());
2746        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2747        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2748        assert!(!valid);
2749    }
2750
2751    #[test]
2752    fn test_extract_vector_f32_values_deleted_with_none() {
2753        let (result, valid) = extract_vector_f32_values(None, true, 3);
2754        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2755        assert!(valid); // Deleted entries are marked as valid with zeros
2756    }
2757
2758    #[test]
2759    fn test_extract_vector_f32_values_deleted_with_null() {
2760        let val = Value::Null;
2761        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2762        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2763        assert!(valid); // Deleted entries are marked as valid with zeros
2764    }
2765
2766    // Tests for values_to_array with FixedSizeList
2767
2768    #[test]
2769    fn test_values_to_fixed_size_list_vector_with_nulls() {
2770        let values = vec![
2771            Value::Vector(vec![1.0, 2.0]),
2772            Value::Null,
2773            Value::Vector(vec![3.0, 4.0]),
2774            Value::String("invalid".to_string()),
2775        ];
2776        let arr_ref = values_to_array(
2777            &values,
2778            &ArrowDataType::FixedSizeList(
2779                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2780                2,
2781            ),
2782        )
2783        .unwrap();
2784
2785        let arr = arr_ref
2786            .as_any()
2787            .downcast_ref::<FixedSizeListArray>()
2788            .unwrap();
2789
2790        assert_eq!(arr.len(), 4);
2791        assert!(arr.is_valid(0));
2792        assert!(!arr.is_valid(1)); // Null value
2793        assert!(arr.is_valid(2));
2794        assert!(!arr.is_valid(3)); // Invalid type
2795    }
2796
2797    #[test]
2798    fn test_values_to_fixed_size_list_from_list() {
2799        let values = vec![
2800            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2801            Value::List(vec![Value::Int(3), Value::Int(4)]),
2802        ];
2803        let arr_ref = values_to_array(
2804            &values,
2805            &ArrowDataType::FixedSizeList(
2806                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2807                2,
2808            ),
2809        )
2810        .unwrap();
2811
2812        let arr = arr_ref
2813            .as_any()
2814            .downcast_ref::<FixedSizeListArray>()
2815            .unwrap();
2816
2817        assert_eq!(arr.len(), 2);
2818        assert!(arr.is_valid(0));
2819        assert!(arr.is_valid(1));
2820
2821        // Check values
2822        let child = arr
2823            .values()
2824            .as_any()
2825            .downcast_ref::<Float32Array>()
2826            .unwrap();
2827        assert_eq!(child.value(0), 1.0);
2828        assert_eq!(child.value(1), 2.0);
2829        assert_eq!(child.value(2), 3.0);
2830        assert_eq!(child.value(3), 4.0);
2831    }
2832
2833    #[test]
2834    fn test_values_to_fixed_size_list_wrong_dimensions() {
2835        let values = vec![
2836            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2837            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2838        ];
2839        let arr_ref = values_to_array(
2840            &values,
2841            &ArrowDataType::FixedSizeList(
2842                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2843                2,
2844            ),
2845        )
2846        .unwrap();
2847
2848        let arr = arr_ref
2849            .as_any()
2850            .downcast_ref::<FixedSizeListArray>()
2851            .unwrap();
2852
2853        assert_eq!(arr.len(), 2);
2854        assert!(!arr.is_valid(0)); // Wrong dimensions
2855        assert!(!arr.is_valid(1)); // Wrong dimensions
2856
2857        // Check that child array has zeros for invalid entries
2858        let child = arr
2859            .values()
2860            .as_any()
2861            .downcast_ref::<Float32Array>()
2862            .unwrap();
2863        assert_eq!(child.value(0), 0.0);
2864        assert_eq!(child.value(1), 0.0);
2865        assert_eq!(child.value(2), 0.0);
2866        assert_eq!(child.value(3), 0.0);
2867    }
2868
2869    #[test]
2870    fn test_values_to_fixed_size_list_all_nulls() {
2871        let values = vec![Value::Null, Value::Null, Value::Null];
2872        let arr_ref = values_to_array(
2873            &values,
2874            &ArrowDataType::FixedSizeList(
2875                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2876                3,
2877            ),
2878        )
2879        .unwrap();
2880
2881        let arr = arr_ref
2882            .as_any()
2883            .downcast_ref::<FixedSizeListArray>()
2884            .unwrap();
2885
2886        assert_eq!(arr.len(), 3);
2887        assert!(!arr.is_valid(0));
2888        assert!(!arr.is_valid(1));
2889        assert!(!arr.is_valid(2));
2890
2891        // Verify child array length is correct (3 rows × 3 dims = 9)
2892        let child = arr
2893            .values()
2894            .as_any()
2895            .downcast_ref::<Float32Array>()
2896            .unwrap();
2897        assert_eq!(child.len(), 9);
2898    }
2899
2900    #[test]
2901    fn test_values_to_fixed_size_list_mixed_types() {
2902        let values = vec![
2903            Value::Vector(vec![1.0, 2.0]),
2904            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2905            Value::Null,
2906            Value::String("invalid".to_string()),
2907        ];
2908        let arr_ref = values_to_array(
2909            &values,
2910            &ArrowDataType::FixedSizeList(
2911                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2912                2,
2913            ),
2914        )
2915        .unwrap();
2916
2917        let arr = arr_ref
2918            .as_any()
2919            .downcast_ref::<FixedSizeListArray>()
2920            .unwrap();
2921
2922        assert_eq!(arr.len(), 4);
2923        assert!(arr.is_valid(0)); // Value::Vector
2924        assert!(arr.is_valid(1)); // Value::List
2925        assert!(!arr.is_valid(2)); // Value::Null
2926        assert!(!arr.is_valid(3)); // Value::String
2927
2928        // Check values for valid entries
2929        let child = arr
2930            .values()
2931            .as_any()
2932            .downcast_ref::<Float32Array>()
2933            .unwrap();
2934        assert_eq!(child.value(0), 1.0);
2935        assert_eq!(child.value(1), 2.0);
2936        assert_eq!(child.value(2), 3.0);
2937        assert_eq!(child.value(3), 4.0);
2938    }
2939
2940    // Tests for PropertyExtractor::build_vector_column
2941
2942    #[test]
2943    fn test_build_vector_column_with_nulls_and_deleted() {
2944        let data_type = DataType::Vector { dimensions: 3 };
2945        let extractor = PropertyExtractor::new("test_vec", &data_type);
2946
2947        let props = [
2948            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2949            None,              // Missing property
2950            Some(Value::Null), // Null value
2951            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2952        ];
2953        let deleted = [false, false, false, true]; // Last one is deleted
2954
2955        let arr_ref = extractor
2956            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2957            .unwrap();
2958
2959        let arr = arr_ref
2960            .as_any()
2961            .downcast_ref::<FixedSizeListArray>()
2962            .unwrap();
2963
2964        assert_eq!(arr.len(), 4);
2965        assert!(arr.is_valid(0)); // Valid vector
2966        assert!(!arr.is_valid(1)); // Missing property
2967        assert!(!arr.is_valid(2)); // Null value
2968        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2969
2970        // Check values
2971        let child = arr
2972            .values()
2973            .as_any()
2974            .downcast_ref::<Float32Array>()
2975            .unwrap();
2976        assert_eq!(child.value(0), 1.0);
2977        assert_eq!(child.value(1), 2.0);
2978        assert_eq!(child.value(2), 3.0);
2979        // Indices 3-5: zeros for missing
2980        // Indices 6-8: zeros for null
2981        // Indices 9-11: zeros for deleted (but marked as valid)
2982        assert_eq!(child.value(9), 0.0);
2983        assert_eq!(child.value(10), 0.0);
2984        assert_eq!(child.value(11), 0.0);
2985    }
2986
2987    #[test]
2988    fn test_build_vector_column_with_list_input() {
2989        let data_type = DataType::Vector { dimensions: 2 };
2990        let extractor = PropertyExtractor::new("test_vec", &data_type);
2991
2992        let props = [
2993            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2994            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2995            Some(Value::Vector(vec![5.0, 6.0])),
2996        ];
2997        let deleted = [false, false, false];
2998
2999        let arr_ref = extractor
3000            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
3001            .unwrap();
3002
3003        let arr = arr_ref
3004            .as_any()
3005            .downcast_ref::<FixedSizeListArray>()
3006            .unwrap();
3007
3008        assert_eq!(arr.len(), 3);
3009        assert!(arr.is_valid(0));
3010        assert!(arr.is_valid(1));
3011        assert!(arr.is_valid(2));
3012
3013        // Check values
3014        let child = arr
3015            .values()
3016            .as_any()
3017            .downcast_ref::<Float32Array>()
3018            .unwrap();
3019        assert_eq!(child.value(0), 1.0);
3020        assert_eq!(child.value(1), 2.0);
3021        assert_eq!(child.value(2), 3.0);
3022        assert_eq!(child.value(3), 4.0);
3023        assert_eq!(child.value(4), 5.0);
3024        assert_eq!(child.value(5), 6.0);
3025    }
3026
3027    // Tests for multi-vector (ColBERT) `List<Vector>` columns (issue #96)
3028
3029    #[test]
3030    fn test_build_multivector_list_column_roundtrip() {
3031        // A multi-vector property is `List<FixedSizeList<Float32, dim>>` with a
3032        // VARIABLE token count per row. Each token is a dense vector, so it reads
3033        // back with type fidelity as a `Value::List` of `Value::Vector` tokens
3034        // (parity with `SparseVector`/`Btic` — `FixedSizeList<Float32>` decodes to
3035        // `Value::Vector`, not a generic float list).
3036        let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 3 }));
3037        let extractor = PropertyExtractor::new("tokens", &data_type);
3038
3039        let props = [
3040            // row 0: two tokens
3041            Some(Value::List(vec![
3042                Value::Vector(vec![1.0, 2.0, 3.0]),
3043                Value::Vector(vec![4.0, 5.0, 6.0]),
3044            ])),
3045            // row 1: three tokens (different count -> exercises variable length)
3046            Some(Value::List(vec![
3047                Value::Vector(vec![7.0, 8.0, 9.0]),
3048                Value::Vector(vec![10.0, 11.0, 12.0]),
3049                Value::Vector(vec![13.0, 14.0, 15.0]),
3050            ])),
3051            // row 2: empty token set (present but zero tokens)
3052            Some(Value::List(vec![])),
3053            // row 3: deleted, missing property
3054            None,
3055        ];
3056        let deleted = [false, false, false, true];
3057
3058        let arr_ref = extractor
3059            .build_column(4, &deleted, |i| props[i].as_ref())
3060            .unwrap();
3061
3062        // Outer column is a variable-length list of fixed-size vectors.
3063        let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
3064        assert_eq!(outer.len(), 4);
3065        assert!(outer.is_valid(0));
3066        assert!(outer.is_valid(1));
3067        assert!(outer.is_valid(2)); // empty-but-present row
3068        assert!(!outer.is_valid(3)); // deleted/missing -> null
3069
3070        // Variable token counts survive.
3071        assert_eq!(outer.value(0).len(), 2);
3072        assert_eq!(outer.value(1).len(), 3);
3073        assert_eq!(outer.value(2).len(), 0);
3074
3075        // Read back through the generic decoder; each token round-trips with type
3076        // fidelity as a `Value::Vector`.
3077        let row0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&data_type));
3078        assert_eq!(
3079            row0,
3080            Value::List(vec![
3081                Value::Vector(vec![1.0, 2.0, 3.0]),
3082                Value::Vector(vec![4.0, 5.0, 6.0]),
3083            ])
3084        );
3085
3086        let row1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&data_type));
3087        let Value::List(tokens) = row1 else {
3088            panic!("row1 should decode to a list of tokens");
3089        };
3090        assert_eq!(tokens.len(), 3);
3091        assert_eq!(tokens[2], Value::Vector(vec![13.0, 14.0, 15.0]));
3092    }
3093
3094    #[test]
3095    fn test_build_multivector_invalid_inner_tokens() {
3096        // A row whose tokens have wrong dimension or non-numeric content: each
3097        // bad token becomes a null inner entry (zeros, invalid) without crashing,
3098        // and valid tokens in the same row are preserved.
3099        let data_type = DataType::List(Box::new(DataType::Vector { dimensions: 2 }));
3100        let extractor = PropertyExtractor::new("tokens", &data_type);
3101
3102        let props = [Some(Value::List(vec![
3103            Value::Vector(vec![1.0, 2.0]),                           // valid
3104            Value::Vector(vec![9.0, 9.0, 9.0]),                      // wrong dim -> null
3105            Value::String("nope".to_string()),                       // non-vector -> null
3106            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]), // valid (list form)
3107        ]))];
3108        let deleted = [false];
3109
3110        let arr_ref = extractor
3111            .build_column(1, &deleted, |i| props[i].as_ref())
3112            .unwrap();
3113        let outer = arr_ref.as_any().downcast_ref::<ListArray>().unwrap();
3114        let inner_row = outer.value(0);
3115        let inner = inner_row
3116            .as_any()
3117            .downcast_ref::<FixedSizeListArray>()
3118            .unwrap();
3119        assert_eq!(inner.len(), 4);
3120        assert!(inner.is_valid(0)); // [1,2]
3121        assert!(!inner.is_valid(1)); // wrong dim -> null
3122        assert!(!inner.is_valid(2)); // non-vector -> null
3123        assert!(inner.is_valid(3)); // [3,4] from list form
3124    }
3125
3126    #[test]
3127    fn test_values_to_array_multivector_schemaless_deferred() {
3128        // Schemaless (Arrow-type-driven) multi-vector writes are intentionally NOT
3129        // supported yet (issue #96, Phase 1.5). Declared-schema writes go through
3130        // `build_list_column` instead. This test pins the deferral contract so a
3131        // future change that enables it updates this expectation deliberately.
3132        let values = vec![Value::List(vec![Value::Vector(vec![1.0, 2.0])])];
3133        let dt = ArrowDataType::List(Arc::new(Field::new(
3134            "item",
3135            ArrowDataType::FixedSizeList(
3136                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
3137                2,
3138            ),
3139            true,
3140        )));
3141        assert!(values_to_array(&values, &dt).is_err());
3142    }
3143
3144    /// H13: out-of-range i64 values must become NULL in i32/date32 columns
3145    /// instead of silently wrapping to a different number/date.
3146    #[test]
3147    fn test_int32_and_date32_columns_null_out_of_range() {
3148        let dt = DataType::Int64;
3149        let extractor = PropertyExtractor::new("x", &dt);
3150
3151        let over = Value::Int(i64::from(i32::MAX) + 1);
3152        let ok = Value::Int(42);
3153        let vals = [over, ok];
3154        let deleted = [false, false];
3155
3156        let arr = extractor
3157            .build_int32_column(2, &deleted, |i| Some(&vals[i]))
3158            .unwrap();
3159        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3160        assert!(
3161            arr.is_null(0),
3162            "out-of-range i64 must be NULL in an int32 column, not wrapped"
3163        );
3164        assert_eq!(arr.value(1), 42);
3165
3166        let arr = extractor
3167            .build_date32_column(2, &deleted, |i| Some(&vals[i]))
3168            .unwrap();
3169        let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
3170        assert!(
3171            arr.is_null(0),
3172            "out-of-range day count must be NULL in a date32 column, not wrapped"
3173        );
3174        assert_eq!(arr.value(1), 42);
3175    }
3176}