Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array,
20    Float64Array, Int32Array, Int64Array, IntervalMonthDayNanoArray, LargeBinaryArray, ListArray,
21    StringArray, StructArray, Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
22};
23use arrow_schema::{DataType as ArrowDataType, Field};
24use std::collections::HashMap;
25use std::sync::Arc;
26use uni_common::DataType;
27use uni_common::Value;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema;
30use uni_crdt::Crdt;
31
32/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
33///
34/// Shared utility for building `_created_at` and `_updated_at` columns
35/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
36fn build_timestamp_column_from_id_map<K, I>(
37    ids: I,
38    timestamps: Option<&HashMap<K, i64>>,
39) -> ArrayRef
40where
41    K: Eq + std::hash::Hash,
42    I: IntoIterator<Item = K>,
43{
44    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
45    for id in ids {
46        match timestamps.and_then(|m| m.get(&id)) {
47            Some(&ts) => builder.append_value(ts),
48            None => builder.append_null(),
49        }
50    }
51    Arc::new(builder.finish())
52}
53
54pub fn build_timestamp_column_from_vid_map<I>(
55    ids: I,
56    timestamps: Option<&HashMap<Vid, i64>>,
57) -> ArrayRef
58where
59    I: IntoIterator<Item = Vid>,
60{
61    build_timestamp_column_from_id_map(ids, timestamps)
62}
63
64pub fn build_timestamp_column_from_eid_map<I>(
65    ids: I,
66    timestamps: Option<&HashMap<Eid, i64>>,
67) -> ArrayRef
68where
69    I: IntoIterator<Item = Eid>,
70{
71    build_timestamp_column_from_id_map(ids, timestamps)
72}
73
74/// Build a timestamp column from an iterator of optional timestamps.
75///
76/// This is useful for building timestamp columns directly from entry structs.
77pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
78where
79    I: IntoIterator<Item = Option<i64>>,
80{
81    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
82    for ts in timestamps {
83        builder.append_option(ts);
84    }
85    Arc::new(builder.finish())
86}
87
88/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
89///
90/// Returns an empty vec when the row is null, the inner array is not a
91/// `StringArray`, or the list is empty.  Null entries inside the list are
92/// silently skipped.
93pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
94    if list_arr.is_null(row) {
95        return Vec::new();
96    }
97    let values = list_arr.value(row);
98    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
99        return Vec::new();
100    };
101    (0..str_arr.len())
102        .filter(|&j| !str_arr.is_null(j))
103        .map(|j| str_arr.value(j).to_string())
104        .collect()
105}
106
107/// Parse a datetime string into nanoseconds since Unix epoch.
108///
109/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
110/// and "%Y-%m-%dT%H:%MZ" formats.
111fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
112    chrono::DateTime::parse_from_rfc3339(s)
113        .map(|dt| {
114            dt.with_timezone(&chrono::Utc)
115                .timestamp_nanos_opt()
116                .unwrap_or(0)
117        })
118        .or_else(|_| {
119            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
120                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
121        })
122        .or_else(|_| {
123            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
124                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
125        })
126        .or_else(|_| {
127            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
128                dt.with_timezone(&chrono::Utc)
129                    .timestamp_nanos_opt()
130                    .unwrap_or(0)
131            })
132        })
133        .ok()
134        .or_else(|| {
135            s.strip_suffix('Z')
136                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
137                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
138        })
139}
140
141/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
142///
143/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
144/// checks whether the given array matches that layout and, if so, converts the
145/// key/value pairs back into a `HashMap<String, Value>`.
146fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
147    let structs = arr.as_any().downcast_ref::<StructArray>()?;
148    let fields = structs.fields();
149    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
150        return None;
151    }
152    let key_col = structs.column(0);
153    let val_col = structs.column(1);
154    let mut map = HashMap::new();
155    for i in 0..structs.len() {
156        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
157            map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
158        }
159    }
160    Some(map)
161}
162
163/// Convert all elements of an Arrow array into a `Vec<Value>`.
164fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
165    (0..arr.len())
166        .map(|i| arrow_to_value(arr.as_ref(), i, None))
167        .collect()
168}
169
170/// Convert an Arrow array value at a given row index to a Uni Value.
171///
172/// Handles all common Arrow types and recursively processes nested structures
173/// like Lists and Structs. The optional `data_type` parameter provides schema
174/// context for decoding DateTime and Time struct arrays; when provided, it
175/// takes precedence over runtime type detection.
176pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
177    if col.is_null(row) {
178        return Value::Null;
179    }
180
181    // Schema-driven decode for DateTime and Time structs
182    if let Some(dt) = data_type {
183        match dt {
184            DataType::DateTime => {
185                // Expect StructArray with three fields
186                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
187                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
188                        struct_arr.column_by_name("nanos_since_epoch"),
189                        struct_arr.column_by_name("offset_seconds"),
190                        struct_arr.column_by_name("timezone_name"),
191                    )
192                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
193                        nanos_col
194                            .as_any()
195                            .downcast_ref::<TimestampNanosecondArray>(),
196                        offset_col.as_any().downcast_ref::<Int32Array>(),
197                        tz_col.as_any().downcast_ref::<StringArray>(),
198                    )
199                {
200                    if nanos_arr.is_null(row) {
201                        return Value::Null;
202                    }
203                    let nanos = nanos_arr.value(row);
204                    if offset_arr.is_null(row) {
205                        // No offset → LocalDateTime
206                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
207                            nanos_since_epoch: nanos,
208                        });
209                    }
210                    let offset = offset_arr.value(row);
211                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
212                    return Value::Temporal(uni_common::TemporalValue::DateTime {
213                        nanos_since_epoch: nanos,
214                        offset_seconds: offset,
215                        timezone_name: tz_name,
216                    });
217                }
218                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
219                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
220                    let nanos = ts.value(row);
221                    let tz_name = ts.timezone().map(|s| s.to_string());
222                    return Value::Temporal(uni_common::TemporalValue::DateTime {
223                        nanos_since_epoch: nanos,
224                        offset_seconds: 0,
225                        timezone_name: tz_name,
226                    });
227                }
228            }
229            DataType::Time => {
230                // Expect StructArray with two fields
231                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
232                    && let (Some(nanos_col), Some(offset_col)) = (
233                        struct_arr.column_by_name("nanos_since_midnight"),
234                        struct_arr.column_by_name("offset_seconds"),
235                    )
236                    && let (Some(nanos_arr), Some(offset_arr)) = (
237                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
238                        offset_col.as_any().downcast_ref::<Int32Array>(),
239                    )
240                {
241                    // Check field-level nulls before calling .value()
242                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
243                        return Value::Null;
244                    }
245                    let nanos = nanos_arr.value(row);
246                    let offset = offset_arr.value(row);
247                    return Value::Temporal(uni_common::TemporalValue::Time {
248                        nanos_since_midnight: nanos,
249                        offset_seconds: offset,
250                    });
251                }
252                // Fall back to old schema: Time64Nanosecond → Time with offset=0
253                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
254                    let nanos = t.value(row);
255                    return Value::Temporal(uni_common::TemporalValue::Time {
256                        nanos_since_midnight: nanos,
257                        offset_seconds: 0,
258                    });
259                }
260            }
261            _ => {}
262        }
263    }
264
265    // String types
266    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
267        return Value::String(s.value(row).to_string());
268    }
269
270    // Integer types
271    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
272        return Value::Int(u.value(row) as i64);
273    }
274    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
275        return Value::Int(i.value(row));
276    }
277    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
278        return Value::Int(i.value(row) as i64);
279    }
280
281    // Float types
282    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
283        return Value::Float(f.value(row));
284    }
285    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
286        return Value::Float(f.value(row) as f64);
287    }
288
289    // Boolean type
290    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
291        return Value::Bool(b.value(row));
292    }
293
294    // Fixed-size list (vectors)
295    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
296        return Value::List(array_to_value_list(&list.value(row)));
297    }
298
299    // Variable-size list
300    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
301        let arr = list.value(row);
302
303        // Map types are stored as List(Struct(key, value)); reconstruct as map
304        if let Some(obj) = try_reconstruct_map(&arr) {
305            return Value::Map(obj);
306        }
307
308        return Value::List(array_to_value_list(&arr));
309    }
310
311    // Large list (variable-size list with i64 offsets)
312    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
313        return Value::List(array_to_value_list(&list.value(row)));
314    }
315
316    // Struct type — detect temporal structs by field names before generic handler
317    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
318        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
319
320        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
321        if field_names.contains(&"nanos_since_epoch")
322            && field_names.contains(&"offset_seconds")
323            && field_names.contains(&"timezone_name")
324            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
325                s.column_by_name("nanos_since_epoch"),
326                s.column_by_name("offset_seconds"),
327                s.column_by_name("timezone_name"),
328            )
329        {
330            // Try TimestampNanosecond first (standard schema), then Int64 fallback
331            let nanos_opt = nanos_col
332                .as_any()
333                .downcast_ref::<TimestampNanosecondArray>()
334                .map(|a| {
335                    if a.is_null(row) {
336                        None
337                    } else {
338                        Some(a.value(row))
339                    }
340                })
341                .or_else(|| {
342                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
343                        if a.is_null(row) {
344                            None
345                        } else {
346                            Some(a.value(row))
347                        }
348                    })
349                });
350            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
351                if a.is_null(row) {
352                    None
353                } else {
354                    Some(a.value(row))
355                }
356            });
357
358            if let Some(Some(nanos)) = nanos_opt {
359                match offset_opt {
360                    Some(Some(offset)) => {
361                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
362                            if a.is_null(row) {
363                                None
364                            } else {
365                                Some(a.value(row).to_string())
366                            }
367                        });
368                        return Value::Temporal(uni_common::TemporalValue::DateTime {
369                            nanos_since_epoch: nanos,
370                            offset_seconds: offset,
371                            timezone_name: tz_name,
372                        });
373                    }
374                    _ => {
375                        // No offset → LocalDateTime
376                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
377                            nanos_since_epoch: nanos,
378                        });
379                    }
380                }
381            }
382        }
383
384        // Time struct: {nanos_since_midnight, offset_seconds}
385        if field_names.contains(&"nanos_since_midnight")
386            && field_names.contains(&"offset_seconds")
387            && let (Some(nanos_col), Some(offset_col)) = (
388                s.column_by_name("nanos_since_midnight"),
389                s.column_by_name("offset_seconds"),
390            )
391        {
392            // Try Time64Nanosecond first (standard schema), then Int64 fallback
393            let nanos_opt = nanos_col
394                .as_any()
395                .downcast_ref::<Time64NanosecondArray>()
396                .map(|a| {
397                    if a.is_null(row) {
398                        None
399                    } else {
400                        Some(a.value(row))
401                    }
402                })
403                .or_else(|| {
404                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
405                        if a.is_null(row) {
406                            None
407                        } else {
408                            Some(a.value(row))
409                        }
410                    })
411                });
412            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
413                if a.is_null(row) {
414                    None
415                } else {
416                    Some(a.value(row))
417                }
418            });
419
420            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
421                return Value::Temporal(uni_common::TemporalValue::Time {
422                    nanos_since_midnight: nanos,
423                    offset_seconds: offset,
424                });
425            }
426        }
427
428        // Generic struct → Map
429        let mut map = HashMap::new();
430        for (field, child) in s.fields().iter().zip(s.columns()) {
431            map.insert(
432                field.name().clone(),
433                arrow_to_value(child.as_ref(), row, None),
434            );
435        }
436        return Value::Map(map);
437    }
438
439    // Date32 type (days since epoch) - return as Value::Temporal
440    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
441        let days = d.value(row);
442        return Value::Temporal(uni_common::TemporalValue::Date {
443            days_since_epoch: days,
444        });
445    }
446
447    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
448    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
449        let nanos = ts.value(row);
450        return match ts.timezone() {
451            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
452                nanos_since_epoch: nanos,
453                offset_seconds: 0,
454                timezone_name: Some(tz.to_string()),
455            }),
456            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
457                nanos_since_epoch: nanos,
458            }),
459        };
460    }
461
462    // Time64 (nanoseconds since midnight) - return as Value::Temporal
463    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
464        let nanos = t.value(row);
465        return Value::Temporal(uni_common::TemporalValue::LocalTime {
466            nanos_since_midnight: nanos,
467        });
468    }
469
470    // Time64 (microseconds since midnight) - convert to nanoseconds
471    if let Some(t) = col
472        .as_any()
473        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
474    {
475        let micros = t.value(row);
476        return Value::Temporal(uni_common::TemporalValue::LocalTime {
477            nanos_since_midnight: micros * 1000,
478        });
479    }
480
481    // DurationMicrosecond - convert to Duration with nanoseconds
482    if let Some(d) = col
483        .as_any()
484        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
485    {
486        let micros = d.value(row);
487        let total_nanos = micros * 1000;
488        let seconds = total_nanos / 1_000_000_000;
489        let remaining_nanos = total_nanos % 1_000_000_000;
490        return Value::Temporal(uni_common::TemporalValue::Duration {
491            months: 0,
492            days: 0,
493            nanos: seconds * 1_000_000_000 + remaining_nanos,
494        });
495    }
496
497    // IntervalMonthDayNano - return as Value::Temporal(Duration)
498    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
499        let val = interval.value(row);
500        return Value::Temporal(uni_common::TemporalValue::Duration {
501            months: val.months as i64,
502            days: val.days as i64,
503            nanos: val.nanoseconds,
504        });
505    }
506
507    // LargeBinary (CypherValue MessagePack-tagged encoding)
508    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
509        let bytes = b.value(row);
510        if bytes.is_empty() {
511            return Value::Null;
512        }
513        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
514            eprintln!("CypherValue decode error: {}", e);
515            Value::Null
516        });
517    }
518
519    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
520    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
521        let bytes = b.value(row);
522        return Crdt::from_msgpack(bytes)
523            .ok()
524            .and_then(|crdt| serde_json::to_value(&crdt).ok())
525            .map(Value::from)
526            .unwrap_or(Value::Null);
527    }
528
529    // Fallback
530    Value::Null
531}
532
533fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
534    let mut builder = UInt64Builder::with_capacity(values.len());
535    for v in values {
536        if let Some(n) = v.as_u64() {
537            builder.append_value(n);
538        } else {
539            builder.append_null();
540        }
541    }
542    Arc::new(builder.finish())
543}
544
545fn values_to_int64_array(values: &[Value]) -> ArrayRef {
546    let mut builder = Int64Builder::with_capacity(values.len());
547    for v in values {
548        if let Some(n) = v.as_i64() {
549            builder.append_value(n);
550        } else {
551            builder.append_null();
552        }
553    }
554    Arc::new(builder.finish())
555}
556
557fn values_to_int32_array(values: &[Value]) -> ArrayRef {
558    let mut builder = Int32Builder::with_capacity(values.len());
559    for v in values {
560        if let Some(n) = v.as_i64() {
561            builder.append_value(n as i32);
562        } else {
563            builder.append_null();
564        }
565    }
566    Arc::new(builder.finish())
567}
568
569fn values_to_string_array(values: &[Value]) -> ArrayRef {
570    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
571    for v in values {
572        if let Some(s) = v.as_str() {
573            builder.append_value(s);
574        } else if v.is_null() {
575            builder.append_null();
576        } else {
577            builder.append_value(v.to_string());
578        }
579    }
580    Arc::new(builder.finish())
581}
582
583fn values_to_bool_array(values: &[Value]) -> ArrayRef {
584    let mut builder = BooleanBuilder::with_capacity(values.len());
585    for v in values {
586        if let Some(b) = v.as_bool() {
587            builder.append_value(b);
588        } else {
589            builder.append_null();
590        }
591    }
592    Arc::new(builder.finish())
593}
594
595fn values_to_float32_array(values: &[Value]) -> ArrayRef {
596    let mut builder = Float32Builder::with_capacity(values.len());
597    for v in values {
598        if let Some(n) = v.as_f64() {
599            builder.append_value(n as f32);
600        } else {
601            builder.append_null();
602        }
603    }
604    Arc::new(builder.finish())
605}
606
607fn values_to_float64_array(values: &[Value]) -> ArrayRef {
608    let mut builder = Float64Builder::with_capacity(values.len());
609    for v in values {
610        if let Some(n) = v.as_f64() {
611            builder.append_value(n);
612        } else {
613            builder.append_null();
614        }
615    }
616    Arc::new(builder.finish())
617}
618
619fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
620    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
621    for v in values {
622        if let Value::List(bytes) = v {
623            let b: Vec<u8> = bytes
624                .iter()
625                .map(|bv| bv.as_u64().unwrap_or(0) as u8)
626                .collect();
627            if b.len() as i32 == size {
628                builder.append_value(&b)?;
629            } else {
630                builder.append_null();
631            }
632        } else {
633            builder.append_null();
634        }
635    }
636    Ok(Arc::new(builder.finish()))
637}
638
639/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
640///
641/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
642/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
643///
644/// # Arguments
645/// - `val`: Optional property value to extract from
646/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
647/// - `dimensions`: Expected vector dimensions
648///
649/// # Returns
650/// - Tuple of (vector values, validity flag)
651///   - Vector always has exactly `dimensions` elements
652///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
653pub fn extract_vector_f32_values(
654    val: Option<&Value>,
655    is_deleted: bool,
656    dimensions: usize,
657) -> (Vec<f32>, bool) {
658    let zeros = || vec![0.0_f32; dimensions];
659
660    // Deleted entries always return zeros with valid=true
661    if is_deleted {
662        return (zeros(), true);
663    }
664
665    match val {
666        // Native f32 vector (Value::Vector)
667        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
668        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
669        // List of values (Value::List) - convert to f32
670        Some(Value::List(arr)) if arr.len() == dimensions => {
671            let values: Vec<f32> = arr
672                .iter()
673                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
674                .collect();
675            (values, true)
676        }
677        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
678        _ => (zeros(), false),                    // Missing or unsupported value
679    }
680}
681
682fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
683    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
684    for v in values {
685        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
686        for val in vals {
687            builder.values().append_value(val);
688        }
689        builder.append(valid);
690    }
691    Arc::new(builder.finish())
692}
693
694fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
695    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
696    for v in values {
697        if v.is_null() {
698            builder.append_null();
699        } else if let Value::Temporal(tv) = v {
700            match tv {
701                uni_common::TemporalValue::DateTime {
702                    nanos_since_epoch, ..
703                }
704                | uni_common::TemporalValue::LocalDateTime {
705                    nanos_since_epoch, ..
706                } => builder.append_value(*nanos_since_epoch),
707                _ => builder.append_null(),
708            }
709        } else if let Some(n) = v.as_i64() {
710            builder.append_value(n);
711        } else if let Some(s) = v.as_str() {
712            match parse_datetime_to_nanos(s) {
713                Some(nanos) => builder.append_value(nanos),
714                None => builder.append_null(),
715            }
716        } else {
717            builder.append_null();
718        }
719    }
720
721    let arr = builder.finish();
722    if let Some(tz) = tz {
723        Arc::new(arr.with_timezone(tz.as_ref()))
724    } else {
725        Arc::new(arr)
726    }
727}
728
729/// Build a DateTime struct array from values.
730///
731/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
732/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
733fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
734    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
735    let mut offset_builder = Int32Builder::with_capacity(values.len());
736    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
737    let mut null_buffer = BooleanBufferBuilder::new(values.len());
738
739    for v in values {
740        match v {
741            Value::Temporal(uni_common::TemporalValue::DateTime {
742                nanos_since_epoch,
743                offset_seconds,
744                timezone_name,
745            }) => {
746                nanos_builder.append_value(*nanos_since_epoch);
747                offset_builder.append_value(*offset_seconds);
748                tz_builder.append_option(timezone_name.as_deref());
749                null_buffer.append(true);
750            }
751            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
752                nanos_builder.append_value(*nanos_since_epoch);
753                offset_builder.append_null();
754                tz_builder.append_null();
755                null_buffer.append(true);
756            }
757            _ => {
758                nanos_builder.append_null();
759                offset_builder.append_null();
760                tz_builder.append_null();
761                null_buffer.append(false);
762            }
763        }
764    }
765
766    let struct_arr = StructArray::new(
767        schema::datetime_struct_fields(),
768        vec![
769            Arc::new(nanos_builder.finish()) as ArrayRef,
770            Arc::new(offset_builder.finish()) as ArrayRef,
771            Arc::new(tz_builder.finish()) as ArrayRef,
772        ],
773        Some(null_buffer.finish().into()),
774    );
775    Arc::new(struct_arr)
776}
777
778/// Build a Time struct array from values.
779///
780/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
781/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
782fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
783    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
784    let mut offset_builder = Int32Builder::with_capacity(values.len());
785    let mut null_buffer = BooleanBufferBuilder::new(values.len());
786
787    for v in values {
788        match v {
789            Value::Temporal(uni_common::TemporalValue::Time {
790                nanos_since_midnight,
791                offset_seconds,
792            }) => {
793                nanos_builder.append_value(*nanos_since_midnight);
794                offset_builder.append_value(*offset_seconds);
795                null_buffer.append(true);
796            }
797            Value::Temporal(uni_common::TemporalValue::LocalTime {
798                nanos_since_midnight,
799            }) => {
800                nanos_builder.append_value(*nanos_since_midnight);
801                offset_builder.append_null();
802                null_buffer.append(true);
803            }
804            _ => {
805                nanos_builder.append_null();
806                offset_builder.append_null();
807                null_buffer.append(false);
808            }
809        }
810    }
811
812    let struct_arr = StructArray::new(
813        schema::time_struct_fields(),
814        vec![
815            Arc::new(nanos_builder.finish()) as ArrayRef,
816            Arc::new(offset_builder.finish()) as ArrayRef,
817        ],
818        Some(null_buffer.finish().into()),
819    );
820    Arc::new(struct_arr)
821}
822
823fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
824    let mut builder =
825        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
826    for v in values {
827        if v.is_null() {
828            builder.append_null();
829        } else {
830            // Encode as CypherValue (MessagePack-tagged)
831            let cv_bytes = uni_common::cypher_value_codec::encode(v);
832            builder.append_value(&cv_bytes);
833        }
834    }
835    Arc::new(builder.finish())
836}
837
838/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
839pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
840    match dt {
841        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
842        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
843        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
844        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
845        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
846        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
847        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
848        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
849        ArrowDataType::FixedSizeList(inner, size) => {
850            if inner.data_type() == &ArrowDataType::Float32 {
851                Ok(values_to_fixed_size_list_f32_array(values, *size))
852            } else {
853                Err(anyhow!("Unsupported FixedSizeList inner type"))
854            }
855        }
856        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
857            Ok(values_to_timestamp_array(values, tz.as_ref()))
858        }
859        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
860            Ok(values_to_timestamp_array(values, tz.as_ref()))
861        }
862        ArrowDataType::Date32 => {
863            let mut builder = Date32Builder::with_capacity(values.len());
864            for v in values {
865                if v.is_null() {
866                    builder.append_null();
867                } else if let Value::Temporal(uni_common::TemporalValue::Date {
868                    days_since_epoch,
869                }) = v
870                {
871                    builder.append_value(*days_since_epoch);
872                } else if let Some(n) = v.as_i64() {
873                    builder.append_value(n as i32);
874                } else {
875                    builder.append_null();
876                }
877            }
878            Ok(Arc::new(builder.finish()))
879        }
880        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
881            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
882            for v in values {
883                if v.is_null() {
884                    builder.append_null();
885                } else if let Value::Temporal(tv) = v {
886                    match tv {
887                        uni_common::TemporalValue::LocalTime {
888                            nanos_since_midnight,
889                        }
890                        | uni_common::TemporalValue::Time {
891                            nanos_since_midnight,
892                            ..
893                        } => builder.append_value(*nanos_since_midnight),
894                        _ => builder.append_null(),
895                    }
896                } else if let Some(n) = v.as_i64() {
897                    builder.append_value(n);
898                } else {
899                    builder.append_null();
900                }
901            }
902            Ok(Arc::new(builder.finish()))
903        }
904        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
905            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
906            for v in values {
907                if v.is_null() {
908                    builder.append_null();
909                } else if let Value::Temporal(tv) = v {
910                    match tv {
911                        uni_common::TemporalValue::LocalTime {
912                            nanos_since_midnight,
913                        }
914                        | uni_common::TemporalValue::Time {
915                            nanos_since_midnight,
916                            ..
917                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
918                        _ => builder.append_null(),
919                    }
920                } else if let Some(n) = v.as_i64() {
921                    builder.append_value(n);
922                } else {
923                    builder.append_null();
924                }
925            }
926            Ok(Arc::new(builder.finish()))
927        }
928        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
929            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
930            for v in values {
931                if v.is_null() {
932                    builder.append_null();
933                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
934                    months,
935                    days,
936                    nanos,
937                }) = v
938                {
939                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
940                        months: *months as i32,
941                        days: *days as i32,
942                        nanoseconds: *nanos,
943                    });
944                } else {
945                    builder.append_null();
946                }
947            }
948            Ok(Arc::new(builder.finish()))
949        }
950        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
951            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
952            for v in values {
953                if v.is_null() {
954                    builder.append_null();
955                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
956                    months,
957                    days,
958                    nanos,
959                }) = v
960                {
961                    let total_micros =
962                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
963                    builder.append_value(total_micros);
964                } else if let Some(n) = v.as_i64() {
965                    builder.append_value(n);
966                } else {
967                    builder.append_null();
968                }
969            }
970            Ok(Arc::new(builder.finish()))
971        }
972        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
973        ArrowDataType::List(field) => {
974            if field.data_type() == &ArrowDataType::Utf8 {
975                let mut builder = ListBuilder::new(StringBuilder::new());
976                for v in values {
977                    if let Value::List(arr) = v {
978                        for item in arr {
979                            if let Some(s) = item.as_str() {
980                                builder.values().append_value(s);
981                            } else {
982                                builder.values().append_null();
983                            }
984                        }
985                        builder.append(true);
986                    } else {
987                        builder.append_null();
988                    }
989                }
990                Ok(Arc::new(builder.finish()))
991            } else {
992                Err(anyhow!(
993                    "Unsupported List inner type: {:?}",
994                    field.data_type()
995                ))
996            }
997        }
998        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
999            Ok(values_to_datetime_struct_array(values))
1000        }
1001        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1002            Ok(values_to_time_struct_array(values))
1003        }
1004        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1005    }
1006}
1007
1008/// Property value extractor for building Arrow columns from entity properties.
1009pub struct PropertyExtractor<'a> {
1010    data_type: &'a DataType,
1011}
1012
1013impl<'a> PropertyExtractor<'a> {
1014    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1015        Self { data_type }
1016    }
1017
1018    /// Build an Arrow column from a slice of property maps.
1019    /// The `deleted` slice indicates which entries are deleted (use default values).
1020    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1021    where
1022        F: Fn(usize) -> Option<&'a Value>,
1023    {
1024        match self.data_type {
1025            DataType::String => self.build_string_column(len, deleted, get_props),
1026            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1027            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1028            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1029            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1030            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1031            DataType::Vector { dimensions } => {
1032                self.build_vector_column(len, deleted, get_props, *dimensions)
1033            }
1034            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1035            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1036            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1037            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1038            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1039            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1040            DataType::Date => self.build_date32_column(len, deleted, get_props),
1041            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1042            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1043            _ => Err(anyhow!(
1044                "Unsupported data type for arrow conversion: {:?}",
1045                self.data_type
1046            )),
1047        }
1048    }
1049
1050    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1051    where
1052        F: Fn(usize) -> Option<&'a Value>,
1053    {
1054        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1055        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1056            let prop = get_props(i);
1057            if let Some(s) = prop.and_then(|v| v.as_str()) {
1058                builder.append_value(s);
1059            } else if let Some(Value::Temporal(tv)) = prop {
1060                builder.append_value(tv.to_string());
1061            } else if is_deleted {
1062                builder.append_value("");
1063            } else {
1064                builder.append_null();
1065            }
1066        }
1067        Ok(Arc::new(builder.finish()))
1068    }
1069
1070    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1071    where
1072        F: Fn(usize) -> Option<&'a Value>,
1073    {
1074        let mut values = Vec::with_capacity(len);
1075        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1076            let val = get_props(i).and_then(|v| v.as_i64()).map(|v| v as i32);
1077            if val.is_none() && is_deleted {
1078                values.push(Some(0));
1079            } else {
1080                values.push(val);
1081            }
1082        }
1083        Ok(Arc::new(Int32Array::from(values)))
1084    }
1085
1086    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1087    where
1088        F: Fn(usize) -> Option<&'a Value>,
1089    {
1090        let mut values = Vec::with_capacity(len);
1091        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1092            let val = get_props(i).and_then(|v| v.as_i64());
1093            if val.is_none() && is_deleted {
1094                values.push(Some(0));
1095            } else {
1096                values.push(val);
1097            }
1098        }
1099        Ok(Arc::new(Int64Array::from(values)))
1100    }
1101
1102    fn build_timestamp_column<F>(
1103        &self,
1104        len: usize,
1105        deleted: &[bool],
1106        get_props: F,
1107    ) -> Result<ArrayRef>
1108    where
1109        F: Fn(usize) -> Option<&'a Value>,
1110    {
1111        let mut values = Vec::with_capacity(len);
1112        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1113            let val = get_props(i);
1114            let ts = if is_deleted || val.is_none() {
1115                Some(0i64)
1116            } else if let Some(Value::Temporal(tv)) = val {
1117                match tv {
1118                    uni_common::TemporalValue::DateTime {
1119                        nanos_since_epoch, ..
1120                    }
1121                    | uni_common::TemporalValue::LocalDateTime {
1122                        nanos_since_epoch, ..
1123                    } => Some(*nanos_since_epoch),
1124                    _ => None,
1125                }
1126            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1127                Some(v)
1128            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1129                parse_datetime_to_nanos(s)
1130            } else {
1131                None
1132            };
1133
1134            if is_deleted {
1135                values.push(Some(0));
1136            } else {
1137                values.push(ts);
1138            }
1139        }
1140        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1141        Ok(Arc::new(arr))
1142    }
1143
1144    fn build_datetime_struct_column<F>(
1145        &self,
1146        len: usize,
1147        deleted: &[bool],
1148        get_props: F,
1149    ) -> Result<ArrayRef>
1150    where
1151        F: Fn(usize) -> Option<&'a Value>,
1152    {
1153        let values = self.collect_values_or_null(len, deleted, &get_props);
1154        Ok(values_to_datetime_struct_array(&values))
1155    }
1156
1157    fn build_time_struct_column<F>(
1158        &self,
1159        len: usize,
1160        deleted: &[bool],
1161        get_props: F,
1162    ) -> Result<ArrayRef>
1163    where
1164        F: Fn(usize) -> Option<&'a Value>,
1165    {
1166        let values = self.collect_values_or_null(len, deleted, &get_props);
1167        Ok(values_to_time_struct_array(&values))
1168    }
1169
1170    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1171    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1172    where
1173        F: Fn(usize) -> Option<&'a Value>,
1174    {
1175        deleted
1176            .iter()
1177            .enumerate()
1178            .take(len)
1179            .map(|(i, &is_deleted)| {
1180                if is_deleted {
1181                    Value::Null
1182                } else {
1183                    get_props(i).cloned().unwrap_or(Value::Null)
1184                }
1185            })
1186            .collect()
1187    }
1188
1189    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1190    where
1191        F: Fn(usize) -> Option<&'a Value>,
1192    {
1193        let mut builder = Date32Builder::with_capacity(len);
1194        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1195
1196        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1197            let val = get_props(i);
1198            let days = if is_deleted || val.is_none() {
1199                Some(0)
1200            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1201                days_since_epoch,
1202            })) = val
1203            {
1204                Some(*days_since_epoch)
1205            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1206                Some(v as i32)
1207            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1208                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1209                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1210                    Err(_) => None,
1211                }
1212            } else {
1213                None
1214            };
1215
1216            if is_deleted {
1217                builder.append_value(0);
1218            } else if let Some(v) = days {
1219                builder.append_value(v);
1220            } else {
1221                builder.append_null();
1222            }
1223        }
1224        Ok(Arc::new(builder.finish()))
1225    }
1226
1227    fn build_duration_column<F>(
1228        &self,
1229        len: usize,
1230        deleted: &[bool],
1231        get_props: F,
1232    ) -> Result<ArrayRef>
1233    where
1234        F: Fn(usize) -> Option<&'a Value>,
1235    {
1236        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1237        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1238        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1239            let raw_val = get_props(i);
1240            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1241            {
1242                let encoded = uni_common::cypher_value_codec::encode(val);
1243                builder.append_value(&encoded);
1244            } else if is_deleted {
1245                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1246                    months: 0,
1247                    days: 0,
1248                    nanos: 0,
1249                });
1250                let encoded = uni_common::cypher_value_codec::encode(&zero);
1251                builder.append_value(&encoded);
1252            } else {
1253                builder.append_null();
1254            }
1255        }
1256        Ok(Arc::new(builder.finish()))
1257    }
1258
1259    fn build_float32_column<F>(
1260        &self,
1261        len: usize,
1262        deleted: &[bool],
1263        get_props: F,
1264    ) -> Result<ArrayRef>
1265    where
1266        F: Fn(usize) -> Option<&'a Value>,
1267    {
1268        let mut values = Vec::with_capacity(len);
1269        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1270            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1271            if val.is_none() && is_deleted {
1272                values.push(Some(0.0));
1273            } else {
1274                values.push(val);
1275            }
1276        }
1277        Ok(Arc::new(Float32Array::from(values)))
1278    }
1279
1280    fn build_float64_column<F>(
1281        &self,
1282        len: usize,
1283        deleted: &[bool],
1284        get_props: F,
1285    ) -> Result<ArrayRef>
1286    where
1287        F: Fn(usize) -> Option<&'a Value>,
1288    {
1289        let mut values = Vec::with_capacity(len);
1290        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1291            let val = get_props(i).and_then(|v| v.as_f64());
1292            if val.is_none() && is_deleted {
1293                values.push(Some(0.0));
1294            } else {
1295                values.push(val);
1296            }
1297        }
1298        Ok(Arc::new(Float64Array::from(values)))
1299    }
1300
1301    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1302    where
1303        F: Fn(usize) -> Option<&'a Value>,
1304    {
1305        let mut values = Vec::with_capacity(len);
1306        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1307            let val = get_props(i).and_then(|v| v.as_bool());
1308            if val.is_none() && is_deleted {
1309                values.push(Some(false));
1310            } else {
1311                values.push(val);
1312            }
1313        }
1314        Ok(Arc::new(BooleanArray::from(values)))
1315    }
1316
1317    fn build_vector_column<F>(
1318        &self,
1319        len: usize,
1320        deleted: &[bool],
1321        get_props: F,
1322        dimensions: usize,
1323    ) -> Result<ArrayRef>
1324    where
1325        F: Fn(usize) -> Option<&'a Value>,
1326    {
1327        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1328
1329        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1330            let val = get_props(i);
1331            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1332            for v in values {
1333                builder.values().append_value(v);
1334            }
1335            builder.append(valid);
1336        }
1337        Ok(Arc::new(builder.finish()))
1338    }
1339
1340    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1341    where
1342        F: Fn(usize) -> Option<&'a Value>,
1343    {
1344        let null_val = Value::Null;
1345        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1346        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1347            let val = get_props(i);
1348            let uni_val = if val.is_none() && is_deleted {
1349                &null_val
1350            } else {
1351                val.unwrap_or(&null_val)
1352            };
1353            // Encode to CypherValue (MessagePack-tagged)
1354            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1355            builder.append_value(&cv_bytes);
1356        }
1357        Ok(Arc::new(builder.finish()))
1358    }
1359
1360    fn build_list_column<F>(
1361        &self,
1362        len: usize,
1363        deleted: &[bool],
1364        get_props: F,
1365        inner: &DataType,
1366    ) -> Result<ArrayRef>
1367    where
1368        F: Fn(usize) -> Option<&'a Value>,
1369    {
1370        match inner {
1371            DataType::String => {
1372                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1373                    if let Some(s) = v.as_str() {
1374                        b.append_value(s);
1375                    } else {
1376                        b.append_null();
1377                    }
1378                })
1379            }
1380            DataType::Int64 => {
1381                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1382                    if let Some(n) = v.as_i64() {
1383                        b.append_value(n);
1384                    } else {
1385                        b.append_null();
1386                    }
1387                })
1388            }
1389            DataType::Float64 => {
1390                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1391                    if let Some(f) = v.as_f64() {
1392                        b.append_value(f);
1393                    } else {
1394                        b.append_null();
1395                    }
1396                })
1397            }
1398            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1399        }
1400    }
1401
1402    /// Generic helper to build a list column with any inner builder type.
1403    fn build_typed_list<F, B, A>(
1404        &self,
1405        len: usize,
1406        deleted: &[bool],
1407        get_props: &F,
1408        inner_builder: B,
1409        mut append_value: A,
1410    ) -> Result<ArrayRef>
1411    where
1412        F: Fn(usize) -> Option<&'a Value>,
1413        B: arrow_array::builder::ArrayBuilder,
1414        A: FnMut(&Value, &mut B),
1415    {
1416        let mut builder = ListBuilder::new(inner_builder);
1417        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1418            let val_array = get_props(i).and_then(|v| v.as_array());
1419            if val_array.is_none() && is_deleted {
1420                builder.append_null();
1421            } else if let Some(arr) = val_array {
1422                for v in arr {
1423                    append_value(v, builder.values());
1424                }
1425                builder.append(true);
1426            } else {
1427                builder.append_null();
1428            }
1429        }
1430        Ok(Arc::new(builder.finish()))
1431    }
1432
1433    fn build_map_column<F>(
1434        &self,
1435        len: usize,
1436        deleted: &[bool],
1437        get_props: F,
1438        key: &DataType,
1439        value: &DataType,
1440    ) -> Result<ArrayRef>
1441    where
1442        F: Fn(usize) -> Option<&'a Value>,
1443    {
1444        if !matches!(key, DataType::String) {
1445            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1446        }
1447
1448        match value {
1449            DataType::String => self.build_typed_map(
1450                len,
1451                deleted,
1452                &get_props,
1453                StringBuilder::new(),
1454                arrow_schema::DataType::Utf8,
1455                |v, b: &mut StringBuilder| {
1456                    if let Some(s) = v.as_str() {
1457                        b.append_value(s);
1458                    } else {
1459                        b.append_null();
1460                    }
1461                },
1462            ),
1463            DataType::Int64 => self.build_typed_map(
1464                len,
1465                deleted,
1466                &get_props,
1467                Int64Builder::new(),
1468                arrow_schema::DataType::Int64,
1469                |v, b: &mut Int64Builder| {
1470                    if let Some(n) = v.as_i64() {
1471                        b.append_value(n);
1472                    } else {
1473                        b.append_null();
1474                    }
1475                },
1476            ),
1477            _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1478        }
1479    }
1480
1481    /// Generic helper to build a map column with any value builder type.
1482    fn build_typed_map<F, B, A>(
1483        &self,
1484        len: usize,
1485        deleted: &[bool],
1486        get_props: &F,
1487        value_builder: B,
1488        value_arrow_type: arrow_schema::DataType,
1489        mut append_value: A,
1490    ) -> Result<ArrayRef>
1491    where
1492        F: Fn(usize) -> Option<&'a Value>,
1493        B: arrow_array::builder::ArrayBuilder,
1494        A: FnMut(&Value, &mut B),
1495    {
1496        let key_builder = Box::new(StringBuilder::new());
1497        let value_builder = Box::new(value_builder);
1498        let struct_builder = StructBuilder::new(
1499            vec![
1500                Field::new("key", arrow_schema::DataType::Utf8, false),
1501                Field::new("value", value_arrow_type, true),
1502            ],
1503            vec![key_builder, value_builder],
1504        );
1505        let mut builder = ListBuilder::new(struct_builder);
1506
1507        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1508            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1509        }
1510        Ok(Arc::new(builder.finish()))
1511    }
1512
1513    /// Append a single map entry to the list builder.
1514    fn append_map_entry<B, A>(
1515        &self,
1516        builder: &mut ListBuilder<StructBuilder>,
1517        val: Option<&'a Value>,
1518        is_deleted: bool,
1519        append_value: &mut A,
1520    ) where
1521        B: arrow_array::builder::ArrayBuilder,
1522        A: FnMut(&Value, &mut B),
1523    {
1524        let val_obj = val.and_then(|v| v.as_object());
1525        if val_obj.is_none() && is_deleted {
1526            builder.append(false);
1527        } else if let Some(obj) = val_obj {
1528            let struct_b = builder.values();
1529            for (k, v) in obj {
1530                struct_b
1531                    .field_builder::<StringBuilder>(0)
1532                    .unwrap()
1533                    .append_value(k);
1534                // Safety: We know the value builder type matches B
1535                let value_b = struct_b.field_builder::<B>(1).unwrap();
1536                append_value(v, value_b);
1537                struct_b.append(true);
1538            }
1539            builder.append(true);
1540        } else {
1541            builder.append(false);
1542        }
1543    }
1544
1545    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1546    where
1547        F: Fn(usize) -> Option<&'a Value>,
1548    {
1549        let mut builder = BinaryBuilder::new();
1550        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1551            if is_deleted {
1552                builder.append_null();
1553                continue;
1554            }
1555            if let Some(val) = get_props(i) {
1556                // Try to parse CRDT from the value
1557                // If it's a string, first parse it as JSON, then as CRDT
1558                let crdt_result = if let Some(s) = val.as_str() {
1559                    serde_json::from_str::<Crdt>(s)
1560                } else {
1561                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1562                    let json_val: serde_json::Value = val.clone().into();
1563                    serde_json::from_value::<Crdt>(json_val)
1564                };
1565
1566                if let Ok(crdt) = crdt_result {
1567                    if let Ok(bytes) = crdt.to_msgpack() {
1568                        builder.append_value(&bytes);
1569                    } else {
1570                        builder.append_null();
1571                    }
1572                } else {
1573                    builder.append_null();
1574                }
1575            } else {
1576                builder.append_null();
1577            }
1578        }
1579        Ok(Arc::new(builder.finish()))
1580    }
1581}
1582
1583/// Build a column for edge entries (no deleted flag handling needed).
1584pub fn build_edge_column<'a>(
1585    name: &'a str,
1586    data_type: &'a DataType,
1587    len: usize,
1588    get_props: impl Fn(usize) -> Option<&'a Value>,
1589) -> Result<ArrayRef> {
1590    // For edges, use empty deleted array
1591    let deleted = vec![false; len];
1592    let extractor = PropertyExtractor::new(name, data_type);
1593    extractor.build_column(len, &deleted, get_props)
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598    use super::*;
1599    use arrow_array::{
1600        Array, DurationMicrosecondArray,
1601        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1602    };
1603    use std::collections::HashMap;
1604    use uni_common::TemporalValue;
1605    use uni_crdt::{Crdt, GCounter};
1606
1607    #[test]
1608    fn test_arrow_to_value_string() {
1609        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1610        assert_eq!(
1611            arrow_to_value(&arr, 0, None),
1612            Value::String("hello".to_string())
1613        );
1614        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1615        assert_eq!(
1616            arrow_to_value(&arr, 2, None),
1617            Value::String("world".to_string())
1618        );
1619    }
1620
1621    #[test]
1622    fn test_arrow_to_value_int64() {
1623        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1624        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1625        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1626        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1627    }
1628
1629    #[test]
1630    #[allow(clippy::approx_constant)]
1631    fn test_arrow_to_value_float64() {
1632        let arr = Float64Array::from(vec![Some(3.14), None]);
1633        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1634        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1635    }
1636
1637    #[test]
1638    fn test_arrow_to_value_bool() {
1639        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1640        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1641        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1642        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1643    }
1644
1645    #[test]
1646    fn test_values_to_array_int64() {
1647        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1648        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1649        assert_eq!(arr.len(), 4);
1650
1651        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1652        assert_eq!(int_arr.value(0), 1);
1653        assert_eq!(int_arr.value(1), 2);
1654        assert!(int_arr.is_null(2));
1655        assert_eq!(int_arr.value(3), 4);
1656    }
1657
1658    #[test]
1659    fn test_values_to_array_string() {
1660        let values = vec![
1661            Value::String("a".to_string()),
1662            Value::String("b".to_string()),
1663            Value::Null,
1664        ];
1665        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1666        assert_eq!(arr.len(), 3);
1667
1668        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1669        assert_eq!(str_arr.value(0), "a");
1670        assert_eq!(str_arr.value(1), "b");
1671        assert!(str_arr.is_null(2));
1672    }
1673
1674    #[test]
1675    fn test_property_extractor_string() {
1676        let props: Vec<HashMap<String, Value>> = vec![
1677            [("name".to_string(), Value::String("Alice".to_string()))]
1678                .into_iter()
1679                .collect(),
1680            [("name".to_string(), Value::String("Bob".to_string()))]
1681                .into_iter()
1682                .collect(),
1683            HashMap::new(),
1684        ];
1685        let deleted = vec![false, false, true];
1686
1687        let extractor = PropertyExtractor::new("name", &DataType::String);
1688        let arr = extractor
1689            .build_column(3, &deleted, |i| props[i].get("name"))
1690            .unwrap();
1691
1692        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1693        assert_eq!(str_arr.value(0), "Alice");
1694        assert_eq!(str_arr.value(1), "Bob");
1695        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
1696    }
1697
1698    #[test]
1699    fn test_property_extractor_int64() {
1700        let props: Vec<HashMap<String, Value>> = vec![
1701            [("age".to_string(), Value::Int(25))].into_iter().collect(),
1702            [("age".to_string(), Value::Int(30))].into_iter().collect(),
1703            HashMap::new(),
1704        ];
1705        let deleted = vec![false, false, true];
1706
1707        let extractor = PropertyExtractor::new("age", &DataType::Int64);
1708        let arr = extractor
1709            .build_column(3, &deleted, |i| props[i].get("age"))
1710            .unwrap();
1711
1712        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1713        assert_eq!(int_arr.value(0), 25);
1714        assert_eq!(int_arr.value(1), 30);
1715        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
1716    }
1717
1718    #[test]
1719    fn test_arrow_to_value_time64() {
1720        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
1721        let mut builder = Time64MicrosecondBuilder::new();
1722        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
1723        builder.append_value(37_845_000_000);
1724        // 00:00:00 = 0 microseconds
1725        builder.append_value(0);
1726        // 23:59:59.123456 = 86399.123456 seconds
1727        builder.append_value(86_399_123_456);
1728        builder.append_null();
1729
1730        let arr = builder.finish();
1731        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
1732        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1733        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1734        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1735        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1736    }
1737
1738    #[test]
1739    fn test_arrow_to_value_duration() {
1740        // Test DurationMicrosecondArray conversion
1741        // Arrow→Value now returns Value::Temporal(Duration)
1742        let arr = DurationMicrosecondArray::from(vec![
1743            Some(1_000_000),      // 1 second in microseconds
1744            Some(3_600_000_000),  // 1 hour
1745            Some(86_400_000_000), // 1 day
1746            None,
1747        ]);
1748
1749        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1750        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1751        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1752        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1753    }
1754
1755    #[test]
1756    fn test_arrow_to_value_binary_crdt() {
1757        // Test BinaryArray (CRDT) conversion - round-trip test
1758        let mut builder = BinaryBuilder::new();
1759
1760        // Create a GCounter CRDT and serialize it
1761        let mut counter = GCounter::new();
1762        counter.increment("actor1", 5);
1763        let crdt = Crdt::GCounter(counter);
1764        let bytes = crdt.to_msgpack().unwrap();
1765        builder.append_value(&bytes);
1766
1767        // Add a null value
1768        builder.append_null();
1769
1770        let arr = builder.finish();
1771
1772        // The first value should deserialize back to a map
1773        let result = arrow_to_value(&arr, 0, None);
1774        assert!(result.as_object().is_some());
1775        let obj = result.as_object().unwrap();
1776        // GCounter serializes with tag "t": "gc"
1777        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1778
1779        // Null value should return null
1780        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1781    }
1782
1783    #[test]
1784    fn test_datetime_struct_encode_decode_roundtrip() {
1785        // Test DateTime struct encoding with offset and timezone preservation
1786        let values = vec![
1787            Value::Temporal(TemporalValue::DateTime {
1788                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
1789                offset_seconds: 3600,                  // +01:00
1790                timezone_name: Some("Europe/Paris".to_string()),
1791            }),
1792            Value::Temporal(TemporalValue::DateTime {
1793                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
1794                offset_seconds: -18000,                 // -05:00
1795                timezone_name: None,
1796            }),
1797            Value::Temporal(TemporalValue::DateTime {
1798                nanos_since_epoch: 0, // Unix epoch
1799                offset_seconds: 0,
1800                timezone_name: Some("UTC".to_string()),
1801            }),
1802        ];
1803
1804        // Encode to Arrow struct
1805        let arr_ref = values_to_datetime_struct_array(&values);
1806        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1807        assert_eq!(arr.len(), 3);
1808
1809        // Decode back to Value
1810        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1811        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1812        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1813
1814        // Verify round-trip preserves all fields
1815        assert_eq!(decoded_0, values[0]);
1816        assert_eq!(decoded_1, values[1]);
1817        assert_eq!(decoded_2, values[2]);
1818
1819        // Verify struct field extraction
1820        if let Value::Temporal(TemporalValue::DateTime {
1821            nanos_since_epoch,
1822            offset_seconds,
1823            timezone_name,
1824        }) = decoded_0
1825        {
1826            assert_eq!(nanos_since_epoch, 441763200000000000);
1827            assert_eq!(offset_seconds, 3600);
1828            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
1829        } else {
1830            panic!("Expected DateTime value");
1831        }
1832    }
1833
1834    #[test]
1835    fn test_datetime_struct_null_handling() {
1836        // Test DateTime struct with null values
1837        let values = vec![
1838            Value::Temporal(TemporalValue::DateTime {
1839                nanos_since_epoch: 441763200000000000,
1840                offset_seconds: 3600,
1841                timezone_name: Some("Europe/Paris".to_string()),
1842            }),
1843            Value::Null,
1844            Value::Temporal(TemporalValue::DateTime {
1845                nanos_since_epoch: 0,
1846                offset_seconds: 0,
1847                timezone_name: None,
1848            }),
1849        ];
1850
1851        let arr_ref = values_to_datetime_struct_array(&values);
1852        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1853        assert_eq!(arr.len(), 3);
1854
1855        // Check first value is valid
1856        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1857        assert_eq!(decoded_0, values[0]);
1858
1859        // Check second value is null
1860        assert!(arr.is_null(1));
1861        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1862        assert_eq!(decoded_1, Value::Null);
1863
1864        // Check third value is valid
1865        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1866        assert_eq!(decoded_2, values[2]);
1867    }
1868
1869    #[test]
1870    fn test_datetime_struct_boundary_values() {
1871        // Test boundary values: offset=0, large positive/negative offsets
1872        let values = vec![
1873            Value::Temporal(TemporalValue::DateTime {
1874                nanos_since_epoch: 441763200000000000,
1875                offset_seconds: 0, // UTC
1876                timezone_name: None,
1877            }),
1878            Value::Temporal(TemporalValue::DateTime {
1879                nanos_since_epoch: 441763200000000000,
1880                offset_seconds: 43200, // +12:00 (max typical offset)
1881                timezone_name: None,
1882            }),
1883            Value::Temporal(TemporalValue::DateTime {
1884                nanos_since_epoch: 441763200000000000,
1885                offset_seconds: -43200, // -12:00 (min typical offset)
1886                timezone_name: None,
1887            }),
1888        ];
1889
1890        let arr_ref = values_to_datetime_struct_array(&values);
1891        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1892        assert_eq!(arr.len(), 3);
1893
1894        // Verify round-trip for all boundary values
1895        for (i, expected) in values.iter().enumerate() {
1896            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
1897            assert_eq!(&decoded, expected);
1898        }
1899    }
1900
1901    #[test]
1902    fn test_datetime_old_schema_migration() {
1903        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
1904        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
1905        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
1906        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
1907        builder.append_null();
1908
1909        let arr = builder.finish();
1910
1911        // Decode with DataType::DateTime hint should migrate old schema
1912        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
1913        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
1914        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
1915
1916        // Old schema should default to offset=0, preserve timezone
1917        if let Value::Temporal(TemporalValue::DateTime {
1918            nanos_since_epoch,
1919            offset_seconds,
1920            timezone_name,
1921        }) = decoded_0
1922        {
1923            assert_eq!(nanos_since_epoch, 441763200000000000);
1924            assert_eq!(offset_seconds, 0);
1925            assert_eq!(timezone_name, Some("UTC".to_string()));
1926        } else {
1927            panic!("Expected DateTime value");
1928        }
1929
1930        // Verify null handling
1931        assert_eq!(decoded_2, Value::Null);
1932    }
1933
1934    #[test]
1935    fn test_time_struct_encode_decode_roundtrip() {
1936        // Test Time struct encoding with offset preservation
1937        let values = vec![
1938            Value::Temporal(TemporalValue::Time {
1939                nanos_since_midnight: 37845000000000, // 10:30:45
1940                offset_seconds: 3600,                 // +01:00
1941            }),
1942            Value::Temporal(TemporalValue::Time {
1943                nanos_since_midnight: 0, // 00:00:00
1944                offset_seconds: 0,
1945            }),
1946            Value::Temporal(TemporalValue::Time {
1947                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
1948                offset_seconds: -18000,               // -05:00
1949            }),
1950        ];
1951
1952        // Encode to Arrow struct
1953        let arr_ref = values_to_time_struct_array(&values);
1954        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1955        assert_eq!(arr.len(), 3);
1956
1957        // Decode back to Value
1958        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
1959        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
1960        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
1961
1962        // Verify round-trip preserves all fields
1963        assert_eq!(decoded_0, values[0]);
1964        assert_eq!(decoded_1, values[1]);
1965        assert_eq!(decoded_2, values[2]);
1966
1967        // Verify struct field extraction
1968        if let Value::Temporal(TemporalValue::Time {
1969            nanos_since_midnight,
1970            offset_seconds,
1971        }) = decoded_0
1972        {
1973            assert_eq!(nanos_since_midnight, 37845000000000);
1974            assert_eq!(offset_seconds, 3600);
1975        } else {
1976            panic!("Expected Time value");
1977        }
1978    }
1979
1980    #[test]
1981    fn test_time_struct_null_handling() {
1982        // Test Time struct with null values
1983        let values = vec![
1984            Value::Temporal(TemporalValue::Time {
1985                nanos_since_midnight: 37845000000000,
1986                offset_seconds: 3600,
1987            }),
1988            Value::Null,
1989            Value::Temporal(TemporalValue::Time {
1990                nanos_since_midnight: 0,
1991                offset_seconds: 0,
1992            }),
1993        ];
1994
1995        let arr_ref = values_to_time_struct_array(&values);
1996        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1997        assert_eq!(arr.len(), 3);
1998
1999        // Check first value is valid
2000        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2001        assert_eq!(decoded_0, values[0]);
2002
2003        // Check second value is null
2004        assert!(arr.is_null(1));
2005        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2006        assert_eq!(decoded_1, Value::Null);
2007
2008        // Check third value is valid
2009        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2010        assert_eq!(decoded_2, values[2]);
2011    }
2012
2013    // Tests for extract_vector_f32_values
2014
2015    #[test]
2016    fn test_extract_vector_f32_values_valid_vector() {
2017        let v = vec![1.0, 2.0, 3.0];
2018        let val = Value::Vector(v.clone());
2019        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2020        assert_eq!(result, v);
2021        assert!(valid);
2022    }
2023
2024    #[test]
2025    fn test_extract_vector_f32_values_vector_wrong_dims() {
2026        let v = vec![1.0, 2.0];
2027        let val = Value::Vector(v);
2028        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2029        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2030        assert!(!valid);
2031    }
2032
2033    #[test]
2034    fn test_extract_vector_f32_values_valid_list() {
2035        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2036        let val = Value::List(v);
2037        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2038        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2039        assert!(valid);
2040    }
2041
2042    #[test]
2043    fn test_extract_vector_f32_values_list_wrong_dims() {
2044        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2045        let val = Value::List(v);
2046        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2047        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2048        assert!(!valid);
2049    }
2050
2051    #[test]
2052    fn test_extract_vector_f32_values_list_int_coercion() {
2053        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2054        let val = Value::List(v);
2055        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2056        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2057        assert!(valid);
2058    }
2059
2060    #[test]
2061    fn test_extract_vector_f32_values_none() {
2062        let (result, valid) = extract_vector_f32_values(None, false, 3);
2063        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2064        assert!(!valid);
2065    }
2066
2067    #[test]
2068    fn test_extract_vector_f32_values_null() {
2069        let val = Value::Null;
2070        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2071        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2072        assert!(!valid);
2073    }
2074
2075    #[test]
2076    fn test_extract_vector_f32_values_unsupported_type() {
2077        let val = Value::String("not a vector".to_string());
2078        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2079        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2080        assert!(!valid);
2081    }
2082
2083    #[test]
2084    fn test_extract_vector_f32_values_deleted_with_none() {
2085        let (result, valid) = extract_vector_f32_values(None, true, 3);
2086        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2087        assert!(valid); // Deleted entries are marked as valid with zeros
2088    }
2089
2090    #[test]
2091    fn test_extract_vector_f32_values_deleted_with_null() {
2092        let val = Value::Null;
2093        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2094        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2095        assert!(valid); // Deleted entries are marked as valid with zeros
2096    }
2097
2098    // Tests for values_to_array with FixedSizeList
2099
2100    #[test]
2101    fn test_values_to_fixed_size_list_vector_with_nulls() {
2102        let values = vec![
2103            Value::Vector(vec![1.0, 2.0]),
2104            Value::Null,
2105            Value::Vector(vec![3.0, 4.0]),
2106            Value::String("invalid".to_string()),
2107        ];
2108        let arr_ref = values_to_array(
2109            &values,
2110            &ArrowDataType::FixedSizeList(
2111                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2112                2,
2113            ),
2114        )
2115        .unwrap();
2116
2117        let arr = arr_ref
2118            .as_any()
2119            .downcast_ref::<FixedSizeListArray>()
2120            .unwrap();
2121
2122        assert_eq!(arr.len(), 4);
2123        assert!(arr.is_valid(0));
2124        assert!(!arr.is_valid(1)); // Null value
2125        assert!(arr.is_valid(2));
2126        assert!(!arr.is_valid(3)); // Invalid type
2127    }
2128
2129    #[test]
2130    fn test_values_to_fixed_size_list_from_list() {
2131        let values = vec![
2132            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2133            Value::List(vec![Value::Int(3), Value::Int(4)]),
2134        ];
2135        let arr_ref = values_to_array(
2136            &values,
2137            &ArrowDataType::FixedSizeList(
2138                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2139                2,
2140            ),
2141        )
2142        .unwrap();
2143
2144        let arr = arr_ref
2145            .as_any()
2146            .downcast_ref::<FixedSizeListArray>()
2147            .unwrap();
2148
2149        assert_eq!(arr.len(), 2);
2150        assert!(arr.is_valid(0));
2151        assert!(arr.is_valid(1));
2152
2153        // Check values
2154        let child = arr
2155            .values()
2156            .as_any()
2157            .downcast_ref::<Float32Array>()
2158            .unwrap();
2159        assert_eq!(child.value(0), 1.0);
2160        assert_eq!(child.value(1), 2.0);
2161        assert_eq!(child.value(2), 3.0);
2162        assert_eq!(child.value(3), 4.0);
2163    }
2164
2165    #[test]
2166    fn test_values_to_fixed_size_list_wrong_dimensions() {
2167        let values = vec![
2168            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2169            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2170        ];
2171        let arr_ref = values_to_array(
2172            &values,
2173            &ArrowDataType::FixedSizeList(
2174                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2175                2,
2176            ),
2177        )
2178        .unwrap();
2179
2180        let arr = arr_ref
2181            .as_any()
2182            .downcast_ref::<FixedSizeListArray>()
2183            .unwrap();
2184
2185        assert_eq!(arr.len(), 2);
2186        assert!(!arr.is_valid(0)); // Wrong dimensions
2187        assert!(!arr.is_valid(1)); // Wrong dimensions
2188
2189        // Check that child array has zeros for invalid entries
2190        let child = arr
2191            .values()
2192            .as_any()
2193            .downcast_ref::<Float32Array>()
2194            .unwrap();
2195        assert_eq!(child.value(0), 0.0);
2196        assert_eq!(child.value(1), 0.0);
2197        assert_eq!(child.value(2), 0.0);
2198        assert_eq!(child.value(3), 0.0);
2199    }
2200
2201    #[test]
2202    fn test_values_to_fixed_size_list_all_nulls() {
2203        let values = vec![Value::Null, Value::Null, Value::Null];
2204        let arr_ref = values_to_array(
2205            &values,
2206            &ArrowDataType::FixedSizeList(
2207                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2208                3,
2209            ),
2210        )
2211        .unwrap();
2212
2213        let arr = arr_ref
2214            .as_any()
2215            .downcast_ref::<FixedSizeListArray>()
2216            .unwrap();
2217
2218        assert_eq!(arr.len(), 3);
2219        assert!(!arr.is_valid(0));
2220        assert!(!arr.is_valid(1));
2221        assert!(!arr.is_valid(2));
2222
2223        // Verify child array length is correct (3 rows × 3 dims = 9)
2224        let child = arr
2225            .values()
2226            .as_any()
2227            .downcast_ref::<Float32Array>()
2228            .unwrap();
2229        assert_eq!(child.len(), 9);
2230    }
2231
2232    #[test]
2233    fn test_values_to_fixed_size_list_mixed_types() {
2234        let values = vec![
2235            Value::Vector(vec![1.0, 2.0]),
2236            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2237            Value::Null,
2238            Value::String("invalid".to_string()),
2239        ];
2240        let arr_ref = values_to_array(
2241            &values,
2242            &ArrowDataType::FixedSizeList(
2243                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2244                2,
2245            ),
2246        )
2247        .unwrap();
2248
2249        let arr = arr_ref
2250            .as_any()
2251            .downcast_ref::<FixedSizeListArray>()
2252            .unwrap();
2253
2254        assert_eq!(arr.len(), 4);
2255        assert!(arr.is_valid(0)); // Value::Vector
2256        assert!(arr.is_valid(1)); // Value::List
2257        assert!(!arr.is_valid(2)); // Value::Null
2258        assert!(!arr.is_valid(3)); // Value::String
2259
2260        // Check values for valid entries
2261        let child = arr
2262            .values()
2263            .as_any()
2264            .downcast_ref::<Float32Array>()
2265            .unwrap();
2266        assert_eq!(child.value(0), 1.0);
2267        assert_eq!(child.value(1), 2.0);
2268        assert_eq!(child.value(2), 3.0);
2269        assert_eq!(child.value(3), 4.0);
2270    }
2271
2272    // Tests for PropertyExtractor::build_vector_column
2273
2274    #[test]
2275    fn test_build_vector_column_with_nulls_and_deleted() {
2276        let data_type = DataType::Vector { dimensions: 3 };
2277        let extractor = PropertyExtractor::new("test_vec", &data_type);
2278
2279        let props = [
2280            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2281            None,              // Missing property
2282            Some(Value::Null), // Null value
2283            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2284        ];
2285        let deleted = [false, false, false, true]; // Last one is deleted
2286
2287        let arr_ref = extractor
2288            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2289            .unwrap();
2290
2291        let arr = arr_ref
2292            .as_any()
2293            .downcast_ref::<FixedSizeListArray>()
2294            .unwrap();
2295
2296        assert_eq!(arr.len(), 4);
2297        assert!(arr.is_valid(0)); // Valid vector
2298        assert!(!arr.is_valid(1)); // Missing property
2299        assert!(!arr.is_valid(2)); // Null value
2300        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2301
2302        // Check values
2303        let child = arr
2304            .values()
2305            .as_any()
2306            .downcast_ref::<Float32Array>()
2307            .unwrap();
2308        assert_eq!(child.value(0), 1.0);
2309        assert_eq!(child.value(1), 2.0);
2310        assert_eq!(child.value(2), 3.0);
2311        // Indices 3-5: zeros for missing
2312        // Indices 6-8: zeros for null
2313        // Indices 9-11: zeros for deleted (but marked as valid)
2314        assert_eq!(child.value(9), 0.0);
2315        assert_eq!(child.value(10), 0.0);
2316        assert_eq!(child.value(11), 0.0);
2317    }
2318
2319    #[test]
2320    fn test_build_vector_column_with_list_input() {
2321        let data_type = DataType::Vector { dimensions: 2 };
2322        let extractor = PropertyExtractor::new("test_vec", &data_type);
2323
2324        let props = [
2325            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2326            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2327            Some(Value::Vector(vec![5.0, 6.0])),
2328        ];
2329        let deleted = [false, false, false];
2330
2331        let arr_ref = extractor
2332            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2333            .unwrap();
2334
2335        let arr = arr_ref
2336            .as_any()
2337            .downcast_ref::<FixedSizeListArray>()
2338            .unwrap();
2339
2340        assert_eq!(arr.len(), 3);
2341        assert!(arr.is_valid(0));
2342        assert!(arr.is_valid(1));
2343        assert!(arr.is_valid(2));
2344
2345        // Check values
2346        let child = arr
2347            .values()
2348            .as_any()
2349            .downcast_ref::<Float32Array>()
2350            .unwrap();
2351        assert_eq!(child.value(0), 1.0);
2352        assert_eq!(child.value(1), 2.0);
2353        assert_eq!(child.value(2), 3.0);
2354        assert_eq!(child.value(3), 4.0);
2355        assert_eq!(child.value(4), 5.0);
2356        assert_eq!(child.value(5), 6.0);
2357    }
2358}