Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    let key_col = structs.column(0);
154    let val_col = structs.column(1);
155    let mut map = HashMap::new();
156    for i in 0..structs.len() {
157        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158            map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159        }
160    }
161    Some(map)
162}
163
164/// Convert all elements of an Arrow array into a `Vec<Value>`.
165fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166    (0..arr.len())
167        .map(|i| arrow_to_value(arr.as_ref(), i, None))
168        .collect()
169}
170
171/// Convert an Arrow array value at a given row index to a Uni Value.
172///
173/// Handles all common Arrow types and recursively processes nested structures
174/// like Lists and Structs. The optional `data_type` parameter provides schema
175/// context for decoding DateTime and Time struct arrays; when provided, it
176/// takes precedence over runtime type detection.
177pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178    if col.is_null(row) {
179        return Value::Null;
180    }
181
182    // Schema-driven decode for DateTime and Time structs
183    if let Some(dt) = data_type {
184        match dt {
185            DataType::DateTime => {
186                // Expect StructArray with three fields
187                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189                        struct_arr.column_by_name("nanos_since_epoch"),
190                        struct_arr.column_by_name("offset_seconds"),
191                        struct_arr.column_by_name("timezone_name"),
192                    )
193                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194                        nanos_col
195                            .as_any()
196                            .downcast_ref::<TimestampNanosecondArray>(),
197                        offset_col.as_any().downcast_ref::<Int32Array>(),
198                        tz_col.as_any().downcast_ref::<StringArray>(),
199                    )
200                {
201                    if nanos_arr.is_null(row) {
202                        return Value::Null;
203                    }
204                    let nanos = nanos_arr.value(row);
205                    if offset_arr.is_null(row) {
206                        // No offset → LocalDateTime
207                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208                            nanos_since_epoch: nanos,
209                        });
210                    }
211                    let offset = offset_arr.value(row);
212                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213                    return Value::Temporal(uni_common::TemporalValue::DateTime {
214                        nanos_since_epoch: nanos,
215                        offset_seconds: offset,
216                        timezone_name: tz_name,
217                    });
218                }
219                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
220                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221                    let nanos = ts.value(row);
222                    let tz_name = ts.timezone().map(|s| s.to_string());
223                    return Value::Temporal(uni_common::TemporalValue::DateTime {
224                        nanos_since_epoch: nanos,
225                        offset_seconds: 0,
226                        timezone_name: tz_name,
227                    });
228                }
229            }
230            DataType::Time => {
231                // Expect StructArray with two fields
232                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233                    && let (Some(nanos_col), Some(offset_col)) = (
234                        struct_arr.column_by_name("nanos_since_midnight"),
235                        struct_arr.column_by_name("offset_seconds"),
236                    )
237                    && let (Some(nanos_arr), Some(offset_arr)) = (
238                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239                        offset_col.as_any().downcast_ref::<Int32Array>(),
240                    )
241                {
242                    // Check field-level nulls before calling .value()
243                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244                        return Value::Null;
245                    }
246                    let nanos = nanos_arr.value(row);
247                    let offset = offset_arr.value(row);
248                    return Value::Temporal(uni_common::TemporalValue::Time {
249                        nanos_since_midnight: nanos,
250                        offset_seconds: offset,
251                    });
252                }
253                // Fall back to old schema: Time64Nanosecond → Time with offset=0
254                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255                    let nanos = t.value(row);
256                    return Value::Temporal(uni_common::TemporalValue::Time {
257                        nanos_since_midnight: nanos,
258                        offset_seconds: 0,
259                    });
260                }
261            }
262            DataType::Btic => {
263                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
264                    log::warn!("BTIC column is not FixedSizeBinaryArray");
265                    return Value::Null;
266                };
267                let bytes = fsb.value(row);
268                return match uni_btic::encode::decode_slice(bytes) {
269                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
270                        lo: btic.lo(),
271                        hi: btic.hi(),
272                        meta: btic.meta(),
273                    }),
274                    Err(e) => {
275                        log::warn!("BTIC decode error: {}", e);
276                        Value::Null
277                    }
278                };
279            }
280            _ => {}
281        }
282    }
283
284    // String types
285    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
286        return Value::String(s.value(row).to_string());
287    }
288
289    // Integer types
290    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
291        return Value::Int(u.value(row) as i64);
292    }
293    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
294        return Value::Int(i.value(row));
295    }
296    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
297        return Value::Int(i.value(row) as i64);
298    }
299
300    // Float types
301    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
302        return Value::Float(f.value(row));
303    }
304    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
305        return Value::Float(f.value(row) as f64);
306    }
307
308    // Boolean type
309    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
310        return Value::Bool(b.value(row));
311    }
312
313    // Fixed-size list (vectors)
314    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
315        return Value::List(array_to_value_list(&list.value(row)));
316    }
317
318    // Variable-size list
319    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
320        let arr = list.value(row);
321
322        // Map types are stored as List(Struct(key, value)); reconstruct as map
323        if let Some(obj) = try_reconstruct_map(&arr) {
324            return Value::Map(obj);
325        }
326
327        return Value::List(array_to_value_list(&arr));
328    }
329
330    // Large list (variable-size list with i64 offsets)
331    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
332        return Value::List(array_to_value_list(&list.value(row)));
333    }
334
335    // Struct type — detect temporal structs by field names before generic handler
336    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
337        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
338
339        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
340        if field_names.contains(&"nanos_since_epoch")
341            && field_names.contains(&"offset_seconds")
342            && field_names.contains(&"timezone_name")
343            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
344                s.column_by_name("nanos_since_epoch"),
345                s.column_by_name("offset_seconds"),
346                s.column_by_name("timezone_name"),
347            )
348        {
349            // Try TimestampNanosecond first (standard schema), then Int64 fallback
350            let nanos_opt = nanos_col
351                .as_any()
352                .downcast_ref::<TimestampNanosecondArray>()
353                .map(|a| {
354                    if a.is_null(row) {
355                        None
356                    } else {
357                        Some(a.value(row))
358                    }
359                })
360                .or_else(|| {
361                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
362                        if a.is_null(row) {
363                            None
364                        } else {
365                            Some(a.value(row))
366                        }
367                    })
368                });
369            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
370                if a.is_null(row) {
371                    None
372                } else {
373                    Some(a.value(row))
374                }
375            });
376
377            if let Some(Some(nanos)) = nanos_opt {
378                match offset_opt {
379                    Some(Some(offset)) => {
380                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
381                            if a.is_null(row) {
382                                None
383                            } else {
384                                Some(a.value(row).to_string())
385                            }
386                        });
387                        return Value::Temporal(uni_common::TemporalValue::DateTime {
388                            nanos_since_epoch: nanos,
389                            offset_seconds: offset,
390                            timezone_name: tz_name,
391                        });
392                    }
393                    _ => {
394                        // No offset → LocalDateTime
395                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
396                            nanos_since_epoch: nanos,
397                        });
398                    }
399                }
400            }
401        }
402
403        // Time struct: {nanos_since_midnight, offset_seconds}
404        if field_names.contains(&"nanos_since_midnight")
405            && field_names.contains(&"offset_seconds")
406            && let (Some(nanos_col), Some(offset_col)) = (
407                s.column_by_name("nanos_since_midnight"),
408                s.column_by_name("offset_seconds"),
409            )
410        {
411            // Try Time64Nanosecond first (standard schema), then Int64 fallback
412            let nanos_opt = nanos_col
413                .as_any()
414                .downcast_ref::<Time64NanosecondArray>()
415                .map(|a| {
416                    if a.is_null(row) {
417                        None
418                    } else {
419                        Some(a.value(row))
420                    }
421                })
422                .or_else(|| {
423                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
424                        if a.is_null(row) {
425                            None
426                        } else {
427                            Some(a.value(row))
428                        }
429                    })
430                });
431            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
432                if a.is_null(row) {
433                    None
434                } else {
435                    Some(a.value(row))
436                }
437            });
438
439            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
440                return Value::Temporal(uni_common::TemporalValue::Time {
441                    nanos_since_midnight: nanos,
442                    offset_seconds: offset,
443                });
444            }
445        }
446
447        // Generic struct → Map
448        let mut map = HashMap::new();
449        for (field, child) in s.fields().iter().zip(s.columns()) {
450            map.insert(
451                field.name().clone(),
452                arrow_to_value(child.as_ref(), row, None),
453            );
454        }
455        return Value::Map(map);
456    }
457
458    // Date32 type (days since epoch) - return as Value::Temporal
459    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
460        let days = d.value(row);
461        return Value::Temporal(uni_common::TemporalValue::Date {
462            days_since_epoch: days,
463        });
464    }
465
466    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
467    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
468        let nanos = ts.value(row);
469        return match ts.timezone() {
470            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
471                nanos_since_epoch: nanos,
472                offset_seconds: 0,
473                timezone_name: Some(tz.to_string()),
474            }),
475            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
476                nanos_since_epoch: nanos,
477            }),
478        };
479    }
480
481    // Time64 (nanoseconds since midnight) - return as Value::Temporal
482    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
483        let nanos = t.value(row);
484        return Value::Temporal(uni_common::TemporalValue::LocalTime {
485            nanos_since_midnight: nanos,
486        });
487    }
488
489    // Time64 (microseconds since midnight) - convert to nanoseconds
490    if let Some(t) = col
491        .as_any()
492        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
493    {
494        let micros = t.value(row);
495        return Value::Temporal(uni_common::TemporalValue::LocalTime {
496            nanos_since_midnight: micros * 1000,
497        });
498    }
499
500    // DurationMicrosecond - convert to Duration with nanoseconds
501    if let Some(d) = col
502        .as_any()
503        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
504    {
505        let micros = d.value(row);
506        let total_nanos = micros * 1000;
507        let seconds = total_nanos / 1_000_000_000;
508        let remaining_nanos = total_nanos % 1_000_000_000;
509        return Value::Temporal(uni_common::TemporalValue::Duration {
510            months: 0,
511            days: 0,
512            nanos: seconds * 1_000_000_000 + remaining_nanos,
513        });
514    }
515
516    // IntervalMonthDayNano - return as Value::Temporal(Duration)
517    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
518        let val = interval.value(row);
519        return Value::Temporal(uni_common::TemporalValue::Duration {
520            months: val.months as i64,
521            days: val.days as i64,
522            nanos: val.nanoseconds,
523        });
524    }
525
526    // LargeBinary (CypherValue MessagePack-tagged encoding)
527    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
528        let bytes = b.value(row);
529        if bytes.is_empty() {
530            return Value::Null;
531        }
532        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
533            eprintln!("CypherValue decode error: {}", e);
534            Value::Null
535        });
536    }
537
538    // FixedSizeBinary(24) — BTIC temporal interval
539    if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
540        && fsb.value_length() == 24
541    {
542        let bytes = fsb.value(row);
543        return match uni_btic::encode::decode_slice(bytes) {
544            Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
545                lo: btic.lo(),
546                hi: btic.hi(),
547                meta: btic.meta(),
548            }),
549            Err(e) => {
550                log::warn!("BTIC decode error: {}", e);
551                Value::Null
552            }
553        };
554    }
555
556    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
557    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
558        let bytes = b.value(row);
559        return Crdt::from_msgpack(bytes)
560            .ok()
561            .and_then(|crdt| serde_json::to_value(&crdt).ok())
562            .map(Value::from)
563            .unwrap_or(Value::Null);
564    }
565
566    // Fallback
567    Value::Null
568}
569
570fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
571    let mut builder = UInt64Builder::with_capacity(values.len());
572    for v in values {
573        if let Some(n) = v.as_u64() {
574            builder.append_value(n);
575        } else {
576            builder.append_null();
577        }
578    }
579    Arc::new(builder.finish())
580}
581
582fn values_to_int64_array(values: &[Value]) -> ArrayRef {
583    let mut builder = Int64Builder::with_capacity(values.len());
584    for v in values {
585        if let Some(n) = v.as_i64() {
586            builder.append_value(n);
587        } else {
588            builder.append_null();
589        }
590    }
591    Arc::new(builder.finish())
592}
593
594fn values_to_int32_array(values: &[Value]) -> ArrayRef {
595    let mut builder = Int32Builder::with_capacity(values.len());
596    for v in values {
597        if let Some(n) = v.as_i64() {
598            builder.append_value(n as i32);
599        } else {
600            builder.append_null();
601        }
602    }
603    Arc::new(builder.finish())
604}
605
606fn values_to_string_array(values: &[Value]) -> ArrayRef {
607    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
608    for v in values {
609        if let Some(s) = v.as_str() {
610            builder.append_value(s);
611        } else if v.is_null() {
612            builder.append_null();
613        } else {
614            builder.append_value(v.to_string());
615        }
616    }
617    Arc::new(builder.finish())
618}
619
620fn values_to_bool_array(values: &[Value]) -> ArrayRef {
621    let mut builder = BooleanBuilder::with_capacity(values.len());
622    for v in values {
623        if let Some(b) = v.as_bool() {
624            builder.append_value(b);
625        } else {
626            builder.append_null();
627        }
628    }
629    Arc::new(builder.finish())
630}
631
632fn values_to_float32_array(values: &[Value]) -> ArrayRef {
633    let mut builder = Float32Builder::with_capacity(values.len());
634    for v in values {
635        if let Some(n) = v.as_f64() {
636            builder.append_value(n as f32);
637        } else {
638            builder.append_null();
639        }
640    }
641    Arc::new(builder.finish())
642}
643
644fn values_to_float64_array(values: &[Value]) -> ArrayRef {
645    let mut builder = Float64Builder::with_capacity(values.len());
646    for v in values {
647        if let Some(n) = v.as_f64() {
648            builder.append_value(n);
649        } else {
650            builder.append_null();
651        }
652    }
653    Arc::new(builder.finish())
654}
655
656fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
657    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
658    for v in values {
659        match v {
660            Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
661                let btic = uni_btic::Btic::new(*lo, *hi, *meta)
662                    .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
663                builder.append_value(uni_btic::encode::encode(&btic))?;
664            }
665            Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
666                Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
667                Err(_) => builder.append_null(),
668            },
669            Value::List(bytes) => {
670                let b: Vec<u8> = bytes
671                    .iter()
672                    .map(|bv| bv.as_u64().unwrap_or(0) as u8)
673                    .collect();
674                if b.len() as i32 == size {
675                    builder.append_value(&b)?;
676                } else {
677                    builder.append_null();
678                }
679            }
680            _ => builder.append_null(),
681        }
682    }
683    Ok(Arc::new(builder.finish()))
684}
685
686/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
687///
688/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
689/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
690///
691/// # Arguments
692/// - `val`: Optional property value to extract from
693/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
694/// - `dimensions`: Expected vector dimensions
695///
696/// # Returns
697/// - Tuple of (vector values, validity flag)
698///   - Vector always has exactly `dimensions` elements
699///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
700pub fn extract_vector_f32_values(
701    val: Option<&Value>,
702    is_deleted: bool,
703    dimensions: usize,
704) -> (Vec<f32>, bool) {
705    let zeros = || vec![0.0_f32; dimensions];
706
707    // Deleted entries always return zeros with valid=true
708    if is_deleted {
709        return (zeros(), true);
710    }
711
712    match val {
713        // Native f32 vector (Value::Vector)
714        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
715        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
716        // List of values (Value::List) - convert to f32
717        Some(Value::List(arr)) if arr.len() == dimensions => {
718            let values: Vec<f32> = arr
719                .iter()
720                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
721                .collect();
722            (values, true)
723        }
724        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
725        _ => (zeros(), false),                    // Missing or unsupported value
726    }
727}
728
729fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
730    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
731    for v in values {
732        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
733        for val in vals {
734            builder.values().append_value(val);
735        }
736        builder.append(valid);
737    }
738    Arc::new(builder.finish())
739}
740
741fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
742    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
743    for v in values {
744        if v.is_null() {
745            builder.append_null();
746        } else if let Value::Temporal(tv) = v {
747            match tv {
748                uni_common::TemporalValue::DateTime {
749                    nanos_since_epoch, ..
750                }
751                | uni_common::TemporalValue::LocalDateTime {
752                    nanos_since_epoch, ..
753                } => builder.append_value(*nanos_since_epoch),
754                _ => builder.append_null(),
755            }
756        } else if let Some(n) = v.as_i64() {
757            builder.append_value(n);
758        } else if let Some(s) = v.as_str() {
759            match parse_datetime_to_nanos(s) {
760                Some(nanos) => builder.append_value(nanos),
761                None => builder.append_null(),
762            }
763        } else {
764            builder.append_null();
765        }
766    }
767
768    let arr = builder.finish();
769    if let Some(tz) = tz {
770        Arc::new(arr.with_timezone(tz.as_ref()))
771    } else {
772        Arc::new(arr)
773    }
774}
775
776/// Build a DateTime struct array from values.
777///
778/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
779/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
780fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
781    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
782    let mut offset_builder = Int32Builder::with_capacity(values.len());
783    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
784    let mut null_buffer = BooleanBufferBuilder::new(values.len());
785
786    for v in values {
787        match v {
788            Value::Temporal(uni_common::TemporalValue::DateTime {
789                nanos_since_epoch,
790                offset_seconds,
791                timezone_name,
792            }) => {
793                nanos_builder.append_value(*nanos_since_epoch);
794                offset_builder.append_value(*offset_seconds);
795                tz_builder.append_option(timezone_name.as_deref());
796                null_buffer.append(true);
797            }
798            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
799                nanos_builder.append_value(*nanos_since_epoch);
800                offset_builder.append_null();
801                tz_builder.append_null();
802                null_buffer.append(true);
803            }
804            _ => {
805                nanos_builder.append_null();
806                offset_builder.append_null();
807                tz_builder.append_null();
808                null_buffer.append(false);
809            }
810        }
811    }
812
813    let struct_arr = StructArray::new(
814        schema::datetime_struct_fields(),
815        vec![
816            Arc::new(nanos_builder.finish()) as ArrayRef,
817            Arc::new(offset_builder.finish()) as ArrayRef,
818            Arc::new(tz_builder.finish()) as ArrayRef,
819        ],
820        Some(null_buffer.finish().into()),
821    );
822    Arc::new(struct_arr)
823}
824
825/// Build a Time struct array from values.
826///
827/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
828/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
829fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
830    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
831    let mut offset_builder = Int32Builder::with_capacity(values.len());
832    let mut null_buffer = BooleanBufferBuilder::new(values.len());
833
834    for v in values {
835        match v {
836            Value::Temporal(uni_common::TemporalValue::Time {
837                nanos_since_midnight,
838                offset_seconds,
839            }) => {
840                nanos_builder.append_value(*nanos_since_midnight);
841                offset_builder.append_value(*offset_seconds);
842                null_buffer.append(true);
843            }
844            Value::Temporal(uni_common::TemporalValue::LocalTime {
845                nanos_since_midnight,
846            }) => {
847                nanos_builder.append_value(*nanos_since_midnight);
848                offset_builder.append_null();
849                null_buffer.append(true);
850            }
851            _ => {
852                nanos_builder.append_null();
853                offset_builder.append_null();
854                null_buffer.append(false);
855            }
856        }
857    }
858
859    let struct_arr = StructArray::new(
860        schema::time_struct_fields(),
861        vec![
862            Arc::new(nanos_builder.finish()) as ArrayRef,
863            Arc::new(offset_builder.finish()) as ArrayRef,
864        ],
865        Some(null_buffer.finish().into()),
866    );
867    Arc::new(struct_arr)
868}
869
870fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
871    let mut builder =
872        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
873    for v in values {
874        if v.is_null() {
875            builder.append_null();
876        } else {
877            // Encode as CypherValue (MessagePack-tagged)
878            let cv_bytes = uni_common::cypher_value_codec::encode(v);
879            builder.append_value(&cv_bytes);
880        }
881    }
882    Arc::new(builder.finish())
883}
884
885/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
886pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
887    match dt {
888        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
889        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
890        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
891        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
892        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
893        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
894        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
895        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
896        ArrowDataType::FixedSizeList(inner, size) => {
897            if inner.data_type() == &ArrowDataType::Float32 {
898                Ok(values_to_fixed_size_list_f32_array(values, *size))
899            } else {
900                Err(anyhow!("Unsupported FixedSizeList inner type"))
901            }
902        }
903        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
904            Ok(values_to_timestamp_array(values, tz.as_ref()))
905        }
906        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
907            Ok(values_to_timestamp_array(values, tz.as_ref()))
908        }
909        ArrowDataType::Date32 => {
910            let mut builder = Date32Builder::with_capacity(values.len());
911            for v in values {
912                if v.is_null() {
913                    builder.append_null();
914                } else if let Value::Temporal(uni_common::TemporalValue::Date {
915                    days_since_epoch,
916                }) = v
917                {
918                    builder.append_value(*days_since_epoch);
919                } else if let Some(n) = v.as_i64() {
920                    builder.append_value(n as i32);
921                } else {
922                    builder.append_null();
923                }
924            }
925            Ok(Arc::new(builder.finish()))
926        }
927        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
928            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
929            for v in values {
930                if v.is_null() {
931                    builder.append_null();
932                } else if let Value::Temporal(tv) = v {
933                    match tv {
934                        uni_common::TemporalValue::LocalTime {
935                            nanos_since_midnight,
936                        }
937                        | uni_common::TemporalValue::Time {
938                            nanos_since_midnight,
939                            ..
940                        } => builder.append_value(*nanos_since_midnight),
941                        _ => builder.append_null(),
942                    }
943                } else if let Some(n) = v.as_i64() {
944                    builder.append_value(n);
945                } else {
946                    builder.append_null();
947                }
948            }
949            Ok(Arc::new(builder.finish()))
950        }
951        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
952            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
953            for v in values {
954                if v.is_null() {
955                    builder.append_null();
956                } else if let Value::Temporal(tv) = v {
957                    match tv {
958                        uni_common::TemporalValue::LocalTime {
959                            nanos_since_midnight,
960                        }
961                        | uni_common::TemporalValue::Time {
962                            nanos_since_midnight,
963                            ..
964                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
965                        _ => builder.append_null(),
966                    }
967                } else if let Some(n) = v.as_i64() {
968                    builder.append_value(n);
969                } else {
970                    builder.append_null();
971                }
972            }
973            Ok(Arc::new(builder.finish()))
974        }
975        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
976            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
977            for v in values {
978                if v.is_null() {
979                    builder.append_null();
980                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
981                    months,
982                    days,
983                    nanos,
984                }) = v
985                {
986                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
987                        months: *months as i32,
988                        days: *days as i32,
989                        nanoseconds: *nanos,
990                    });
991                } else {
992                    builder.append_null();
993                }
994            }
995            Ok(Arc::new(builder.finish()))
996        }
997        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
998            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
999            for v in values {
1000                if v.is_null() {
1001                    builder.append_null();
1002                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1003                    months,
1004                    days,
1005                    nanos,
1006                }) = v
1007                {
1008                    let total_micros =
1009                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1010                    builder.append_value(total_micros);
1011                } else if let Some(n) = v.as_i64() {
1012                    builder.append_value(n);
1013                } else {
1014                    builder.append_null();
1015                }
1016            }
1017            Ok(Arc::new(builder.finish()))
1018        }
1019        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1020        ArrowDataType::List(field) => {
1021            if field.data_type() == &ArrowDataType::Utf8 {
1022                let mut builder = ListBuilder::new(StringBuilder::new());
1023                for v in values {
1024                    if let Value::List(arr) = v {
1025                        for item in arr {
1026                            if let Some(s) = item.as_str() {
1027                                builder.values().append_value(s);
1028                            } else {
1029                                builder.values().append_null();
1030                            }
1031                        }
1032                        builder.append(true);
1033                    } else {
1034                        builder.append_null();
1035                    }
1036                }
1037                Ok(Arc::new(builder.finish()))
1038            } else {
1039                Err(anyhow!(
1040                    "Unsupported List inner type: {:?}",
1041                    field.data_type()
1042                ))
1043            }
1044        }
1045        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1046            Ok(values_to_datetime_struct_array(values))
1047        }
1048        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1049            Ok(values_to_time_struct_array(values))
1050        }
1051        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1052    }
1053}
1054
1055/// Property value extractor for building Arrow columns from entity properties.
1056pub struct PropertyExtractor<'a> {
1057    data_type: &'a DataType,
1058}
1059
1060impl<'a> PropertyExtractor<'a> {
1061    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1062        Self { data_type }
1063    }
1064
1065    /// Build an Arrow column from a slice of property maps.
1066    /// The `deleted` slice indicates which entries are deleted (use default values).
1067    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1068    where
1069        F: Fn(usize) -> Option<&'a Value>,
1070    {
1071        match self.data_type {
1072            DataType::String => self.build_string_column(len, deleted, get_props),
1073            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1074            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1075            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1076            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1077            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1078            DataType::Vector { dimensions } => {
1079                self.build_vector_column(len, deleted, get_props, *dimensions)
1080            }
1081            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1082            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1083            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1084            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1085            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1086            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1087            DataType::Date => self.build_date32_column(len, deleted, get_props),
1088            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1089            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1090            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1091            _ => Err(anyhow!(
1092                "Unsupported data type for arrow conversion: {:?}",
1093                self.data_type
1094            )),
1095        }
1096    }
1097
1098    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1099    where
1100        F: Fn(usize) -> Option<&'a Value>,
1101    {
1102        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1103        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1104            let prop = get_props(i);
1105            if let Some(s) = prop.and_then(|v| v.as_str()) {
1106                builder.append_value(s);
1107            } else if let Some(Value::Temporal(tv)) = prop {
1108                builder.append_value(tv.to_string());
1109            } else if is_deleted {
1110                builder.append_value("");
1111            } else {
1112                builder.append_null();
1113            }
1114        }
1115        Ok(Arc::new(builder.finish()))
1116    }
1117
1118    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1119    where
1120        F: Fn(usize) -> Option<&'a Value>,
1121    {
1122        let mut values = Vec::with_capacity(len);
1123        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1124            let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1125            if val.is_none() && is_deleted {
1126                values.push(Some(0));
1127            } else {
1128                values.push(val);
1129            }
1130        }
1131        Ok(Arc::new(Int32Array::from(values)))
1132    }
1133
1134    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1135    where
1136        F: Fn(usize) -> Option<&'a Value>,
1137    {
1138        let mut values = Vec::with_capacity(len);
1139        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1140            let val = get_props(i).and_then(|v| v.as_i64());
1141            if val.is_none() && is_deleted {
1142                values.push(Some(0));
1143            } else {
1144                values.push(val);
1145            }
1146        }
1147        Ok(Arc::new(Int64Array::from(values)))
1148    }
1149
1150    fn build_timestamp_column<F>(
1151        &self,
1152        len: usize,
1153        deleted: &[bool],
1154        get_props: F,
1155    ) -> Result<ArrayRef>
1156    where
1157        F: Fn(usize) -> Option<&'a Value>,
1158    {
1159        let mut values = Vec::with_capacity(len);
1160        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1161            let val = get_props(i);
1162            let ts = if is_deleted || val.is_none() {
1163                Some(0i64)
1164            } else if let Some(Value::Temporal(tv)) = val {
1165                match tv {
1166                    uni_common::TemporalValue::DateTime {
1167                        nanos_since_epoch, ..
1168                    }
1169                    | uni_common::TemporalValue::LocalDateTime {
1170                        nanos_since_epoch, ..
1171                    } => Some(*nanos_since_epoch),
1172                    _ => None,
1173                }
1174            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1175                Some(v)
1176            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1177                parse_datetime_to_nanos(s)
1178            } else {
1179                None
1180            };
1181
1182            if is_deleted {
1183                values.push(Some(0));
1184            } else {
1185                values.push(ts);
1186            }
1187        }
1188        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1189        Ok(Arc::new(arr))
1190    }
1191
1192    fn build_datetime_struct_column<F>(
1193        &self,
1194        len: usize,
1195        deleted: &[bool],
1196        get_props: F,
1197    ) -> Result<ArrayRef>
1198    where
1199        F: Fn(usize) -> Option<&'a Value>,
1200    {
1201        let values = self.collect_values_or_null(len, deleted, &get_props);
1202        Ok(values_to_datetime_struct_array(&values))
1203    }
1204
1205    fn build_time_struct_column<F>(
1206        &self,
1207        len: usize,
1208        deleted: &[bool],
1209        get_props: F,
1210    ) -> Result<ArrayRef>
1211    where
1212        F: Fn(usize) -> Option<&'a Value>,
1213    {
1214        let values = self.collect_values_or_null(len, deleted, &get_props);
1215        Ok(values_to_time_struct_array(&values))
1216    }
1217
1218    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1219    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1220    where
1221        F: Fn(usize) -> Option<&'a Value>,
1222    {
1223        deleted
1224            .iter()
1225            .enumerate()
1226            .take(len)
1227            .map(|(i, &is_deleted)| {
1228                if is_deleted {
1229                    Value::Null
1230                } else {
1231                    get_props(i).cloned().unwrap_or(Value::Null)
1232                }
1233            })
1234            .collect()
1235    }
1236
1237    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1238    where
1239        F: Fn(usize) -> Option<&'a Value>,
1240    {
1241        let mut builder = Date32Builder::with_capacity(len);
1242        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1243
1244        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1245            let val = get_props(i);
1246            let days = if is_deleted || val.is_none() {
1247                Some(0)
1248            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1249                days_since_epoch,
1250            })) = val
1251            {
1252                Some(*days_since_epoch)
1253            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1254                Some(v as i32)
1255            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1256                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1257                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1258                    Err(_) => None,
1259                }
1260            } else {
1261                None
1262            };
1263
1264            if is_deleted {
1265                builder.append_value(0);
1266            } else if let Some(v) = days {
1267                builder.append_value(v);
1268            } else {
1269                builder.append_null();
1270            }
1271        }
1272        Ok(Arc::new(builder.finish()))
1273    }
1274
1275    fn build_duration_column<F>(
1276        &self,
1277        len: usize,
1278        deleted: &[bool],
1279        get_props: F,
1280    ) -> Result<ArrayRef>
1281    where
1282        F: Fn(usize) -> Option<&'a Value>,
1283    {
1284        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1285        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1286        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1287            let raw_val = get_props(i);
1288            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1289            {
1290                let encoded = uni_common::cypher_value_codec::encode(val);
1291                builder.append_value(&encoded);
1292            } else if is_deleted {
1293                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1294                    months: 0,
1295                    days: 0,
1296                    nanos: 0,
1297                });
1298                let encoded = uni_common::cypher_value_codec::encode(&zero);
1299                builder.append_value(&encoded);
1300            } else {
1301                builder.append_null();
1302            }
1303        }
1304        Ok(Arc::new(builder.finish()))
1305    }
1306
1307    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1308    where
1309        F: Fn(usize) -> Option<&'a Value>,
1310    {
1311        const ENCODED_LEN: i32 = 24;
1312        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1313        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1314            let raw_val = get_props(i);
1315            let btic = match raw_val {
1316                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1317                    uni_btic::Btic::new(*lo, *hi, *meta)
1318                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1319                ),
1320                Some(Value::String(s)) => Some(
1321                    uni_btic::parse::parse_btic_literal(s)
1322                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1323                ),
1324                _ => None,
1325            };
1326
1327            if let Some(b) = btic {
1328                builder.append_value(uni_btic::encode::encode(&b))?;
1329            } else if is_deleted {
1330                builder.append_value([0u8; ENCODED_LEN as usize])?;
1331            } else {
1332                builder.append_null();
1333            }
1334        }
1335        Ok(Arc::new(builder.finish()))
1336    }
1337
1338    fn build_float32_column<F>(
1339        &self,
1340        len: usize,
1341        deleted: &[bool],
1342        get_props: F,
1343    ) -> Result<ArrayRef>
1344    where
1345        F: Fn(usize) -> Option<&'a Value>,
1346    {
1347        let mut values = Vec::with_capacity(len);
1348        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1349            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1350            if val.is_none() && is_deleted {
1351                values.push(Some(0.0));
1352            } else {
1353                values.push(val);
1354            }
1355        }
1356        Ok(Arc::new(Float32Array::from(values)))
1357    }
1358
1359    fn build_float64_column<F>(
1360        &self,
1361        len: usize,
1362        deleted: &[bool],
1363        get_props: F,
1364    ) -> Result<ArrayRef>
1365    where
1366        F: Fn(usize) -> Option<&'a Value>,
1367    {
1368        let mut values = Vec::with_capacity(len);
1369        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1370            let val = get_props(i).and_then(|v| v.as_f64());
1371            if val.is_none() && is_deleted {
1372                values.push(Some(0.0));
1373            } else {
1374                values.push(val);
1375            }
1376        }
1377        Ok(Arc::new(Float64Array::from(values)))
1378    }
1379
1380    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1381    where
1382        F: Fn(usize) -> Option<&'a Value>,
1383    {
1384        let mut values = Vec::with_capacity(len);
1385        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1386            let val = get_props(i).and_then(|v| v.as_bool());
1387            if val.is_none() && is_deleted {
1388                values.push(Some(false));
1389            } else {
1390                values.push(val);
1391            }
1392        }
1393        Ok(Arc::new(BooleanArray::from(values)))
1394    }
1395
1396    fn build_vector_column<F>(
1397        &self,
1398        len: usize,
1399        deleted: &[bool],
1400        get_props: F,
1401        dimensions: usize,
1402    ) -> Result<ArrayRef>
1403    where
1404        F: Fn(usize) -> Option<&'a Value>,
1405    {
1406        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1407
1408        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1409            let val = get_props(i);
1410            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1411            for v in values {
1412                builder.values().append_value(v);
1413            }
1414            builder.append(valid);
1415        }
1416        Ok(Arc::new(builder.finish()))
1417    }
1418
1419    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1420    where
1421        F: Fn(usize) -> Option<&'a Value>,
1422    {
1423        let null_val = Value::Null;
1424        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1425        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1426            let val = get_props(i);
1427            let uni_val = if val.is_none() && is_deleted {
1428                &null_val
1429            } else {
1430                val.unwrap_or(&null_val)
1431            };
1432            // Encode to CypherValue (MessagePack-tagged)
1433            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1434            builder.append_value(&cv_bytes);
1435        }
1436        Ok(Arc::new(builder.finish()))
1437    }
1438
1439    fn build_list_column<F>(
1440        &self,
1441        len: usize,
1442        deleted: &[bool],
1443        get_props: F,
1444        inner: &DataType,
1445    ) -> Result<ArrayRef>
1446    where
1447        F: Fn(usize) -> Option<&'a Value>,
1448    {
1449        match inner {
1450            DataType::String => {
1451                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1452                    if let Some(s) = v.as_str() {
1453                        b.append_value(s);
1454                    } else {
1455                        b.append_null();
1456                    }
1457                })
1458            }
1459            DataType::Int64 => {
1460                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1461                    if let Some(n) = v.as_i64() {
1462                        b.append_value(n);
1463                    } else {
1464                        b.append_null();
1465                    }
1466                })
1467            }
1468            DataType::Float64 => {
1469                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1470                    if let Some(f) = v.as_f64() {
1471                        b.append_value(f);
1472                    } else {
1473                        b.append_null();
1474                    }
1475                })
1476            }
1477            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1478        }
1479    }
1480
1481    /// Generic helper to build a list column with any inner builder type.
1482    fn build_typed_list<F, B, A>(
1483        &self,
1484        len: usize,
1485        deleted: &[bool],
1486        get_props: &F,
1487        inner_builder: B,
1488        mut append_value: A,
1489    ) -> Result<ArrayRef>
1490    where
1491        F: Fn(usize) -> Option<&'a Value>,
1492        B: arrow_array::builder::ArrayBuilder,
1493        A: FnMut(&Value, &mut B),
1494    {
1495        let mut builder = ListBuilder::new(inner_builder);
1496        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1497            let val_array = get_props(i).and_then(|v| v.as_array());
1498            if val_array.is_none() && is_deleted {
1499                builder.append_null();
1500            } else if let Some(arr) = val_array {
1501                for v in arr {
1502                    append_value(v, builder.values());
1503                }
1504                builder.append(true);
1505            } else {
1506                builder.append_null();
1507            }
1508        }
1509        Ok(Arc::new(builder.finish()))
1510    }
1511
1512    fn build_map_column<F>(
1513        &self,
1514        len: usize,
1515        deleted: &[bool],
1516        get_props: F,
1517        key: &DataType,
1518        value: &DataType,
1519    ) -> Result<ArrayRef>
1520    where
1521        F: Fn(usize) -> Option<&'a Value>,
1522    {
1523        if !matches!(key, DataType::String) {
1524            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1525        }
1526
1527        match value {
1528            DataType::String => self.build_typed_map(
1529                len,
1530                deleted,
1531                &get_props,
1532                StringBuilder::new(),
1533                arrow_schema::DataType::Utf8,
1534                |v, b: &mut StringBuilder| {
1535                    if let Some(s) = v.as_str() {
1536                        b.append_value(s);
1537                    } else {
1538                        b.append_null();
1539                    }
1540                },
1541            ),
1542            DataType::Int64 => self.build_typed_map(
1543                len,
1544                deleted,
1545                &get_props,
1546                Int64Builder::new(),
1547                arrow_schema::DataType::Int64,
1548                |v, b: &mut Int64Builder| {
1549                    if let Some(n) = v.as_i64() {
1550                        b.append_value(n);
1551                    } else {
1552                        b.append_null();
1553                    }
1554                },
1555            ),
1556            _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1557        }
1558    }
1559
1560    /// Generic helper to build a map column with any value builder type.
1561    fn build_typed_map<F, B, A>(
1562        &self,
1563        len: usize,
1564        deleted: &[bool],
1565        get_props: &F,
1566        value_builder: B,
1567        value_arrow_type: arrow_schema::DataType,
1568        mut append_value: A,
1569    ) -> Result<ArrayRef>
1570    where
1571        F: Fn(usize) -> Option<&'a Value>,
1572        B: arrow_array::builder::ArrayBuilder,
1573        A: FnMut(&Value, &mut B),
1574    {
1575        let key_builder = Box::new(StringBuilder::new());
1576        let value_builder = Box::new(value_builder);
1577        let struct_builder = StructBuilder::new(
1578            vec![
1579                Field::new("key", arrow_schema::DataType::Utf8, false),
1580                Field::new("value", value_arrow_type, true),
1581            ],
1582            vec![key_builder, value_builder],
1583        );
1584        let mut builder = ListBuilder::new(struct_builder);
1585
1586        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1587            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1588        }
1589        Ok(Arc::new(builder.finish()))
1590    }
1591
1592    /// Append a single map entry to the list builder.
1593    fn append_map_entry<B, A>(
1594        &self,
1595        builder: &mut ListBuilder<StructBuilder>,
1596        val: Option<&'a Value>,
1597        is_deleted: bool,
1598        append_value: &mut A,
1599    ) where
1600        B: arrow_array::builder::ArrayBuilder,
1601        A: FnMut(&Value, &mut B),
1602    {
1603        let val_obj = val.and_then(|v| v.as_object());
1604        if val_obj.is_none() && is_deleted {
1605            builder.append(false);
1606        } else if let Some(obj) = val_obj {
1607            let struct_b = builder.values();
1608            for (k, v) in obj {
1609                struct_b
1610                    .field_builder::<StringBuilder>(0)
1611                    .unwrap()
1612                    .append_value(k);
1613                // Safety: We know the value builder type matches B
1614                let value_b = struct_b.field_builder::<B>(1).unwrap();
1615                append_value(v, value_b);
1616                struct_b.append(true);
1617            }
1618            builder.append(true);
1619        } else {
1620            builder.append(false);
1621        }
1622    }
1623
1624    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1625    where
1626        F: Fn(usize) -> Option<&'a Value>,
1627    {
1628        let mut builder = BinaryBuilder::new();
1629        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1630            if is_deleted {
1631                builder.append_null();
1632                continue;
1633            }
1634            if let Some(val) = get_props(i) {
1635                // Try to parse CRDT from the value
1636                // If it's a string, first parse it as JSON, then as CRDT
1637                let crdt_result = if let Some(s) = val.as_str() {
1638                    serde_json::from_str::<Crdt>(s)
1639                } else {
1640                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1641                    let json_val: serde_json::Value = val.clone().into();
1642                    serde_json::from_value::<Crdt>(json_val)
1643                };
1644
1645                if let Ok(crdt) = crdt_result {
1646                    if let Ok(bytes) = crdt.to_msgpack() {
1647                        builder.append_value(&bytes);
1648                    } else {
1649                        builder.append_null();
1650                    }
1651                } else {
1652                    builder.append_null();
1653                }
1654            } else {
1655                builder.append_null();
1656            }
1657        }
1658        Ok(Arc::new(builder.finish()))
1659    }
1660}
1661
1662/// Build a column for edge entries (no deleted flag handling needed).
1663pub fn build_edge_column<'a>(
1664    name: &'a str,
1665    data_type: &'a DataType,
1666    len: usize,
1667    get_props: impl Fn(usize) -> Option<&'a Value>,
1668) -> Result<ArrayRef> {
1669    // For edges, use empty deleted array
1670    let deleted = vec![false; len];
1671    let extractor = PropertyExtractor::new(name, data_type);
1672    extractor.build_column(len, &deleted, get_props)
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677    use super::*;
1678    use arrow_array::{
1679        Array, DurationMicrosecondArray,
1680        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1681    };
1682    use std::collections::HashMap;
1683    use uni_common::TemporalValue;
1684    use uni_crdt::{Crdt, GCounter};
1685
1686    #[test]
1687    fn test_arrow_to_value_string() {
1688        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1689        assert_eq!(
1690            arrow_to_value(&arr, 0, None),
1691            Value::String("hello".to_string())
1692        );
1693        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1694        assert_eq!(
1695            arrow_to_value(&arr, 2, None),
1696            Value::String("world".to_string())
1697        );
1698    }
1699
1700    #[test]
1701    fn test_arrow_to_value_int64() {
1702        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1703        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1704        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1705        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1706    }
1707
1708    #[test]
1709    #[allow(clippy::approx_constant)]
1710    fn test_arrow_to_value_float64() {
1711        let arr = Float64Array::from(vec![Some(3.14), None]);
1712        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1713        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1714    }
1715
1716    #[test]
1717    fn test_arrow_to_value_bool() {
1718        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1719        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1720        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1721        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1722    }
1723
1724    #[test]
1725    fn test_values_to_array_int64() {
1726        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1727        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1728        assert_eq!(arr.len(), 4);
1729
1730        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1731        assert_eq!(int_arr.value(0), 1);
1732        assert_eq!(int_arr.value(1), 2);
1733        assert!(int_arr.is_null(2));
1734        assert_eq!(int_arr.value(3), 4);
1735    }
1736
1737    #[test]
1738    fn test_values_to_array_string() {
1739        let values = vec![
1740            Value::String("a".to_string()),
1741            Value::String("b".to_string()),
1742            Value::Null,
1743        ];
1744        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1745        assert_eq!(arr.len(), 3);
1746
1747        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1748        assert_eq!(str_arr.value(0), "a");
1749        assert_eq!(str_arr.value(1), "b");
1750        assert!(str_arr.is_null(2));
1751    }
1752
1753    #[test]
1754    fn test_property_extractor_string() {
1755        let props: Vec<HashMap<String, Value>> = vec![
1756            [("name".to_string(), Value::String("Alice".to_string()))]
1757                .into_iter()
1758                .collect(),
1759            [("name".to_string(), Value::String("Bob".to_string()))]
1760                .into_iter()
1761                .collect(),
1762            HashMap::new(),
1763        ];
1764        let deleted = vec![false, false, true];
1765
1766        let extractor = PropertyExtractor::new("name", &DataType::String);
1767        let arr = extractor
1768            .build_column(3, &deleted, |i| props[i].get("name"))
1769            .unwrap();
1770
1771        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1772        assert_eq!(str_arr.value(0), "Alice");
1773        assert_eq!(str_arr.value(1), "Bob");
1774        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
1775    }
1776
1777    #[test]
1778    fn test_property_extractor_int64() {
1779        let props: Vec<HashMap<String, Value>> = vec![
1780            [("age".to_string(), Value::Int(25))].into_iter().collect(),
1781            [("age".to_string(), Value::Int(30))].into_iter().collect(),
1782            HashMap::new(),
1783        ];
1784        let deleted = vec![false, false, true];
1785
1786        let extractor = PropertyExtractor::new("age", &DataType::Int64);
1787        let arr = extractor
1788            .build_column(3, &deleted, |i| props[i].get("age"))
1789            .unwrap();
1790
1791        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1792        assert_eq!(int_arr.value(0), 25);
1793        assert_eq!(int_arr.value(1), 30);
1794        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
1795    }
1796
1797    #[test]
1798    fn test_arrow_to_value_time64() {
1799        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
1800        let mut builder = Time64MicrosecondBuilder::new();
1801        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
1802        builder.append_value(37_845_000_000);
1803        // 00:00:00 = 0 microseconds
1804        builder.append_value(0);
1805        // 23:59:59.123456 = 86399.123456 seconds
1806        builder.append_value(86_399_123_456);
1807        builder.append_null();
1808
1809        let arr = builder.finish();
1810        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
1811        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1812        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1813        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1814        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1815    }
1816
1817    #[test]
1818    fn test_arrow_to_value_duration() {
1819        // Test DurationMicrosecondArray conversion
1820        // Arrow→Value now returns Value::Temporal(Duration)
1821        let arr = DurationMicrosecondArray::from(vec![
1822            Some(1_000_000),      // 1 second in microseconds
1823            Some(3_600_000_000),  // 1 hour
1824            Some(86_400_000_000), // 1 day
1825            None,
1826        ]);
1827
1828        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1829        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1830        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1831        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1832    }
1833
1834    #[test]
1835    fn test_arrow_to_value_binary_crdt() {
1836        // Test BinaryArray (CRDT) conversion - round-trip test
1837        let mut builder = BinaryBuilder::new();
1838
1839        // Create a GCounter CRDT and serialize it
1840        let mut counter = GCounter::new();
1841        counter.increment("actor1", 5);
1842        let crdt = Crdt::GCounter(counter);
1843        let bytes = crdt.to_msgpack().unwrap();
1844        builder.append_value(&bytes);
1845
1846        // Add a null value
1847        builder.append_null();
1848
1849        let arr = builder.finish();
1850
1851        // The first value should deserialize back to a map
1852        let result = arrow_to_value(&arr, 0, None);
1853        assert!(result.as_object().is_some());
1854        let obj = result.as_object().unwrap();
1855        // GCounter serializes with tag "t": "gc"
1856        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1857
1858        // Null value should return null
1859        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1860    }
1861
1862    #[test]
1863    fn test_datetime_struct_encode_decode_roundtrip() {
1864        // Test DateTime struct encoding with offset and timezone preservation
1865        let values = vec![
1866            Value::Temporal(TemporalValue::DateTime {
1867                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
1868                offset_seconds: 3600,                  // +01:00
1869                timezone_name: Some("Europe/Paris".to_string()),
1870            }),
1871            Value::Temporal(TemporalValue::DateTime {
1872                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
1873                offset_seconds: -18000,                 // -05:00
1874                timezone_name: None,
1875            }),
1876            Value::Temporal(TemporalValue::DateTime {
1877                nanos_since_epoch: 0, // Unix epoch
1878                offset_seconds: 0,
1879                timezone_name: Some("UTC".to_string()),
1880            }),
1881        ];
1882
1883        // Encode to Arrow struct
1884        let arr_ref = values_to_datetime_struct_array(&values);
1885        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1886        assert_eq!(arr.len(), 3);
1887
1888        // Decode back to Value
1889        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1890        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1891        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1892
1893        // Verify round-trip preserves all fields
1894        assert_eq!(decoded_0, values[0]);
1895        assert_eq!(decoded_1, values[1]);
1896        assert_eq!(decoded_2, values[2]);
1897
1898        // Verify struct field extraction
1899        if let Value::Temporal(TemporalValue::DateTime {
1900            nanos_since_epoch,
1901            offset_seconds,
1902            timezone_name,
1903        }) = decoded_0
1904        {
1905            assert_eq!(nanos_since_epoch, 441763200000000000);
1906            assert_eq!(offset_seconds, 3600);
1907            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1908        } else {
1909            panic!("Expected DateTime value");
1910        }
1911    }
1912
1913    #[test]
1914    fn test_datetime_struct_null_handling() {
1915        // Test DateTime struct with null values
1916        let values = vec![
1917            Value::Temporal(TemporalValue::DateTime {
1918                nanos_since_epoch: 441763200000000000,
1919                offset_seconds: 3600,
1920                timezone_name: Some("Europe/Paris".to_string()),
1921            }),
1922            Value::Null,
1923            Value::Temporal(TemporalValue::DateTime {
1924                nanos_since_epoch: 0,
1925                offset_seconds: 0,
1926                timezone_name: None,
1927            }),
1928        ];
1929
1930        let arr_ref = values_to_datetime_struct_array(&values);
1931        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1932        assert_eq!(arr.len(), 3);
1933
1934        // Check first value is valid
1935        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1936        assert_eq!(decoded_0, values[0]);
1937
1938        // Check second value is null
1939        assert!(arr.is_null(1));
1940        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1941        assert_eq!(decoded_1, Value::Null);
1942
1943        // Check third value is valid
1944        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1945        assert_eq!(decoded_2, values[2]);
1946    }
1947
1948    #[test]
1949    fn test_datetime_struct_boundary_values() {
1950        // Test boundary values: offset=0, large positive/negative offsets
1951        let values = vec![
1952            Value::Temporal(TemporalValue::DateTime {
1953                nanos_since_epoch: 441763200000000000,
1954                offset_seconds: 0, // UTC
1955                timezone_name: None,
1956            }),
1957            Value::Temporal(TemporalValue::DateTime {
1958                nanos_since_epoch: 441763200000000000,
1959                offset_seconds: 43200, // +12:00 (max typical offset)
1960                timezone_name: None,
1961            }),
1962            Value::Temporal(TemporalValue::DateTime {
1963                nanos_since_epoch: 441763200000000000,
1964                offset_seconds: -43200, // -12:00 (min typical offset)
1965                timezone_name: None,
1966            }),
1967        ];
1968
1969        let arr_ref = values_to_datetime_struct_array(&values);
1970        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1971        assert_eq!(arr.len(), 3);
1972
1973        // Verify round-trip for all boundary values
1974        for (i, expected) in values.iter().enumerate() {
1975            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
1976            assert_eq!(&decoded, expected);
1977        }
1978    }
1979
1980    #[test]
1981    fn test_datetime_old_schema_migration() {
1982        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
1983        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
1984        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
1985        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
1986        builder.append_null();
1987
1988        let arr = builder.finish();
1989
1990        // Decode with DataType::DateTime hint should migrate old schema
1991        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
1992        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
1993        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
1994
1995        // Old schema should default to offset=0, preserve timezone
1996        if let Value::Temporal(TemporalValue::DateTime {
1997            nanos_since_epoch,
1998            offset_seconds,
1999            timezone_name,
2000        }) = decoded_0
2001        {
2002            assert_eq!(nanos_since_epoch, 441763200000000000);
2003            assert_eq!(offset_seconds, 0);
2004            assert_eq!(timezone_name, Some("UTC".to_string()));
2005        } else {
2006            panic!("Expected DateTime value");
2007        }
2008
2009        // Verify null handling
2010        assert_eq!(decoded_2, Value::Null);
2011    }
2012
2013    #[test]
2014    fn test_time_struct_encode_decode_roundtrip() {
2015        // Test Time struct encoding with offset preservation
2016        let values = vec![
2017            Value::Temporal(TemporalValue::Time {
2018                nanos_since_midnight: 37845000000000, // 10:30:45
2019                offset_seconds: 3600,                 // +01:00
2020            }),
2021            Value::Temporal(TemporalValue::Time {
2022                nanos_since_midnight: 0, // 00:00:00
2023                offset_seconds: 0,
2024            }),
2025            Value::Temporal(TemporalValue::Time {
2026                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
2027                offset_seconds: -18000,               // -05:00
2028            }),
2029        ];
2030
2031        // Encode to Arrow struct
2032        let arr_ref = values_to_time_struct_array(&values);
2033        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2034        assert_eq!(arr.len(), 3);
2035
2036        // Decode back to Value
2037        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2038        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2039        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2040
2041        // Verify round-trip preserves all fields
2042        assert_eq!(decoded_0, values[0]);
2043        assert_eq!(decoded_1, values[1]);
2044        assert_eq!(decoded_2, values[2]);
2045
2046        // Verify struct field extraction
2047        if let Value::Temporal(TemporalValue::Time {
2048            nanos_since_midnight,
2049            offset_seconds,
2050        }) = decoded_0
2051        {
2052            assert_eq!(nanos_since_midnight, 37845000000000);
2053            assert_eq!(offset_seconds, 3600);
2054        } else {
2055            panic!("Expected Time value");
2056        }
2057    }
2058
2059    #[test]
2060    fn test_time_struct_null_handling() {
2061        // Test Time struct with null values
2062        let values = vec![
2063            Value::Temporal(TemporalValue::Time {
2064                nanos_since_midnight: 37845000000000,
2065                offset_seconds: 3600,
2066            }),
2067            Value::Null,
2068            Value::Temporal(TemporalValue::Time {
2069                nanos_since_midnight: 0,
2070                offset_seconds: 0,
2071            }),
2072        ];
2073
2074        let arr_ref = values_to_time_struct_array(&values);
2075        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2076        assert_eq!(arr.len(), 3);
2077
2078        // Check first value is valid
2079        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2080        assert_eq!(decoded_0, values[0]);
2081
2082        // Check second value is null
2083        assert!(arr.is_null(1));
2084        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2085        assert_eq!(decoded_1, Value::Null);
2086
2087        // Check third value is valid
2088        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2089        assert_eq!(decoded_2, values[2]);
2090    }
2091
2092    // Tests for extract_vector_f32_values
2093
2094    #[test]
2095    fn test_extract_vector_f32_values_valid_vector() {
2096        let v = vec![1.0, 2.0, 3.0];
2097        let val = Value::Vector(v.clone());
2098        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2099        assert_eq!(result, v);
2100        assert!(valid);
2101    }
2102
2103    #[test]
2104    fn test_extract_vector_f32_values_vector_wrong_dims() {
2105        let v = vec![1.0, 2.0];
2106        let val = Value::Vector(v);
2107        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2108        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2109        assert!(!valid);
2110    }
2111
2112    #[test]
2113    fn test_extract_vector_f32_values_valid_list() {
2114        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2115        let val = Value::List(v);
2116        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2117        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2118        assert!(valid);
2119    }
2120
2121    #[test]
2122    fn test_extract_vector_f32_values_list_wrong_dims() {
2123        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2124        let val = Value::List(v);
2125        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2126        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2127        assert!(!valid);
2128    }
2129
2130    #[test]
2131    fn test_extract_vector_f32_values_list_int_coercion() {
2132        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2133        let val = Value::List(v);
2134        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2135        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2136        assert!(valid);
2137    }
2138
2139    #[test]
2140    fn test_extract_vector_f32_values_none() {
2141        let (result, valid) = extract_vector_f32_values(None, false, 3);
2142        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2143        assert!(!valid);
2144    }
2145
2146    #[test]
2147    fn test_extract_vector_f32_values_null() {
2148        let val = Value::Null;
2149        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2150        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2151        assert!(!valid);
2152    }
2153
2154    #[test]
2155    fn test_extract_vector_f32_values_unsupported_type() {
2156        let val = Value::String("not a vector".to_string());
2157        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2158        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2159        assert!(!valid);
2160    }
2161
2162    #[test]
2163    fn test_extract_vector_f32_values_deleted_with_none() {
2164        let (result, valid) = extract_vector_f32_values(None, true, 3);
2165        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2166        assert!(valid); // Deleted entries are marked as valid with zeros
2167    }
2168
2169    #[test]
2170    fn test_extract_vector_f32_values_deleted_with_null() {
2171        let val = Value::Null;
2172        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2173        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2174        assert!(valid); // Deleted entries are marked as valid with zeros
2175    }
2176
2177    // Tests for values_to_array with FixedSizeList
2178
2179    #[test]
2180    fn test_values_to_fixed_size_list_vector_with_nulls() {
2181        let values = vec![
2182            Value::Vector(vec![1.0, 2.0]),
2183            Value::Null,
2184            Value::Vector(vec![3.0, 4.0]),
2185            Value::String("invalid".to_string()),
2186        ];
2187        let arr_ref = values_to_array(
2188            &values,
2189            &ArrowDataType::FixedSizeList(
2190                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2191                2,
2192            ),
2193        )
2194        .unwrap();
2195
2196        let arr = arr_ref
2197            .as_any()
2198            .downcast_ref::<FixedSizeListArray>()
2199            .unwrap();
2200
2201        assert_eq!(arr.len(), 4);
2202        assert!(arr.is_valid(0));
2203        assert!(!arr.is_valid(1)); // Null value
2204        assert!(arr.is_valid(2));
2205        assert!(!arr.is_valid(3)); // Invalid type
2206    }
2207
2208    #[test]
2209    fn test_values_to_fixed_size_list_from_list() {
2210        let values = vec![
2211            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2212            Value::List(vec![Value::Int(3), Value::Int(4)]),
2213        ];
2214        let arr_ref = values_to_array(
2215            &values,
2216            &ArrowDataType::FixedSizeList(
2217                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2218                2,
2219            ),
2220        )
2221        .unwrap();
2222
2223        let arr = arr_ref
2224            .as_any()
2225            .downcast_ref::<FixedSizeListArray>()
2226            .unwrap();
2227
2228        assert_eq!(arr.len(), 2);
2229        assert!(arr.is_valid(0));
2230        assert!(arr.is_valid(1));
2231
2232        // Check values
2233        let child = arr
2234            .values()
2235            .as_any()
2236            .downcast_ref::<Float32Array>()
2237            .unwrap();
2238        assert_eq!(child.value(0), 1.0);
2239        assert_eq!(child.value(1), 2.0);
2240        assert_eq!(child.value(2), 3.0);
2241        assert_eq!(child.value(3), 4.0);
2242    }
2243
2244    #[test]
2245    fn test_values_to_fixed_size_list_wrong_dimensions() {
2246        let values = vec![
2247            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2248            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2249        ];
2250        let arr_ref = values_to_array(
2251            &values,
2252            &ArrowDataType::FixedSizeList(
2253                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2254                2,
2255            ),
2256        )
2257        .unwrap();
2258
2259        let arr = arr_ref
2260            .as_any()
2261            .downcast_ref::<FixedSizeListArray>()
2262            .unwrap();
2263
2264        assert_eq!(arr.len(), 2);
2265        assert!(!arr.is_valid(0)); // Wrong dimensions
2266        assert!(!arr.is_valid(1)); // Wrong dimensions
2267
2268        // Check that child array has zeros for invalid entries
2269        let child = arr
2270            .values()
2271            .as_any()
2272            .downcast_ref::<Float32Array>()
2273            .unwrap();
2274        assert_eq!(child.value(0), 0.0);
2275        assert_eq!(child.value(1), 0.0);
2276        assert_eq!(child.value(2), 0.0);
2277        assert_eq!(child.value(3), 0.0);
2278    }
2279
2280    #[test]
2281    fn test_values_to_fixed_size_list_all_nulls() {
2282        let values = vec![Value::Null, Value::Null, Value::Null];
2283        let arr_ref = values_to_array(
2284            &values,
2285            &ArrowDataType::FixedSizeList(
2286                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2287                3,
2288            ),
2289        )
2290        .unwrap();
2291
2292        let arr = arr_ref
2293            .as_any()
2294            .downcast_ref::<FixedSizeListArray>()
2295            .unwrap();
2296
2297        assert_eq!(arr.len(), 3);
2298        assert!(!arr.is_valid(0));
2299        assert!(!arr.is_valid(1));
2300        assert!(!arr.is_valid(2));
2301
2302        // Verify child array length is correct (3 rows × 3 dims = 9)
2303        let child = arr
2304            .values()
2305            .as_any()
2306            .downcast_ref::<Float32Array>()
2307            .unwrap();
2308        assert_eq!(child.len(), 9);
2309    }
2310
2311    #[test]
2312    fn test_values_to_fixed_size_list_mixed_types() {
2313        let values = vec![
2314            Value::Vector(vec![1.0, 2.0]),
2315            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2316            Value::Null,
2317            Value::String("invalid".to_string()),
2318        ];
2319        let arr_ref = values_to_array(
2320            &values,
2321            &ArrowDataType::FixedSizeList(
2322                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2323                2,
2324            ),
2325        )
2326        .unwrap();
2327
2328        let arr = arr_ref
2329            .as_any()
2330            .downcast_ref::<FixedSizeListArray>()
2331            .unwrap();
2332
2333        assert_eq!(arr.len(), 4);
2334        assert!(arr.is_valid(0)); // Value::Vector
2335        assert!(arr.is_valid(1)); // Value::List
2336        assert!(!arr.is_valid(2)); // Value::Null
2337        assert!(!arr.is_valid(3)); // Value::String
2338
2339        // Check values for valid entries
2340        let child = arr
2341            .values()
2342            .as_any()
2343            .downcast_ref::<Float32Array>()
2344            .unwrap();
2345        assert_eq!(child.value(0), 1.0);
2346        assert_eq!(child.value(1), 2.0);
2347        assert_eq!(child.value(2), 3.0);
2348        assert_eq!(child.value(3), 4.0);
2349    }
2350
2351    // Tests for PropertyExtractor::build_vector_column
2352
2353    #[test]
2354    fn test_build_vector_column_with_nulls_and_deleted() {
2355        let data_type = DataType::Vector { dimensions: 3 };
2356        let extractor = PropertyExtractor::new("test_vec", &data_type);
2357
2358        let props = [
2359            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2360            None,              // Missing property
2361            Some(Value::Null), // Null value
2362            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2363        ];
2364        let deleted = [false, false, false, true]; // Last one is deleted
2365
2366        let arr_ref = extractor
2367            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2368            .unwrap();
2369
2370        let arr = arr_ref
2371            .as_any()
2372            .downcast_ref::<FixedSizeListArray>()
2373            .unwrap();
2374
2375        assert_eq!(arr.len(), 4);
2376        assert!(arr.is_valid(0)); // Valid vector
2377        assert!(!arr.is_valid(1)); // Missing property
2378        assert!(!arr.is_valid(2)); // Null value
2379        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2380
2381        // Check values
2382        let child = arr
2383            .values()
2384            .as_any()
2385            .downcast_ref::<Float32Array>()
2386            .unwrap();
2387        assert_eq!(child.value(0), 1.0);
2388        assert_eq!(child.value(1), 2.0);
2389        assert_eq!(child.value(2), 3.0);
2390        // Indices 3-5: zeros for missing
2391        // Indices 6-8: zeros for null
2392        // Indices 9-11: zeros for deleted (but marked as valid)
2393        assert_eq!(child.value(9), 0.0);
2394        assert_eq!(child.value(10), 0.0);
2395        assert_eq!(child.value(11), 0.0);
2396    }
2397
2398    #[test]
2399    fn test_build_vector_column_with_list_input() {
2400        let data_type = DataType::Vector { dimensions: 2 };
2401        let extractor = PropertyExtractor::new("test_vec", &data_type);
2402
2403        let props = [
2404            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2405            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2406            Some(Value::Vector(vec![5.0, 6.0])),
2407        ];
2408        let deleted = [false, false, false];
2409
2410        let arr_ref = extractor
2411            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2412            .unwrap();
2413
2414        let arr = arr_ref
2415            .as_any()
2416            .downcast_ref::<FixedSizeListArray>()
2417            .unwrap();
2418
2419        assert_eq!(arr.len(), 3);
2420        assert!(arr.is_valid(0));
2421        assert!(arr.is_valid(1));
2422        assert!(arr.is_valid(2));
2423
2424        // Check values
2425        let child = arr
2426            .values()
2427            .as_any()
2428            .downcast_ref::<Float32Array>()
2429            .unwrap();
2430        assert_eq!(child.value(0), 1.0);
2431        assert_eq!(child.value(1), 2.0);
2432        assert_eq!(child.value(2), 3.0);
2433        assert_eq!(child.value(3), 4.0);
2434        assert_eq!(child.value(4), 5.0);
2435        assert_eq!(child.value(5), 6.0);
2436    }
2437}