Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    let key_col = structs.column(0);
154    let val_col = structs.column(1);
155    let mut map = HashMap::new();
156    for i in 0..structs.len() {
157        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158            map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159        }
160    }
161    Some(map)
162}
163
164/// Convert all elements of an Arrow array into a `Vec<Value>`.
165fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166    (0..arr.len())
167        .map(|i| arrow_to_value(arr.as_ref(), i, None))
168        .collect()
169}
170
171/// Convert an Arrow array value at a given row index to a Uni Value.
172///
173/// Handles all common Arrow types and recursively processes nested structures
174/// like Lists and Structs. The optional `data_type` parameter provides schema
175/// context for decoding DateTime and Time struct arrays; when provided, it
176/// takes precedence over runtime type detection.
177pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178    if col.is_null(row) {
179        return Value::Null;
180    }
181
182    // Schema-driven decode for DateTime and Time structs
183    if let Some(dt) = data_type {
184        match dt {
185            DataType::DateTime => {
186                // Expect StructArray with three fields
187                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189                        struct_arr.column_by_name("nanos_since_epoch"),
190                        struct_arr.column_by_name("offset_seconds"),
191                        struct_arr.column_by_name("timezone_name"),
192                    )
193                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194                        nanos_col
195                            .as_any()
196                            .downcast_ref::<TimestampNanosecondArray>(),
197                        offset_col.as_any().downcast_ref::<Int32Array>(),
198                        tz_col.as_any().downcast_ref::<StringArray>(),
199                    )
200                {
201                    if nanos_arr.is_null(row) {
202                        return Value::Null;
203                    }
204                    let nanos = nanos_arr.value(row);
205                    if offset_arr.is_null(row) {
206                        // No offset → LocalDateTime
207                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208                            nanos_since_epoch: nanos,
209                        });
210                    }
211                    let offset = offset_arr.value(row);
212                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213                    return Value::Temporal(uni_common::TemporalValue::DateTime {
214                        nanos_since_epoch: nanos,
215                        offset_seconds: offset,
216                        timezone_name: tz_name,
217                    });
218                }
219                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
220                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221                    let nanos = ts.value(row);
222                    let tz_name = ts.timezone().map(|s| s.to_string());
223                    return Value::Temporal(uni_common::TemporalValue::DateTime {
224                        nanos_since_epoch: nanos,
225                        offset_seconds: 0,
226                        timezone_name: tz_name,
227                    });
228                }
229            }
230            DataType::Time => {
231                // Expect StructArray with two fields
232                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233                    && let (Some(nanos_col), Some(offset_col)) = (
234                        struct_arr.column_by_name("nanos_since_midnight"),
235                        struct_arr.column_by_name("offset_seconds"),
236                    )
237                    && let (Some(nanos_arr), Some(offset_arr)) = (
238                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239                        offset_col.as_any().downcast_ref::<Int32Array>(),
240                    )
241                {
242                    // Check field-level nulls before calling .value()
243                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244                        return Value::Null;
245                    }
246                    let nanos = nanos_arr.value(row);
247                    let offset = offset_arr.value(row);
248                    return Value::Temporal(uni_common::TemporalValue::Time {
249                        nanos_since_midnight: nanos,
250                        offset_seconds: offset,
251                    });
252                }
253                // Fall back to old schema: Time64Nanosecond → Time with offset=0
254                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255                    let nanos = t.value(row);
256                    return Value::Temporal(uni_common::TemporalValue::Time {
257                        nanos_since_midnight: nanos,
258                        offset_seconds: 0,
259                    });
260                }
261            }
262            DataType::Bytes => {
263                let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
264                    log::warn!("Bytes column is not LargeBinaryArray");
265                    return Value::Null;
266                };
267                if arr.is_null(row) {
268                    return Value::Null;
269                }
270                return Value::Bytes(arr.value(row).to_vec());
271            }
272            DataType::Btic => {
273                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
274                    log::warn!("BTIC column is not FixedSizeBinaryArray");
275                    return Value::Null;
276                };
277                let bytes = fsb.value(row);
278                return match uni_btic::encode::decode_slice(bytes) {
279                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
280                        lo: btic.lo(),
281                        hi: btic.hi(),
282                        meta: btic.meta(),
283                    }),
284                    Err(e) => {
285                        log::warn!("BTIC decode error: {}", e);
286                        Value::Null
287                    }
288                };
289            }
290            _ => {}
291        }
292    }
293
294    // String types
295    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
296        return Value::String(s.value(row).to_string());
297    }
298
299    // Integer types
300    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
301        return Value::Int(u.value(row) as i64);
302    }
303    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
304        return Value::Int(i.value(row));
305    }
306    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
307        return Value::Int(i.value(row) as i64);
308    }
309
310    // Float types
311    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
312        return Value::Float(f.value(row));
313    }
314    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
315        return Value::Float(f.value(row) as f64);
316    }
317
318    // Boolean type
319    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
320        return Value::Bool(b.value(row));
321    }
322
323    // Fixed-size list (vectors)
324    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
325        return Value::List(array_to_value_list(&list.value(row)));
326    }
327
328    // Variable-size list
329    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
330        let arr = list.value(row);
331
332        // Map types are stored as List(Struct(key, value)); reconstruct as map
333        if let Some(obj) = try_reconstruct_map(&arr) {
334            return Value::Map(obj);
335        }
336
337        return Value::List(array_to_value_list(&arr));
338    }
339
340    // Large list (variable-size list with i64 offsets)
341    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
342        return Value::List(array_to_value_list(&list.value(row)));
343    }
344
345    // Struct type — detect temporal structs by field names before generic handler
346    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
347        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
348
349        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
350        if field_names.contains(&"nanos_since_epoch")
351            && field_names.contains(&"offset_seconds")
352            && field_names.contains(&"timezone_name")
353            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
354                s.column_by_name("nanos_since_epoch"),
355                s.column_by_name("offset_seconds"),
356                s.column_by_name("timezone_name"),
357            )
358        {
359            // Try TimestampNanosecond first (standard schema), then Int64 fallback
360            let nanos_opt = nanos_col
361                .as_any()
362                .downcast_ref::<TimestampNanosecondArray>()
363                .map(|a| {
364                    if a.is_null(row) {
365                        None
366                    } else {
367                        Some(a.value(row))
368                    }
369                })
370                .or_else(|| {
371                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
372                        if a.is_null(row) {
373                            None
374                        } else {
375                            Some(a.value(row))
376                        }
377                    })
378                });
379            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
380                if a.is_null(row) {
381                    None
382                } else {
383                    Some(a.value(row))
384                }
385            });
386
387            if let Some(Some(nanos)) = nanos_opt {
388                match offset_opt {
389                    Some(Some(offset)) => {
390                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
391                            if a.is_null(row) {
392                                None
393                            } else {
394                                Some(a.value(row).to_string())
395                            }
396                        });
397                        return Value::Temporal(uni_common::TemporalValue::DateTime {
398                            nanos_since_epoch: nanos,
399                            offset_seconds: offset,
400                            timezone_name: tz_name,
401                        });
402                    }
403                    _ => {
404                        // No offset → LocalDateTime
405                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
406                            nanos_since_epoch: nanos,
407                        });
408                    }
409                }
410            }
411        }
412
413        // Time struct: {nanos_since_midnight, offset_seconds}
414        if field_names.contains(&"nanos_since_midnight")
415            && field_names.contains(&"offset_seconds")
416            && let (Some(nanos_col), Some(offset_col)) = (
417                s.column_by_name("nanos_since_midnight"),
418                s.column_by_name("offset_seconds"),
419            )
420        {
421            // Try Time64Nanosecond first (standard schema), then Int64 fallback
422            let nanos_opt = nanos_col
423                .as_any()
424                .downcast_ref::<Time64NanosecondArray>()
425                .map(|a| {
426                    if a.is_null(row) {
427                        None
428                    } else {
429                        Some(a.value(row))
430                    }
431                })
432                .or_else(|| {
433                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
434                        if a.is_null(row) {
435                            None
436                        } else {
437                            Some(a.value(row))
438                        }
439                    })
440                });
441            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
442                if a.is_null(row) {
443                    None
444                } else {
445                    Some(a.value(row))
446                }
447            });
448
449            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
450                return Value::Temporal(uni_common::TemporalValue::Time {
451                    nanos_since_midnight: nanos,
452                    offset_seconds: offset,
453                });
454            }
455        }
456
457        // Generic struct → Map
458        let mut map = HashMap::new();
459        for (field, child) in s.fields().iter().zip(s.columns()) {
460            map.insert(
461                field.name().clone(),
462                arrow_to_value(child.as_ref(), row, None),
463            );
464        }
465        return Value::Map(map);
466    }
467
468    // Date32 type (days since epoch) - return as Value::Temporal
469    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
470        let days = d.value(row);
471        return Value::Temporal(uni_common::TemporalValue::Date {
472            days_since_epoch: days,
473        });
474    }
475
476    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
477    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
478        let nanos = ts.value(row);
479        return match ts.timezone() {
480            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
481                nanos_since_epoch: nanos,
482                offset_seconds: 0,
483                timezone_name: Some(tz.to_string()),
484            }),
485            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
486                nanos_since_epoch: nanos,
487            }),
488        };
489    }
490
491    // Time64 (nanoseconds since midnight) - return as Value::Temporal
492    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
493        let nanos = t.value(row);
494        return Value::Temporal(uni_common::TemporalValue::LocalTime {
495            nanos_since_midnight: nanos,
496        });
497    }
498
499    // Time64 (microseconds since midnight) - convert to nanoseconds
500    if let Some(t) = col
501        .as_any()
502        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
503    {
504        let micros = t.value(row);
505        return Value::Temporal(uni_common::TemporalValue::LocalTime {
506            nanos_since_midnight: micros * 1000,
507        });
508    }
509
510    // DurationMicrosecond - convert to Duration with nanoseconds
511    if let Some(d) = col
512        .as_any()
513        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
514    {
515        let micros = d.value(row);
516        let total_nanos = micros * 1000;
517        let seconds = total_nanos / 1_000_000_000;
518        let remaining_nanos = total_nanos % 1_000_000_000;
519        return Value::Temporal(uni_common::TemporalValue::Duration {
520            months: 0,
521            days: 0,
522            nanos: seconds * 1_000_000_000 + remaining_nanos,
523        });
524    }
525
526    // IntervalMonthDayNano - return as Value::Temporal(Duration)
527    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
528        let val = interval.value(row);
529        return Value::Temporal(uni_common::TemporalValue::Duration {
530            months: val.months as i64,
531            days: val.days as i64,
532            nanos: val.nanoseconds,
533        });
534    }
535
536    // LargeBinary (CypherValue MessagePack-tagged encoding)
537    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
538        let bytes = b.value(row);
539        if bytes.is_empty() {
540            return Value::Null;
541        }
542        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
543            eprintln!("CypherValue decode error: {}", e);
544            Value::Null
545        });
546    }
547
548    // FixedSizeBinary(24) — BTIC temporal interval
549    if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
550        && fsb.value_length() == 24
551    {
552        let bytes = fsb.value(row);
553        return match uni_btic::encode::decode_slice(bytes) {
554            Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
555                lo: btic.lo(),
556                hi: btic.hi(),
557                meta: btic.meta(),
558            }),
559            Err(e) => {
560                log::warn!("BTIC decode error: {}", e);
561                Value::Null
562            }
563        };
564    }
565
566    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
567    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
568        let bytes = b.value(row);
569        return Crdt::from_msgpack(bytes)
570            .ok()
571            .and_then(|crdt| serde_json::to_value(&crdt).ok())
572            .map(Value::from)
573            .unwrap_or(Value::Null);
574    }
575
576    // Fallback
577    Value::Null
578}
579
580fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
581    let mut builder = UInt64Builder::with_capacity(values.len());
582    for v in values {
583        if let Some(n) = v.as_u64() {
584            builder.append_value(n);
585        } else {
586            builder.append_null();
587        }
588    }
589    Arc::new(builder.finish())
590}
591
592fn values_to_int64_array(values: &[Value]) -> ArrayRef {
593    let mut builder = Int64Builder::with_capacity(values.len());
594    for v in values {
595        if let Some(n) = v.as_i64() {
596            builder.append_value(n);
597        } else {
598            builder.append_null();
599        }
600    }
601    Arc::new(builder.finish())
602}
603
604fn values_to_int32_array(values: &[Value]) -> ArrayRef {
605    let mut builder = Int32Builder::with_capacity(values.len());
606    for v in values {
607        if let Some(n) = v.as_i64() {
608            builder.append_value(n as i32);
609        } else {
610            builder.append_null();
611        }
612    }
613    Arc::new(builder.finish())
614}
615
616fn values_to_string_array(values: &[Value]) -> ArrayRef {
617    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
618    for v in values {
619        if let Some(s) = v.as_str() {
620            builder.append_value(s);
621        } else if v.is_null() {
622            builder.append_null();
623        } else {
624            builder.append_value(v.to_string());
625        }
626    }
627    Arc::new(builder.finish())
628}
629
630fn values_to_bool_array(values: &[Value]) -> ArrayRef {
631    let mut builder = BooleanBuilder::with_capacity(values.len());
632    for v in values {
633        if let Some(b) = v.as_bool() {
634            builder.append_value(b);
635        } else {
636            builder.append_null();
637        }
638    }
639    Arc::new(builder.finish())
640}
641
642fn values_to_float32_array(values: &[Value]) -> ArrayRef {
643    let mut builder = Float32Builder::with_capacity(values.len());
644    for v in values {
645        if let Some(n) = v.as_f64() {
646            builder.append_value(n as f32);
647        } else {
648            builder.append_null();
649        }
650    }
651    Arc::new(builder.finish())
652}
653
654fn values_to_float64_array(values: &[Value]) -> ArrayRef {
655    let mut builder = Float64Builder::with_capacity(values.len());
656    for v in values {
657        if let Some(n) = v.as_f64() {
658            builder.append_value(n);
659        } else {
660            builder.append_null();
661        }
662    }
663    Arc::new(builder.finish())
664}
665
666fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
667    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
668    for v in values {
669        match v {
670            Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
671                let btic = uni_btic::Btic::new(*lo, *hi, *meta)
672                    .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
673                builder.append_value(uni_btic::encode::encode(&btic))?;
674            }
675            Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
676                Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
677                Err(_) => builder.append_null(),
678            },
679            Value::List(bytes) => {
680                let b: Vec<u8> = bytes
681                    .iter()
682                    .map(|bv| bv.as_u64().unwrap_or(0) as u8)
683                    .collect();
684                if b.len() as i32 == size {
685                    builder.append_value(&b)?;
686                } else {
687                    builder.append_null();
688                }
689            }
690            _ => builder.append_null(),
691        }
692    }
693    Ok(Arc::new(builder.finish()))
694}
695
696/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
697///
698/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
699/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
700///
701/// # Arguments
702/// - `val`: Optional property value to extract from
703/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
704/// - `dimensions`: Expected vector dimensions
705///
706/// # Returns
707/// - Tuple of (vector values, validity flag)
708///   - Vector always has exactly `dimensions` elements
709///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
710pub fn extract_vector_f32_values(
711    val: Option<&Value>,
712    is_deleted: bool,
713    dimensions: usize,
714) -> (Vec<f32>, bool) {
715    let zeros = || vec![0.0_f32; dimensions];
716
717    // Deleted entries always return zeros with valid=true
718    if is_deleted {
719        return (zeros(), true);
720    }
721
722    match val {
723        // Native f32 vector (Value::Vector)
724        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
725        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
726        // List of values (Value::List) - convert to f32
727        Some(Value::List(arr)) if arr.len() == dimensions => {
728            let values: Vec<f32> = arr
729                .iter()
730                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
731                .collect();
732            (values, true)
733        }
734        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
735        _ => (zeros(), false),                    // Missing or unsupported value
736    }
737}
738
739fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
740    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
741    for v in values {
742        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
743        for val in vals {
744            builder.values().append_value(val);
745        }
746        builder.append(valid);
747    }
748    Arc::new(builder.finish())
749}
750
751fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
752    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
753    for v in values {
754        if v.is_null() {
755            builder.append_null();
756        } else if let Value::Temporal(tv) = v {
757            match tv {
758                uni_common::TemporalValue::DateTime {
759                    nanos_since_epoch, ..
760                }
761                | uni_common::TemporalValue::LocalDateTime {
762                    nanos_since_epoch, ..
763                } => builder.append_value(*nanos_since_epoch),
764                _ => builder.append_null(),
765            }
766        } else if let Some(n) = v.as_i64() {
767            builder.append_value(n);
768        } else if let Some(s) = v.as_str() {
769            match parse_datetime_to_nanos(s) {
770                Some(nanos) => builder.append_value(nanos),
771                None => builder.append_null(),
772            }
773        } else {
774            builder.append_null();
775        }
776    }
777
778    let arr = builder.finish();
779    if let Some(tz) = tz {
780        Arc::new(arr.with_timezone(tz.as_ref()))
781    } else {
782        Arc::new(arr)
783    }
784}
785
786/// Build a DateTime struct array from values.
787///
788/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
789/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
790fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
791    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
792    let mut offset_builder = Int32Builder::with_capacity(values.len());
793    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
794    let mut null_buffer = BooleanBufferBuilder::new(values.len());
795
796    for v in values {
797        match v {
798            Value::Temporal(uni_common::TemporalValue::DateTime {
799                nanos_since_epoch,
800                offset_seconds,
801                timezone_name,
802            }) => {
803                nanos_builder.append_value(*nanos_since_epoch);
804                offset_builder.append_value(*offset_seconds);
805                tz_builder.append_option(timezone_name.as_deref());
806                null_buffer.append(true);
807            }
808            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
809                nanos_builder.append_value(*nanos_since_epoch);
810                offset_builder.append_null();
811                tz_builder.append_null();
812                null_buffer.append(true);
813            }
814            _ => {
815                nanos_builder.append_null();
816                offset_builder.append_null();
817                tz_builder.append_null();
818                null_buffer.append(false);
819            }
820        }
821    }
822
823    let struct_arr = StructArray::new(
824        schema::datetime_struct_fields(),
825        vec![
826            Arc::new(nanos_builder.finish()) as ArrayRef,
827            Arc::new(offset_builder.finish()) as ArrayRef,
828            Arc::new(tz_builder.finish()) as ArrayRef,
829        ],
830        Some(null_buffer.finish().into()),
831    );
832    Arc::new(struct_arr)
833}
834
835/// Build a Time struct array from values.
836///
837/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
838/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
839fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
840    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
841    let mut offset_builder = Int32Builder::with_capacity(values.len());
842    let mut null_buffer = BooleanBufferBuilder::new(values.len());
843
844    for v in values {
845        match v {
846            Value::Temporal(uni_common::TemporalValue::Time {
847                nanos_since_midnight,
848                offset_seconds,
849            }) => {
850                nanos_builder.append_value(*nanos_since_midnight);
851                offset_builder.append_value(*offset_seconds);
852                null_buffer.append(true);
853            }
854            Value::Temporal(uni_common::TemporalValue::LocalTime {
855                nanos_since_midnight,
856            }) => {
857                nanos_builder.append_value(*nanos_since_midnight);
858                offset_builder.append_null();
859                null_buffer.append(true);
860            }
861            _ => {
862                nanos_builder.append_null();
863                offset_builder.append_null();
864                null_buffer.append(false);
865            }
866        }
867    }
868
869    let struct_arr = StructArray::new(
870        schema::time_struct_fields(),
871        vec![
872            Arc::new(nanos_builder.finish()) as ArrayRef,
873            Arc::new(offset_builder.finish()) as ArrayRef,
874        ],
875        Some(null_buffer.finish().into()),
876    );
877    Arc::new(struct_arr)
878}
879
880fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
881    let mut builder =
882        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
883    for v in values {
884        if v.is_null() {
885            builder.append_null();
886        } else {
887            // Encode as CypherValue (MessagePack-tagged)
888            let cv_bytes = uni_common::cypher_value_codec::encode(v);
889            builder.append_value(&cv_bytes);
890        }
891    }
892    Arc::new(builder.finish())
893}
894
895/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
896pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
897    match dt {
898        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
899        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
900        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
901        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
902        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
903        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
904        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
905        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
906        ArrowDataType::FixedSizeList(inner, size) => {
907            if inner.data_type() == &ArrowDataType::Float32 {
908                Ok(values_to_fixed_size_list_f32_array(values, *size))
909            } else {
910                Err(anyhow!("Unsupported FixedSizeList inner type"))
911            }
912        }
913        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
914            Ok(values_to_timestamp_array(values, tz.as_ref()))
915        }
916        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
917            Ok(values_to_timestamp_array(values, tz.as_ref()))
918        }
919        ArrowDataType::Date32 => {
920            let mut builder = Date32Builder::with_capacity(values.len());
921            for v in values {
922                if v.is_null() {
923                    builder.append_null();
924                } else if let Value::Temporal(uni_common::TemporalValue::Date {
925                    days_since_epoch,
926                }) = v
927                {
928                    builder.append_value(*days_since_epoch);
929                } else if let Some(n) = v.as_i64() {
930                    builder.append_value(n as i32);
931                } else {
932                    builder.append_null();
933                }
934            }
935            Ok(Arc::new(builder.finish()))
936        }
937        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
938            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
939            for v in values {
940                if v.is_null() {
941                    builder.append_null();
942                } else if let Value::Temporal(tv) = v {
943                    match tv {
944                        uni_common::TemporalValue::LocalTime {
945                            nanos_since_midnight,
946                        }
947                        | uni_common::TemporalValue::Time {
948                            nanos_since_midnight,
949                            ..
950                        } => builder.append_value(*nanos_since_midnight),
951                        _ => builder.append_null(),
952                    }
953                } else if let Some(n) = v.as_i64() {
954                    builder.append_value(n);
955                } else {
956                    builder.append_null();
957                }
958            }
959            Ok(Arc::new(builder.finish()))
960        }
961        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
962            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
963            for v in values {
964                if v.is_null() {
965                    builder.append_null();
966                } else if let Value::Temporal(tv) = v {
967                    match tv {
968                        uni_common::TemporalValue::LocalTime {
969                            nanos_since_midnight,
970                        }
971                        | uni_common::TemporalValue::Time {
972                            nanos_since_midnight,
973                            ..
974                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
975                        _ => builder.append_null(),
976                    }
977                } else if let Some(n) = v.as_i64() {
978                    builder.append_value(n);
979                } else {
980                    builder.append_null();
981                }
982            }
983            Ok(Arc::new(builder.finish()))
984        }
985        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
986            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
987            for v in values {
988                if v.is_null() {
989                    builder.append_null();
990                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
991                    months,
992                    days,
993                    nanos,
994                }) = v
995                {
996                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
997                        months: *months as i32,
998                        days: *days as i32,
999                        nanoseconds: *nanos,
1000                    });
1001                } else {
1002                    builder.append_null();
1003                }
1004            }
1005            Ok(Arc::new(builder.finish()))
1006        }
1007        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1008            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1009            for v in values {
1010                if v.is_null() {
1011                    builder.append_null();
1012                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1013                    months,
1014                    days,
1015                    nanos,
1016                }) = v
1017                {
1018                    let total_micros =
1019                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1020                    builder.append_value(total_micros);
1021                } else if let Some(n) = v.as_i64() {
1022                    builder.append_value(n);
1023                } else {
1024                    builder.append_null();
1025                }
1026            }
1027            Ok(Arc::new(builder.finish()))
1028        }
1029        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1030        ArrowDataType::List(field) => {
1031            if field.data_type() == &ArrowDataType::Utf8 {
1032                let mut builder = ListBuilder::new(StringBuilder::new());
1033                for v in values {
1034                    if let Value::List(arr) = v {
1035                        for item in arr {
1036                            if let Some(s) = item.as_str() {
1037                                builder.values().append_value(s);
1038                            } else {
1039                                builder.values().append_null();
1040                            }
1041                        }
1042                        builder.append(true);
1043                    } else {
1044                        builder.append_null();
1045                    }
1046                }
1047                Ok(Arc::new(builder.finish()))
1048            } else {
1049                Err(anyhow!(
1050                    "Unsupported List inner type: {:?}",
1051                    field.data_type()
1052                ))
1053            }
1054        }
1055        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1056            Ok(values_to_datetime_struct_array(values))
1057        }
1058        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1059            Ok(values_to_time_struct_array(values))
1060        }
1061        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1062    }
1063}
1064
1065/// Property value extractor for building Arrow columns from entity properties.
1066pub struct PropertyExtractor<'a> {
1067    data_type: &'a DataType,
1068}
1069
1070impl<'a> PropertyExtractor<'a> {
1071    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1072        Self { data_type }
1073    }
1074
1075    /// Build an Arrow column from a slice of property maps.
1076    /// The `deleted` slice indicates which entries are deleted (use default values).
1077    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1078    where
1079        F: Fn(usize) -> Option<&'a Value>,
1080    {
1081        match self.data_type {
1082            DataType::String => self.build_string_column(len, deleted, get_props),
1083            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1084            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1085            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1086            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1087            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1088            DataType::Vector { dimensions } => {
1089                self.build_vector_column(len, deleted, get_props, *dimensions)
1090            }
1091            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1092            DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1093            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1094            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1095            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1096            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1097            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1098            DataType::Date => self.build_date32_column(len, deleted, get_props),
1099            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1100            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1101            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1102            _ => Err(anyhow!(
1103                "Unsupported data type for arrow conversion: {:?}",
1104                self.data_type
1105            )),
1106        }
1107    }
1108
1109    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1110    where
1111        F: Fn(usize) -> Option<&'a Value>,
1112    {
1113        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1114        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1115            let prop = get_props(i);
1116            if let Some(s) = prop.and_then(|v| v.as_str()) {
1117                builder.append_value(s);
1118            } else if let Some(Value::Temporal(tv)) = prop {
1119                builder.append_value(tv.to_string());
1120            } else if is_deleted {
1121                builder.append_value("");
1122            } else {
1123                builder.append_null();
1124            }
1125        }
1126        Ok(Arc::new(builder.finish()))
1127    }
1128
1129    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1130    where
1131        F: Fn(usize) -> Option<&'a Value>,
1132    {
1133        let mut values = Vec::with_capacity(len);
1134        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1135            let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1136            if val.is_none() && is_deleted {
1137                values.push(Some(0));
1138            } else {
1139                values.push(val);
1140            }
1141        }
1142        Ok(Arc::new(Int32Array::from(values)))
1143    }
1144
1145    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1146    where
1147        F: Fn(usize) -> Option<&'a Value>,
1148    {
1149        let mut values = Vec::with_capacity(len);
1150        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1151            let val = get_props(i).and_then(|v| v.as_i64());
1152            if val.is_none() && is_deleted {
1153                values.push(Some(0));
1154            } else {
1155                values.push(val);
1156            }
1157        }
1158        Ok(Arc::new(Int64Array::from(values)))
1159    }
1160
1161    fn build_timestamp_column<F>(
1162        &self,
1163        len: usize,
1164        deleted: &[bool],
1165        get_props: F,
1166    ) -> Result<ArrayRef>
1167    where
1168        F: Fn(usize) -> Option<&'a Value>,
1169    {
1170        let mut values = Vec::with_capacity(len);
1171        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1172            let val = get_props(i);
1173            let ts = if is_deleted || val.is_none() {
1174                Some(0i64)
1175            } else if let Some(Value::Temporal(tv)) = val {
1176                match tv {
1177                    uni_common::TemporalValue::DateTime {
1178                        nanos_since_epoch, ..
1179                    }
1180                    | uni_common::TemporalValue::LocalDateTime {
1181                        nanos_since_epoch, ..
1182                    } => Some(*nanos_since_epoch),
1183                    _ => None,
1184                }
1185            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1186                Some(v)
1187            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1188                parse_datetime_to_nanos(s)
1189            } else {
1190                None
1191            };
1192
1193            if is_deleted {
1194                values.push(Some(0));
1195            } else {
1196                values.push(ts);
1197            }
1198        }
1199        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1200        Ok(Arc::new(arr))
1201    }
1202
1203    fn build_datetime_struct_column<F>(
1204        &self,
1205        len: usize,
1206        deleted: &[bool],
1207        get_props: F,
1208    ) -> Result<ArrayRef>
1209    where
1210        F: Fn(usize) -> Option<&'a Value>,
1211    {
1212        let values = self.collect_values_or_null(len, deleted, &get_props);
1213        Ok(values_to_datetime_struct_array(&values))
1214    }
1215
1216    fn build_time_struct_column<F>(
1217        &self,
1218        len: usize,
1219        deleted: &[bool],
1220        get_props: F,
1221    ) -> Result<ArrayRef>
1222    where
1223        F: Fn(usize) -> Option<&'a Value>,
1224    {
1225        let values = self.collect_values_or_null(len, deleted, &get_props);
1226        Ok(values_to_time_struct_array(&values))
1227    }
1228
1229    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1230    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1231    where
1232        F: Fn(usize) -> Option<&'a Value>,
1233    {
1234        deleted
1235            .iter()
1236            .enumerate()
1237            .take(len)
1238            .map(|(i, &is_deleted)| {
1239                if is_deleted {
1240                    Value::Null
1241                } else {
1242                    get_props(i).cloned().unwrap_or(Value::Null)
1243                }
1244            })
1245            .collect()
1246    }
1247
1248    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1249    where
1250        F: Fn(usize) -> Option<&'a Value>,
1251    {
1252        let mut builder = Date32Builder::with_capacity(len);
1253        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1254
1255        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1256            let val = get_props(i);
1257            let days = if is_deleted || val.is_none() {
1258                Some(0)
1259            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1260                days_since_epoch,
1261            })) = val
1262            {
1263                Some(*days_since_epoch)
1264            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1265                Some(v as i32)
1266            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1267                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1268                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1269                    Err(_) => None,
1270                }
1271            } else {
1272                None
1273            };
1274
1275            if is_deleted {
1276                builder.append_value(0);
1277            } else if let Some(v) = days {
1278                builder.append_value(v);
1279            } else {
1280                builder.append_null();
1281            }
1282        }
1283        Ok(Arc::new(builder.finish()))
1284    }
1285
1286    fn build_duration_column<F>(
1287        &self,
1288        len: usize,
1289        deleted: &[bool],
1290        get_props: F,
1291    ) -> Result<ArrayRef>
1292    where
1293        F: Fn(usize) -> Option<&'a Value>,
1294    {
1295        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1296        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1297        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1298            let raw_val = get_props(i);
1299            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1300            {
1301                let encoded = uni_common::cypher_value_codec::encode(val);
1302                builder.append_value(&encoded);
1303            } else if is_deleted {
1304                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1305                    months: 0,
1306                    days: 0,
1307                    nanos: 0,
1308                });
1309                let encoded = uni_common::cypher_value_codec::encode(&zero);
1310                builder.append_value(&encoded);
1311            } else {
1312                builder.append_null();
1313            }
1314        }
1315        Ok(Arc::new(builder.finish()))
1316    }
1317
1318    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1319    where
1320        F: Fn(usize) -> Option<&'a Value>,
1321    {
1322        const ENCODED_LEN: i32 = 24;
1323        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1324        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1325            let raw_val = get_props(i);
1326            let btic = match raw_val {
1327                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1328                    uni_btic::Btic::new(*lo, *hi, *meta)
1329                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1330                ),
1331                Some(Value::String(s)) => Some(
1332                    uni_btic::parse::parse_btic_literal(s)
1333                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1334                ),
1335                _ => None,
1336            };
1337
1338            if let Some(b) = btic {
1339                builder.append_value(uni_btic::encode::encode(&b))?;
1340            } else if is_deleted {
1341                builder.append_value([0u8; ENCODED_LEN as usize])?;
1342            } else {
1343                builder.append_null();
1344            }
1345        }
1346        Ok(Arc::new(builder.finish()))
1347    }
1348
1349    fn build_float32_column<F>(
1350        &self,
1351        len: usize,
1352        deleted: &[bool],
1353        get_props: F,
1354    ) -> Result<ArrayRef>
1355    where
1356        F: Fn(usize) -> Option<&'a Value>,
1357    {
1358        let mut values = Vec::with_capacity(len);
1359        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1360            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1361            if val.is_none() && is_deleted {
1362                values.push(Some(0.0));
1363            } else {
1364                values.push(val);
1365            }
1366        }
1367        Ok(Arc::new(Float32Array::from(values)))
1368    }
1369
1370    fn build_float64_column<F>(
1371        &self,
1372        len: usize,
1373        deleted: &[bool],
1374        get_props: F,
1375    ) -> Result<ArrayRef>
1376    where
1377        F: Fn(usize) -> Option<&'a Value>,
1378    {
1379        let mut values = Vec::with_capacity(len);
1380        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1381            let val = get_props(i).and_then(|v| v.as_f64());
1382            if val.is_none() && is_deleted {
1383                values.push(Some(0.0));
1384            } else {
1385                values.push(val);
1386            }
1387        }
1388        Ok(Arc::new(Float64Array::from(values)))
1389    }
1390
1391    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1392    where
1393        F: Fn(usize) -> Option<&'a Value>,
1394    {
1395        let mut values = Vec::with_capacity(len);
1396        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1397            let val = get_props(i).and_then(|v| v.as_bool());
1398            if val.is_none() && is_deleted {
1399                values.push(Some(false));
1400            } else {
1401                values.push(val);
1402            }
1403        }
1404        Ok(Arc::new(BooleanArray::from(values)))
1405    }
1406
1407    fn build_vector_column<F>(
1408        &self,
1409        len: usize,
1410        deleted: &[bool],
1411        get_props: F,
1412        dimensions: usize,
1413    ) -> Result<ArrayRef>
1414    where
1415        F: Fn(usize) -> Option<&'a Value>,
1416    {
1417        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1418
1419        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1420            let val = get_props(i);
1421            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1422            for v in values {
1423                builder.values().append_value(v);
1424            }
1425            builder.append(valid);
1426        }
1427        Ok(Arc::new(builder.finish()))
1428    }
1429
1430    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1431    where
1432        F: Fn(usize) -> Option<&'a Value>,
1433    {
1434        let null_val = Value::Null;
1435        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1436        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1437            let val = get_props(i);
1438            let uni_val = if val.is_none() && is_deleted {
1439                &null_val
1440            } else {
1441                val.unwrap_or(&null_val)
1442            };
1443            // Encode to CypherValue (MessagePack-tagged)
1444            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1445            builder.append_value(&cv_bytes);
1446        }
1447        Ok(Arc::new(builder.finish()))
1448    }
1449
1450    fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1451    where
1452        F: Fn(usize) -> Option<&'a Value>,
1453    {
1454        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1455        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1456            let val = get_props(i);
1457            if let Some(Value::Bytes(b)) = val {
1458                builder.append_value(b);
1459            } else if is_deleted {
1460                builder.append_value(&[][..]);
1461            } else {
1462                builder.append_null();
1463            }
1464        }
1465        Ok(Arc::new(builder.finish()))
1466    }
1467
1468    fn build_list_column<F>(
1469        &self,
1470        len: usize,
1471        deleted: &[bool],
1472        get_props: F,
1473        inner: &DataType,
1474    ) -> Result<ArrayRef>
1475    where
1476        F: Fn(usize) -> Option<&'a Value>,
1477    {
1478        match inner {
1479            DataType::String => {
1480                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1481                    if let Some(s) = v.as_str() {
1482                        b.append_value(s);
1483                    } else {
1484                        b.append_null();
1485                    }
1486                })
1487            }
1488            DataType::Int64 => {
1489                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1490                    if let Some(n) = v.as_i64() {
1491                        b.append_value(n);
1492                    } else {
1493                        b.append_null();
1494                    }
1495                })
1496            }
1497            DataType::Float64 => {
1498                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1499                    if let Some(f) = v.as_f64() {
1500                        b.append_value(f);
1501                    } else {
1502                        b.append_null();
1503                    }
1504                })
1505            }
1506            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1507        }
1508    }
1509
1510    /// Generic helper to build a list column with any inner builder type.
1511    fn build_typed_list<F, B, A>(
1512        &self,
1513        len: usize,
1514        deleted: &[bool],
1515        get_props: &F,
1516        inner_builder: B,
1517        mut append_value: A,
1518    ) -> Result<ArrayRef>
1519    where
1520        F: Fn(usize) -> Option<&'a Value>,
1521        B: arrow_array::builder::ArrayBuilder,
1522        A: FnMut(&Value, &mut B),
1523    {
1524        let mut builder = ListBuilder::new(inner_builder);
1525        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1526            let val_array = get_props(i).and_then(|v| v.as_array());
1527            if val_array.is_none() && is_deleted {
1528                builder.append_null();
1529            } else if let Some(arr) = val_array {
1530                for v in arr {
1531                    append_value(v, builder.values());
1532                }
1533                builder.append(true);
1534            } else {
1535                builder.append_null();
1536            }
1537        }
1538        Ok(Arc::new(builder.finish()))
1539    }
1540
1541    fn build_map_column<F>(
1542        &self,
1543        len: usize,
1544        deleted: &[bool],
1545        get_props: F,
1546        key: &DataType,
1547        value: &DataType,
1548    ) -> Result<ArrayRef>
1549    where
1550        F: Fn(usize) -> Option<&'a Value>,
1551    {
1552        if !matches!(key, DataType::String) {
1553            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1554        }
1555
1556        match value {
1557            DataType::String => self.build_typed_map(
1558                len,
1559                deleted,
1560                &get_props,
1561                StringBuilder::new(),
1562                arrow_schema::DataType::Utf8,
1563                |v, b: &mut StringBuilder| {
1564                    if let Some(s) = v.as_str() {
1565                        b.append_value(s);
1566                    } else {
1567                        b.append_null();
1568                    }
1569                },
1570            ),
1571            DataType::Int64 => self.build_typed_map(
1572                len,
1573                deleted,
1574                &get_props,
1575                Int64Builder::new(),
1576                arrow_schema::DataType::Int64,
1577                |v, b: &mut Int64Builder| {
1578                    if let Some(n) = v.as_i64() {
1579                        b.append_value(n);
1580                    } else {
1581                        b.append_null();
1582                    }
1583                },
1584            ),
1585            _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1586        }
1587    }
1588
1589    /// Generic helper to build a map column with any value builder type.
1590    fn build_typed_map<F, B, A>(
1591        &self,
1592        len: usize,
1593        deleted: &[bool],
1594        get_props: &F,
1595        value_builder: B,
1596        value_arrow_type: arrow_schema::DataType,
1597        mut append_value: A,
1598    ) -> Result<ArrayRef>
1599    where
1600        F: Fn(usize) -> Option<&'a Value>,
1601        B: arrow_array::builder::ArrayBuilder,
1602        A: FnMut(&Value, &mut B),
1603    {
1604        let key_builder = Box::new(StringBuilder::new());
1605        let value_builder = Box::new(value_builder);
1606        let struct_builder = StructBuilder::new(
1607            vec![
1608                Field::new("key", arrow_schema::DataType::Utf8, false),
1609                Field::new("value", value_arrow_type, true),
1610            ],
1611            vec![key_builder, value_builder],
1612        );
1613        let mut builder = ListBuilder::new(struct_builder);
1614
1615        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1616            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1617        }
1618        Ok(Arc::new(builder.finish()))
1619    }
1620
1621    /// Append a single map entry to the list builder.
1622    fn append_map_entry<B, A>(
1623        &self,
1624        builder: &mut ListBuilder<StructBuilder>,
1625        val: Option<&'a Value>,
1626        is_deleted: bool,
1627        append_value: &mut A,
1628    ) where
1629        B: arrow_array::builder::ArrayBuilder,
1630        A: FnMut(&Value, &mut B),
1631    {
1632        let val_obj = val.and_then(|v| v.as_object());
1633        if val_obj.is_none() && is_deleted {
1634            builder.append(false);
1635        } else if let Some(obj) = val_obj {
1636            let struct_b = builder.values();
1637            for (k, v) in obj {
1638                struct_b
1639                    .field_builder::<StringBuilder>(0)
1640                    .unwrap()
1641                    .append_value(k);
1642                // Safety: We know the value builder type matches B
1643                let value_b = struct_b.field_builder::<B>(1).unwrap();
1644                append_value(v, value_b);
1645                struct_b.append(true);
1646            }
1647            builder.append(true);
1648        } else {
1649            builder.append(false);
1650        }
1651    }
1652
1653    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1654    where
1655        F: Fn(usize) -> Option<&'a Value>,
1656    {
1657        let mut builder = BinaryBuilder::new();
1658        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1659            if is_deleted {
1660                builder.append_null();
1661                continue;
1662            }
1663            if let Some(val) = get_props(i) {
1664                // Try to parse CRDT from the value
1665                // If it's a string, first parse it as JSON, then as CRDT
1666                let crdt_result = if let Some(s) = val.as_str() {
1667                    serde_json::from_str::<Crdt>(s)
1668                } else {
1669                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1670                    let json_val: serde_json::Value = val.clone().into();
1671                    serde_json::from_value::<Crdt>(json_val)
1672                };
1673
1674                if let Ok(crdt) = crdt_result {
1675                    if let Ok(bytes) = crdt.to_msgpack() {
1676                        builder.append_value(&bytes);
1677                    } else {
1678                        builder.append_null();
1679                    }
1680                } else {
1681                    builder.append_null();
1682                }
1683            } else {
1684                builder.append_null();
1685            }
1686        }
1687        Ok(Arc::new(builder.finish()))
1688    }
1689}
1690
1691/// Build a column for edge entries (no deleted flag handling needed).
1692pub fn build_edge_column<'a>(
1693    name: &'a str,
1694    data_type: &'a DataType,
1695    len: usize,
1696    get_props: impl Fn(usize) -> Option<&'a Value>,
1697) -> Result<ArrayRef> {
1698    // For edges, use empty deleted array
1699    let deleted = vec![false; len];
1700    let extractor = PropertyExtractor::new(name, data_type);
1701    extractor.build_column(len, &deleted, get_props)
1702}
1703
1704#[cfg(test)]
1705mod tests {
1706    use super::*;
1707    use arrow_array::{
1708        Array, DurationMicrosecondArray,
1709        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1710    };
1711    use std::collections::HashMap;
1712    use uni_common::TemporalValue;
1713    use uni_crdt::{Crdt, GCounter};
1714
1715    #[test]
1716    fn test_arrow_to_value_string() {
1717        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1718        assert_eq!(
1719            arrow_to_value(&arr, 0, None),
1720            Value::String("hello".to_string())
1721        );
1722        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1723        assert_eq!(
1724            arrow_to_value(&arr, 2, None),
1725            Value::String("world".to_string())
1726        );
1727    }
1728
1729    #[test]
1730    fn test_arrow_to_value_int64() {
1731        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1732        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1733        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1734        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1735    }
1736
1737    #[test]
1738    #[allow(clippy::approx_constant)]
1739    fn test_arrow_to_value_float64() {
1740        let arr = Float64Array::from(vec![Some(3.14), None]);
1741        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1742        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1743    }
1744
1745    #[test]
1746    fn test_arrow_to_value_bool() {
1747        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1748        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1749        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1750        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1751    }
1752
1753    #[test]
1754    fn test_values_to_array_int64() {
1755        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1756        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1757        assert_eq!(arr.len(), 4);
1758
1759        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1760        assert_eq!(int_arr.value(0), 1);
1761        assert_eq!(int_arr.value(1), 2);
1762        assert!(int_arr.is_null(2));
1763        assert_eq!(int_arr.value(3), 4);
1764    }
1765
1766    #[test]
1767    fn test_values_to_array_string() {
1768        let values = vec![
1769            Value::String("a".to_string()),
1770            Value::String("b".to_string()),
1771            Value::Null,
1772        ];
1773        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1774        assert_eq!(arr.len(), 3);
1775
1776        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1777        assert_eq!(str_arr.value(0), "a");
1778        assert_eq!(str_arr.value(1), "b");
1779        assert!(str_arr.is_null(2));
1780    }
1781
1782    #[test]
1783    fn test_property_extractor_string() {
1784        let props: Vec<HashMap<String, Value>> = vec![
1785            [("name".to_string(), Value::String("Alice".to_string()))]
1786                .into_iter()
1787                .collect(),
1788            [("name".to_string(), Value::String("Bob".to_string()))]
1789                .into_iter()
1790                .collect(),
1791            HashMap::new(),
1792        ];
1793        let deleted = vec![false, false, true];
1794
1795        let extractor = PropertyExtractor::new("name", &DataType::String);
1796        let arr = extractor
1797            .build_column(3, &deleted, |i| props[i].get("name"))
1798            .unwrap();
1799
1800        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1801        assert_eq!(str_arr.value(0), "Alice");
1802        assert_eq!(str_arr.value(1), "Bob");
1803        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
1804    }
1805
1806    #[test]
1807    fn test_property_extractor_int64() {
1808        let props: Vec<HashMap<String, Value>> = vec![
1809            [("age".to_string(), Value::Int(25))].into_iter().collect(),
1810            [("age".to_string(), Value::Int(30))].into_iter().collect(),
1811            HashMap::new(),
1812        ];
1813        let deleted = vec![false, false, true];
1814
1815        let extractor = PropertyExtractor::new("age", &DataType::Int64);
1816        let arr = extractor
1817            .build_column(3, &deleted, |i| props[i].get("age"))
1818            .unwrap();
1819
1820        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1821        assert_eq!(int_arr.value(0), 25);
1822        assert_eq!(int_arr.value(1), 30);
1823        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
1824    }
1825
1826    #[test]
1827    fn test_property_extractor_bytes_roundtrip() {
1828        let blob = vec![0u8, 1, 2, 255];
1829        let props: Vec<HashMap<String, Value>> = vec![
1830            [("blob".to_string(), Value::Bytes(blob.clone()))]
1831                .into_iter()
1832                .collect(),
1833            [("blob".to_string(), Value::Bytes(Vec::new()))]
1834                .into_iter()
1835                .collect(),
1836            HashMap::new(),
1837        ];
1838        let deleted = vec![false, false, false];
1839
1840        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1841        let arr = extractor
1842            .build_column(3, &deleted, |i| props[i].get("blob"))
1843            .unwrap();
1844
1845        // Decode each row through arrow_to_value with Bytes hint.
1846        assert_eq!(
1847            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1848            Value::Bytes(blob)
1849        );
1850        assert_eq!(
1851            arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
1852            Value::Bytes(Vec::new())
1853        );
1854        // Missing property → null in the Arrow array.
1855        assert_eq!(
1856            arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
1857            Value::Null
1858        );
1859    }
1860
1861    #[test]
1862    fn test_bytes_vs_cypher_value_disambiguation() {
1863        // A LargeBinary column tagged as DataType::Bytes must NOT be decoded
1864        // through the CypherValue MessagePack codec, even though both share
1865        // the same Arrow physical type.
1866        let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1867        let props: Vec<HashMap<String, Value>> = vec![
1868            [("blob".to_string(), Value::Bytes(raw.clone()))]
1869                .into_iter()
1870                .collect(),
1871        ];
1872        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1873        let arr = extractor
1874            .build_column(1, &[false], |i| props[i].get("blob"))
1875            .unwrap();
1876        // With schema hint: raw bytes returned.
1877        assert_eq!(
1878            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1879            Value::Bytes(raw)
1880        );
1881    }
1882
1883    #[test]
1884    fn test_data_type_bytes_to_arrow() {
1885        assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
1886    }
1887
1888    #[test]
1889    fn test_arrow_to_value_time64() {
1890        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
1891        let mut builder = Time64MicrosecondBuilder::new();
1892        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
1893        builder.append_value(37_845_000_000);
1894        // 00:00:00 = 0 microseconds
1895        builder.append_value(0);
1896        // 23:59:59.123456 = 86399.123456 seconds
1897        builder.append_value(86_399_123_456);
1898        builder.append_null();
1899
1900        let arr = builder.finish();
1901        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
1902        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1903        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1904        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1905        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1906    }
1907
1908    #[test]
1909    fn test_arrow_to_value_duration() {
1910        // Test DurationMicrosecondArray conversion
1911        // Arrow→Value now returns Value::Temporal(Duration)
1912        let arr = DurationMicrosecondArray::from(vec![
1913            Some(1_000_000),      // 1 second in microseconds
1914            Some(3_600_000_000),  // 1 hour
1915            Some(86_400_000_000), // 1 day
1916            None,
1917        ]);
1918
1919        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1920        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1921        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1922        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1923    }
1924
1925    #[test]
1926    fn test_arrow_to_value_binary_crdt() {
1927        // Test BinaryArray (CRDT) conversion - round-trip test
1928        let mut builder = BinaryBuilder::new();
1929
1930        // Create a GCounter CRDT and serialize it
1931        let mut counter = GCounter::new();
1932        counter.increment("actor1", 5);
1933        let crdt = Crdt::GCounter(counter);
1934        let bytes = crdt.to_msgpack().unwrap();
1935        builder.append_value(&bytes);
1936
1937        // Add a null value
1938        builder.append_null();
1939
1940        let arr = builder.finish();
1941
1942        // The first value should deserialize back to a map
1943        let result = arrow_to_value(&arr, 0, None);
1944        assert!(result.as_object().is_some());
1945        let obj = result.as_object().unwrap();
1946        // GCounter serializes with tag "t": "gc"
1947        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1948
1949        // Null value should return null
1950        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1951    }
1952
1953    #[test]
1954    fn test_datetime_struct_encode_decode_roundtrip() {
1955        // Test DateTime struct encoding with offset and timezone preservation
1956        let values = vec![
1957            Value::Temporal(TemporalValue::DateTime {
1958                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
1959                offset_seconds: 3600,                  // +01:00
1960                timezone_name: Some("Europe/Paris".to_string()),
1961            }),
1962            Value::Temporal(TemporalValue::DateTime {
1963                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
1964                offset_seconds: -18000,                 // -05:00
1965                timezone_name: None,
1966            }),
1967            Value::Temporal(TemporalValue::DateTime {
1968                nanos_since_epoch: 0, // Unix epoch
1969                offset_seconds: 0,
1970                timezone_name: Some("UTC".to_string()),
1971            }),
1972        ];
1973
1974        // Encode to Arrow struct
1975        let arr_ref = values_to_datetime_struct_array(&values);
1976        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1977        assert_eq!(arr.len(), 3);
1978
1979        // Decode back to Value
1980        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1981        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1982        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1983
1984        // Verify round-trip preserves all fields
1985        assert_eq!(decoded_0, values[0]);
1986        assert_eq!(decoded_1, values[1]);
1987        assert_eq!(decoded_2, values[2]);
1988
1989        // Verify struct field extraction
1990        if let Value::Temporal(TemporalValue::DateTime {
1991            nanos_since_epoch,
1992            offset_seconds,
1993            timezone_name,
1994        }) = decoded_0
1995        {
1996            assert_eq!(nanos_since_epoch, 441763200000000000);
1997            assert_eq!(offset_seconds, 3600);
1998            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1999        } else {
2000            panic!("Expected DateTime value");
2001        }
2002    }
2003
2004    #[test]
2005    fn test_datetime_struct_null_handling() {
2006        // Test DateTime struct with null values
2007        let values = vec![
2008            Value::Temporal(TemporalValue::DateTime {
2009                nanos_since_epoch: 441763200000000000,
2010                offset_seconds: 3600,
2011                timezone_name: Some("Europe/Paris".to_string()),
2012            }),
2013            Value::Null,
2014            Value::Temporal(TemporalValue::DateTime {
2015                nanos_since_epoch: 0,
2016                offset_seconds: 0,
2017                timezone_name: None,
2018            }),
2019        ];
2020
2021        let arr_ref = values_to_datetime_struct_array(&values);
2022        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2023        assert_eq!(arr.len(), 3);
2024
2025        // Check first value is valid
2026        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2027        assert_eq!(decoded_0, values[0]);
2028
2029        // Check second value is null
2030        assert!(arr.is_null(1));
2031        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2032        assert_eq!(decoded_1, Value::Null);
2033
2034        // Check third value is valid
2035        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2036        assert_eq!(decoded_2, values[2]);
2037    }
2038
2039    #[test]
2040    fn test_datetime_struct_boundary_values() {
2041        // Test boundary values: offset=0, large positive/negative offsets
2042        let values = vec![
2043            Value::Temporal(TemporalValue::DateTime {
2044                nanos_since_epoch: 441763200000000000,
2045                offset_seconds: 0, // UTC
2046                timezone_name: None,
2047            }),
2048            Value::Temporal(TemporalValue::DateTime {
2049                nanos_since_epoch: 441763200000000000,
2050                offset_seconds: 43200, // +12:00 (max typical offset)
2051                timezone_name: None,
2052            }),
2053            Value::Temporal(TemporalValue::DateTime {
2054                nanos_since_epoch: 441763200000000000,
2055                offset_seconds: -43200, // -12:00 (min typical offset)
2056                timezone_name: None,
2057            }),
2058        ];
2059
2060        let arr_ref = values_to_datetime_struct_array(&values);
2061        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2062        assert_eq!(arr.len(), 3);
2063
2064        // Verify round-trip for all boundary values
2065        for (i, expected) in values.iter().enumerate() {
2066            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2067            assert_eq!(&decoded, expected);
2068        }
2069    }
2070
2071    #[test]
2072    fn test_datetime_old_schema_migration() {
2073        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
2074        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2075        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
2076        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
2077        builder.append_null();
2078
2079        let arr = builder.finish();
2080
2081        // Decode with DataType::DateTime hint should migrate old schema
2082        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2083        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2084        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2085
2086        // Old schema should default to offset=0, preserve timezone
2087        if let Value::Temporal(TemporalValue::DateTime {
2088            nanos_since_epoch,
2089            offset_seconds,
2090            timezone_name,
2091        }) = decoded_0
2092        {
2093            assert_eq!(nanos_since_epoch, 441763200000000000);
2094            assert_eq!(offset_seconds, 0);
2095            assert_eq!(timezone_name, Some("UTC".to_string()));
2096        } else {
2097            panic!("Expected DateTime value");
2098        }
2099
2100        // Verify null handling
2101        assert_eq!(decoded_2, Value::Null);
2102    }
2103
2104    #[test]
2105    fn test_time_struct_encode_decode_roundtrip() {
2106        // Test Time struct encoding with offset preservation
2107        let values = vec![
2108            Value::Temporal(TemporalValue::Time {
2109                nanos_since_midnight: 37845000000000, // 10:30:45
2110                offset_seconds: 3600,                 // +01:00
2111            }),
2112            Value::Temporal(TemporalValue::Time {
2113                nanos_since_midnight: 0, // 00:00:00
2114                offset_seconds: 0,
2115            }),
2116            Value::Temporal(TemporalValue::Time {
2117                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
2118                offset_seconds: -18000,               // -05:00
2119            }),
2120        ];
2121
2122        // Encode to Arrow struct
2123        let arr_ref = values_to_time_struct_array(&values);
2124        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2125        assert_eq!(arr.len(), 3);
2126
2127        // Decode back to Value
2128        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2129        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2130        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2131
2132        // Verify round-trip preserves all fields
2133        assert_eq!(decoded_0, values[0]);
2134        assert_eq!(decoded_1, values[1]);
2135        assert_eq!(decoded_2, values[2]);
2136
2137        // Verify struct field extraction
2138        if let Value::Temporal(TemporalValue::Time {
2139            nanos_since_midnight,
2140            offset_seconds,
2141        }) = decoded_0
2142        {
2143            assert_eq!(nanos_since_midnight, 37845000000000);
2144            assert_eq!(offset_seconds, 3600);
2145        } else {
2146            panic!("Expected Time value");
2147        }
2148    }
2149
2150    #[test]
2151    fn test_time_struct_null_handling() {
2152        // Test Time struct with null values
2153        let values = vec![
2154            Value::Temporal(TemporalValue::Time {
2155                nanos_since_midnight: 37845000000000,
2156                offset_seconds: 3600,
2157            }),
2158            Value::Null,
2159            Value::Temporal(TemporalValue::Time {
2160                nanos_since_midnight: 0,
2161                offset_seconds: 0,
2162            }),
2163        ];
2164
2165        let arr_ref = values_to_time_struct_array(&values);
2166        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2167        assert_eq!(arr.len(), 3);
2168
2169        // Check first value is valid
2170        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2171        assert_eq!(decoded_0, values[0]);
2172
2173        // Check second value is null
2174        assert!(arr.is_null(1));
2175        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2176        assert_eq!(decoded_1, Value::Null);
2177
2178        // Check third value is valid
2179        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2180        assert_eq!(decoded_2, values[2]);
2181    }
2182
2183    // Tests for extract_vector_f32_values
2184
2185    #[test]
2186    fn test_extract_vector_f32_values_valid_vector() {
2187        let v = vec![1.0, 2.0, 3.0];
2188        let val = Value::Vector(v.clone());
2189        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2190        assert_eq!(result, v);
2191        assert!(valid);
2192    }
2193
2194    #[test]
2195    fn test_extract_vector_f32_values_vector_wrong_dims() {
2196        let v = vec![1.0, 2.0];
2197        let val = Value::Vector(v);
2198        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2199        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2200        assert!(!valid);
2201    }
2202
2203    #[test]
2204    fn test_extract_vector_f32_values_valid_list() {
2205        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2206        let val = Value::List(v);
2207        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2208        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2209        assert!(valid);
2210    }
2211
2212    #[test]
2213    fn test_extract_vector_f32_values_list_wrong_dims() {
2214        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2215        let val = Value::List(v);
2216        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2217        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2218        assert!(!valid);
2219    }
2220
2221    #[test]
2222    fn test_extract_vector_f32_values_list_int_coercion() {
2223        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2224        let val = Value::List(v);
2225        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2226        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2227        assert!(valid);
2228    }
2229
2230    #[test]
2231    fn test_extract_vector_f32_values_none() {
2232        let (result, valid) = extract_vector_f32_values(None, false, 3);
2233        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2234        assert!(!valid);
2235    }
2236
2237    #[test]
2238    fn test_extract_vector_f32_values_null() {
2239        let val = Value::Null;
2240        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2241        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2242        assert!(!valid);
2243    }
2244
2245    #[test]
2246    fn test_extract_vector_f32_values_unsupported_type() {
2247        let val = Value::String("not a vector".to_string());
2248        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2249        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2250        assert!(!valid);
2251    }
2252
2253    #[test]
2254    fn test_extract_vector_f32_values_deleted_with_none() {
2255        let (result, valid) = extract_vector_f32_values(None, true, 3);
2256        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2257        assert!(valid); // Deleted entries are marked as valid with zeros
2258    }
2259
2260    #[test]
2261    fn test_extract_vector_f32_values_deleted_with_null() {
2262        let val = Value::Null;
2263        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2264        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2265        assert!(valid); // Deleted entries are marked as valid with zeros
2266    }
2267
2268    // Tests for values_to_array with FixedSizeList
2269
2270    #[test]
2271    fn test_values_to_fixed_size_list_vector_with_nulls() {
2272        let values = vec![
2273            Value::Vector(vec![1.0, 2.0]),
2274            Value::Null,
2275            Value::Vector(vec![3.0, 4.0]),
2276            Value::String("invalid".to_string()),
2277        ];
2278        let arr_ref = values_to_array(
2279            &values,
2280            &ArrowDataType::FixedSizeList(
2281                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2282                2,
2283            ),
2284        )
2285        .unwrap();
2286
2287        let arr = arr_ref
2288            .as_any()
2289            .downcast_ref::<FixedSizeListArray>()
2290            .unwrap();
2291
2292        assert_eq!(arr.len(), 4);
2293        assert!(arr.is_valid(0));
2294        assert!(!arr.is_valid(1)); // Null value
2295        assert!(arr.is_valid(2));
2296        assert!(!arr.is_valid(3)); // Invalid type
2297    }
2298
2299    #[test]
2300    fn test_values_to_fixed_size_list_from_list() {
2301        let values = vec![
2302            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2303            Value::List(vec![Value::Int(3), Value::Int(4)]),
2304        ];
2305        let arr_ref = values_to_array(
2306            &values,
2307            &ArrowDataType::FixedSizeList(
2308                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2309                2,
2310            ),
2311        )
2312        .unwrap();
2313
2314        let arr = arr_ref
2315            .as_any()
2316            .downcast_ref::<FixedSizeListArray>()
2317            .unwrap();
2318
2319        assert_eq!(arr.len(), 2);
2320        assert!(arr.is_valid(0));
2321        assert!(arr.is_valid(1));
2322
2323        // Check values
2324        let child = arr
2325            .values()
2326            .as_any()
2327            .downcast_ref::<Float32Array>()
2328            .unwrap();
2329        assert_eq!(child.value(0), 1.0);
2330        assert_eq!(child.value(1), 2.0);
2331        assert_eq!(child.value(2), 3.0);
2332        assert_eq!(child.value(3), 4.0);
2333    }
2334
2335    #[test]
2336    fn test_values_to_fixed_size_list_wrong_dimensions() {
2337        let values = vec![
2338            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2339            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2340        ];
2341        let arr_ref = values_to_array(
2342            &values,
2343            &ArrowDataType::FixedSizeList(
2344                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2345                2,
2346            ),
2347        )
2348        .unwrap();
2349
2350        let arr = arr_ref
2351            .as_any()
2352            .downcast_ref::<FixedSizeListArray>()
2353            .unwrap();
2354
2355        assert_eq!(arr.len(), 2);
2356        assert!(!arr.is_valid(0)); // Wrong dimensions
2357        assert!(!arr.is_valid(1)); // Wrong dimensions
2358
2359        // Check that child array has zeros for invalid entries
2360        let child = arr
2361            .values()
2362            .as_any()
2363            .downcast_ref::<Float32Array>()
2364            .unwrap();
2365        assert_eq!(child.value(0), 0.0);
2366        assert_eq!(child.value(1), 0.0);
2367        assert_eq!(child.value(2), 0.0);
2368        assert_eq!(child.value(3), 0.0);
2369    }
2370
2371    #[test]
2372    fn test_values_to_fixed_size_list_all_nulls() {
2373        let values = vec![Value::Null, Value::Null, Value::Null];
2374        let arr_ref = values_to_array(
2375            &values,
2376            &ArrowDataType::FixedSizeList(
2377                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2378                3,
2379            ),
2380        )
2381        .unwrap();
2382
2383        let arr = arr_ref
2384            .as_any()
2385            .downcast_ref::<FixedSizeListArray>()
2386            .unwrap();
2387
2388        assert_eq!(arr.len(), 3);
2389        assert!(!arr.is_valid(0));
2390        assert!(!arr.is_valid(1));
2391        assert!(!arr.is_valid(2));
2392
2393        // Verify child array length is correct (3 rows × 3 dims = 9)
2394        let child = arr
2395            .values()
2396            .as_any()
2397            .downcast_ref::<Float32Array>()
2398            .unwrap();
2399        assert_eq!(child.len(), 9);
2400    }
2401
2402    #[test]
2403    fn test_values_to_fixed_size_list_mixed_types() {
2404        let values = vec![
2405            Value::Vector(vec![1.0, 2.0]),
2406            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2407            Value::Null,
2408            Value::String("invalid".to_string()),
2409        ];
2410        let arr_ref = values_to_array(
2411            &values,
2412            &ArrowDataType::FixedSizeList(
2413                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2414                2,
2415            ),
2416        )
2417        .unwrap();
2418
2419        let arr = arr_ref
2420            .as_any()
2421            .downcast_ref::<FixedSizeListArray>()
2422            .unwrap();
2423
2424        assert_eq!(arr.len(), 4);
2425        assert!(arr.is_valid(0)); // Value::Vector
2426        assert!(arr.is_valid(1)); // Value::List
2427        assert!(!arr.is_valid(2)); // Value::Null
2428        assert!(!arr.is_valid(3)); // Value::String
2429
2430        // Check values for valid entries
2431        let child = arr
2432            .values()
2433            .as_any()
2434            .downcast_ref::<Float32Array>()
2435            .unwrap();
2436        assert_eq!(child.value(0), 1.0);
2437        assert_eq!(child.value(1), 2.0);
2438        assert_eq!(child.value(2), 3.0);
2439        assert_eq!(child.value(3), 4.0);
2440    }
2441
2442    // Tests for PropertyExtractor::build_vector_column
2443
2444    #[test]
2445    fn test_build_vector_column_with_nulls_and_deleted() {
2446        let data_type = DataType::Vector { dimensions: 3 };
2447        let extractor = PropertyExtractor::new("test_vec", &data_type);
2448
2449        let props = [
2450            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2451            None,              // Missing property
2452            Some(Value::Null), // Null value
2453            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2454        ];
2455        let deleted = [false, false, false, true]; // Last one is deleted
2456
2457        let arr_ref = extractor
2458            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2459            .unwrap();
2460
2461        let arr = arr_ref
2462            .as_any()
2463            .downcast_ref::<FixedSizeListArray>()
2464            .unwrap();
2465
2466        assert_eq!(arr.len(), 4);
2467        assert!(arr.is_valid(0)); // Valid vector
2468        assert!(!arr.is_valid(1)); // Missing property
2469        assert!(!arr.is_valid(2)); // Null value
2470        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2471
2472        // Check values
2473        let child = arr
2474            .values()
2475            .as_any()
2476            .downcast_ref::<Float32Array>()
2477            .unwrap();
2478        assert_eq!(child.value(0), 1.0);
2479        assert_eq!(child.value(1), 2.0);
2480        assert_eq!(child.value(2), 3.0);
2481        // Indices 3-5: zeros for missing
2482        // Indices 6-8: zeros for null
2483        // Indices 9-11: zeros for deleted (but marked as valid)
2484        assert_eq!(child.value(9), 0.0);
2485        assert_eq!(child.value(10), 0.0);
2486        assert_eq!(child.value(11), 0.0);
2487    }
2488
2489    #[test]
2490    fn test_build_vector_column_with_list_input() {
2491        let data_type = DataType::Vector { dimensions: 2 };
2492        let extractor = PropertyExtractor::new("test_vec", &data_type);
2493
2494        let props = [
2495            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2496            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2497            Some(Value::Vector(vec![5.0, 6.0])),
2498        ];
2499        let deleted = [false, false, false];
2500
2501        let arr_ref = extractor
2502            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2503            .unwrap();
2504
2505        let arr = arr_ref
2506            .as_any()
2507            .downcast_ref::<FixedSizeListArray>()
2508            .unwrap();
2509
2510        assert_eq!(arr.len(), 3);
2511        assert!(arr.is_valid(0));
2512        assert!(arr.is_valid(1));
2513        assert!(arr.is_valid(2));
2514
2515        // Check values
2516        let child = arr
2517            .values()
2518            .as_any()
2519            .downcast_ref::<Float32Array>()
2520            .unwrap();
2521        assert_eq!(child.value(0), 1.0);
2522        assert_eq!(child.value(1), 2.0);
2523        assert_eq!(child.value(2), 3.0);
2524        assert_eq!(child.value(3), 4.0);
2525        assert_eq!(child.value(4), 5.0);
2526        assert_eq!(child.value(5), 6.0);
2527    }
2528}