Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    let key_col = structs.column(0);
154    let val_col = structs.column(1);
155    let mut map = HashMap::new();
156    for i in 0..structs.len() {
157        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158            map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159        }
160    }
161    Some(map)
162}
163
164/// Convert all elements of an Arrow array into a `Vec<Value>`.
165fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166    (0..arr.len())
167        .map(|i| arrow_to_value(arr.as_ref(), i, None))
168        .collect()
169}
170
171/// Convert an Arrow array value at a given row index to a Uni Value.
172///
173/// Handles all common Arrow types and recursively processes nested structures
174/// like Lists and Structs. The optional `data_type` parameter provides schema
175/// context for decoding DateTime and Time struct arrays; when provided, it
176/// takes precedence over runtime type detection.
177pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178    if col.is_null(row) {
179        return Value::Null;
180    }
181
182    // Schema-driven decode for DateTime and Time structs
183    if let Some(dt) = data_type {
184        match dt {
185            DataType::DateTime => {
186                // Expect StructArray with three fields
187                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189                        struct_arr.column_by_name("nanos_since_epoch"),
190                        struct_arr.column_by_name("offset_seconds"),
191                        struct_arr.column_by_name("timezone_name"),
192                    )
193                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194                        nanos_col
195                            .as_any()
196                            .downcast_ref::<TimestampNanosecondArray>(),
197                        offset_col.as_any().downcast_ref::<Int32Array>(),
198                        tz_col.as_any().downcast_ref::<StringArray>(),
199                    )
200                {
201                    if nanos_arr.is_null(row) {
202                        return Value::Null;
203                    }
204                    let nanos = nanos_arr.value(row);
205                    if offset_arr.is_null(row) {
206                        // No offset → LocalDateTime
207                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208                            nanos_since_epoch: nanos,
209                        });
210                    }
211                    let offset = offset_arr.value(row);
212                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213                    return Value::Temporal(uni_common::TemporalValue::DateTime {
214                        nanos_since_epoch: nanos,
215                        offset_seconds: offset,
216                        timezone_name: tz_name,
217                    });
218                }
219                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
220                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221                    let nanos = ts.value(row);
222                    let tz_name = ts.timezone().map(|s| s.to_string());
223                    return Value::Temporal(uni_common::TemporalValue::DateTime {
224                        nanos_since_epoch: nanos,
225                        offset_seconds: 0,
226                        timezone_name: tz_name,
227                    });
228                }
229            }
230            DataType::Time => {
231                // Expect StructArray with two fields
232                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233                    && let (Some(nanos_col), Some(offset_col)) = (
234                        struct_arr.column_by_name("nanos_since_midnight"),
235                        struct_arr.column_by_name("offset_seconds"),
236                    )
237                    && let (Some(nanos_arr), Some(offset_arr)) = (
238                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239                        offset_col.as_any().downcast_ref::<Int32Array>(),
240                    )
241                {
242                    // Check field-level nulls before calling .value()
243                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244                        return Value::Null;
245                    }
246                    let nanos = nanos_arr.value(row);
247                    let offset = offset_arr.value(row);
248                    return Value::Temporal(uni_common::TemporalValue::Time {
249                        nanos_since_midnight: nanos,
250                        offset_seconds: offset,
251                    });
252                }
253                // Fall back to old schema: Time64Nanosecond → Time with offset=0
254                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255                    let nanos = t.value(row);
256                    return Value::Temporal(uni_common::TemporalValue::Time {
257                        nanos_since_midnight: nanos,
258                        offset_seconds: 0,
259                    });
260                }
261            }
262            DataType::Btic => {
263                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
264                    log::warn!("BTIC column is not FixedSizeBinaryArray");
265                    return Value::Null;
266                };
267                let bytes = fsb.value(row);
268                return match uni_btic::encode::decode_slice(bytes) {
269                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
270                        lo: btic.lo(),
271                        hi: btic.hi(),
272                        meta: btic.meta(),
273                    }),
274                    Err(e) => {
275                        log::warn!("BTIC decode error: {}", e);
276                        Value::Null
277                    }
278                };
279            }
280            _ => {}
281        }
282    }
283
284    // String types
285    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
286        return Value::String(s.value(row).to_string());
287    }
288
289    // Integer types
290    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
291        return Value::Int(u.value(row) as i64);
292    }
293    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
294        return Value::Int(i.value(row));
295    }
296    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
297        return Value::Int(i.value(row) as i64);
298    }
299
300    // Float types
301    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
302        return Value::Float(f.value(row));
303    }
304    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
305        return Value::Float(f.value(row) as f64);
306    }
307
308    // Boolean type
309    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
310        return Value::Bool(b.value(row));
311    }
312
313    // Fixed-size list (vectors)
314    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
315        return Value::List(array_to_value_list(&list.value(row)));
316    }
317
318    // Variable-size list
319    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
320        let arr = list.value(row);
321
322        // Map types are stored as List(Struct(key, value)); reconstruct as map
323        if let Some(obj) = try_reconstruct_map(&arr) {
324            return Value::Map(obj);
325        }
326
327        return Value::List(array_to_value_list(&arr));
328    }
329
330    // Large list (variable-size list with i64 offsets)
331    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
332        return Value::List(array_to_value_list(&list.value(row)));
333    }
334
335    // Struct type — detect temporal structs by field names before generic handler
336    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
337        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
338
339        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
340        if field_names.contains(&"nanos_since_epoch")
341            && field_names.contains(&"offset_seconds")
342            && field_names.contains(&"timezone_name")
343            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
344                s.column_by_name("nanos_since_epoch"),
345                s.column_by_name("offset_seconds"),
346                s.column_by_name("timezone_name"),
347            )
348        {
349            // Try TimestampNanosecond first (standard schema), then Int64 fallback
350            let nanos_opt = nanos_col
351                .as_any()
352                .downcast_ref::<TimestampNanosecondArray>()
353                .map(|a| {
354                    if a.is_null(row) {
355                        None
356                    } else {
357                        Some(a.value(row))
358                    }
359                })
360                .or_else(|| {
361                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
362                        if a.is_null(row) {
363                            None
364                        } else {
365                            Some(a.value(row))
366                        }
367                    })
368                });
369            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
370                if a.is_null(row) {
371                    None
372                } else {
373                    Some(a.value(row))
374                }
375            });
376
377            if let Some(Some(nanos)) = nanos_opt {
378                match offset_opt {
379                    Some(Some(offset)) => {
380                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
381                            if a.is_null(row) {
382                                None
383                            } else {
384                                Some(a.value(row).to_string())
385                            }
386                        });
387                        return Value::Temporal(uni_common::TemporalValue::DateTime {
388                            nanos_since_epoch: nanos,
389                            offset_seconds: offset,
390                            timezone_name: tz_name,
391                        });
392                    }
393                    _ => {
394                        // No offset → LocalDateTime
395                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
396                            nanos_since_epoch: nanos,
397                        });
398                    }
399                }
400            }
401        }
402
403        // Time struct: {nanos_since_midnight, offset_seconds}
404        if field_names.contains(&"nanos_since_midnight")
405            && field_names.contains(&"offset_seconds")
406            && let (Some(nanos_col), Some(offset_col)) = (
407                s.column_by_name("nanos_since_midnight"),
408                s.column_by_name("offset_seconds"),
409            )
410        {
411            // Try Time64Nanosecond first (standard schema), then Int64 fallback
412            let nanos_opt = nanos_col
413                .as_any()
414                .downcast_ref::<Time64NanosecondArray>()
415                .map(|a| {
416                    if a.is_null(row) {
417                        None
418                    } else {
419                        Some(a.value(row))
420                    }
421                })
422                .or_else(|| {
423                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
424                        if a.is_null(row) {
425                            None
426                        } else {
427                            Some(a.value(row))
428                        }
429                    })
430                });
431            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
432                if a.is_null(row) {
433                    None
434                } else {
435                    Some(a.value(row))
436                }
437            });
438
439            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
440                return Value::Temporal(uni_common::TemporalValue::Time {
441                    nanos_since_midnight: nanos,
442                    offset_seconds: offset,
443                });
444            }
445        }
446
447        // Generic struct → Map
448        let mut map = HashMap::new();
449        for (field, child) in s.fields().iter().zip(s.columns()) {
450            map.insert(
451                field.name().clone(),
452                arrow_to_value(child.as_ref(), row, None),
453            );
454        }
455        return Value::Map(map);
456    }
457
458    // Date32 type (days since epoch) - return as Value::Temporal
459    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
460        let days = d.value(row);
461        return Value::Temporal(uni_common::TemporalValue::Date {
462            days_since_epoch: days,
463        });
464    }
465
466    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
467    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
468        let nanos = ts.value(row);
469        return match ts.timezone() {
470            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
471                nanos_since_epoch: nanos,
472                offset_seconds: 0,
473                timezone_name: Some(tz.to_string()),
474            }),
475            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
476                nanos_since_epoch: nanos,
477            }),
478        };
479    }
480
481    // Time64 (nanoseconds since midnight) - return as Value::Temporal
482    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
483        let nanos = t.value(row);
484        return Value::Temporal(uni_common::TemporalValue::LocalTime {
485            nanos_since_midnight: nanos,
486        });
487    }
488
489    // Time64 (microseconds since midnight) - convert to nanoseconds
490    if let Some(t) = col
491        .as_any()
492        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
493    {
494        let micros = t.value(row);
495        return Value::Temporal(uni_common::TemporalValue::LocalTime {
496            nanos_since_midnight: micros * 1000,
497        });
498    }
499
500    // DurationMicrosecond - convert to Duration with nanoseconds
501    if let Some(d) = col
502        .as_any()
503        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
504    {
505        let micros = d.value(row);
506        let total_nanos = micros * 1000;
507        let seconds = total_nanos / 1_000_000_000;
508        let remaining_nanos = total_nanos % 1_000_000_000;
509        return Value::Temporal(uni_common::TemporalValue::Duration {
510            months: 0,
511            days: 0,
512            nanos: seconds * 1_000_000_000 + remaining_nanos,
513        });
514    }
515
516    // IntervalMonthDayNano - return as Value::Temporal(Duration)
517    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
518        let val = interval.value(row);
519        return Value::Temporal(uni_common::TemporalValue::Duration {
520            months: val.months as i64,
521            days: val.days as i64,
522            nanos: val.nanoseconds,
523        });
524    }
525
526    // LargeBinary (CypherValue MessagePack-tagged encoding)
527    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
528        let bytes = b.value(row);
529        if bytes.is_empty() {
530            return Value::Null;
531        }
532        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
533            eprintln!("CypherValue decode error: {}", e);
534            Value::Null
535        });
536    }
537
538    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
539    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
540        let bytes = b.value(row);
541        return Crdt::from_msgpack(bytes)
542            .ok()
543            .and_then(|crdt| serde_json::to_value(&crdt).ok())
544            .map(Value::from)
545            .unwrap_or(Value::Null);
546    }
547
548    // Fallback
549    Value::Null
550}
551
552fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
553    let mut builder = UInt64Builder::with_capacity(values.len());
554    for v in values {
555        if let Some(n) = v.as_u64() {
556            builder.append_value(n);
557        } else {
558            builder.append_null();
559        }
560    }
561    Arc::new(builder.finish())
562}
563
564fn values_to_int64_array(values: &[Value]) -> ArrayRef {
565    let mut builder = Int64Builder::with_capacity(values.len());
566    for v in values {
567        if let Some(n) = v.as_i64() {
568            builder.append_value(n);
569        } else {
570            builder.append_null();
571        }
572    }
573    Arc::new(builder.finish())
574}
575
576fn values_to_int32_array(values: &[Value]) -> ArrayRef {
577    let mut builder = Int32Builder::with_capacity(values.len());
578    for v in values {
579        if let Some(n) = v.as_i64() {
580            builder.append_value(n as i32);
581        } else {
582            builder.append_null();
583        }
584    }
585    Arc::new(builder.finish())
586}
587
588fn values_to_string_array(values: &[Value]) -> ArrayRef {
589    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
590    for v in values {
591        if let Some(s) = v.as_str() {
592            builder.append_value(s);
593        } else if v.is_null() {
594            builder.append_null();
595        } else {
596            builder.append_value(v.to_string());
597        }
598    }
599    Arc::new(builder.finish())
600}
601
602fn values_to_bool_array(values: &[Value]) -> ArrayRef {
603    let mut builder = BooleanBuilder::with_capacity(values.len());
604    for v in values {
605        if let Some(b) = v.as_bool() {
606            builder.append_value(b);
607        } else {
608            builder.append_null();
609        }
610    }
611    Arc::new(builder.finish())
612}
613
614fn values_to_float32_array(values: &[Value]) -> ArrayRef {
615    let mut builder = Float32Builder::with_capacity(values.len());
616    for v in values {
617        if let Some(n) = v.as_f64() {
618            builder.append_value(n as f32);
619        } else {
620            builder.append_null();
621        }
622    }
623    Arc::new(builder.finish())
624}
625
626fn values_to_float64_array(values: &[Value]) -> ArrayRef {
627    let mut builder = Float64Builder::with_capacity(values.len());
628    for v in values {
629        if let Some(n) = v.as_f64() {
630            builder.append_value(n);
631        } else {
632            builder.append_null();
633        }
634    }
635    Arc::new(builder.finish())
636}
637
638fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
639    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
640    for v in values {
641        if let Value::List(bytes) = v {
642            let b: Vec<u8> = bytes
643                .iter()
644                .map(|bv| bv.as_u64().unwrap_or(0) as u8)
645                .collect();
646            if b.len() as i32 == size {
647                builder.append_value(&b)?;
648            } else {
649                builder.append_null();
650            }
651        } else {
652            builder.append_null();
653        }
654    }
655    Ok(Arc::new(builder.finish()))
656}
657
658/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
659///
660/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
661/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
662///
663/// # Arguments
664/// - `val`: Optional property value to extract from
665/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
666/// - `dimensions`: Expected vector dimensions
667///
668/// # Returns
669/// - Tuple of (vector values, validity flag)
670///   - Vector always has exactly `dimensions` elements
671///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
672pub fn extract_vector_f32_values(
673    val: Option<&Value>,
674    is_deleted: bool,
675    dimensions: usize,
676) -> (Vec<f32>, bool) {
677    let zeros = || vec![0.0_f32; dimensions];
678
679    // Deleted entries always return zeros with valid=true
680    if is_deleted {
681        return (zeros(), true);
682    }
683
684    match val {
685        // Native f32 vector (Value::Vector)
686        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
687        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
688        // List of values (Value::List) - convert to f32
689        Some(Value::List(arr)) if arr.len() == dimensions => {
690            let values: Vec<f32> = arr
691                .iter()
692                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
693                .collect();
694            (values, true)
695        }
696        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
697        _ => (zeros(), false),                    // Missing or unsupported value
698    }
699}
700
701fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
702    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
703    for v in values {
704        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
705        for val in vals {
706            builder.values().append_value(val);
707        }
708        builder.append(valid);
709    }
710    Arc::new(builder.finish())
711}
712
713fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
714    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
715    for v in values {
716        if v.is_null() {
717            builder.append_null();
718        } else if let Value::Temporal(tv) = v {
719            match tv {
720                uni_common::TemporalValue::DateTime {
721                    nanos_since_epoch, ..
722                }
723                | uni_common::TemporalValue::LocalDateTime {
724                    nanos_since_epoch, ..
725                } => builder.append_value(*nanos_since_epoch),
726                _ => builder.append_null(),
727            }
728        } else if let Some(n) = v.as_i64() {
729            builder.append_value(n);
730        } else if let Some(s) = v.as_str() {
731            match parse_datetime_to_nanos(s) {
732                Some(nanos) => builder.append_value(nanos),
733                None => builder.append_null(),
734            }
735        } else {
736            builder.append_null();
737        }
738    }
739
740    let arr = builder.finish();
741    if let Some(tz) = tz {
742        Arc::new(arr.with_timezone(tz.as_ref()))
743    } else {
744        Arc::new(arr)
745    }
746}
747
748/// Build a DateTime struct array from values.
749///
750/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
751/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
752fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
753    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
754    let mut offset_builder = Int32Builder::with_capacity(values.len());
755    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
756    let mut null_buffer = BooleanBufferBuilder::new(values.len());
757
758    for v in values {
759        match v {
760            Value::Temporal(uni_common::TemporalValue::DateTime {
761                nanos_since_epoch,
762                offset_seconds,
763                timezone_name,
764            }) => {
765                nanos_builder.append_value(*nanos_since_epoch);
766                offset_builder.append_value(*offset_seconds);
767                tz_builder.append_option(timezone_name.as_deref());
768                null_buffer.append(true);
769            }
770            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
771                nanos_builder.append_value(*nanos_since_epoch);
772                offset_builder.append_null();
773                tz_builder.append_null();
774                null_buffer.append(true);
775            }
776            _ => {
777                nanos_builder.append_null();
778                offset_builder.append_null();
779                tz_builder.append_null();
780                null_buffer.append(false);
781            }
782        }
783    }
784
785    let struct_arr = StructArray::new(
786        schema::datetime_struct_fields(),
787        vec![
788            Arc::new(nanos_builder.finish()) as ArrayRef,
789            Arc::new(offset_builder.finish()) as ArrayRef,
790            Arc::new(tz_builder.finish()) as ArrayRef,
791        ],
792        Some(null_buffer.finish().into()),
793    );
794    Arc::new(struct_arr)
795}
796
797/// Build a Time struct array from values.
798///
799/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
800/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
801fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
802    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
803    let mut offset_builder = Int32Builder::with_capacity(values.len());
804    let mut null_buffer = BooleanBufferBuilder::new(values.len());
805
806    for v in values {
807        match v {
808            Value::Temporal(uni_common::TemporalValue::Time {
809                nanos_since_midnight,
810                offset_seconds,
811            }) => {
812                nanos_builder.append_value(*nanos_since_midnight);
813                offset_builder.append_value(*offset_seconds);
814                null_buffer.append(true);
815            }
816            Value::Temporal(uni_common::TemporalValue::LocalTime {
817                nanos_since_midnight,
818            }) => {
819                nanos_builder.append_value(*nanos_since_midnight);
820                offset_builder.append_null();
821                null_buffer.append(true);
822            }
823            _ => {
824                nanos_builder.append_null();
825                offset_builder.append_null();
826                null_buffer.append(false);
827            }
828        }
829    }
830
831    let struct_arr = StructArray::new(
832        schema::time_struct_fields(),
833        vec![
834            Arc::new(nanos_builder.finish()) as ArrayRef,
835            Arc::new(offset_builder.finish()) as ArrayRef,
836        ],
837        Some(null_buffer.finish().into()),
838    );
839    Arc::new(struct_arr)
840}
841
842fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
843    let mut builder =
844        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
845    for v in values {
846        if v.is_null() {
847            builder.append_null();
848        } else {
849            // Encode as CypherValue (MessagePack-tagged)
850            let cv_bytes = uni_common::cypher_value_codec::encode(v);
851            builder.append_value(&cv_bytes);
852        }
853    }
854    Arc::new(builder.finish())
855}
856
857/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
858pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
859    match dt {
860        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
861        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
862        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
863        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
864        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
865        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
866        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
867        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
868        ArrowDataType::FixedSizeList(inner, size) => {
869            if inner.data_type() == &ArrowDataType::Float32 {
870                Ok(values_to_fixed_size_list_f32_array(values, *size))
871            } else {
872                Err(anyhow!("Unsupported FixedSizeList inner type"))
873            }
874        }
875        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
876            Ok(values_to_timestamp_array(values, tz.as_ref()))
877        }
878        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
879            Ok(values_to_timestamp_array(values, tz.as_ref()))
880        }
881        ArrowDataType::Date32 => {
882            let mut builder = Date32Builder::with_capacity(values.len());
883            for v in values {
884                if v.is_null() {
885                    builder.append_null();
886                } else if let Value::Temporal(uni_common::TemporalValue::Date {
887                    days_since_epoch,
888                }) = v
889                {
890                    builder.append_value(*days_since_epoch);
891                } else if let Some(n) = v.as_i64() {
892                    builder.append_value(n as i32);
893                } else {
894                    builder.append_null();
895                }
896            }
897            Ok(Arc::new(builder.finish()))
898        }
899        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
900            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
901            for v in values {
902                if v.is_null() {
903                    builder.append_null();
904                } else if let Value::Temporal(tv) = v {
905                    match tv {
906                        uni_common::TemporalValue::LocalTime {
907                            nanos_since_midnight,
908                        }
909                        | uni_common::TemporalValue::Time {
910                            nanos_since_midnight,
911                            ..
912                        } => builder.append_value(*nanos_since_midnight),
913                        _ => builder.append_null(),
914                    }
915                } else if let Some(n) = v.as_i64() {
916                    builder.append_value(n);
917                } else {
918                    builder.append_null();
919                }
920            }
921            Ok(Arc::new(builder.finish()))
922        }
923        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
924            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
925            for v in values {
926                if v.is_null() {
927                    builder.append_null();
928                } else if let Value::Temporal(tv) = v {
929                    match tv {
930                        uni_common::TemporalValue::LocalTime {
931                            nanos_since_midnight,
932                        }
933                        | uni_common::TemporalValue::Time {
934                            nanos_since_midnight,
935                            ..
936                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
937                        _ => builder.append_null(),
938                    }
939                } else if let Some(n) = v.as_i64() {
940                    builder.append_value(n);
941                } else {
942                    builder.append_null();
943                }
944            }
945            Ok(Arc::new(builder.finish()))
946        }
947        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
948            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
949            for v in values {
950                if v.is_null() {
951                    builder.append_null();
952                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
953                    months,
954                    days,
955                    nanos,
956                }) = v
957                {
958                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
959                        months: *months as i32,
960                        days: *days as i32,
961                        nanoseconds: *nanos,
962                    });
963                } else {
964                    builder.append_null();
965                }
966            }
967            Ok(Arc::new(builder.finish()))
968        }
969        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
970            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
971            for v in values {
972                if v.is_null() {
973                    builder.append_null();
974                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
975                    months,
976                    days,
977                    nanos,
978                }) = v
979                {
980                    let total_micros =
981                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
982                    builder.append_value(total_micros);
983                } else if let Some(n) = v.as_i64() {
984                    builder.append_value(n);
985                } else {
986                    builder.append_null();
987                }
988            }
989            Ok(Arc::new(builder.finish()))
990        }
991        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
992        ArrowDataType::List(field) => {
993            if field.data_type() == &ArrowDataType::Utf8 {
994                let mut builder = ListBuilder::new(StringBuilder::new());
995                for v in values {
996                    if let Value::List(arr) = v {
997                        for item in arr {
998                            if let Some(s) = item.as_str() {
999                                builder.values().append_value(s);
1000                            } else {
1001                                builder.values().append_null();
1002                            }
1003                        }
1004                        builder.append(true);
1005                    } else {
1006                        builder.append_null();
1007                    }
1008                }
1009                Ok(Arc::new(builder.finish()))
1010            } else {
1011                Err(anyhow!(
1012                    "Unsupported List inner type: {:?}",
1013                    field.data_type()
1014                ))
1015            }
1016        }
1017        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1018            Ok(values_to_datetime_struct_array(values))
1019        }
1020        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1021            Ok(values_to_time_struct_array(values))
1022        }
1023        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1024    }
1025}
1026
1027/// Property value extractor for building Arrow columns from entity properties.
1028pub struct PropertyExtractor<'a> {
1029    data_type: &'a DataType,
1030}
1031
1032impl<'a> PropertyExtractor<'a> {
1033    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1034        Self { data_type }
1035    }
1036
1037    /// Build an Arrow column from a slice of property maps.
1038    /// The `deleted` slice indicates which entries are deleted (use default values).
1039    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1040    where
1041        F: Fn(usize) -> Option<&'a Value>,
1042    {
1043        match self.data_type {
1044            DataType::String => self.build_string_column(len, deleted, get_props),
1045            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1046            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1047            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1048            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1049            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1050            DataType::Vector { dimensions } => {
1051                self.build_vector_column(len, deleted, get_props, *dimensions)
1052            }
1053            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1054            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1055            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1056            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1057            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1058            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1059            DataType::Date => self.build_date32_column(len, deleted, get_props),
1060            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1061            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1062            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1063            _ => Err(anyhow!(
1064                "Unsupported data type for arrow conversion: {:?}",
1065                self.data_type
1066            )),
1067        }
1068    }
1069
1070    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1071    where
1072        F: Fn(usize) -> Option<&'a Value>,
1073    {
1074        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1075        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1076            let prop = get_props(i);
1077            if let Some(s) = prop.and_then(|v| v.as_str()) {
1078                builder.append_value(s);
1079            } else if let Some(Value::Temporal(tv)) = prop {
1080                builder.append_value(tv.to_string());
1081            } else if is_deleted {
1082                builder.append_value("");
1083            } else {
1084                builder.append_null();
1085            }
1086        }
1087        Ok(Arc::new(builder.finish()))
1088    }
1089
1090    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1091    where
1092        F: Fn(usize) -> Option<&'a Value>,
1093    {
1094        let mut values = Vec::with_capacity(len);
1095        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1096            let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1097            if val.is_none() && is_deleted {
1098                values.push(Some(0));
1099            } else {
1100                values.push(val);
1101            }
1102        }
1103        Ok(Arc::new(Int32Array::from(values)))
1104    }
1105
1106    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1107    where
1108        F: Fn(usize) -> Option<&'a Value>,
1109    {
1110        let mut values = Vec::with_capacity(len);
1111        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1112            let val = get_props(i).and_then(|v| v.as_i64());
1113            if val.is_none() && is_deleted {
1114                values.push(Some(0));
1115            } else {
1116                values.push(val);
1117            }
1118        }
1119        Ok(Arc::new(Int64Array::from(values)))
1120    }
1121
1122    fn build_timestamp_column<F>(
1123        &self,
1124        len: usize,
1125        deleted: &[bool],
1126        get_props: F,
1127    ) -> Result<ArrayRef>
1128    where
1129        F: Fn(usize) -> Option<&'a Value>,
1130    {
1131        let mut values = Vec::with_capacity(len);
1132        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1133            let val = get_props(i);
1134            let ts = if is_deleted || val.is_none() {
1135                Some(0i64)
1136            } else if let Some(Value::Temporal(tv)) = val {
1137                match tv {
1138                    uni_common::TemporalValue::DateTime {
1139                        nanos_since_epoch, ..
1140                    }
1141                    | uni_common::TemporalValue::LocalDateTime {
1142                        nanos_since_epoch, ..
1143                    } => Some(*nanos_since_epoch),
1144                    _ => None,
1145                }
1146            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1147                Some(v)
1148            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1149                parse_datetime_to_nanos(s)
1150            } else {
1151                None
1152            };
1153
1154            if is_deleted {
1155                values.push(Some(0));
1156            } else {
1157                values.push(ts);
1158            }
1159        }
1160        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1161        Ok(Arc::new(arr))
1162    }
1163
1164    fn build_datetime_struct_column<F>(
1165        &self,
1166        len: usize,
1167        deleted: &[bool],
1168        get_props: F,
1169    ) -> Result<ArrayRef>
1170    where
1171        F: Fn(usize) -> Option<&'a Value>,
1172    {
1173        let values = self.collect_values_or_null(len, deleted, &get_props);
1174        Ok(values_to_datetime_struct_array(&values))
1175    }
1176
1177    fn build_time_struct_column<F>(
1178        &self,
1179        len: usize,
1180        deleted: &[bool],
1181        get_props: F,
1182    ) -> Result<ArrayRef>
1183    where
1184        F: Fn(usize) -> Option<&'a Value>,
1185    {
1186        let values = self.collect_values_or_null(len, deleted, &get_props);
1187        Ok(values_to_time_struct_array(&values))
1188    }
1189
1190    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1191    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1192    where
1193        F: Fn(usize) -> Option<&'a Value>,
1194    {
1195        deleted
1196            .iter()
1197            .enumerate()
1198            .take(len)
1199            .map(|(i, &is_deleted)| {
1200                if is_deleted {
1201                    Value::Null
1202                } else {
1203                    get_props(i).cloned().unwrap_or(Value::Null)
1204                }
1205            })
1206            .collect()
1207    }
1208
1209    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1210    where
1211        F: Fn(usize) -> Option<&'a Value>,
1212    {
1213        let mut builder = Date32Builder::with_capacity(len);
1214        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1215
1216        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1217            let val = get_props(i);
1218            let days = if is_deleted || val.is_none() {
1219                Some(0)
1220            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1221                days_since_epoch,
1222            })) = val
1223            {
1224                Some(*days_since_epoch)
1225            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1226                Some(v as i32)
1227            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1228                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1229                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1230                    Err(_) => None,
1231                }
1232            } else {
1233                None
1234            };
1235
1236            if is_deleted {
1237                builder.append_value(0);
1238            } else if let Some(v) = days {
1239                builder.append_value(v);
1240            } else {
1241                builder.append_null();
1242            }
1243        }
1244        Ok(Arc::new(builder.finish()))
1245    }
1246
1247    fn build_duration_column<F>(
1248        &self,
1249        len: usize,
1250        deleted: &[bool],
1251        get_props: F,
1252    ) -> Result<ArrayRef>
1253    where
1254        F: Fn(usize) -> Option<&'a Value>,
1255    {
1256        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1257        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1258        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1259            let raw_val = get_props(i);
1260            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1261            {
1262                let encoded = uni_common::cypher_value_codec::encode(val);
1263                builder.append_value(&encoded);
1264            } else if is_deleted {
1265                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1266                    months: 0,
1267                    days: 0,
1268                    nanos: 0,
1269                });
1270                let encoded = uni_common::cypher_value_codec::encode(&zero);
1271                builder.append_value(&encoded);
1272            } else {
1273                builder.append_null();
1274            }
1275        }
1276        Ok(Arc::new(builder.finish()))
1277    }
1278
1279    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1280    where
1281        F: Fn(usize) -> Option<&'a Value>,
1282    {
1283        const ENCODED_LEN: i32 = 24;
1284        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1285        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1286            let raw_val = get_props(i);
1287            let btic = match raw_val {
1288                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1289                    uni_btic::Btic::new(*lo, *hi, *meta)
1290                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1291                ),
1292                Some(Value::String(s)) => Some(
1293                    uni_btic::parse::parse_btic_literal(s)
1294                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1295                ),
1296                _ => None,
1297            };
1298
1299            if let Some(b) = btic {
1300                builder.append_value(uni_btic::encode::encode(&b))?;
1301            } else if is_deleted {
1302                builder.append_value([0u8; ENCODED_LEN as usize])?;
1303            } else {
1304                builder.append_null();
1305            }
1306        }
1307        Ok(Arc::new(builder.finish()))
1308    }
1309
1310    fn build_float32_column<F>(
1311        &self,
1312        len: usize,
1313        deleted: &[bool],
1314        get_props: F,
1315    ) -> Result<ArrayRef>
1316    where
1317        F: Fn(usize) -> Option<&'a Value>,
1318    {
1319        let mut values = Vec::with_capacity(len);
1320        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1321            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1322            if val.is_none() && is_deleted {
1323                values.push(Some(0.0));
1324            } else {
1325                values.push(val);
1326            }
1327        }
1328        Ok(Arc::new(Float32Array::from(values)))
1329    }
1330
1331    fn build_float64_column<F>(
1332        &self,
1333        len: usize,
1334        deleted: &[bool],
1335        get_props: F,
1336    ) -> Result<ArrayRef>
1337    where
1338        F: Fn(usize) -> Option<&'a Value>,
1339    {
1340        let mut values = Vec::with_capacity(len);
1341        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1342            let val = get_props(i).and_then(|v| v.as_f64());
1343            if val.is_none() && is_deleted {
1344                values.push(Some(0.0));
1345            } else {
1346                values.push(val);
1347            }
1348        }
1349        Ok(Arc::new(Float64Array::from(values)))
1350    }
1351
1352    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1353    where
1354        F: Fn(usize) -> Option<&'a Value>,
1355    {
1356        let mut values = Vec::with_capacity(len);
1357        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1358            let val = get_props(i).and_then(|v| v.as_bool());
1359            if val.is_none() && is_deleted {
1360                values.push(Some(false));
1361            } else {
1362                values.push(val);
1363            }
1364        }
1365        Ok(Arc::new(BooleanArray::from(values)))
1366    }
1367
1368    fn build_vector_column<F>(
1369        &self,
1370        len: usize,
1371        deleted: &[bool],
1372        get_props: F,
1373        dimensions: usize,
1374    ) -> Result<ArrayRef>
1375    where
1376        F: Fn(usize) -> Option<&'a Value>,
1377    {
1378        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1379
1380        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1381            let val = get_props(i);
1382            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1383            for v in values {
1384                builder.values().append_value(v);
1385            }
1386            builder.append(valid);
1387        }
1388        Ok(Arc::new(builder.finish()))
1389    }
1390
1391    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1392    where
1393        F: Fn(usize) -> Option<&'a Value>,
1394    {
1395        let null_val = Value::Null;
1396        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1397        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1398            let val = get_props(i);
1399            let uni_val = if val.is_none() && is_deleted {
1400                &null_val
1401            } else {
1402                val.unwrap_or(&null_val)
1403            };
1404            // Encode to CypherValue (MessagePack-tagged)
1405            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1406            builder.append_value(&cv_bytes);
1407        }
1408        Ok(Arc::new(builder.finish()))
1409    }
1410
1411    fn build_list_column<F>(
1412        &self,
1413        len: usize,
1414        deleted: &[bool],
1415        get_props: F,
1416        inner: &DataType,
1417    ) -> Result<ArrayRef>
1418    where
1419        F: Fn(usize) -> Option<&'a Value>,
1420    {
1421        match inner {
1422            DataType::String => {
1423                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1424                    if let Some(s) = v.as_str() {
1425                        b.append_value(s);
1426                    } else {
1427                        b.append_null();
1428                    }
1429                })
1430            }
1431            DataType::Int64 => {
1432                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1433                    if let Some(n) = v.as_i64() {
1434                        b.append_value(n);
1435                    } else {
1436                        b.append_null();
1437                    }
1438                })
1439            }
1440            DataType::Float64 => {
1441                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1442                    if let Some(f) = v.as_f64() {
1443                        b.append_value(f);
1444                    } else {
1445                        b.append_null();
1446                    }
1447                })
1448            }
1449            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1450        }
1451    }
1452
1453    /// Generic helper to build a list column with any inner builder type.
1454    fn build_typed_list<F, B, A>(
1455        &self,
1456        len: usize,
1457        deleted: &[bool],
1458        get_props: &F,
1459        inner_builder: B,
1460        mut append_value: A,
1461    ) -> Result<ArrayRef>
1462    where
1463        F: Fn(usize) -> Option<&'a Value>,
1464        B: arrow_array::builder::ArrayBuilder,
1465        A: FnMut(&Value, &mut B),
1466    {
1467        let mut builder = ListBuilder::new(inner_builder);
1468        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1469            let val_array = get_props(i).and_then(|v| v.as_array());
1470            if val_array.is_none() && is_deleted {
1471                builder.append_null();
1472            } else if let Some(arr) = val_array {
1473                for v in arr {
1474                    append_value(v, builder.values());
1475                }
1476                builder.append(true);
1477            } else {
1478                builder.append_null();
1479            }
1480        }
1481        Ok(Arc::new(builder.finish()))
1482    }
1483
1484    fn build_map_column<F>(
1485        &self,
1486        len: usize,
1487        deleted: &[bool],
1488        get_props: F,
1489        key: &DataType,
1490        value: &DataType,
1491    ) -> Result<ArrayRef>
1492    where
1493        F: Fn(usize) -> Option<&'a Value>,
1494    {
1495        if !matches!(key, DataType::String) {
1496            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1497        }
1498
1499        match value {
1500            DataType::String => self.build_typed_map(
1501                len,
1502                deleted,
1503                &get_props,
1504                StringBuilder::new(),
1505                arrow_schema::DataType::Utf8,
1506                |v, b: &mut StringBuilder| {
1507                    if let Some(s) = v.as_str() {
1508                        b.append_value(s);
1509                    } else {
1510                        b.append_null();
1511                    }
1512                },
1513            ),
1514            DataType::Int64 => self.build_typed_map(
1515                len,
1516                deleted,
1517                &get_props,
1518                Int64Builder::new(),
1519                arrow_schema::DataType::Int64,
1520                |v, b: &mut Int64Builder| {
1521                    if let Some(n) = v.as_i64() {
1522                        b.append_value(n);
1523                    } else {
1524                        b.append_null();
1525                    }
1526                },
1527            ),
1528            _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1529        }
1530    }
1531
1532    /// Generic helper to build a map column with any value builder type.
1533    fn build_typed_map<F, B, A>(
1534        &self,
1535        len: usize,
1536        deleted: &[bool],
1537        get_props: &F,
1538        value_builder: B,
1539        value_arrow_type: arrow_schema::DataType,
1540        mut append_value: A,
1541    ) -> Result<ArrayRef>
1542    where
1543        F: Fn(usize) -> Option<&'a Value>,
1544        B: arrow_array::builder::ArrayBuilder,
1545        A: FnMut(&Value, &mut B),
1546    {
1547        let key_builder = Box::new(StringBuilder::new());
1548        let value_builder = Box::new(value_builder);
1549        let struct_builder = StructBuilder::new(
1550            vec![
1551                Field::new("key", arrow_schema::DataType::Utf8, false),
1552                Field::new("value", value_arrow_type, true),
1553            ],
1554            vec![key_builder, value_builder],
1555        );
1556        let mut builder = ListBuilder::new(struct_builder);
1557
1558        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1559            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1560        }
1561        Ok(Arc::new(builder.finish()))
1562    }
1563
1564    /// Append a single map entry to the list builder.
1565    fn append_map_entry<B, A>(
1566        &self,
1567        builder: &mut ListBuilder<StructBuilder>,
1568        val: Option<&'a Value>,
1569        is_deleted: bool,
1570        append_value: &mut A,
1571    ) where
1572        B: arrow_array::builder::ArrayBuilder,
1573        A: FnMut(&Value, &mut B),
1574    {
1575        let val_obj = val.and_then(|v| v.as_object());
1576        if val_obj.is_none() && is_deleted {
1577            builder.append(false);
1578        } else if let Some(obj) = val_obj {
1579            let struct_b = builder.values();
1580            for (k, v) in obj {
1581                struct_b
1582                    .field_builder::<StringBuilder>(0)
1583                    .unwrap()
1584                    .append_value(k);
1585                // Safety: We know the value builder type matches B
1586                let value_b = struct_b.field_builder::<B>(1).unwrap();
1587                append_value(v, value_b);
1588                struct_b.append(true);
1589            }
1590            builder.append(true);
1591        } else {
1592            builder.append(false);
1593        }
1594    }
1595
1596    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1597    where
1598        F: Fn(usize) -> Option<&'a Value>,
1599    {
1600        let mut builder = BinaryBuilder::new();
1601        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1602            if is_deleted {
1603                builder.append_null();
1604                continue;
1605            }
1606            if let Some(val) = get_props(i) {
1607                // Try to parse CRDT from the value
1608                // If it's a string, first parse it as JSON, then as CRDT
1609                let crdt_result = if let Some(s) = val.as_str() {
1610                    serde_json::from_str::<Crdt>(s)
1611                } else {
1612                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1613                    let json_val: serde_json::Value = val.clone().into();
1614                    serde_json::from_value::<Crdt>(json_val)
1615                };
1616
1617                if let Ok(crdt) = crdt_result {
1618                    if let Ok(bytes) = crdt.to_msgpack() {
1619                        builder.append_value(&bytes);
1620                    } else {
1621                        builder.append_null();
1622                    }
1623                } else {
1624                    builder.append_null();
1625                }
1626            } else {
1627                builder.append_null();
1628            }
1629        }
1630        Ok(Arc::new(builder.finish()))
1631    }
1632}
1633
1634/// Build a column for edge entries (no deleted flag handling needed).
1635pub fn build_edge_column<'a>(
1636    name: &'a str,
1637    data_type: &'a DataType,
1638    len: usize,
1639    get_props: impl Fn(usize) -> Option<&'a Value>,
1640) -> Result<ArrayRef> {
1641    // For edges, use empty deleted array
1642    let deleted = vec![false; len];
1643    let extractor = PropertyExtractor::new(name, data_type);
1644    extractor.build_column(len, &deleted, get_props)
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649    use super::*;
1650    use arrow_array::{
1651        Array, DurationMicrosecondArray,
1652        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1653    };
1654    use std::collections::HashMap;
1655    use uni_common::TemporalValue;
1656    use uni_crdt::{Crdt, GCounter};
1657
1658    #[test]
1659    fn test_arrow_to_value_string() {
1660        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1661        assert_eq!(
1662            arrow_to_value(&arr, 0, None),
1663            Value::String("hello".to_string())
1664        );
1665        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1666        assert_eq!(
1667            arrow_to_value(&arr, 2, None),
1668            Value::String("world".to_string())
1669        );
1670    }
1671
1672    #[test]
1673    fn test_arrow_to_value_int64() {
1674        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1675        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1676        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1677        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1678    }
1679
1680    #[test]
1681    #[allow(clippy::approx_constant)]
1682    fn test_arrow_to_value_float64() {
1683        let arr = Float64Array::from(vec![Some(3.14), None]);
1684        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1685        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1686    }
1687
1688    #[test]
1689    fn test_arrow_to_value_bool() {
1690        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1691        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1692        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1693        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1694    }
1695
1696    #[test]
1697    fn test_values_to_array_int64() {
1698        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1699        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1700        assert_eq!(arr.len(), 4);
1701
1702        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1703        assert_eq!(int_arr.value(0), 1);
1704        assert_eq!(int_arr.value(1), 2);
1705        assert!(int_arr.is_null(2));
1706        assert_eq!(int_arr.value(3), 4);
1707    }
1708
1709    #[test]
1710    fn test_values_to_array_string() {
1711        let values = vec![
1712            Value::String("a".to_string()),
1713            Value::String("b".to_string()),
1714            Value::Null,
1715        ];
1716        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1717        assert_eq!(arr.len(), 3);
1718
1719        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1720        assert_eq!(str_arr.value(0), "a");
1721        assert_eq!(str_arr.value(1), "b");
1722        assert!(str_arr.is_null(2));
1723    }
1724
1725    #[test]
1726    fn test_property_extractor_string() {
1727        let props: Vec<HashMap<String, Value>> = vec![
1728            [("name".to_string(), Value::String("Alice".to_string()))]
1729                .into_iter()
1730                .collect(),
1731            [("name".to_string(), Value::String("Bob".to_string()))]
1732                .into_iter()
1733                .collect(),
1734            HashMap::new(),
1735        ];
1736        let deleted = vec![false, false, true];
1737
1738        let extractor = PropertyExtractor::new("name", &DataType::String);
1739        let arr = extractor
1740            .build_column(3, &deleted, |i| props[i].get("name"))
1741            .unwrap();
1742
1743        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1744        assert_eq!(str_arr.value(0), "Alice");
1745        assert_eq!(str_arr.value(1), "Bob");
1746        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
1747    }
1748
1749    #[test]
1750    fn test_property_extractor_int64() {
1751        let props: Vec<HashMap<String, Value>> = vec![
1752            [("age".to_string(), Value::Int(25))].into_iter().collect(),
1753            [("age".to_string(), Value::Int(30))].into_iter().collect(),
1754            HashMap::new(),
1755        ];
1756        let deleted = vec![false, false, true];
1757
1758        let extractor = PropertyExtractor::new("age", &DataType::Int64);
1759        let arr = extractor
1760            .build_column(3, &deleted, |i| props[i].get("age"))
1761            .unwrap();
1762
1763        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1764        assert_eq!(int_arr.value(0), 25);
1765        assert_eq!(int_arr.value(1), 30);
1766        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
1767    }
1768
1769    #[test]
1770    fn test_arrow_to_value_time64() {
1771        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
1772        let mut builder = Time64MicrosecondBuilder::new();
1773        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
1774        builder.append_value(37_845_000_000);
1775        // 00:00:00 = 0 microseconds
1776        builder.append_value(0);
1777        // 23:59:59.123456 = 86399.123456 seconds
1778        builder.append_value(86_399_123_456);
1779        builder.append_null();
1780
1781        let arr = builder.finish();
1782        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
1783        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1784        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1785        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1786        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1787    }
1788
1789    #[test]
1790    fn test_arrow_to_value_duration() {
1791        // Test DurationMicrosecondArray conversion
1792        // Arrow→Value now returns Value::Temporal(Duration)
1793        let arr = DurationMicrosecondArray::from(vec![
1794            Some(1_000_000),      // 1 second in microseconds
1795            Some(3_600_000_000),  // 1 hour
1796            Some(86_400_000_000), // 1 day
1797            None,
1798        ]);
1799
1800        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1801        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1802        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1803        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1804    }
1805
1806    #[test]
1807    fn test_arrow_to_value_binary_crdt() {
1808        // Test BinaryArray (CRDT) conversion - round-trip test
1809        let mut builder = BinaryBuilder::new();
1810
1811        // Create a GCounter CRDT and serialize it
1812        let mut counter = GCounter::new();
1813        counter.increment("actor1", 5);
1814        let crdt = Crdt::GCounter(counter);
1815        let bytes = crdt.to_msgpack().unwrap();
1816        builder.append_value(&bytes);
1817
1818        // Add a null value
1819        builder.append_null();
1820
1821        let arr = builder.finish();
1822
1823        // The first value should deserialize back to a map
1824        let result = arrow_to_value(&arr, 0, None);
1825        assert!(result.as_object().is_some());
1826        let obj = result.as_object().unwrap();
1827        // GCounter serializes with tag "t": "gc"
1828        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1829
1830        // Null value should return null
1831        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1832    }
1833
1834    #[test]
1835    fn test_datetime_struct_encode_decode_roundtrip() {
1836        // Test DateTime struct encoding with offset and timezone preservation
1837        let values = vec![
1838            Value::Temporal(TemporalValue::DateTime {
1839                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
1840                offset_seconds: 3600,                  // +01:00
1841                timezone_name: Some("Europe/Paris".to_string()),
1842            }),
1843            Value::Temporal(TemporalValue::DateTime {
1844                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
1845                offset_seconds: -18000,                 // -05:00
1846                timezone_name: None,
1847            }),
1848            Value::Temporal(TemporalValue::DateTime {
1849                nanos_since_epoch: 0, // Unix epoch
1850                offset_seconds: 0,
1851                timezone_name: Some("UTC".to_string()),
1852            }),
1853        ];
1854
1855        // Encode to Arrow struct
1856        let arr_ref = values_to_datetime_struct_array(&values);
1857        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1858        assert_eq!(arr.len(), 3);
1859
1860        // Decode back to Value
1861        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1862        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1863        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1864
1865        // Verify round-trip preserves all fields
1866        assert_eq!(decoded_0, values[0]);
1867        assert_eq!(decoded_1, values[1]);
1868        assert_eq!(decoded_2, values[2]);
1869
1870        // Verify struct field extraction
1871        if let Value::Temporal(TemporalValue::DateTime {
1872            nanos_since_epoch,
1873            offset_seconds,
1874            timezone_name,
1875        }) = decoded_0
1876        {
1877            assert_eq!(nanos_since_epoch, 441763200000000000);
1878            assert_eq!(offset_seconds, 3600);
1879            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1880        } else {
1881            panic!("Expected DateTime value");
1882        }
1883    }
1884
1885    #[test]
1886    fn test_datetime_struct_null_handling() {
1887        // Test DateTime struct with null values
1888        let values = vec![
1889            Value::Temporal(TemporalValue::DateTime {
1890                nanos_since_epoch: 441763200000000000,
1891                offset_seconds: 3600,
1892                timezone_name: Some("Europe/Paris".to_string()),
1893            }),
1894            Value::Null,
1895            Value::Temporal(TemporalValue::DateTime {
1896                nanos_since_epoch: 0,
1897                offset_seconds: 0,
1898                timezone_name: None,
1899            }),
1900        ];
1901
1902        let arr_ref = values_to_datetime_struct_array(&values);
1903        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1904        assert_eq!(arr.len(), 3);
1905
1906        // Check first value is valid
1907        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1908        assert_eq!(decoded_0, values[0]);
1909
1910        // Check second value is null
1911        assert!(arr.is_null(1));
1912        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1913        assert_eq!(decoded_1, Value::Null);
1914
1915        // Check third value is valid
1916        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1917        assert_eq!(decoded_2, values[2]);
1918    }
1919
1920    #[test]
1921    fn test_datetime_struct_boundary_values() {
1922        // Test boundary values: offset=0, large positive/negative offsets
1923        let values = vec![
1924            Value::Temporal(TemporalValue::DateTime {
1925                nanos_since_epoch: 441763200000000000,
1926                offset_seconds: 0, // UTC
1927                timezone_name: None,
1928            }),
1929            Value::Temporal(TemporalValue::DateTime {
1930                nanos_since_epoch: 441763200000000000,
1931                offset_seconds: 43200, // +12:00 (max typical offset)
1932                timezone_name: None,
1933            }),
1934            Value::Temporal(TemporalValue::DateTime {
1935                nanos_since_epoch: 441763200000000000,
1936                offset_seconds: -43200, // -12:00 (min typical offset)
1937                timezone_name: None,
1938            }),
1939        ];
1940
1941        let arr_ref = values_to_datetime_struct_array(&values);
1942        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1943        assert_eq!(arr.len(), 3);
1944
1945        // Verify round-trip for all boundary values
1946        for (i, expected) in values.iter().enumerate() {
1947            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
1948            assert_eq!(&decoded, expected);
1949        }
1950    }
1951
1952    #[test]
1953    fn test_datetime_old_schema_migration() {
1954        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
1955        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
1956        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
1957        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
1958        builder.append_null();
1959
1960        let arr = builder.finish();
1961
1962        // Decode with DataType::DateTime hint should migrate old schema
1963        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
1964        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
1965        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
1966
1967        // Old schema should default to offset=0, preserve timezone
1968        if let Value::Temporal(TemporalValue::DateTime {
1969            nanos_since_epoch,
1970            offset_seconds,
1971            timezone_name,
1972        }) = decoded_0
1973        {
1974            assert_eq!(nanos_since_epoch, 441763200000000000);
1975            assert_eq!(offset_seconds, 0);
1976            assert_eq!(timezone_name, Some("UTC".to_string()));
1977        } else {
1978            panic!("Expected DateTime value");
1979        }
1980
1981        // Verify null handling
1982        assert_eq!(decoded_2, Value::Null);
1983    }
1984
1985    #[test]
1986    fn test_time_struct_encode_decode_roundtrip() {
1987        // Test Time struct encoding with offset preservation
1988        let values = vec![
1989            Value::Temporal(TemporalValue::Time {
1990                nanos_since_midnight: 37845000000000, // 10:30:45
1991                offset_seconds: 3600,                 // +01:00
1992            }),
1993            Value::Temporal(TemporalValue::Time {
1994                nanos_since_midnight: 0, // 00:00:00
1995                offset_seconds: 0,
1996            }),
1997            Value::Temporal(TemporalValue::Time {
1998                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
1999                offset_seconds: -18000,               // -05:00
2000            }),
2001        ];
2002
2003        // Encode to Arrow struct
2004        let arr_ref = values_to_time_struct_array(&values);
2005        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2006        assert_eq!(arr.len(), 3);
2007
2008        // Decode back to Value
2009        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2010        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2011        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2012
2013        // Verify round-trip preserves all fields
2014        assert_eq!(decoded_0, values[0]);
2015        assert_eq!(decoded_1, values[1]);
2016        assert_eq!(decoded_2, values[2]);
2017
2018        // Verify struct field extraction
2019        if let Value::Temporal(TemporalValue::Time {
2020            nanos_since_midnight,
2021            offset_seconds,
2022        }) = decoded_0
2023        {
2024            assert_eq!(nanos_since_midnight, 37845000000000);
2025            assert_eq!(offset_seconds, 3600);
2026        } else {
2027            panic!("Expected Time value");
2028        }
2029    }
2030
2031    #[test]
2032    fn test_time_struct_null_handling() {
2033        // Test Time struct with null values
2034        let values = vec![
2035            Value::Temporal(TemporalValue::Time {
2036                nanos_since_midnight: 37845000000000,
2037                offset_seconds: 3600,
2038            }),
2039            Value::Null,
2040            Value::Temporal(TemporalValue::Time {
2041                nanos_since_midnight: 0,
2042                offset_seconds: 0,
2043            }),
2044        ];
2045
2046        let arr_ref = values_to_time_struct_array(&values);
2047        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2048        assert_eq!(arr.len(), 3);
2049
2050        // Check first value is valid
2051        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2052        assert_eq!(decoded_0, values[0]);
2053
2054        // Check second value is null
2055        assert!(arr.is_null(1));
2056        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2057        assert_eq!(decoded_1, Value::Null);
2058
2059        // Check third value is valid
2060        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2061        assert_eq!(decoded_2, values[2]);
2062    }
2063
2064    // Tests for extract_vector_f32_values
2065
2066    #[test]
2067    fn test_extract_vector_f32_values_valid_vector() {
2068        let v = vec![1.0, 2.0, 3.0];
2069        let val = Value::Vector(v.clone());
2070        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2071        assert_eq!(result, v);
2072        assert!(valid);
2073    }
2074
2075    #[test]
2076    fn test_extract_vector_f32_values_vector_wrong_dims() {
2077        let v = vec![1.0, 2.0];
2078        let val = Value::Vector(v);
2079        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2080        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2081        assert!(!valid);
2082    }
2083
2084    #[test]
2085    fn test_extract_vector_f32_values_valid_list() {
2086        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2087        let val = Value::List(v);
2088        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2089        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2090        assert!(valid);
2091    }
2092
2093    #[test]
2094    fn test_extract_vector_f32_values_list_wrong_dims() {
2095        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2096        let val = Value::List(v);
2097        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2098        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2099        assert!(!valid);
2100    }
2101
2102    #[test]
2103    fn test_extract_vector_f32_values_list_int_coercion() {
2104        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2105        let val = Value::List(v);
2106        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2107        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2108        assert!(valid);
2109    }
2110
2111    #[test]
2112    fn test_extract_vector_f32_values_none() {
2113        let (result, valid) = extract_vector_f32_values(None, false, 3);
2114        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2115        assert!(!valid);
2116    }
2117
2118    #[test]
2119    fn test_extract_vector_f32_values_null() {
2120        let val = Value::Null;
2121        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2122        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2123        assert!(!valid);
2124    }
2125
2126    #[test]
2127    fn test_extract_vector_f32_values_unsupported_type() {
2128        let val = Value::String("not a vector".to_string());
2129        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2130        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2131        assert!(!valid);
2132    }
2133
2134    #[test]
2135    fn test_extract_vector_f32_values_deleted_with_none() {
2136        let (result, valid) = extract_vector_f32_values(None, true, 3);
2137        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2138        assert!(valid); // Deleted entries are marked as valid with zeros
2139    }
2140
2141    #[test]
2142    fn test_extract_vector_f32_values_deleted_with_null() {
2143        let val = Value::Null;
2144        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2145        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2146        assert!(valid); // Deleted entries are marked as valid with zeros
2147    }
2148
2149    // Tests for values_to_array with FixedSizeList
2150
2151    #[test]
2152    fn test_values_to_fixed_size_list_vector_with_nulls() {
2153        let values = vec![
2154            Value::Vector(vec![1.0, 2.0]),
2155            Value::Null,
2156            Value::Vector(vec![3.0, 4.0]),
2157            Value::String("invalid".to_string()),
2158        ];
2159        let arr_ref = values_to_array(
2160            &values,
2161            &ArrowDataType::FixedSizeList(
2162                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2163                2,
2164            ),
2165        )
2166        .unwrap();
2167
2168        let arr = arr_ref
2169            .as_any()
2170            .downcast_ref::<FixedSizeListArray>()
2171            .unwrap();
2172
2173        assert_eq!(arr.len(), 4);
2174        assert!(arr.is_valid(0));
2175        assert!(!arr.is_valid(1)); // Null value
2176        assert!(arr.is_valid(2));
2177        assert!(!arr.is_valid(3)); // Invalid type
2178    }
2179
2180    #[test]
2181    fn test_values_to_fixed_size_list_from_list() {
2182        let values = vec![
2183            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2184            Value::List(vec![Value::Int(3), Value::Int(4)]),
2185        ];
2186        let arr_ref = values_to_array(
2187            &values,
2188            &ArrowDataType::FixedSizeList(
2189                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2190                2,
2191            ),
2192        )
2193        .unwrap();
2194
2195        let arr = arr_ref
2196            .as_any()
2197            .downcast_ref::<FixedSizeListArray>()
2198            .unwrap();
2199
2200        assert_eq!(arr.len(), 2);
2201        assert!(arr.is_valid(0));
2202        assert!(arr.is_valid(1));
2203
2204        // Check values
2205        let child = arr
2206            .values()
2207            .as_any()
2208            .downcast_ref::<Float32Array>()
2209            .unwrap();
2210        assert_eq!(child.value(0), 1.0);
2211        assert_eq!(child.value(1), 2.0);
2212        assert_eq!(child.value(2), 3.0);
2213        assert_eq!(child.value(3), 4.0);
2214    }
2215
2216    #[test]
2217    fn test_values_to_fixed_size_list_wrong_dimensions() {
2218        let values = vec![
2219            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2220            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2221        ];
2222        let arr_ref = values_to_array(
2223            &values,
2224            &ArrowDataType::FixedSizeList(
2225                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2226                2,
2227            ),
2228        )
2229        .unwrap();
2230
2231        let arr = arr_ref
2232            .as_any()
2233            .downcast_ref::<FixedSizeListArray>()
2234            .unwrap();
2235
2236        assert_eq!(arr.len(), 2);
2237        assert!(!arr.is_valid(0)); // Wrong dimensions
2238        assert!(!arr.is_valid(1)); // Wrong dimensions
2239
2240        // Check that child array has zeros for invalid entries
2241        let child = arr
2242            .values()
2243            .as_any()
2244            .downcast_ref::<Float32Array>()
2245            .unwrap();
2246        assert_eq!(child.value(0), 0.0);
2247        assert_eq!(child.value(1), 0.0);
2248        assert_eq!(child.value(2), 0.0);
2249        assert_eq!(child.value(3), 0.0);
2250    }
2251
2252    #[test]
2253    fn test_values_to_fixed_size_list_all_nulls() {
2254        let values = vec![Value::Null, Value::Null, Value::Null];
2255        let arr_ref = values_to_array(
2256            &values,
2257            &ArrowDataType::FixedSizeList(
2258                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2259                3,
2260            ),
2261        )
2262        .unwrap();
2263
2264        let arr = arr_ref
2265            .as_any()
2266            .downcast_ref::<FixedSizeListArray>()
2267            .unwrap();
2268
2269        assert_eq!(arr.len(), 3);
2270        assert!(!arr.is_valid(0));
2271        assert!(!arr.is_valid(1));
2272        assert!(!arr.is_valid(2));
2273
2274        // Verify child array length is correct (3 rows × 3 dims = 9)
2275        let child = arr
2276            .values()
2277            .as_any()
2278            .downcast_ref::<Float32Array>()
2279            .unwrap();
2280        assert_eq!(child.len(), 9);
2281    }
2282
2283    #[test]
2284    fn test_values_to_fixed_size_list_mixed_types() {
2285        let values = vec![
2286            Value::Vector(vec![1.0, 2.0]),
2287            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2288            Value::Null,
2289            Value::String("invalid".to_string()),
2290        ];
2291        let arr_ref = values_to_array(
2292            &values,
2293            &ArrowDataType::FixedSizeList(
2294                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2295                2,
2296            ),
2297        )
2298        .unwrap();
2299
2300        let arr = arr_ref
2301            .as_any()
2302            .downcast_ref::<FixedSizeListArray>()
2303            .unwrap();
2304
2305        assert_eq!(arr.len(), 4);
2306        assert!(arr.is_valid(0)); // Value::Vector
2307        assert!(arr.is_valid(1)); // Value::List
2308        assert!(!arr.is_valid(2)); // Value::Null
2309        assert!(!arr.is_valid(3)); // Value::String
2310
2311        // Check values for valid entries
2312        let child = arr
2313            .values()
2314            .as_any()
2315            .downcast_ref::<Float32Array>()
2316            .unwrap();
2317        assert_eq!(child.value(0), 1.0);
2318        assert_eq!(child.value(1), 2.0);
2319        assert_eq!(child.value(2), 3.0);
2320        assert_eq!(child.value(3), 4.0);
2321    }
2322
2323    // Tests for PropertyExtractor::build_vector_column
2324
2325    #[test]
2326    fn test_build_vector_column_with_nulls_and_deleted() {
2327        let data_type = DataType::Vector { dimensions: 3 };
2328        let extractor = PropertyExtractor::new("test_vec", &data_type);
2329
2330        let props = [
2331            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2332            None,              // Missing property
2333            Some(Value::Null), // Null value
2334            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2335        ];
2336        let deleted = [false, false, false, true]; // Last one is deleted
2337
2338        let arr_ref = extractor
2339            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2340            .unwrap();
2341
2342        let arr = arr_ref
2343            .as_any()
2344            .downcast_ref::<FixedSizeListArray>()
2345            .unwrap();
2346
2347        assert_eq!(arr.len(), 4);
2348        assert!(arr.is_valid(0)); // Valid vector
2349        assert!(!arr.is_valid(1)); // Missing property
2350        assert!(!arr.is_valid(2)); // Null value
2351        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2352
2353        // Check values
2354        let child = arr
2355            .values()
2356            .as_any()
2357            .downcast_ref::<Float32Array>()
2358            .unwrap();
2359        assert_eq!(child.value(0), 1.0);
2360        assert_eq!(child.value(1), 2.0);
2361        assert_eq!(child.value(2), 3.0);
2362        // Indices 3-5: zeros for missing
2363        // Indices 6-8: zeros for null
2364        // Indices 9-11: zeros for deleted (but marked as valid)
2365        assert_eq!(child.value(9), 0.0);
2366        assert_eq!(child.value(10), 0.0);
2367        assert_eq!(child.value(11), 0.0);
2368    }
2369
2370    #[test]
2371    fn test_build_vector_column_with_list_input() {
2372        let data_type = DataType::Vector { dimensions: 2 };
2373        let extractor = PropertyExtractor::new("test_vec", &data_type);
2374
2375        let props = [
2376            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2377            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2378            Some(Value::Vector(vec![5.0, 6.0])),
2379        ];
2380        let deleted = [false, false, false];
2381
2382        let arr_ref = extractor
2383            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2384            .unwrap();
2385
2386        let arr = arr_ref
2387            .as_any()
2388            .downcast_ref::<FixedSizeListArray>()
2389            .unwrap();
2390
2391        assert_eq!(arr.len(), 3);
2392        assert!(arr.is_valid(0));
2393        assert!(arr.is_valid(1));
2394        assert!(arr.is_valid(2));
2395
2396        // Check values
2397        let child = arr
2398            .values()
2399            .as_any()
2400            .downcast_ref::<Float32Array>()
2401            .unwrap();
2402        assert_eq!(child.value(0), 1.0);
2403        assert_eq!(child.value(1), 2.0);
2404        assert_eq!(child.value(2), 3.0);
2405        assert_eq!(child.value(3), 4.0);
2406        assert_eq!(child.value(4), 5.0);
2407        assert_eq!(child.value(5), 6.0);
2408    }
2409}