Skip to main content

uni_store/storage/
arrow_convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow type conversion utilities for reducing cognitive complexity.
5//!
6//! This module provides shared helper functions and macros for converting
7//! between Arrow arrays and JSON Values, reducing code duplication across
8//! vertex.rs, delta.rs, and executor.rs.
9
10use anyhow::{Result, anyhow};
11use arrow_array::builder::{
12    BinaryBuilder, BooleanBufferBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
13    FixedSizeBinaryBuilder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int32Builder,
14    Int64Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder,
15    StructBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder,
16    UInt64Builder,
17};
18use arrow_array::{
19    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray,
20    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
21    IntervalMonthDayNanoArray, LargeBinaryArray, ListArray, StringArray, StructArray,
22    Time64NanosecondArray, TimestampNanosecondArray, UInt64Array,
23};
24use arrow_schema::{DataType as ArrowDataType, Field};
25use std::collections::HashMap;
26use std::sync::Arc;
27use uni_common::DataType;
28use uni_common::Value;
29use uni_common::core::id::{Eid, Vid};
30use uni_common::core::schema;
31use uni_crdt::Crdt;
32
33/// Build a timestamp column from a map of ID -> timestamp (nanoseconds).
34///
35/// Shared utility for building `_created_at` and `_updated_at` columns
36/// in vertex and edge tables. Works with any hashable ID type (Vid, Eid, etc.).
37fn build_timestamp_column_from_id_map<K, I>(
38    ids: I,
39    timestamps: Option<&HashMap<K, i64>>,
40) -> ArrayRef
41where
42    K: Eq + std::hash::Hash,
43    I: IntoIterator<Item = K>,
44{
45    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
46    for id in ids {
47        match timestamps.and_then(|m| m.get(&id)) {
48            Some(&ts) => builder.append_value(ts),
49            None => builder.append_null(),
50        }
51    }
52    Arc::new(builder.finish())
53}
54
55pub fn build_timestamp_column_from_vid_map<I>(
56    ids: I,
57    timestamps: Option<&HashMap<Vid, i64>>,
58) -> ArrayRef
59where
60    I: IntoIterator<Item = Vid>,
61{
62    build_timestamp_column_from_id_map(ids, timestamps)
63}
64
65pub fn build_timestamp_column_from_eid_map<I>(
66    ids: I,
67    timestamps: Option<&HashMap<Eid, i64>>,
68) -> ArrayRef
69where
70    I: IntoIterator<Item = Eid>,
71{
72    build_timestamp_column_from_id_map(ids, timestamps)
73}
74
75/// Build a timestamp column from an iterator of optional timestamps.
76///
77/// This is useful for building timestamp columns directly from entry structs.
78pub fn build_timestamp_column<I>(timestamps: I) -> ArrayRef
79where
80    I: IntoIterator<Item = Option<i64>>,
81{
82    let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
83    for ts in timestamps {
84        builder.append_option(ts);
85    }
86    Arc::new(builder.finish())
87}
88
89/// Extract a `Vec<String>` from a single row of a `List<Utf8>` column.
90///
91/// Returns an empty vec when the row is null, the inner array is not a
92/// `StringArray`, or the list is empty.  Null entries inside the list are
93/// silently skipped.
94pub fn labels_from_list_array(list_arr: &ListArray, row: usize) -> Vec<String> {
95    if list_arr.is_null(row) {
96        return Vec::new();
97    }
98    let values = list_arr.value(row);
99    let Some(str_arr) = values.as_any().downcast_ref::<StringArray>() else {
100        return Vec::new();
101    };
102    (0..str_arr.len())
103        .filter(|&j| !str_arr.is_null(j))
104        .map(|j| str_arr.value(j).to_string())
105        .collect()
106}
107
108/// Parse a datetime string into nanoseconds since Unix epoch.
109///
110/// Tries RFC3339, "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M%:z",
111/// and "%Y-%m-%dT%H:%MZ" formats.
112fn parse_datetime_to_nanos(s: &str) -> Option<i64> {
113    chrono::DateTime::parse_from_rfc3339(s)
114        .map(|dt| {
115            dt.with_timezone(&chrono::Utc)
116                .timestamp_nanos_opt()
117                .unwrap_or(0)
118        })
119        .or_else(|_| {
120            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
121                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
122        })
123        .or_else(|_| {
124            chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ")
125                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
126        })
127        .or_else(|_| {
128            chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M%:z").map(|dt| {
129                dt.with_timezone(&chrono::Utc)
130                    .timestamp_nanos_opt()
131                    .unwrap_or(0)
132            })
133        })
134        .ok()
135        .or_else(|| {
136            s.strip_suffix('Z')
137                .and_then(|base| chrono::NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M").ok())
138                .map(|ndt| ndt.and_utc().timestamp_nanos_opt().unwrap_or(0))
139        })
140}
141
142/// Detect the Arrow Map-as-List(Struct(key, value)) pattern and reconstruct a map.
143///
144/// Arrow represents Map columns as `List(Struct { key, value })`. This helper
145/// checks whether the given array matches that layout and, if so, converts the
146/// key/value pairs back into a `HashMap<String, Value>`.
147fn try_reconstruct_map(arr: &ArrayRef) -> Option<HashMap<String, Value>> {
148    let structs = arr.as_any().downcast_ref::<StructArray>()?;
149    let fields = structs.fields();
150    if fields.len() != 2 || fields[0].name() != "key" || fields[1].name() != "value" {
151        return None;
152    }
153    let key_col = structs.column(0);
154    let val_col = structs.column(1);
155    let mut map = HashMap::new();
156    for i in 0..structs.len() {
157        if let Value::String(k) = arrow_to_value(key_col.as_ref(), i, None) {
158            map.insert(k, arrow_to_value(val_col.as_ref(), i, None));
159        }
160    }
161    Some(map)
162}
163
164/// Convert all elements of an Arrow array into a `Vec<Value>`.
165fn array_to_value_list(arr: &ArrayRef) -> Vec<Value> {
166    (0..arr.len())
167        .map(|i| arrow_to_value(arr.as_ref(), i, None))
168        .collect()
169}
170
171/// Convert an Arrow array value at a given row index to a Uni Value.
172///
173/// Handles all common Arrow types and recursively processes nested structures
174/// like Lists and Structs. The optional `data_type` parameter provides schema
175/// context for decoding DateTime and Time struct arrays; when provided, it
176/// takes precedence over runtime type detection.
177pub fn arrow_to_value(col: &dyn Array, row: usize, data_type: Option<&DataType>) -> Value {
178    if col.is_null(row) {
179        return Value::Null;
180    }
181
182    // Schema-driven decode for DateTime and Time structs
183    if let Some(dt) = data_type {
184        match dt {
185            DataType::DateTime => {
186                // Expect StructArray with three fields
187                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
188                    && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
189                        struct_arr.column_by_name("nanos_since_epoch"),
190                        struct_arr.column_by_name("offset_seconds"),
191                        struct_arr.column_by_name("timezone_name"),
192                    )
193                    && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
194                        nanos_col
195                            .as_any()
196                            .downcast_ref::<TimestampNanosecondArray>(),
197                        offset_col.as_any().downcast_ref::<Int32Array>(),
198                        tz_col.as_any().downcast_ref::<StringArray>(),
199                    )
200                {
201                    if nanos_arr.is_null(row) {
202                        return Value::Null;
203                    }
204                    let nanos = nanos_arr.value(row);
205                    if offset_arr.is_null(row) {
206                        // No offset → LocalDateTime
207                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
208                            nanos_since_epoch: nanos,
209                        });
210                    }
211                    let offset = offset_arr.value(row);
212                    let tz_name = (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
213                    return Value::Temporal(uni_common::TemporalValue::DateTime {
214                        nanos_since_epoch: nanos,
215                        offset_seconds: offset,
216                        timezone_name: tz_name,
217                    });
218                }
219                // Fall back to old schema migration: TimestampNanosecond → DateTime with offset=0
220                if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
221                    let nanos = ts.value(row);
222                    let tz_name = ts.timezone().map(|s| s.to_string());
223                    return Value::Temporal(uni_common::TemporalValue::DateTime {
224                        nanos_since_epoch: nanos,
225                        offset_seconds: 0,
226                        timezone_name: tz_name,
227                    });
228                }
229            }
230            DataType::Time => {
231                // Expect StructArray with two fields
232                if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
233                    && let (Some(nanos_col), Some(offset_col)) = (
234                        struct_arr.column_by_name("nanos_since_midnight"),
235                        struct_arr.column_by_name("offset_seconds"),
236                    )
237                    && let (Some(nanos_arr), Some(offset_arr)) = (
238                        nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
239                        offset_col.as_any().downcast_ref::<Int32Array>(),
240                    )
241                {
242                    // Check field-level nulls before calling .value()
243                    if nanos_arr.is_null(row) || offset_arr.is_null(row) {
244                        return Value::Null;
245                    }
246                    let nanos = nanos_arr.value(row);
247                    let offset = offset_arr.value(row);
248                    return Value::Temporal(uni_common::TemporalValue::Time {
249                        nanos_since_midnight: nanos,
250                        offset_seconds: offset,
251                    });
252                }
253                // Fall back to old schema: Time64Nanosecond → Time with offset=0
254                if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
255                    let nanos = t.value(row);
256                    return Value::Temporal(uni_common::TemporalValue::Time {
257                        nanos_since_midnight: nanos,
258                        offset_seconds: 0,
259                    });
260                }
261            }
262            DataType::Bytes => {
263                let Some(arr) = col.as_any().downcast_ref::<LargeBinaryArray>() else {
264                    log::warn!("Bytes column is not LargeBinaryArray");
265                    return Value::Null;
266                };
267                if arr.is_null(row) {
268                    return Value::Null;
269                }
270                return Value::Bytes(arr.value(row).to_vec());
271            }
272            DataType::Btic => {
273                let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>() else {
274                    log::warn!("BTIC column is not FixedSizeBinaryArray");
275                    return Value::Null;
276                };
277                let bytes = fsb.value(row);
278                return match uni_btic::encode::decode_slice(bytes) {
279                    Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
280                        lo: btic.lo(),
281                        hi: btic.hi(),
282                        meta: btic.meta(),
283                    }),
284                    Err(e) => {
285                        log::warn!("BTIC decode error: {}", e);
286                        Value::Null
287                    }
288                };
289            }
290            _ => {}
291        }
292    }
293
294    // String types
295    if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
296        return Value::String(s.value(row).to_string());
297    }
298
299    // Integer types
300    if let Some(u) = col.as_any().downcast_ref::<UInt64Array>() {
301        return Value::Int(u.value(row) as i64);
302    }
303    if let Some(i) = col.as_any().downcast_ref::<Int64Array>() {
304        return Value::Int(i.value(row));
305    }
306    if let Some(i) = col.as_any().downcast_ref::<Int32Array>() {
307        return Value::Int(i.value(row) as i64);
308    }
309
310    // Float types
311    if let Some(f) = col.as_any().downcast_ref::<Float64Array>() {
312        return Value::Float(f.value(row));
313    }
314    if let Some(f) = col.as_any().downcast_ref::<Float32Array>() {
315        return Value::Float(f.value(row) as f64);
316    }
317
318    // Boolean type
319    if let Some(b) = col.as_any().downcast_ref::<BooleanArray>() {
320        return Value::Bool(b.value(row));
321    }
322
323    // Fixed-size list (vectors)
324    if let Some(list) = col.as_any().downcast_ref::<FixedSizeListArray>() {
325        return Value::List(array_to_value_list(&list.value(row)));
326    }
327
328    // Variable-size list
329    if let Some(list) = col.as_any().downcast_ref::<ListArray>() {
330        let arr = list.value(row);
331
332        // Map types are stored as List(Struct(key, value)); reconstruct as map
333        if let Some(obj) = try_reconstruct_map(&arr) {
334            return Value::Map(obj);
335        }
336
337        return Value::List(array_to_value_list(&arr));
338    }
339
340    // Large list (variable-size list with i64 offsets)
341    if let Some(list) = col.as_any().downcast_ref::<arrow_array::LargeListArray>() {
342        return Value::List(array_to_value_list(&list.value(row)));
343    }
344
345    // Struct type — detect temporal structs by field names before generic handler
346    if let Some(s) = col.as_any().downcast_ref::<StructArray>() {
347        let field_names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
348
349        // DateTime struct: {nanos_since_epoch, offset_seconds, timezone_name}
350        if field_names.contains(&"nanos_since_epoch")
351            && field_names.contains(&"offset_seconds")
352            && field_names.contains(&"timezone_name")
353            && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
354                s.column_by_name("nanos_since_epoch"),
355                s.column_by_name("offset_seconds"),
356                s.column_by_name("timezone_name"),
357            )
358        {
359            // Try TimestampNanosecond first (standard schema), then Int64 fallback
360            let nanos_opt = nanos_col
361                .as_any()
362                .downcast_ref::<TimestampNanosecondArray>()
363                .map(|a| {
364                    if a.is_null(row) {
365                        None
366                    } else {
367                        Some(a.value(row))
368                    }
369                })
370                .or_else(|| {
371                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
372                        if a.is_null(row) {
373                            None
374                        } else {
375                            Some(a.value(row))
376                        }
377                    })
378                });
379            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
380                if a.is_null(row) {
381                    None
382                } else {
383                    Some(a.value(row))
384                }
385            });
386
387            if let Some(Some(nanos)) = nanos_opt {
388                match offset_opt {
389                    Some(Some(offset)) => {
390                        let tz_name = tz_col.as_any().downcast_ref::<StringArray>().and_then(|a| {
391                            if a.is_null(row) {
392                                None
393                            } else {
394                                Some(a.value(row).to_string())
395                            }
396                        });
397                        return Value::Temporal(uni_common::TemporalValue::DateTime {
398                            nanos_since_epoch: nanos,
399                            offset_seconds: offset,
400                            timezone_name: tz_name,
401                        });
402                    }
403                    _ => {
404                        // No offset → LocalDateTime
405                        return Value::Temporal(uni_common::TemporalValue::LocalDateTime {
406                            nanos_since_epoch: nanos,
407                        });
408                    }
409                }
410            }
411        }
412
413        // Time struct: {nanos_since_midnight, offset_seconds}
414        if field_names.contains(&"nanos_since_midnight")
415            && field_names.contains(&"offset_seconds")
416            && let (Some(nanos_col), Some(offset_col)) = (
417                s.column_by_name("nanos_since_midnight"),
418                s.column_by_name("offset_seconds"),
419            )
420        {
421            // Try Time64Nanosecond first (standard schema), then Int64 fallback
422            let nanos_opt = nanos_col
423                .as_any()
424                .downcast_ref::<Time64NanosecondArray>()
425                .map(|a| {
426                    if a.is_null(row) {
427                        None
428                    } else {
429                        Some(a.value(row))
430                    }
431                })
432                .or_else(|| {
433                    nanos_col.as_any().downcast_ref::<Int64Array>().map(|a| {
434                        if a.is_null(row) {
435                            None
436                        } else {
437                            Some(a.value(row))
438                        }
439                    })
440                });
441            let offset_opt = offset_col.as_any().downcast_ref::<Int32Array>().map(|a| {
442                if a.is_null(row) {
443                    None
444                } else {
445                    Some(a.value(row))
446                }
447            });
448
449            if let (Some(Some(nanos)), Some(Some(offset))) = (nanos_opt, offset_opt) {
450                return Value::Temporal(uni_common::TemporalValue::Time {
451                    nanos_since_midnight: nanos,
452                    offset_seconds: offset,
453                });
454            }
455        }
456
457        // Generic struct → Map
458        let mut map = HashMap::new();
459        for (field, child) in s.fields().iter().zip(s.columns()) {
460            map.insert(
461                field.name().clone(),
462                arrow_to_value(child.as_ref(), row, None),
463            );
464        }
465        return Value::Map(map);
466    }
467
468    // Date32 type (days since epoch) - return as Value::Temporal
469    if let Some(d) = col.as_any().downcast_ref::<Date32Array>() {
470        let days = d.value(row);
471        return Value::Temporal(uni_common::TemporalValue::Date {
472            days_since_epoch: days,
473        });
474    }
475
476    // Timestamp (nanoseconds since epoch) - timezone presence determines DateTime vs LocalDateTime
477    if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
478        let nanos = ts.value(row);
479        return match ts.timezone() {
480            Some(tz) => Value::Temporal(uni_common::TemporalValue::DateTime {
481                nanos_since_epoch: nanos,
482                offset_seconds: 0,
483                timezone_name: Some(tz.to_string()),
484            }),
485            None => Value::Temporal(uni_common::TemporalValue::LocalDateTime {
486                nanos_since_epoch: nanos,
487            }),
488        };
489    }
490
491    // Time64 (nanoseconds since midnight) - return as Value::Temporal
492    if let Some(t) = col.as_any().downcast_ref::<Time64NanosecondArray>() {
493        let nanos = t.value(row);
494        return Value::Temporal(uni_common::TemporalValue::LocalTime {
495            nanos_since_midnight: nanos,
496        });
497    }
498
499    // Time64 (microseconds since midnight) - convert to nanoseconds
500    if let Some(t) = col
501        .as_any()
502        .downcast_ref::<arrow_array::Time64MicrosecondArray>()
503    {
504        let micros = t.value(row);
505        return Value::Temporal(uni_common::TemporalValue::LocalTime {
506            nanos_since_midnight: micros * 1000,
507        });
508    }
509
510    // DurationMicrosecond - convert to Duration with nanoseconds
511    if let Some(d) = col
512        .as_any()
513        .downcast_ref::<arrow_array::DurationMicrosecondArray>()
514    {
515        let micros = d.value(row);
516        let total_nanos = micros * 1000;
517        let seconds = total_nanos / 1_000_000_000;
518        let remaining_nanos = total_nanos % 1_000_000_000;
519        return Value::Temporal(uni_common::TemporalValue::Duration {
520            months: 0,
521            days: 0,
522            nanos: seconds * 1_000_000_000 + remaining_nanos,
523        });
524    }
525
526    // IntervalMonthDayNano - return as Value::Temporal(Duration)
527    if let Some(interval) = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>() {
528        let val = interval.value(row);
529        return Value::Temporal(uni_common::TemporalValue::Duration {
530            months: val.months as i64,
531            days: val.days as i64,
532            nanos: val.nanoseconds,
533        });
534    }
535
536    // LargeBinary (CypherValue MessagePack-tagged encoding)
537    if let Some(b) = col.as_any().downcast_ref::<LargeBinaryArray>() {
538        let bytes = b.value(row);
539        if bytes.is_empty() {
540            return Value::Null;
541        }
542        return uni_common::cypher_value_codec::decode(bytes).unwrap_or_else(|e| {
543            eprintln!("CypherValue decode error: {}", e);
544            Value::Null
545        });
546    }
547
548    // FixedSizeBinary(24) — BTIC temporal interval
549    if let Some(fsb) = col.as_any().downcast_ref::<FixedSizeBinaryArray>()
550        && fsb.value_length() == 24
551    {
552        let bytes = fsb.value(row);
553        return match uni_btic::encode::decode_slice(bytes) {
554            Ok(btic) => Value::Temporal(uni_common::TemporalValue::Btic {
555                lo: btic.lo(),
556                hi: btic.hi(),
557                meta: btic.meta(),
558            }),
559            Err(e) => {
560                log::warn!("BTIC decode error: {}", e);
561                Value::Null
562            }
563        };
564    }
565
566    // Binary (CRDT MessagePack) - decode to Value via serde_json boundary
567    if let Some(b) = col.as_any().downcast_ref::<BinaryArray>() {
568        let bytes = b.value(row);
569        return Crdt::from_msgpack(bytes)
570            .ok()
571            .and_then(|crdt| serde_json::to_value(&crdt).ok())
572            .map(Value::from)
573            .unwrap_or(Value::Null);
574    }
575
576    // Fallback
577    Value::Null
578}
579
580fn values_to_uint64_array(values: &[Value]) -> ArrayRef {
581    let mut builder = UInt64Builder::with_capacity(values.len());
582    for v in values {
583        if let Some(n) = v.as_u64() {
584            builder.append_value(n);
585        } else {
586            builder.append_null();
587        }
588    }
589    Arc::new(builder.finish())
590}
591
592fn values_to_int64_array(values: &[Value]) -> ArrayRef {
593    let mut builder = Int64Builder::with_capacity(values.len());
594    for v in values {
595        if let Some(n) = v.as_i64() {
596            builder.append_value(n);
597        } else {
598            builder.append_null();
599        }
600    }
601    Arc::new(builder.finish())
602}
603
604fn values_to_int32_array(values: &[Value]) -> ArrayRef {
605    let mut builder = Int32Builder::with_capacity(values.len());
606    for v in values {
607        if let Some(n) = v.as_i64() {
608            builder.append_value(n as i32);
609        } else {
610            builder.append_null();
611        }
612    }
613    Arc::new(builder.finish())
614}
615
616fn values_to_string_array(values: &[Value]) -> ArrayRef {
617    let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 10);
618    for v in values {
619        if let Some(s) = v.as_str() {
620            builder.append_value(s);
621        } else if v.is_null() {
622            builder.append_null();
623        } else {
624            builder.append_value(v.to_string());
625        }
626    }
627    Arc::new(builder.finish())
628}
629
630fn values_to_bool_array(values: &[Value]) -> ArrayRef {
631    let mut builder = BooleanBuilder::with_capacity(values.len());
632    for v in values {
633        if let Some(b) = v.as_bool() {
634            builder.append_value(b);
635        } else {
636            builder.append_null();
637        }
638    }
639    Arc::new(builder.finish())
640}
641
642fn values_to_float32_array(values: &[Value]) -> ArrayRef {
643    let mut builder = Float32Builder::with_capacity(values.len());
644    for v in values {
645        if let Some(n) = v.as_f64() {
646            builder.append_value(n as f32);
647        } else {
648            builder.append_null();
649        }
650    }
651    Arc::new(builder.finish())
652}
653
654fn values_to_float64_array(values: &[Value]) -> ArrayRef {
655    let mut builder = Float64Builder::with_capacity(values.len());
656    for v in values {
657        if let Some(n) = v.as_f64() {
658            builder.append_value(n);
659        } else {
660            builder.append_null();
661        }
662    }
663    Arc::new(builder.finish())
664}
665
666fn values_to_fixed_size_binary_array(values: &[Value], size: i32) -> Result<ArrayRef> {
667    let mut builder = FixedSizeBinaryBuilder::with_capacity(values.len(), size);
668    for v in values {
669        match v {
670            Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) if size == 24 => {
671                let btic = uni_btic::Btic::new(*lo, *hi, *meta)
672                    .map_err(|e| anyhow!("invalid BTIC value: {}", e))?;
673                builder.append_value(uni_btic::encode::encode(&btic))?;
674            }
675            Value::String(s) if size == 24 => match uni_btic::parse::parse_btic_literal(s) {
676                Ok(b) => builder.append_value(uni_btic::encode::encode(&b))?,
677                Err(_) => builder.append_null(),
678            },
679            Value::List(bytes) => {
680                let b: Vec<u8> = bytes
681                    .iter()
682                    .map(|bv| bv.as_u64().unwrap_or(0) as u8)
683                    .collect();
684                if b.len() as i32 == size {
685                    builder.append_value(&b)?;
686                } else {
687                    builder.append_null();
688                }
689            }
690            _ => builder.append_null(),
691        }
692    }
693    Ok(Arc::new(builder.finish()))
694}
695
696/// Extract f32 vector values from a Value, ensuring correct Arrow FixedSizeList invariants.
697///
698/// Always returns exactly `dimensions` f32 values (zeros for null/invalid), plus a validity flag.
699/// This guarantees `child_array.len() == parent_array.len() × dimensions`.
700///
701/// # Arguments
702/// - `val`: Optional property value to extract from
703/// - `is_deleted`: Whether the containing entity is deleted (affects validity)
704/// - `dimensions`: Expected vector dimensions
705///
706/// # Returns
707/// - Tuple of (vector values, validity flag)
708///   - Vector always has exactly `dimensions` elements
709///   - Validity is `true` for valid vectors or deleted entries, `false` for null/invalid
710pub fn extract_vector_f32_values(
711    val: Option<&Value>,
712    is_deleted: bool,
713    dimensions: usize,
714) -> (Vec<f32>, bool) {
715    let zeros = || vec![0.0_f32; dimensions];
716
717    // Deleted entries always return zeros with valid=true
718    if is_deleted {
719        return (zeros(), true);
720    }
721
722    match val {
723        // Native f32 vector (Value::Vector)
724        Some(Value::Vector(v)) if v.len() == dimensions => (v.clone(), true),
725        Some(Value::Vector(_)) => (zeros(), false), // Wrong dimensions
726        // List of values (Value::List) - convert to f32
727        Some(Value::List(arr)) if arr.len() == dimensions => {
728            let values: Vec<f32> = arr
729                .iter()
730                .map(|v| v.as_f64().unwrap_or(0.0) as f32)
731                .collect();
732            (values, true)
733        }
734        Some(Value::List(_)) => (zeros(), false), // Wrong dimensions
735        _ => (zeros(), false),                    // Missing or unsupported value
736    }
737}
738
739fn values_to_fixed_size_list_f32_array(values: &[Value], size: i32) -> ArrayRef {
740    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size);
741    for v in values {
742        let (vals, valid) = extract_vector_f32_values(Some(v), false, size as usize);
743        for val in vals {
744            builder.values().append_value(val);
745        }
746        builder.append(valid);
747    }
748    Arc::new(builder.finish())
749}
750
751fn values_to_timestamp_array(values: &[Value], tz: Option<&Arc<str>>) -> ArrayRef {
752    let mut builder = TimestampNanosecondBuilder::with_capacity(values.len());
753    for v in values {
754        if v.is_null() {
755            builder.append_null();
756        } else if let Value::Temporal(tv) = v {
757            match tv {
758                uni_common::TemporalValue::DateTime {
759                    nanos_since_epoch, ..
760                }
761                | uni_common::TemporalValue::LocalDateTime {
762                    nanos_since_epoch, ..
763                } => builder.append_value(*nanos_since_epoch),
764                _ => builder.append_null(),
765            }
766        } else if let Some(n) = v.as_i64() {
767            builder.append_value(n);
768        } else if let Some(s) = v.as_str() {
769            match parse_datetime_to_nanos(s) {
770                Some(nanos) => builder.append_value(nanos),
771                None => builder.append_null(),
772            }
773        } else {
774            builder.append_null();
775        }
776    }
777
778    let arr = builder.finish();
779    if let Some(tz) = tz {
780        Arc::new(arr.with_timezone(tz.as_ref()))
781    } else {
782        Arc::new(arr)
783    }
784}
785
786/// Build a DateTime struct array from values.
787///
788/// Encodes DateTime as a 3-field struct: (nanos_since_epoch, offset_seconds, timezone_name).
789/// This preserves timezone offset information that was previously lost with TimestampNanosecond encoding.
790fn values_to_datetime_struct_array(values: &[Value]) -> ArrayRef {
791    let mut nanos_builder = TimestampNanosecondBuilder::with_capacity(values.len());
792    let mut offset_builder = Int32Builder::with_capacity(values.len());
793    let mut tz_builder = StringBuilder::with_capacity(values.len(), values.len() * 20);
794    let mut null_buffer = BooleanBufferBuilder::new(values.len());
795
796    for v in values {
797        match v {
798            Value::Temporal(uni_common::TemporalValue::DateTime {
799                nanos_since_epoch,
800                offset_seconds,
801                timezone_name,
802            }) => {
803                nanos_builder.append_value(*nanos_since_epoch);
804                offset_builder.append_value(*offset_seconds);
805                tz_builder.append_option(timezone_name.as_deref());
806                null_buffer.append(true);
807            }
808            Value::Temporal(uni_common::TemporalValue::LocalDateTime { nanos_since_epoch }) => {
809                nanos_builder.append_value(*nanos_since_epoch);
810                offset_builder.append_null();
811                tz_builder.append_null();
812                null_buffer.append(true);
813            }
814            _ => {
815                nanos_builder.append_null();
816                offset_builder.append_null();
817                tz_builder.append_null();
818                null_buffer.append(false);
819            }
820        }
821    }
822
823    let struct_arr = StructArray::new(
824        schema::datetime_struct_fields(),
825        vec![
826            Arc::new(nanos_builder.finish()) as ArrayRef,
827            Arc::new(offset_builder.finish()) as ArrayRef,
828            Arc::new(tz_builder.finish()) as ArrayRef,
829        ],
830        Some(null_buffer.finish().into()),
831    );
832    Arc::new(struct_arr)
833}
834
835/// Build a Time struct array from values.
836///
837/// Encodes Time as a 2-field struct: (nanos_since_midnight, offset_seconds).
838/// This preserves timezone offset information that was previously lost with Time64Nanosecond encoding.
839fn values_to_time_struct_array(values: &[Value]) -> ArrayRef {
840    let mut nanos_builder = Time64NanosecondBuilder::with_capacity(values.len());
841    let mut offset_builder = Int32Builder::with_capacity(values.len());
842    let mut null_buffer = BooleanBufferBuilder::new(values.len());
843
844    for v in values {
845        match v {
846            Value::Temporal(uni_common::TemporalValue::Time {
847                nanos_since_midnight,
848                offset_seconds,
849            }) => {
850                nanos_builder.append_value(*nanos_since_midnight);
851                offset_builder.append_value(*offset_seconds);
852                null_buffer.append(true);
853            }
854            Value::Temporal(uni_common::TemporalValue::LocalTime {
855                nanos_since_midnight,
856            }) => {
857                nanos_builder.append_value(*nanos_since_midnight);
858                offset_builder.append_null();
859                null_buffer.append(true);
860            }
861            _ => {
862                nanos_builder.append_null();
863                offset_builder.append_null();
864                null_buffer.append(false);
865            }
866        }
867    }
868
869    let struct_arr = StructArray::new(
870        schema::time_struct_fields(),
871        vec![
872            Arc::new(nanos_builder.finish()) as ArrayRef,
873            Arc::new(offset_builder.finish()) as ArrayRef,
874        ],
875        Some(null_buffer.finish().into()),
876    );
877    Arc::new(struct_arr)
878}
879
880fn values_to_large_binary_array(values: &[Value]) -> ArrayRef {
881    let mut builder =
882        arrow_array::builder::LargeBinaryBuilder::with_capacity(values.len(), values.len() * 64);
883    for v in values {
884        if v.is_null() {
885            builder.append_null();
886        } else {
887            // Encode as CypherValue (MessagePack-tagged)
888            let cv_bytes = uni_common::cypher_value_codec::encode(v);
889            builder.append_value(&cv_bytes);
890        }
891    }
892    Arc::new(builder.finish())
893}
894
895/// Convert a slice of JSON Values to an Arrow array based on the target Arrow DataType.
896pub fn values_to_array(values: &[Value], dt: &ArrowDataType) -> Result<ArrayRef> {
897    match dt {
898        ArrowDataType::UInt64 => Ok(values_to_uint64_array(values)),
899        ArrowDataType::Int64 => Ok(values_to_int64_array(values)),
900        ArrowDataType::Int32 => Ok(values_to_int32_array(values)),
901        ArrowDataType::Utf8 => Ok(values_to_string_array(values)),
902        ArrowDataType::Boolean => Ok(values_to_bool_array(values)),
903        ArrowDataType::Float32 => Ok(values_to_float32_array(values)),
904        ArrowDataType::Float64 => Ok(values_to_float64_array(values)),
905        ArrowDataType::FixedSizeBinary(size) => values_to_fixed_size_binary_array(values, *size),
906        ArrowDataType::FixedSizeList(inner, size) => {
907            if inner.data_type() == &ArrowDataType::Float32 {
908                Ok(values_to_fixed_size_list_f32_array(values, *size))
909            } else {
910                Err(anyhow!("Unsupported FixedSizeList inner type"))
911            }
912        }
913        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
914            Ok(values_to_timestamp_array(values, tz.as_ref()))
915        }
916        ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
917            Ok(values_to_timestamp_array(values, tz.as_ref()))
918        }
919        ArrowDataType::Date32 => {
920            let mut builder = Date32Builder::with_capacity(values.len());
921            for v in values {
922                if v.is_null() {
923                    builder.append_null();
924                } else if let Value::Temporal(uni_common::TemporalValue::Date {
925                    days_since_epoch,
926                }) = v
927                {
928                    builder.append_value(*days_since_epoch);
929                } else if let Some(n) = v.as_i64() {
930                    builder.append_value(n as i32);
931                } else {
932                    builder.append_null();
933                }
934            }
935            Ok(Arc::new(builder.finish()))
936        }
937        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
938            let mut builder = Time64NanosecondBuilder::with_capacity(values.len());
939            for v in values {
940                if v.is_null() {
941                    builder.append_null();
942                } else if let Value::Temporal(tv) = v {
943                    match tv {
944                        uni_common::TemporalValue::LocalTime {
945                            nanos_since_midnight,
946                        }
947                        | uni_common::TemporalValue::Time {
948                            nanos_since_midnight,
949                            ..
950                        } => builder.append_value(*nanos_since_midnight),
951                        _ => builder.append_null(),
952                    }
953                } else if let Some(n) = v.as_i64() {
954                    builder.append_value(n);
955                } else {
956                    builder.append_null();
957                }
958            }
959            Ok(Arc::new(builder.finish()))
960        }
961        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
962            let mut builder = Time64MicrosecondBuilder::with_capacity(values.len());
963            for v in values {
964                if v.is_null() {
965                    builder.append_null();
966                } else if let Value::Temporal(tv) = v {
967                    match tv {
968                        uni_common::TemporalValue::LocalTime {
969                            nanos_since_midnight,
970                        }
971                        | uni_common::TemporalValue::Time {
972                            nanos_since_midnight,
973                            ..
974                        } => builder.append_value(*nanos_since_midnight / 1_000), // nanos→micros for legacy
975                        _ => builder.append_null(),
976                    }
977                } else if let Some(n) = v.as_i64() {
978                    builder.append_value(n);
979                } else {
980                    builder.append_null();
981                }
982            }
983            Ok(Arc::new(builder.finish()))
984        }
985        ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => {
986            let mut builder = IntervalMonthDayNanoBuilder::with_capacity(values.len());
987            for v in values {
988                if v.is_null() {
989                    builder.append_null();
990                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
991                    months,
992                    days,
993                    nanos,
994                }) = v
995                {
996                    builder.append_value(arrow::datatypes::IntervalMonthDayNano {
997                        months: *months as i32,
998                        days: *days as i32,
999                        nanoseconds: *nanos,
1000                    });
1001                } else {
1002                    builder.append_null();
1003                }
1004            }
1005            Ok(Arc::new(builder.finish()))
1006        }
1007        ArrowDataType::Duration(arrow_schema::TimeUnit::Microsecond) => {
1008            let mut builder = DurationMicrosecondBuilder::with_capacity(values.len());
1009            for v in values {
1010                if v.is_null() {
1011                    builder.append_null();
1012                } else if let Value::Temporal(uni_common::TemporalValue::Duration {
1013                    months,
1014                    days,
1015                    nanos,
1016                }) = v
1017                {
1018                    let total_micros =
1019                        months * 30 * 86_400_000_000i64 + days * 86_400_000_000i64 + nanos / 1_000;
1020                    builder.append_value(total_micros);
1021                } else if let Some(n) = v.as_i64() {
1022                    builder.append_value(n);
1023                } else {
1024                    builder.append_null();
1025                }
1026            }
1027            Ok(Arc::new(builder.finish()))
1028        }
1029        ArrowDataType::LargeBinary => Ok(values_to_large_binary_array(values)),
1030        ArrowDataType::List(field) => {
1031            if field.data_type() == &ArrowDataType::Utf8 {
1032                let mut builder = ListBuilder::new(StringBuilder::new());
1033                for v in values {
1034                    if let Value::List(arr) = v {
1035                        for item in arr {
1036                            if let Some(s) = item.as_str() {
1037                                builder.values().append_value(s);
1038                            } else {
1039                                builder.values().append_null();
1040                            }
1041                        }
1042                        builder.append(true);
1043                    } else {
1044                        builder.append_null();
1045                    }
1046                }
1047                Ok(Arc::new(builder.finish()))
1048            } else {
1049                Err(anyhow!(
1050                    "Unsupported List inner type: {:?}",
1051                    field.data_type()
1052                ))
1053            }
1054        }
1055        ArrowDataType::Struct(_) if schema::is_datetime_struct(dt) => {
1056            Ok(values_to_datetime_struct_array(values))
1057        }
1058        ArrowDataType::Struct(_) if schema::is_time_struct(dt) => {
1059            Ok(values_to_time_struct_array(values))
1060        }
1061        _ => Err(anyhow!("Unsupported type for conversion: {:?}", dt)),
1062    }
1063}
1064
1065/// Property value extractor for building Arrow columns from entity properties.
1066pub struct PropertyExtractor<'a> {
1067    data_type: &'a DataType,
1068}
1069
1070impl<'a> PropertyExtractor<'a> {
1071    pub fn new(_name: &'a str, data_type: &'a DataType) -> Self {
1072        Self { data_type }
1073    }
1074
1075    /// Build an Arrow column from a slice of property maps.
1076    /// The `deleted` slice indicates which entries are deleted (use default values).
1077    pub fn build_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1078    where
1079        F: Fn(usize) -> Option<&'a Value>,
1080    {
1081        match self.data_type {
1082            DataType::String => self.build_string_column(len, deleted, get_props),
1083            DataType::Int32 => self.build_int32_column(len, deleted, get_props),
1084            DataType::Int64 => self.build_int64_column(len, deleted, get_props),
1085            DataType::Float32 => self.build_float32_column(len, deleted, get_props),
1086            DataType::Float64 => self.build_float64_column(len, deleted, get_props),
1087            DataType::Bool => self.build_bool_column(len, deleted, get_props),
1088            DataType::Vector { dimensions } => {
1089                self.build_vector_column(len, deleted, get_props, *dimensions)
1090            }
1091            DataType::CypherValue => self.build_json_column(len, deleted, get_props),
1092            DataType::Bytes => self.build_bytes_column(len, deleted, get_props),
1093            DataType::List(inner) => self.build_list_column(len, deleted, get_props, inner),
1094            DataType::Map(key, value) => self.build_map_column(len, deleted, get_props, key, value),
1095            DataType::Crdt(_) => self.build_crdt_column(len, deleted, get_props),
1096            DataType::DateTime => self.build_datetime_struct_column(len, deleted, get_props),
1097            DataType::Timestamp => self.build_timestamp_column(len, deleted, get_props),
1098            DataType::Date => self.build_date32_column(len, deleted, get_props),
1099            DataType::Time => self.build_time_struct_column(len, deleted, get_props),
1100            DataType::Duration => self.build_duration_column(len, deleted, get_props),
1101            DataType::Btic => self.build_btic_column(len, deleted, get_props),
1102            _ => Err(anyhow!(
1103                "Unsupported data type for arrow conversion: {:?}",
1104                self.data_type
1105            )),
1106        }
1107    }
1108
1109    fn build_string_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1110    where
1111        F: Fn(usize) -> Option<&'a Value>,
1112    {
1113        let mut builder = arrow_array::builder::StringBuilder::with_capacity(len, len * 32);
1114        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1115            let prop = get_props(i);
1116            if let Some(s) = prop.and_then(|v| v.as_str()) {
1117                builder.append_value(s);
1118            } else if let Some(Value::Temporal(tv)) = prop {
1119                builder.append_value(tv.to_string());
1120            } else if is_deleted {
1121                builder.append_value("");
1122            } else {
1123                builder.append_null();
1124            }
1125        }
1126        Ok(Arc::new(builder.finish()))
1127    }
1128
1129    fn build_int32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1130    where
1131        F: Fn(usize) -> Option<&'a Value>,
1132    {
1133        let mut values = Vec::with_capacity(len);
1134        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1135            // i64 -> i32 via try_from: an out-of-range value becomes NULL rather
1136            // than silently wrapping to a different number. (review H13)
1137            let val = get_props(i)
1138                .and_then(|v| v.as_i64())
1139                .and_then(|v| i32::try_from(v).ok());
1140            if val.is_none() && is_deleted {
1141                values.push(Some(0));
1142            } else {
1143                values.push(val);
1144            }
1145        }
1146        Ok(Arc::new(Int32Array::from(values)))
1147    }
1148
1149    fn build_int64_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1150    where
1151        F: Fn(usize) -> Option<&'a Value>,
1152    {
1153        let mut values = Vec::with_capacity(len);
1154        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1155            let val = get_props(i).and_then(|v| v.as_i64());
1156            if val.is_none() && is_deleted {
1157                values.push(Some(0));
1158            } else {
1159                values.push(val);
1160            }
1161        }
1162        Ok(Arc::new(Int64Array::from(values)))
1163    }
1164
1165    fn build_timestamp_column<F>(
1166        &self,
1167        len: usize,
1168        deleted: &[bool],
1169        get_props: F,
1170    ) -> Result<ArrayRef>
1171    where
1172        F: Fn(usize) -> Option<&'a Value>,
1173    {
1174        let mut values = Vec::with_capacity(len);
1175        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1176            let val = get_props(i);
1177            let ts = if is_deleted || val.is_none() {
1178                Some(0i64)
1179            } else if let Some(Value::Temporal(tv)) = val {
1180                match tv {
1181                    uni_common::TemporalValue::DateTime {
1182                        nanos_since_epoch, ..
1183                    }
1184                    | uni_common::TemporalValue::LocalDateTime {
1185                        nanos_since_epoch, ..
1186                    } => Some(*nanos_since_epoch),
1187                    _ => None,
1188                }
1189            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1190                Some(v)
1191            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1192                parse_datetime_to_nanos(s)
1193            } else {
1194                None
1195            };
1196
1197            if is_deleted {
1198                values.push(Some(0));
1199            } else {
1200                values.push(ts);
1201            }
1202        }
1203        let arr = TimestampNanosecondArray::from(values).with_timezone("UTC");
1204        Ok(Arc::new(arr))
1205    }
1206
1207    fn build_datetime_struct_column<F>(
1208        &self,
1209        len: usize,
1210        deleted: &[bool],
1211        get_props: F,
1212    ) -> Result<ArrayRef>
1213    where
1214        F: Fn(usize) -> Option<&'a Value>,
1215    {
1216        let values = self.collect_values_or_null(len, deleted, &get_props);
1217        Ok(values_to_datetime_struct_array(&values))
1218    }
1219
1220    fn build_time_struct_column<F>(
1221        &self,
1222        len: usize,
1223        deleted: &[bool],
1224        get_props: F,
1225    ) -> Result<ArrayRef>
1226    where
1227        F: Fn(usize) -> Option<&'a Value>,
1228    {
1229        let values = self.collect_values_or_null(len, deleted, &get_props);
1230        Ok(values_to_time_struct_array(&values))
1231    }
1232
1233    /// Collect property values into a Vec, substituting `Value::Null` for deleted or missing entries.
1234    fn collect_values_or_null<F>(&self, len: usize, deleted: &[bool], get_props: &F) -> Vec<Value>
1235    where
1236        F: Fn(usize) -> Option<&'a Value>,
1237    {
1238        deleted
1239            .iter()
1240            .enumerate()
1241            .take(len)
1242            .map(|(i, &is_deleted)| {
1243                if is_deleted {
1244                    Value::Null
1245                } else {
1246                    get_props(i).cloned().unwrap_or(Value::Null)
1247                }
1248            })
1249            .collect()
1250    }
1251
1252    fn build_date32_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1253    where
1254        F: Fn(usize) -> Option<&'a Value>,
1255    {
1256        let mut builder = Date32Builder::with_capacity(len);
1257        let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1258
1259        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1260            let val = get_props(i);
1261            let days = if is_deleted || val.is_none() {
1262                Some(0)
1263            } else if let Some(Value::Temporal(uni_common::TemporalValue::Date {
1264                days_since_epoch,
1265            })) = val
1266            {
1267                Some(*days_since_epoch)
1268            } else if let Some(v) = val.and_then(|v| v.as_i64()) {
1269                // i64 day count -> i32: an out-of-range value becomes NULL
1270                // rather than silently wrapping to a different date. (review H13)
1271                i32::try_from(v).ok()
1272            } else if let Some(s) = val.and_then(|v| v.as_str()) {
1273                match chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1274                    Ok(date) => Some(date.signed_duration_since(epoch).num_days() as i32),
1275                    Err(_) => None,
1276                }
1277            } else {
1278                None
1279            };
1280
1281            if is_deleted {
1282                builder.append_value(0);
1283            } else if let Some(v) = days {
1284                builder.append_value(v);
1285            } else {
1286                builder.append_null();
1287            }
1288        }
1289        Ok(Arc::new(builder.finish()))
1290    }
1291
1292    fn build_duration_column<F>(
1293        &self,
1294        len: usize,
1295        deleted: &[bool],
1296        get_props: F,
1297    ) -> Result<ArrayRef>
1298    where
1299        F: Fn(usize) -> Option<&'a Value>,
1300    {
1301        // Duration stored as LargeBinary via CypherValue codec (Lance doesn't support Interval(MonthDayNano))
1302        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 32);
1303        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1304            let raw_val = get_props(i);
1305            if let Some(val @ Value::Temporal(uni_common::TemporalValue::Duration { .. })) = raw_val
1306            {
1307                let encoded = uni_common::cypher_value_codec::encode(val);
1308                builder.append_value(&encoded);
1309            } else if is_deleted {
1310                let zero = Value::Temporal(uni_common::TemporalValue::Duration {
1311                    months: 0,
1312                    days: 0,
1313                    nanos: 0,
1314                });
1315                let encoded = uni_common::cypher_value_codec::encode(&zero);
1316                builder.append_value(&encoded);
1317            } else {
1318                builder.append_null();
1319            }
1320        }
1321        Ok(Arc::new(builder.finish()))
1322    }
1323
1324    fn build_btic_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1325    where
1326        F: Fn(usize) -> Option<&'a Value>,
1327    {
1328        const ENCODED_LEN: i32 = 24;
1329        let mut builder = FixedSizeBinaryBuilder::with_capacity(len, ENCODED_LEN);
1330        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1331            let raw_val = get_props(i);
1332            let btic = match raw_val {
1333                Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => Some(
1334                    uni_btic::Btic::new(*lo, *hi, *meta)
1335                        .map_err(|e| anyhow!("invalid BTIC value: {}", e))?,
1336                ),
1337                Some(Value::String(s)) => Some(
1338                    uni_btic::parse::parse_btic_literal(s)
1339                        .map_err(|e| anyhow!("BTIC parse error for '{}': {}", s, e))?,
1340                ),
1341                _ => None,
1342            };
1343
1344            if let Some(b) = btic {
1345                builder.append_value(uni_btic::encode::encode(&b))?;
1346            } else if is_deleted {
1347                builder.append_value([0u8; ENCODED_LEN as usize])?;
1348            } else {
1349                builder.append_null();
1350            }
1351        }
1352        Ok(Arc::new(builder.finish()))
1353    }
1354
1355    fn build_float32_column<F>(
1356        &self,
1357        len: usize,
1358        deleted: &[bool],
1359        get_props: F,
1360    ) -> Result<ArrayRef>
1361    where
1362        F: Fn(usize) -> Option<&'a Value>,
1363    {
1364        let mut values = Vec::with_capacity(len);
1365        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1366            let val = get_props(i).and_then(|v| v.as_f64()).map(|v| v as f32);
1367            if val.is_none() && is_deleted {
1368                values.push(Some(0.0));
1369            } else {
1370                values.push(val);
1371            }
1372        }
1373        Ok(Arc::new(Float32Array::from(values)))
1374    }
1375
1376    fn build_float64_column<F>(
1377        &self,
1378        len: usize,
1379        deleted: &[bool],
1380        get_props: F,
1381    ) -> Result<ArrayRef>
1382    where
1383        F: Fn(usize) -> Option<&'a Value>,
1384    {
1385        let mut values = Vec::with_capacity(len);
1386        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1387            let val = get_props(i).and_then(|v| v.as_f64());
1388            if val.is_none() && is_deleted {
1389                values.push(Some(0.0));
1390            } else {
1391                values.push(val);
1392            }
1393        }
1394        Ok(Arc::new(Float64Array::from(values)))
1395    }
1396
1397    fn build_bool_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1398    where
1399        F: Fn(usize) -> Option<&'a Value>,
1400    {
1401        let mut values = Vec::with_capacity(len);
1402        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1403            let val = get_props(i).and_then(|v| v.as_bool());
1404            if val.is_none() && is_deleted {
1405                values.push(Some(false));
1406            } else {
1407                values.push(val);
1408            }
1409        }
1410        Ok(Arc::new(BooleanArray::from(values)))
1411    }
1412
1413    fn build_vector_column<F>(
1414        &self,
1415        len: usize,
1416        deleted: &[bool],
1417        get_props: F,
1418        dimensions: usize,
1419    ) -> Result<ArrayRef>
1420    where
1421        F: Fn(usize) -> Option<&'a Value>,
1422    {
1423        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dimensions as i32);
1424
1425        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1426            let val = get_props(i);
1427            let (values, valid) = extract_vector_f32_values(val, is_deleted, dimensions);
1428            for v in values {
1429                builder.values().append_value(v);
1430            }
1431            builder.append(valid);
1432        }
1433        Ok(Arc::new(builder.finish()))
1434    }
1435
1436    fn build_json_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1437    where
1438        F: Fn(usize) -> Option<&'a Value>,
1439    {
1440        let null_val = Value::Null;
1441        let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(len, len * 64);
1442        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1443            let val = get_props(i);
1444            let uni_val = if val.is_none() && is_deleted {
1445                &null_val
1446            } else {
1447                val.unwrap_or(&null_val)
1448            };
1449            // Encode to CypherValue (MessagePack-tagged)
1450            let cv_bytes = uni_common::cypher_value_codec::encode(uni_val);
1451            builder.append_value(&cv_bytes);
1452        }
1453        Ok(Arc::new(builder.finish()))
1454    }
1455
1456    fn build_bytes_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1457    where
1458        F: Fn(usize) -> Option<&'a Value>,
1459    {
1460        let mut builder = LargeBinaryBuilder::with_capacity(len, len * 64);
1461        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1462            let val = get_props(i);
1463            if let Some(Value::Bytes(b)) = val {
1464                builder.append_value(b);
1465            } else if is_deleted {
1466                builder.append_value(&[][..]);
1467            } else {
1468                builder.append_null();
1469            }
1470        }
1471        Ok(Arc::new(builder.finish()))
1472    }
1473
1474    fn build_list_column<F>(
1475        &self,
1476        len: usize,
1477        deleted: &[bool],
1478        get_props: F,
1479        inner: &DataType,
1480    ) -> Result<ArrayRef>
1481    where
1482        F: Fn(usize) -> Option<&'a Value>,
1483    {
1484        match inner {
1485            DataType::String => {
1486                self.build_typed_list(len, deleted, &get_props, StringBuilder::new(), |v, b| {
1487                    if let Some(s) = v.as_str() {
1488                        b.append_value(s);
1489                    } else {
1490                        b.append_null();
1491                    }
1492                })
1493            }
1494            DataType::Int64 => {
1495                self.build_typed_list(len, deleted, &get_props, Int64Builder::new(), |v, b| {
1496                    if let Some(n) = v.as_i64() {
1497                        b.append_value(n);
1498                    } else {
1499                        b.append_null();
1500                    }
1501                })
1502            }
1503            DataType::Float64 => {
1504                self.build_typed_list(len, deleted, &get_props, Float64Builder::new(), |v, b| {
1505                    if let Some(f) = v.as_f64() {
1506                        b.append_value(f);
1507                    } else {
1508                        b.append_null();
1509                    }
1510                })
1511            }
1512            _ => Err(anyhow!("Unsupported inner type for List: {:?}", inner)),
1513        }
1514    }
1515
1516    /// Generic helper to build a list column with any inner builder type.
1517    fn build_typed_list<F, B, A>(
1518        &self,
1519        len: usize,
1520        deleted: &[bool],
1521        get_props: &F,
1522        inner_builder: B,
1523        mut append_value: A,
1524    ) -> Result<ArrayRef>
1525    where
1526        F: Fn(usize) -> Option<&'a Value>,
1527        B: arrow_array::builder::ArrayBuilder,
1528        A: FnMut(&Value, &mut B),
1529    {
1530        let mut builder = ListBuilder::new(inner_builder);
1531        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1532            let val_array = get_props(i).and_then(|v| v.as_array());
1533            if val_array.is_none() && is_deleted {
1534                builder.append_null();
1535            } else if let Some(arr) = val_array {
1536                for v in arr {
1537                    append_value(v, builder.values());
1538                }
1539                builder.append(true);
1540            } else {
1541                builder.append_null();
1542            }
1543        }
1544        Ok(Arc::new(builder.finish()))
1545    }
1546
1547    fn build_map_column<F>(
1548        &self,
1549        len: usize,
1550        deleted: &[bool],
1551        get_props: F,
1552        key: &DataType,
1553        value: &DataType,
1554    ) -> Result<ArrayRef>
1555    where
1556        F: Fn(usize) -> Option<&'a Value>,
1557    {
1558        if !matches!(key, DataType::String) {
1559            return Err(anyhow!("Map keys must be String (JSON limitation)"));
1560        }
1561
1562        match value {
1563            DataType::String => self.build_typed_map(
1564                len,
1565                deleted,
1566                &get_props,
1567                StringBuilder::new(),
1568                arrow_schema::DataType::Utf8,
1569                |v, b: &mut StringBuilder| {
1570                    if let Some(s) = v.as_str() {
1571                        b.append_value(s);
1572                    } else {
1573                        b.append_null();
1574                    }
1575                },
1576            ),
1577            DataType::Int64 => self.build_typed_map(
1578                len,
1579                deleted,
1580                &get_props,
1581                Int64Builder::new(),
1582                arrow_schema::DataType::Int64,
1583                |v, b: &mut Int64Builder| {
1584                    if let Some(n) = v.as_i64() {
1585                        b.append_value(n);
1586                    } else {
1587                        b.append_null();
1588                    }
1589                },
1590            ),
1591            _ => Err(anyhow!("Unsupported value type for Map: {:?}", value)),
1592        }
1593    }
1594
1595    /// Generic helper to build a map column with any value builder type.
1596    fn build_typed_map<F, B, A>(
1597        &self,
1598        len: usize,
1599        deleted: &[bool],
1600        get_props: &F,
1601        value_builder: B,
1602        value_arrow_type: arrow_schema::DataType,
1603        mut append_value: A,
1604    ) -> Result<ArrayRef>
1605    where
1606        F: Fn(usize) -> Option<&'a Value>,
1607        B: arrow_array::builder::ArrayBuilder,
1608        A: FnMut(&Value, &mut B),
1609    {
1610        let key_builder = Box::new(StringBuilder::new());
1611        let value_builder = Box::new(value_builder);
1612        let struct_builder = StructBuilder::new(
1613            vec![
1614                Field::new("key", arrow_schema::DataType::Utf8, false),
1615                Field::new("value", value_arrow_type, true),
1616            ],
1617            vec![key_builder, value_builder],
1618        );
1619        let mut builder = ListBuilder::new(struct_builder);
1620
1621        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1622            self.append_map_entry(&mut builder, get_props(i), is_deleted, &mut append_value);
1623        }
1624        Ok(Arc::new(builder.finish()))
1625    }
1626
1627    /// Append a single map entry to the list builder.
1628    fn append_map_entry<B, A>(
1629        &self,
1630        builder: &mut ListBuilder<StructBuilder>,
1631        val: Option<&'a Value>,
1632        is_deleted: bool,
1633        append_value: &mut A,
1634    ) where
1635        B: arrow_array::builder::ArrayBuilder,
1636        A: FnMut(&Value, &mut B),
1637    {
1638        let val_obj = val.and_then(|v| v.as_object());
1639        if val_obj.is_none() && is_deleted {
1640            builder.append(false);
1641        } else if let Some(obj) = val_obj {
1642            let struct_b = builder.values();
1643            for (k, v) in obj {
1644                struct_b
1645                    .field_builder::<StringBuilder>(0)
1646                    .unwrap()
1647                    .append_value(k);
1648                // Safety: We know the value builder type matches B
1649                let value_b = struct_b.field_builder::<B>(1).unwrap();
1650                append_value(v, value_b);
1651                struct_b.append(true);
1652            }
1653            builder.append(true);
1654        } else {
1655            builder.append(false);
1656        }
1657    }
1658
1659    fn build_crdt_column<F>(&self, len: usize, deleted: &[bool], get_props: F) -> Result<ArrayRef>
1660    where
1661        F: Fn(usize) -> Option<&'a Value>,
1662    {
1663        let mut builder = BinaryBuilder::new();
1664        for (i, &is_deleted) in deleted.iter().enumerate().take(len) {
1665            if is_deleted {
1666                builder.append_null();
1667                continue;
1668            }
1669            if let Some(val) = get_props(i) {
1670                // Try to parse CRDT from the value
1671                // If it's a string, first parse it as JSON, then as CRDT
1672                let crdt_result = if let Some(s) = val.as_str() {
1673                    serde_json::from_str::<Crdt>(s)
1674                } else {
1675                    // Convert uni_common::Value to serde_json::Value at the CRDT boundary
1676                    let json_val: serde_json::Value = val.clone().into();
1677                    serde_json::from_value::<Crdt>(json_val)
1678                };
1679
1680                if let Ok(crdt) = crdt_result {
1681                    if let Ok(bytes) = crdt.to_msgpack() {
1682                        builder.append_value(&bytes);
1683                    } else {
1684                        builder.append_null();
1685                    }
1686                } else {
1687                    builder.append_null();
1688                }
1689            } else {
1690                builder.append_null();
1691            }
1692        }
1693        Ok(Arc::new(builder.finish()))
1694    }
1695}
1696
1697/// Build a column for edge entries (no deleted flag handling needed).
1698pub fn build_edge_column<'a>(
1699    name: &'a str,
1700    data_type: &'a DataType,
1701    len: usize,
1702    get_props: impl Fn(usize) -> Option<&'a Value>,
1703) -> Result<ArrayRef> {
1704    // For edges, use empty deleted array
1705    let deleted = vec![false; len];
1706    let extractor = PropertyExtractor::new(name, data_type);
1707    extractor.build_column(len, &deleted, get_props)
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712    use super::*;
1713    use arrow_array::{
1714        Array, DurationMicrosecondArray,
1715        builder::{BinaryBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder},
1716    };
1717    use std::collections::HashMap;
1718    use uni_common::TemporalValue;
1719    use uni_crdt::{Crdt, GCounter};
1720
1721    #[test]
1722    fn test_arrow_to_value_string() {
1723        let arr = StringArray::from(vec![Some("hello"), None, Some("world")]);
1724        assert_eq!(
1725            arrow_to_value(&arr, 0, None),
1726            Value::String("hello".to_string())
1727        );
1728        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1729        assert_eq!(
1730            arrow_to_value(&arr, 2, None),
1731            Value::String("world".to_string())
1732        );
1733    }
1734
1735    #[test]
1736    fn test_arrow_to_value_int64() {
1737        let arr = Int64Array::from(vec![Some(42), None, Some(-10)]);
1738        assert_eq!(arrow_to_value(&arr, 0, None), Value::Int(42));
1739        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1740        assert_eq!(arrow_to_value(&arr, 2, None), Value::Int(-10));
1741    }
1742
1743    #[test]
1744    #[allow(clippy::approx_constant)]
1745    fn test_arrow_to_value_float64() {
1746        let arr = Float64Array::from(vec![Some(3.14), None]);
1747        assert_eq!(arrow_to_value(&arr, 0, None), Value::Float(3.14));
1748        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1749    }
1750
1751    #[test]
1752    fn test_arrow_to_value_bool() {
1753        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1754        assert_eq!(arrow_to_value(&arr, 0, None), Value::Bool(true));
1755        assert_eq!(arrow_to_value(&arr, 1, None), Value::Bool(false));
1756        assert_eq!(arrow_to_value(&arr, 2, None), Value::Null);
1757    }
1758
1759    #[test]
1760    fn test_values_to_array_int64() {
1761        let values = vec![Value::Int(1), Value::Int(2), Value::Null, Value::Int(4)];
1762        let arr = values_to_array(&values, &ArrowDataType::Int64).unwrap();
1763        assert_eq!(arr.len(), 4);
1764
1765        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1766        assert_eq!(int_arr.value(0), 1);
1767        assert_eq!(int_arr.value(1), 2);
1768        assert!(int_arr.is_null(2));
1769        assert_eq!(int_arr.value(3), 4);
1770    }
1771
1772    #[test]
1773    fn test_values_to_array_string() {
1774        let values = vec![
1775            Value::String("a".to_string()),
1776            Value::String("b".to_string()),
1777            Value::Null,
1778        ];
1779        let arr = values_to_array(&values, &ArrowDataType::Utf8).unwrap();
1780        assert_eq!(arr.len(), 3);
1781
1782        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1783        assert_eq!(str_arr.value(0), "a");
1784        assert_eq!(str_arr.value(1), "b");
1785        assert!(str_arr.is_null(2));
1786    }
1787
1788    #[test]
1789    fn test_property_extractor_string() {
1790        let props: Vec<HashMap<String, Value>> = vec![
1791            [("name".to_string(), Value::String("Alice".to_string()))]
1792                .into_iter()
1793                .collect(),
1794            [("name".to_string(), Value::String("Bob".to_string()))]
1795                .into_iter()
1796                .collect(),
1797            HashMap::new(),
1798        ];
1799        let deleted = vec![false, false, true];
1800
1801        let extractor = PropertyExtractor::new("name", &DataType::String);
1802        let arr = extractor
1803            .build_column(3, &deleted, |i| props[i].get("name"))
1804            .unwrap();
1805
1806        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1807        assert_eq!(str_arr.value(0), "Alice");
1808        assert_eq!(str_arr.value(1), "Bob");
1809        assert_eq!(str_arr.value(2), ""); // Deleted entries get default
1810    }
1811
1812    #[test]
1813    fn test_property_extractor_int64() {
1814        let props: Vec<HashMap<String, Value>> = vec![
1815            [("age".to_string(), Value::Int(25))].into_iter().collect(),
1816            [("age".to_string(), Value::Int(30))].into_iter().collect(),
1817            HashMap::new(),
1818        ];
1819        let deleted = vec![false, false, true];
1820
1821        let extractor = PropertyExtractor::new("age", &DataType::Int64);
1822        let arr = extractor
1823            .build_column(3, &deleted, |i| props[i].get("age"))
1824            .unwrap();
1825
1826        let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1827        assert_eq!(int_arr.value(0), 25);
1828        assert_eq!(int_arr.value(1), 30);
1829        assert_eq!(int_arr.value(2), 0); // Deleted entries get default
1830    }
1831
1832    #[test]
1833    fn test_property_extractor_bytes_roundtrip() {
1834        let blob = vec![0u8, 1, 2, 255];
1835        let props: Vec<HashMap<String, Value>> = vec![
1836            [("blob".to_string(), Value::Bytes(blob.clone()))]
1837                .into_iter()
1838                .collect(),
1839            [("blob".to_string(), Value::Bytes(Vec::new()))]
1840                .into_iter()
1841                .collect(),
1842            HashMap::new(),
1843        ];
1844        let deleted = vec![false, false, false];
1845
1846        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1847        let arr = extractor
1848            .build_column(3, &deleted, |i| props[i].get("blob"))
1849            .unwrap();
1850
1851        // Decode each row through arrow_to_value with Bytes hint.
1852        assert_eq!(
1853            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1854            Value::Bytes(blob)
1855        );
1856        assert_eq!(
1857            arrow_to_value(arr.as_ref(), 1, Some(&DataType::Bytes)),
1858            Value::Bytes(Vec::new())
1859        );
1860        // Missing property → null in the Arrow array.
1861        assert_eq!(
1862            arrow_to_value(arr.as_ref(), 2, Some(&DataType::Bytes)),
1863            Value::Null
1864        );
1865    }
1866
1867    #[test]
1868    fn test_bytes_vs_cypher_value_disambiguation() {
1869        // A LargeBinary column tagged as DataType::Bytes must NOT be decoded
1870        // through the CypherValue MessagePack codec, even though both share
1871        // the same Arrow physical type.
1872        let raw = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1873        let props: Vec<HashMap<String, Value>> = vec![
1874            [("blob".to_string(), Value::Bytes(raw.clone()))]
1875                .into_iter()
1876                .collect(),
1877        ];
1878        let extractor = PropertyExtractor::new("blob", &DataType::Bytes);
1879        let arr = extractor
1880            .build_column(1, &[false], |i| props[i].get("blob"))
1881            .unwrap();
1882        // With schema hint: raw bytes returned.
1883        assert_eq!(
1884            arrow_to_value(arr.as_ref(), 0, Some(&DataType::Bytes)),
1885            Value::Bytes(raw)
1886        );
1887    }
1888
1889    #[test]
1890    fn test_data_type_bytes_to_arrow() {
1891        assert_eq!(DataType::Bytes.to_arrow(), ArrowDataType::LargeBinary);
1892    }
1893
1894    #[test]
1895    fn test_arrow_to_value_time64() {
1896        // Test Time64MicrosecondArray legacy fallback (micros→nanos conversion)
1897        let mut builder = Time64MicrosecondBuilder::new();
1898        // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37845000000 microseconds
1899        builder.append_value(37_845_000_000);
1900        // 00:00:00 = 0 microseconds
1901        builder.append_value(0);
1902        // 23:59:59.123456 = 86399.123456 seconds
1903        builder.append_value(86_399_123_456);
1904        builder.append_null();
1905
1906        let arr = builder.finish();
1907        // Arrow→Value returns Value::Temporal(LocalTime) with nanos (micros * 1000)
1908        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "10:30:45");
1909        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "00:00");
1910        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "23:59:59.123456");
1911        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1912    }
1913
1914    #[test]
1915    fn test_arrow_to_value_duration() {
1916        // Test DurationMicrosecondArray conversion
1917        // Arrow→Value now returns Value::Temporal(Duration)
1918        let arr = DurationMicrosecondArray::from(vec![
1919            Some(1_000_000),      // 1 second in microseconds
1920            Some(3_600_000_000),  // 1 hour
1921            Some(86_400_000_000), // 1 day
1922            None,
1923        ]);
1924
1925        assert_eq!(arrow_to_value(&arr, 0, None).to_string(), "PT1S");
1926        assert_eq!(arrow_to_value(&arr, 1, None).to_string(), "PT1H");
1927        assert_eq!(arrow_to_value(&arr, 2, None).to_string(), "PT24H");
1928        assert_eq!(arrow_to_value(&arr, 3, None), Value::Null);
1929    }
1930
1931    #[test]
1932    fn test_arrow_to_value_binary_crdt() {
1933        // Test BinaryArray (CRDT) conversion - round-trip test
1934        let mut builder = BinaryBuilder::new();
1935
1936        // Create a GCounter CRDT and serialize it
1937        let mut counter = GCounter::new();
1938        counter.increment("actor1", 5);
1939        let crdt = Crdt::GCounter(counter);
1940        let bytes = crdt.to_msgpack().unwrap();
1941        builder.append_value(&bytes);
1942
1943        // Add a null value
1944        builder.append_null();
1945
1946        let arr = builder.finish();
1947
1948        // The first value should deserialize back to a map
1949        let result = arrow_to_value(&arr, 0, None);
1950        assert!(result.as_object().is_some());
1951        let obj = result.as_object().unwrap();
1952        // GCounter serializes with tag "t": "gc"
1953        assert_eq!(obj.get("t"), Some(&Value::String("gc".to_string())));
1954
1955        // Null value should return null
1956        assert_eq!(arrow_to_value(&arr, 1, None), Value::Null);
1957    }
1958
1959    #[test]
1960    fn test_datetime_struct_encode_decode_roundtrip() {
1961        // Test DateTime struct encoding with offset and timezone preservation
1962        let values = vec![
1963            Value::Temporal(TemporalValue::DateTime {
1964                nanos_since_epoch: 441763200000000000, // 1984-01-01T00:00:00Z
1965                offset_seconds: 3600,                  // +01:00
1966                timezone_name: Some("Europe/Paris".to_string()),
1967            }),
1968            Value::Temporal(TemporalValue::DateTime {
1969                nanos_since_epoch: 1704067200000000000, // 2024-01-01T00:00:00Z
1970                offset_seconds: -18000,                 // -05:00
1971                timezone_name: None,
1972            }),
1973            Value::Temporal(TemporalValue::DateTime {
1974                nanos_since_epoch: 0, // Unix epoch
1975                offset_seconds: 0,
1976                timezone_name: Some("UTC".to_string()),
1977            }),
1978        ];
1979
1980        // Encode to Arrow struct
1981        let arr_ref = values_to_datetime_struct_array(&values);
1982        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
1983        assert_eq!(arr.len(), 3);
1984
1985        // Decode back to Value
1986        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
1987        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
1988        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
1989
1990        // Verify round-trip preserves all fields
1991        assert_eq!(decoded_0, values[0]);
1992        assert_eq!(decoded_1, values[1]);
1993        assert_eq!(decoded_2, values[2]);
1994
1995        // Verify struct field extraction
1996        if let Value::Temporal(TemporalValue::DateTime {
1997            nanos_since_epoch,
1998            offset_seconds,
1999            timezone_name,
2000        }) = decoded_0
2001        {
2002            assert_eq!(nanos_since_epoch, 441763200000000000);
2003            assert_eq!(offset_seconds, 3600);
2004            assert_eq!(timezone_name, Some("Europe/Paris".to_string()));
2005        } else {
2006            panic!("Expected DateTime value");
2007        }
2008    }
2009
2010    #[test]
2011    fn test_datetime_struct_null_handling() {
2012        // Test DateTime struct with null values
2013        let values = vec![
2014            Value::Temporal(TemporalValue::DateTime {
2015                nanos_since_epoch: 441763200000000000,
2016                offset_seconds: 3600,
2017                timezone_name: Some("Europe/Paris".to_string()),
2018            }),
2019            Value::Null,
2020            Value::Temporal(TemporalValue::DateTime {
2021                nanos_since_epoch: 0,
2022                offset_seconds: 0,
2023                timezone_name: None,
2024            }),
2025        ];
2026
2027        let arr_ref = values_to_datetime_struct_array(&values);
2028        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2029        assert_eq!(arr.len(), 3);
2030
2031        // Check first value is valid
2032        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::DateTime));
2033        assert_eq!(decoded_0, values[0]);
2034
2035        // Check second value is null
2036        assert!(arr.is_null(1));
2037        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::DateTime));
2038        assert_eq!(decoded_1, Value::Null);
2039
2040        // Check third value is valid
2041        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::DateTime));
2042        assert_eq!(decoded_2, values[2]);
2043    }
2044
2045    #[test]
2046    fn test_datetime_struct_boundary_values() {
2047        // Test boundary values: offset=0, large positive/negative offsets
2048        let values = vec![
2049            Value::Temporal(TemporalValue::DateTime {
2050                nanos_since_epoch: 441763200000000000,
2051                offset_seconds: 0, // UTC
2052                timezone_name: None,
2053            }),
2054            Value::Temporal(TemporalValue::DateTime {
2055                nanos_since_epoch: 441763200000000000,
2056                offset_seconds: 43200, // +12:00 (max typical offset)
2057                timezone_name: None,
2058            }),
2059            Value::Temporal(TemporalValue::DateTime {
2060                nanos_since_epoch: 441763200000000000,
2061                offset_seconds: -43200, // -12:00 (min typical offset)
2062                timezone_name: None,
2063            }),
2064        ];
2065
2066        let arr_ref = values_to_datetime_struct_array(&values);
2067        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2068        assert_eq!(arr.len(), 3);
2069
2070        // Verify round-trip for all boundary values
2071        for (i, expected) in values.iter().enumerate() {
2072            let decoded = arrow_to_value(arr_ref.as_ref(), i, Some(&DataType::DateTime));
2073            assert_eq!(&decoded, expected);
2074        }
2075    }
2076
2077    #[test]
2078    fn test_datetime_old_schema_migration() {
2079        // Test backward compatibility: TimestampNanosecondArray → DateTime with offset=0
2080        let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2081        builder.append_value(441763200000000000); // 1984-01-01T00:00:00Z
2082        builder.append_value(1704067200000000000); // 2024-01-01T00:00:00Z
2083        builder.append_null();
2084
2085        let arr = builder.finish();
2086
2087        // Decode with DataType::DateTime hint should migrate old schema
2088        let decoded_0 = arrow_to_value(&arr, 0, Some(&DataType::DateTime));
2089        let _decoded_1 = arrow_to_value(&arr, 1, Some(&DataType::DateTime));
2090        let decoded_2 = arrow_to_value(&arr, 2, Some(&DataType::DateTime));
2091
2092        // Old schema should default to offset=0, preserve timezone
2093        if let Value::Temporal(TemporalValue::DateTime {
2094            nanos_since_epoch,
2095            offset_seconds,
2096            timezone_name,
2097        }) = decoded_0
2098        {
2099            assert_eq!(nanos_since_epoch, 441763200000000000);
2100            assert_eq!(offset_seconds, 0);
2101            assert_eq!(timezone_name, Some("UTC".to_string()));
2102        } else {
2103            panic!("Expected DateTime value");
2104        }
2105
2106        // Verify null handling
2107        assert_eq!(decoded_2, Value::Null);
2108    }
2109
2110    #[test]
2111    fn test_time_struct_encode_decode_roundtrip() {
2112        // Test Time struct encoding with offset preservation
2113        let values = vec![
2114            Value::Temporal(TemporalValue::Time {
2115                nanos_since_midnight: 37845000000000, // 10:30:45
2116                offset_seconds: 3600,                 // +01:00
2117            }),
2118            Value::Temporal(TemporalValue::Time {
2119                nanos_since_midnight: 0, // 00:00:00
2120                offset_seconds: 0,
2121            }),
2122            Value::Temporal(TemporalValue::Time {
2123                nanos_since_midnight: 86399999999999, // 23:59:59.999999999
2124                offset_seconds: -18000,               // -05:00
2125            }),
2126        ];
2127
2128        // Encode to Arrow struct
2129        let arr_ref = values_to_time_struct_array(&values);
2130        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2131        assert_eq!(arr.len(), 3);
2132
2133        // Decode back to Value
2134        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2135        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2136        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2137
2138        // Verify round-trip preserves all fields
2139        assert_eq!(decoded_0, values[0]);
2140        assert_eq!(decoded_1, values[1]);
2141        assert_eq!(decoded_2, values[2]);
2142
2143        // Verify struct field extraction
2144        if let Value::Temporal(TemporalValue::Time {
2145            nanos_since_midnight,
2146            offset_seconds,
2147        }) = decoded_0
2148        {
2149            assert_eq!(nanos_since_midnight, 37845000000000);
2150            assert_eq!(offset_seconds, 3600);
2151        } else {
2152            panic!("Expected Time value");
2153        }
2154    }
2155
2156    #[test]
2157    fn test_time_struct_null_handling() {
2158        // Test Time struct with null values
2159        let values = vec![
2160            Value::Temporal(TemporalValue::Time {
2161                nanos_since_midnight: 37845000000000,
2162                offset_seconds: 3600,
2163            }),
2164            Value::Null,
2165            Value::Temporal(TemporalValue::Time {
2166                nanos_since_midnight: 0,
2167                offset_seconds: 0,
2168            }),
2169        ];
2170
2171        let arr_ref = values_to_time_struct_array(&values);
2172        let arr = arr_ref.as_any().downcast_ref::<StructArray>().unwrap();
2173        assert_eq!(arr.len(), 3);
2174
2175        // Check first value is valid
2176        let decoded_0 = arrow_to_value(arr_ref.as_ref(), 0, Some(&DataType::Time));
2177        assert_eq!(decoded_0, values[0]);
2178
2179        // Check second value is null
2180        assert!(arr.is_null(1));
2181        let decoded_1 = arrow_to_value(arr_ref.as_ref(), 1, Some(&DataType::Time));
2182        assert_eq!(decoded_1, Value::Null);
2183
2184        // Check third value is valid
2185        let decoded_2 = arrow_to_value(arr_ref.as_ref(), 2, Some(&DataType::Time));
2186        assert_eq!(decoded_2, values[2]);
2187    }
2188
2189    // Tests for extract_vector_f32_values
2190
2191    #[test]
2192    fn test_extract_vector_f32_values_valid_vector() {
2193        let v = vec![1.0, 2.0, 3.0];
2194        let val = Value::Vector(v.clone());
2195        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2196        assert_eq!(result, v);
2197        assert!(valid);
2198    }
2199
2200    #[test]
2201    fn test_extract_vector_f32_values_vector_wrong_dims() {
2202        let v = vec![1.0, 2.0];
2203        let val = Value::Vector(v);
2204        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2205        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2206        assert!(!valid);
2207    }
2208
2209    #[test]
2210    fn test_extract_vector_f32_values_valid_list() {
2211        let v = vec![Value::Float(1.0), Value::Float(2.0), Value::Float(3.0)];
2212        let val = Value::List(v);
2213        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2214        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2215        assert!(valid);
2216    }
2217
2218    #[test]
2219    fn test_extract_vector_f32_values_list_wrong_dims() {
2220        let v = vec![Value::Float(1.0), Value::Float(2.0)];
2221        let val = Value::List(v);
2222        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2223        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2224        assert!(!valid);
2225    }
2226
2227    #[test]
2228    fn test_extract_vector_f32_values_list_int_coercion() {
2229        let v = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
2230        let val = Value::List(v);
2231        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2232        assert_eq!(result, vec![1.0, 2.0, 3.0]);
2233        assert!(valid);
2234    }
2235
2236    #[test]
2237    fn test_extract_vector_f32_values_none() {
2238        let (result, valid) = extract_vector_f32_values(None, false, 3);
2239        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2240        assert!(!valid);
2241    }
2242
2243    #[test]
2244    fn test_extract_vector_f32_values_null() {
2245        let val = Value::Null;
2246        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2247        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2248        assert!(!valid);
2249    }
2250
2251    #[test]
2252    fn test_extract_vector_f32_values_unsupported_type() {
2253        let val = Value::String("not a vector".to_string());
2254        let (result, valid) = extract_vector_f32_values(Some(&val), false, 3);
2255        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2256        assert!(!valid);
2257    }
2258
2259    #[test]
2260    fn test_extract_vector_f32_values_deleted_with_none() {
2261        let (result, valid) = extract_vector_f32_values(None, true, 3);
2262        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2263        assert!(valid); // Deleted entries are marked as valid with zeros
2264    }
2265
2266    #[test]
2267    fn test_extract_vector_f32_values_deleted_with_null() {
2268        let val = Value::Null;
2269        let (result, valid) = extract_vector_f32_values(Some(&val), true, 3);
2270        assert_eq!(result, vec![0.0, 0.0, 0.0]);
2271        assert!(valid); // Deleted entries are marked as valid with zeros
2272    }
2273
2274    // Tests for values_to_array with FixedSizeList
2275
2276    #[test]
2277    fn test_values_to_fixed_size_list_vector_with_nulls() {
2278        let values = vec![
2279            Value::Vector(vec![1.0, 2.0]),
2280            Value::Null,
2281            Value::Vector(vec![3.0, 4.0]),
2282            Value::String("invalid".to_string()),
2283        ];
2284        let arr_ref = values_to_array(
2285            &values,
2286            &ArrowDataType::FixedSizeList(
2287                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2288                2,
2289            ),
2290        )
2291        .unwrap();
2292
2293        let arr = arr_ref
2294            .as_any()
2295            .downcast_ref::<FixedSizeListArray>()
2296            .unwrap();
2297
2298        assert_eq!(arr.len(), 4);
2299        assert!(arr.is_valid(0));
2300        assert!(!arr.is_valid(1)); // Null value
2301        assert!(arr.is_valid(2));
2302        assert!(!arr.is_valid(3)); // Invalid type
2303    }
2304
2305    #[test]
2306    fn test_values_to_fixed_size_list_from_list() {
2307        let values = vec![
2308            Value::List(vec![Value::Float(1.0), Value::Float(2.0)]),
2309            Value::List(vec![Value::Int(3), Value::Int(4)]),
2310        ];
2311        let arr_ref = values_to_array(
2312            &values,
2313            &ArrowDataType::FixedSizeList(
2314                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2315                2,
2316            ),
2317        )
2318        .unwrap();
2319
2320        let arr = arr_ref
2321            .as_any()
2322            .downcast_ref::<FixedSizeListArray>()
2323            .unwrap();
2324
2325        assert_eq!(arr.len(), 2);
2326        assert!(arr.is_valid(0));
2327        assert!(arr.is_valid(1));
2328
2329        // Check values
2330        let child = arr
2331            .values()
2332            .as_any()
2333            .downcast_ref::<Float32Array>()
2334            .unwrap();
2335        assert_eq!(child.value(0), 1.0);
2336        assert_eq!(child.value(1), 2.0);
2337        assert_eq!(child.value(2), 3.0);
2338        assert_eq!(child.value(3), 4.0);
2339    }
2340
2341    #[test]
2342    fn test_values_to_fixed_size_list_wrong_dimensions() {
2343        let values = vec![
2344            Value::Vector(vec![1.0, 2.0, 3.0]),   // 3 dims, expecting 2
2345            Value::List(vec![Value::Float(4.0)]), // 1 dim, expecting 2
2346        ];
2347        let arr_ref = values_to_array(
2348            &values,
2349            &ArrowDataType::FixedSizeList(
2350                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2351                2,
2352            ),
2353        )
2354        .unwrap();
2355
2356        let arr = arr_ref
2357            .as_any()
2358            .downcast_ref::<FixedSizeListArray>()
2359            .unwrap();
2360
2361        assert_eq!(arr.len(), 2);
2362        assert!(!arr.is_valid(0)); // Wrong dimensions
2363        assert!(!arr.is_valid(1)); // Wrong dimensions
2364
2365        // Check that child array has zeros for invalid entries
2366        let child = arr
2367            .values()
2368            .as_any()
2369            .downcast_ref::<Float32Array>()
2370            .unwrap();
2371        assert_eq!(child.value(0), 0.0);
2372        assert_eq!(child.value(1), 0.0);
2373        assert_eq!(child.value(2), 0.0);
2374        assert_eq!(child.value(3), 0.0);
2375    }
2376
2377    #[test]
2378    fn test_values_to_fixed_size_list_all_nulls() {
2379        let values = vec![Value::Null, Value::Null, Value::Null];
2380        let arr_ref = values_to_array(
2381            &values,
2382            &ArrowDataType::FixedSizeList(
2383                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2384                3,
2385            ),
2386        )
2387        .unwrap();
2388
2389        let arr = arr_ref
2390            .as_any()
2391            .downcast_ref::<FixedSizeListArray>()
2392            .unwrap();
2393
2394        assert_eq!(arr.len(), 3);
2395        assert!(!arr.is_valid(0));
2396        assert!(!arr.is_valid(1));
2397        assert!(!arr.is_valid(2));
2398
2399        // Verify child array length is correct (3 rows × 3 dims = 9)
2400        let child = arr
2401            .values()
2402            .as_any()
2403            .downcast_ref::<Float32Array>()
2404            .unwrap();
2405        assert_eq!(child.len(), 9);
2406    }
2407
2408    #[test]
2409    fn test_values_to_fixed_size_list_mixed_types() {
2410        let values = vec![
2411            Value::Vector(vec![1.0, 2.0]),
2412            Value::List(vec![Value::Float(3.0), Value::Float(4.0)]),
2413            Value::Null,
2414            Value::String("invalid".to_string()),
2415        ];
2416        let arr_ref = values_to_array(
2417            &values,
2418            &ArrowDataType::FixedSizeList(
2419                Arc::new(Field::new("item", ArrowDataType::Float32, false)),
2420                2,
2421            ),
2422        )
2423        .unwrap();
2424
2425        let arr = arr_ref
2426            .as_any()
2427            .downcast_ref::<FixedSizeListArray>()
2428            .unwrap();
2429
2430        assert_eq!(arr.len(), 4);
2431        assert!(arr.is_valid(0)); // Value::Vector
2432        assert!(arr.is_valid(1)); // Value::List
2433        assert!(!arr.is_valid(2)); // Value::Null
2434        assert!(!arr.is_valid(3)); // Value::String
2435
2436        // Check values for valid entries
2437        let child = arr
2438            .values()
2439            .as_any()
2440            .downcast_ref::<Float32Array>()
2441            .unwrap();
2442        assert_eq!(child.value(0), 1.0);
2443        assert_eq!(child.value(1), 2.0);
2444        assert_eq!(child.value(2), 3.0);
2445        assert_eq!(child.value(3), 4.0);
2446    }
2447
2448    // Tests for PropertyExtractor::build_vector_column
2449
2450    #[test]
2451    fn test_build_vector_column_with_nulls_and_deleted() {
2452        let data_type = DataType::Vector { dimensions: 3 };
2453        let extractor = PropertyExtractor::new("test_vec", &data_type);
2454
2455        let props = [
2456            Some(Value::Vector(vec![1.0, 2.0, 3.0])),
2457            None,              // Missing property
2458            Some(Value::Null), // Null value
2459            Some(Value::Vector(vec![4.0, 5.0, 6.0])),
2460        ];
2461        let deleted = [false, false, false, true]; // Last one is deleted
2462
2463        let arr_ref = extractor
2464            .build_vector_column(4, &deleted, |i| props[i].as_ref(), 3)
2465            .unwrap();
2466
2467        let arr = arr_ref
2468            .as_any()
2469            .downcast_ref::<FixedSizeListArray>()
2470            .unwrap();
2471
2472        assert_eq!(arr.len(), 4);
2473        assert!(arr.is_valid(0)); // Valid vector
2474        assert!(!arr.is_valid(1)); // Missing property
2475        assert!(!arr.is_valid(2)); // Null value
2476        assert!(arr.is_valid(3)); // Deleted entry (valid with zeros)
2477
2478        // Check values
2479        let child = arr
2480            .values()
2481            .as_any()
2482            .downcast_ref::<Float32Array>()
2483            .unwrap();
2484        assert_eq!(child.value(0), 1.0);
2485        assert_eq!(child.value(1), 2.0);
2486        assert_eq!(child.value(2), 3.0);
2487        // Indices 3-5: zeros for missing
2488        // Indices 6-8: zeros for null
2489        // Indices 9-11: zeros for deleted (but marked as valid)
2490        assert_eq!(child.value(9), 0.0);
2491        assert_eq!(child.value(10), 0.0);
2492        assert_eq!(child.value(11), 0.0);
2493    }
2494
2495    #[test]
2496    fn test_build_vector_column_with_list_input() {
2497        let data_type = DataType::Vector { dimensions: 2 };
2498        let extractor = PropertyExtractor::new("test_vec", &data_type);
2499
2500        let props = [
2501            Some(Value::List(vec![Value::Float(1.0), Value::Float(2.0)])),
2502            Some(Value::List(vec![Value::Int(3), Value::Int(4)])),
2503            Some(Value::Vector(vec![5.0, 6.0])),
2504        ];
2505        let deleted = [false, false, false];
2506
2507        let arr_ref = extractor
2508            .build_vector_column(3, &deleted, |i| props[i].as_ref(), 2)
2509            .unwrap();
2510
2511        let arr = arr_ref
2512            .as_any()
2513            .downcast_ref::<FixedSizeListArray>()
2514            .unwrap();
2515
2516        assert_eq!(arr.len(), 3);
2517        assert!(arr.is_valid(0));
2518        assert!(arr.is_valid(1));
2519        assert!(arr.is_valid(2));
2520
2521        // Check values
2522        let child = arr
2523            .values()
2524            .as_any()
2525            .downcast_ref::<Float32Array>()
2526            .unwrap();
2527        assert_eq!(child.value(0), 1.0);
2528        assert_eq!(child.value(1), 2.0);
2529        assert_eq!(child.value(2), 3.0);
2530        assert_eq!(child.value(3), 4.0);
2531        assert_eq!(child.value(4), 5.0);
2532        assert_eq!(child.value(5), 6.0);
2533    }
2534
2535    /// H13: out-of-range i64 values must become NULL in i32/date32 columns
2536    /// instead of silently wrapping to a different number/date.
2537    #[test]
2538    fn test_int32_and_date32_columns_null_out_of_range() {
2539        let dt = DataType::Int64;
2540        let extractor = PropertyExtractor::new("x", &dt);
2541
2542        let over = Value::Int(i64::from(i32::MAX) + 1);
2543        let ok = Value::Int(42);
2544        let vals = [over, ok];
2545        let deleted = [false, false];
2546
2547        let arr = extractor
2548            .build_int32_column(2, &deleted, |i| Some(&vals[i]))
2549            .unwrap();
2550        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2551        assert!(
2552            arr.is_null(0),
2553            "out-of-range i64 must be NULL in an int32 column, not wrapped"
2554        );
2555        assert_eq!(arr.value(1), 42);
2556
2557        let arr = extractor
2558            .build_date32_column(2, &deleted, |i| Some(&vals[i]))
2559            .unwrap();
2560        let arr = arr.as_any().downcast_ref::<Date32Array>().unwrap();
2561        assert!(
2562            arr.is_null(0),
2563            "out-of-range day count must be NULL in a date32 column, not wrapped"
2564        );
2565        assert_eq!(arr.value(1), 42);
2566    }
2567}