Skip to main content

uni_store/storage/
value_codec.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow column value decoding utilities.
5//!
6//! Provides a unified `value_from_column` function for decoding Arrow column
7//! values to `serde_json::Value`, used by both PropertyManager and DeltaDataset.
8
9use anyhow::{Result, anyhow};
10use arrow_array::{
11    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
12    Int32Array, Int64Array, LargeBinaryArray, ListArray, StringArray, StructArray,
13    Time64NanosecondArray, TimestampNanosecondArray,
14};
15use serde_json::Value;
16use uni_common::{DataType, TemporalValue};
17use uni_crdt::Crdt;
18
19/// Controls how CRDT decode errors are handled.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum CrdtDecodeMode {
22    /// Return an error on CRDT decode failure (strict validation).
23    #[default]
24    Strict,
25    /// Log a warning and return a default GCounter on failure (lenient).
26    Lenient,
27}
28
29/// Maximum recursion depth for nested List/Map decoding to prevent stack overflow.
30/// Issue #62: Added to prevent stack overflow from deeply nested structures.
31pub const MAX_DECODE_DEPTH: usize = 32;
32
33/// Decode an Arrow column value to a serde_json::Value.
34///
35/// # Arguments
36/// * `col` - The Arrow array to read from
37/// * `data_type` - The uni_common::DataType describing the column's logical type
38/// * `row` - The row index to read
39/// * `crdt_mode` - How to handle CRDT decode errors
40///
41/// # Returns
42/// The decoded JSON value, or an error if decoding fails.
43pub fn value_from_column(
44    col: &dyn Array,
45    data_type: &DataType,
46    row: usize,
47    crdt_mode: CrdtDecodeMode,
48) -> Result<Value> {
49    value_from_column_inner(col, data_type, row, crdt_mode, 0)
50}
51
52/// Internal implementation of value_from_column with depth tracking.
53fn value_from_column_inner(
54    col: &dyn Array,
55    data_type: &DataType,
56    row: usize,
57    crdt_mode: CrdtDecodeMode,
58    depth: usize,
59) -> Result<Value> {
60    if depth > MAX_DECODE_DEPTH {
61        return Err(anyhow!("decode depth exceeded (max {})", MAX_DECODE_DEPTH));
62    }
63    match data_type {
64        DataType::String => {
65            let s = col
66                .as_any()
67                .downcast_ref::<StringArray>()
68                .ok_or_else(|| anyhow!("Invalid string col"))?
69                .value(row);
70            Ok(Value::String(s.to_string()))
71        }
72        DataType::Int32 => {
73            let v = col
74                .as_any()
75                .downcast_ref::<Int32Array>()
76                .ok_or_else(|| anyhow!("Invalid int32 col"))?
77                .value(row);
78            Ok(serde_json::json!(v))
79        }
80        DataType::Int64 => {
81            let v = col
82                .as_any()
83                .downcast_ref::<Int64Array>()
84                .ok_or_else(|| anyhow!("Invalid int64 col"))?
85                .value(row);
86            Ok(serde_json::json!(v))
87        }
88        DataType::Float32 => {
89            let v = col
90                .as_any()
91                .downcast_ref::<Float32Array>()
92                .ok_or_else(|| anyhow!("Invalid float32 col"))?
93                .value(row);
94            Ok(serde_json::json!(v))
95        }
96        DataType::Float64 => {
97            let v = col
98                .as_any()
99                .downcast_ref::<Float64Array>()
100                .ok_or_else(|| anyhow!("Invalid float64 col"))?
101                .value(row);
102            Ok(serde_json::json!(v))
103        }
104        DataType::Bool => {
105            let v = col
106                .as_any()
107                .downcast_ref::<BooleanArray>()
108                .ok_or_else(|| anyhow!("Invalid bool col"))?
109                .value(row);
110            Ok(serde_json::json!(v))
111        }
112        DataType::Vector { .. } => {
113            let list_arr = col
114                .as_any()
115                .downcast_ref::<FixedSizeListArray>()
116                .ok_or_else(|| anyhow!("Invalid fixed list col for vector"))?;
117            let values = list_arr.value(row);
118            let float_values = values
119                .as_any()
120                .downcast_ref::<Float32Array>()
121                .ok_or_else(|| anyhow!("Invalid float32 inner col for vector"))?;
122
123            let vec: Vec<f32> = (0..float_values.len())
124                .map(|i| float_values.value(i))
125                .collect();
126            Ok(serde_json::json!(vec))
127        }
128        DataType::CypherValue => {
129            let bytes = col
130                .as_any()
131                .downcast_ref::<LargeBinaryArray>()
132                .ok_or_else(|| anyhow!("Invalid large binary col for CypherValue"))?
133                .value(row);
134            if bytes.is_empty() {
135                return Ok(Value::Null);
136            }
137            let uni_val = uni_common::cypher_value_codec::decode(bytes)
138                .map_err(|e| anyhow!("CypherValue decode error: {}", e))?;
139            // Convert uni_common::Value to serde_json::Value
140            Ok(uni_val.into())
141        }
142        DataType::Bytes => {
143            let arr = col
144                .as_any()
145                .downcast_ref::<LargeBinaryArray>()
146                .ok_or_else(|| anyhow!("Invalid large binary col for Bytes"))?;
147            if arr.is_null(row) {
148                return Ok(Value::Null);
149            }
150            // serde_json::Value has no native bytes variant; encode as JSON array of u8.
151            let bytes = arr.value(row);
152            Ok(Value::Array(
153                bytes.iter().map(|b| serde_json::json!(*b)).collect(),
154            ))
155        }
156        DataType::Crdt(_) => {
157            let bytes = col
158                .as_any()
159                .downcast_ref::<BinaryArray>()
160                .ok_or_else(|| anyhow!("Invalid binary col for CRDT"))?
161                .value(row);
162
163            match crdt_mode {
164                CrdtDecodeMode::Strict => {
165                    let crdt = Crdt::from_msgpack(bytes)
166                        .map_err(|e| anyhow!("CRDT decode error: {}", e))?;
167                    Ok(serde_json::to_value(crdt)?)
168                }
169                CrdtDecodeMode::Lenient => {
170                    let crdt = Crdt::from_msgpack(bytes).unwrap_or_else(|e| {
171                        log::warn!("Failed to deserialize CRDT: {}", e);
172                        Crdt::GCounter(uni_crdt::GCounter::new())
173                    });
174                    Ok(serde_json::to_value(crdt).unwrap_or(Value::Null))
175                }
176            }
177        }
178        DataType::List(inner) => {
179            let list_arr = col
180                .as_any()
181                .downcast_ref::<ListArray>()
182                .ok_or_else(|| anyhow!("Invalid list col"))?;
183            if list_arr.is_null(row) {
184                return Ok(Value::Null);
185            }
186            let values = list_arr.value(row);
187            let mut vec = Vec::with_capacity(values.len());
188            for i in 0..values.len() {
189                vec.push(value_from_column_inner(
190                    values.as_ref(),
191                    inner,
192                    i,
193                    crdt_mode,
194                    depth + 1,
195                )?);
196            }
197            Ok(Value::Array(vec))
198        }
199        DataType::Map(_, _) => {
200            let list_arr = col
201                .as_any()
202                .downcast_ref::<ListArray>()
203                .ok_or_else(|| anyhow!("Invalid map (list) col"))?;
204            if list_arr.is_null(row) {
205                return Ok(Value::Null);
206            }
207            // Decode through the unified map reconstructor (single source of truth with
208            // `arrow_to_value`): it handles typed scalar value children, raw-`Bytes`
209            // (`uni_raw_bytes`-marked) children, and CV-encoded nested-value fallback
210            // children uniformly by runtime Arrow type — so this is correct regardless of
211            // the declared value type (a nested value is stored as a CV LargeBinary, which
212            // a declared-type recursion would otherwise fail to downcast).
213            let struct_arr = list_arr.value(row);
214            let uni_map = super::arrow_convert::try_reconstruct_map(&struct_arr)
215                .ok_or_else(|| anyhow!("Invalid struct array inner for map"))?;
216            let mut map = serde_json::Map::with_capacity(uni_map.len());
217            for (k, v) in uni_map {
218                map.insert(
219                    k,
220                    serde_json::to_value(&v).unwrap_or(serde_json::Value::Null),
221                );
222            }
223            Ok(Value::Object(map))
224        }
225        DataType::Date => {
226            let arr = col
227                .as_any()
228                .downcast_ref::<Date32Array>()
229                .ok_or_else(|| anyhow!("Invalid date32 col"))?;
230            if arr.is_null(row) {
231                return Ok(Value::Null);
232            }
233            let days = arr.value(row);
234            let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
235            if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) {
236                Ok(Value::String(date.format("%Y-%m-%d").to_string()))
237            } else {
238                Ok(Value::Null)
239            }
240        }
241        DataType::Time => {
242            // Preferred schema: struct{nanos_since_midnight, offset_seconds}
243            if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
244                && let (Some(nanos_col), Some(offset_col)) = (
245                    struct_arr.column_by_name("nanos_since_midnight"),
246                    struct_arr.column_by_name("offset_seconds"),
247                )
248                && let (Some(nanos_arr), Some(offset_arr)) = (
249                    nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
250                    offset_col.as_any().downcast_ref::<Int32Array>(),
251                )
252            {
253                if nanos_arr.is_null(row) {
254                    return Ok(Value::Null);
255                }
256                let tv = if offset_arr.is_null(row) {
257                    TemporalValue::LocalTime {
258                        nanos_since_midnight: nanos_arr.value(row),
259                    }
260                } else {
261                    TemporalValue::Time {
262                        nanos_since_midnight: nanos_arr.value(row),
263                        offset_seconds: offset_arr.value(row),
264                    }
265                };
266                return Ok(Value::String(tv.to_string()));
267            }
268
269            // Legacy schema: plain time64 nanos, assume UTC offset=0
270            let arr = col
271                .as_any()
272                .downcast_ref::<Time64NanosecondArray>()
273                .ok_or_else(|| anyhow!("Invalid time64 col"))?;
274            if arr.is_null(row) {
275                return Ok(Value::Null);
276            }
277            let tv = TemporalValue::Time {
278                nanos_since_midnight: arr.value(row),
279                offset_seconds: 0,
280            };
281            Ok(Value::String(tv.to_string()))
282        }
283        DataType::Duration => {
284            // Duration is stored as LargeBinary via CypherValue codec
285            let arr = col
286                .as_any()
287                .downcast_ref::<LargeBinaryArray>()
288                .ok_or_else(|| anyhow!("Invalid duration col (expected LargeBinary)"))?;
289            if arr.is_null(row) {
290                return Ok(Value::Null);
291            }
292            let bytes = arr.value(row);
293            let uni_val = uni_common::cypher_value_codec::decode(bytes)
294                .map_err(|e| anyhow!("Failed to decode duration: {}", e))?;
295            // Return canonical ISO-8601 text for compatibility.
296            if let uni_common::Value::Temporal(uni_common::TemporalValue::Duration {
297                months,
298                days,
299                nanos,
300            }) = &uni_val
301            {
302                let tv = TemporalValue::Duration {
303                    months: *months,
304                    days: *days,
305                    nanos: *nanos,
306                };
307                Ok(Value::String(tv.to_string()))
308            } else {
309                Ok(serde_json::json!(uni_val.to_string()))
310            }
311        }
312        DataType::DateTime | DataType::Timestamp => {
313            // Preferred schema: struct{nanos_since_epoch, offset_seconds, timezone_name}
314            if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
315                && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
316                    struct_arr.column_by_name("nanos_since_epoch"),
317                    struct_arr.column_by_name("offset_seconds"),
318                    struct_arr.column_by_name("timezone_name"),
319                )
320                && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
321                    nanos_col
322                        .as_any()
323                        .downcast_ref::<TimestampNanosecondArray>(),
324                    offset_col.as_any().downcast_ref::<Int32Array>(),
325                    tz_col.as_any().downcast_ref::<StringArray>(),
326                )
327            {
328                if nanos_arr.is_null(row) {
329                    return Ok(Value::Null);
330                }
331                let tv = if offset_arr.is_null(row) {
332                    TemporalValue::LocalDateTime {
333                        nanos_since_epoch: nanos_arr.value(row),
334                    }
335                } else {
336                    let timezone_name =
337                        (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
338                    TemporalValue::DateTime {
339                        nanos_since_epoch: nanos_arr.value(row),
340                        offset_seconds: offset_arr.value(row),
341                        timezone_name,
342                    }
343                };
344                return Ok(Value::String(tv.to_string()));
345            }
346
347            // Legacy schema: plain timestamp nanos, assume UTC offset=0
348            let arr = col
349                .as_any()
350                .downcast_ref::<TimestampNanosecondArray>()
351                .ok_or_else(|| anyhow!("Invalid timestamp col"))?;
352            if arr.is_null(row) {
353                return Ok(Value::Null);
354            }
355            let tv = TemporalValue::DateTime {
356                nanos_since_epoch: arr.value(row),
357                offset_seconds: 0,
358                timezone_name: arr.timezone().map(|s| s.to_string()),
359            };
360            Ok(Value::String(tv.to_string()))
361        }
362        _ => Ok(Value::Null),
363    }
364}
365
366/// Decode an Arrow column value to a [`uni_common::Value`], preserving
367/// `Value::Temporal` variants for round-trip fidelity.
368///
369/// For DateTime/Timestamp/Date/Time, delegates to [`super::arrow_convert::arrow_to_value`].
370/// For all other types, decodes via [`value_from_column`] and converts.
371pub fn decode_column_value(
372    col: &dyn Array,
373    data_type: &DataType,
374    row: usize,
375    crdt_mode: CrdtDecodeMode,
376) -> anyhow::Result<uni_common::Value> {
377    match data_type {
378        DataType::DateTime
379        | DataType::Timestamp
380        | DataType::Date
381        | DataType::Time
382        | DataType::Btic
383        | DataType::Bytes
384        // Maps decode natively (full fidelity, CV-aware) via the unified
385        // `try_reconstruct_map` path inside `arrow_to_value`, which handles typed scalar
386        // value children, raw-`Bytes` (uni_raw_bytes-marked) children, and CV-encoded
387        // nested-value fallback children uniformly by runtime Arrow type.
388        | DataType::Map(_, _) => Ok(super::arrow_convert::arrow_to_value(
389            col,
390            row,
391            Some(data_type),
392        )),
393        _ => value_from_column(col, data_type, row, crdt_mode).map(uni_common::Value::from),
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use arrow_array::builder::{Int64Builder, StringBuilder};
401
402    #[test]
403    fn test_decode_string() {
404        let mut builder = StringBuilder::new();
405        builder.append_value("hello");
406        builder.append_value("world");
407        let array = builder.finish();
408
409        let val = value_from_column(&array, &DataType::String, 0, CrdtDecodeMode::Strict).unwrap();
410        assert_eq!(val, Value::String("hello".to_string()));
411
412        let val = value_from_column(&array, &DataType::String, 1, CrdtDecodeMode::Strict).unwrap();
413        assert_eq!(val, Value::String("world".to_string()));
414    }
415
416    #[test]
417    fn test_decode_int64() {
418        let mut builder = Int64Builder::new();
419        builder.append_value(42);
420        builder.append_value(-100);
421        let array = builder.finish();
422
423        let val = value_from_column(&array, &DataType::Int64, 0, CrdtDecodeMode::Strict).unwrap();
424        assert_eq!(val, serde_json::json!(42));
425
426        let val = value_from_column(&array, &DataType::Int64, 1, CrdtDecodeMode::Strict).unwrap();
427        assert_eq!(val, serde_json::json!(-100));
428    }
429
430    #[test]
431    fn test_decode_json() {
432        use arrow_array::builder::LargeBinaryBuilder;
433
434        // Encode JSON values as JSONB binary (matching the LargeBinary storage format)
435        let mut builder = LargeBinaryBuilder::new();
436
437        let obj_cv = {
438            let val: uni_common::Value = serde_json::json!({"key": "value"}).into();
439            uni_common::cypher_value_codec::encode(&val)
440        };
441        builder.append_value(&obj_cv);
442
443        let null_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::Null);
444        builder.append_value(&null_cv);
445
446        let text_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::String(
447            "plain text".to_string(),
448        ));
449        builder.append_value(&text_cv);
450
451        let array = builder.finish();
452
453        let val =
454            value_from_column(&array, &DataType::CypherValue, 0, CrdtDecodeMode::Strict).unwrap();
455        assert_eq!(val, serde_json::json!({"key": "value"}));
456
457        let val =
458            value_from_column(&array, &DataType::CypherValue, 1, CrdtDecodeMode::Strict).unwrap();
459        assert_eq!(val, Value::Null);
460
461        let val =
462            value_from_column(&array, &DataType::CypherValue, 2, CrdtDecodeMode::Strict).unwrap();
463        assert_eq!(val, Value::String("plain text".to_string()));
464    }
465
466    #[test]
467    fn test_decode_bool() {
468        use arrow_array::builder::BooleanBuilder;
469        let mut builder = BooleanBuilder::new();
470        builder.append_value(true);
471        builder.append_value(false);
472        let array = builder.finish();
473
474        let val = value_from_column(&array, &DataType::Bool, 0, CrdtDecodeMode::Strict).unwrap();
475        assert_eq!(val, serde_json::json!(true));
476
477        let val = value_from_column(&array, &DataType::Bool, 1, CrdtDecodeMode::Strict).unwrap();
478        assert_eq!(val, serde_json::json!(false));
479    }
480
481    #[test]
482    fn test_decode_float64() {
483        use arrow_array::builder::Float64Builder;
484        let mut builder = Float64Builder::new();
485        builder.append_value(3.25);
486        builder.append_value(-0.5);
487        let array = builder.finish();
488
489        let val = value_from_column(&array, &DataType::Float64, 0, CrdtDecodeMode::Strict).unwrap();
490        assert_eq!(val, serde_json::json!(3.25));
491
492        let val = value_from_column(&array, &DataType::Float64, 1, CrdtDecodeMode::Strict).unwrap();
493        assert_eq!(val, serde_json::json!(-0.5));
494    }
495
496    #[test]
497    fn test_decode_int32() {
498        use arrow_array::builder::Int32Builder;
499        let mut builder = Int32Builder::new();
500        builder.append_value(42);
501        builder.append_value(-1);
502        let array = builder.finish();
503
504        let val = value_from_column(&array, &DataType::Int32, 0, CrdtDecodeMode::Strict).unwrap();
505        assert_eq!(val, serde_json::json!(42));
506
507        let val = value_from_column(&array, &DataType::Int32, 1, CrdtDecodeMode::Strict).unwrap();
508        assert_eq!(val, serde_json::json!(-1));
509    }
510
511    #[test]
512    fn test_decode_float32() {
513        use arrow_array::builder::Float32Builder;
514        let mut builder = Float32Builder::new();
515        builder.append_value(1.5);
516        let array = builder.finish();
517
518        let val = value_from_column(&array, &DataType::Float32, 0, CrdtDecodeMode::Strict).unwrap();
519        // Float32 has limited precision so compare approximately
520        let f = val.as_f64().unwrap();
521        assert!((f - 1.5).abs() < 0.001);
522    }
523
524    #[test]
525    fn test_decode_vector() {
526        use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
527        let values_builder = Float32Builder::new();
528        let mut builder = FixedSizeListBuilder::new(values_builder, 3);
529        builder.values().append_value(1.0);
530        builder.values().append_value(2.0);
531        builder.values().append_value(3.0);
532        builder.append(true);
533        let array = builder.finish();
534
535        let val = value_from_column(
536            &array,
537            &DataType::Vector { dimensions: 3 },
538            0,
539            CrdtDecodeMode::Strict,
540        )
541        .unwrap();
542        assert_eq!(val, serde_json::json!([1.0, 2.0, 3.0]));
543    }
544
545    #[test]
546    fn test_decode_date() {
547        use arrow_array::builder::Date32Builder;
548        let mut builder = Date32Builder::new();
549        // 2021-01-01 = 18628 days since epoch
550        builder.append_value(18628);
551        let array = builder.finish();
552
553        let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
554        assert_eq!(val, Value::String("2021-01-01".to_string()));
555    }
556
557    #[test]
558    fn test_decode_date_null() {
559        use arrow_array::builder::Date32Builder;
560        let mut builder = Date32Builder::new();
561        builder.append_null();
562        let array = builder.finish();
563
564        let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
565        assert_eq!(val, Value::Null);
566    }
567
568    #[test]
569    fn test_decode_list_of_strings() {
570        use arrow_array::builder::{ListBuilder, StringBuilder};
571        let values_builder = StringBuilder::new();
572        let mut builder = ListBuilder::new(values_builder);
573        builder.values().append_value("a");
574        builder.values().append_value("b");
575        builder.values().append_value("c");
576        builder.append(true);
577        let array = builder.finish();
578
579        let val = value_from_column(
580            &array,
581            &DataType::List(Box::new(DataType::String)),
582            0,
583            CrdtDecodeMode::Strict,
584        )
585        .unwrap();
586        assert_eq!(val, serde_json::json!(["a", "b", "c"]));
587    }
588
589    #[test]
590    fn test_decode_list_of_ints() {
591        use arrow_array::builder::{Int64Builder, ListBuilder};
592        let values_builder = Int64Builder::new();
593        let mut builder = ListBuilder::new(values_builder);
594        builder.values().append_value(1);
595        builder.values().append_value(2);
596        builder.values().append_value(3);
597        builder.append(true);
598        let array = builder.finish();
599
600        let val = value_from_column(
601            &array,
602            &DataType::List(Box::new(DataType::Int64)),
603            0,
604            CrdtDecodeMode::Strict,
605        )
606        .unwrap();
607        assert_eq!(val, serde_json::json!([1, 2, 3]));
608    }
609
610    #[test]
611    fn test_decode_list_null() {
612        use arrow_array::builder::{Int64Builder, ListBuilder};
613        let values_builder = Int64Builder::new();
614        let mut builder = ListBuilder::new(values_builder);
615        builder.append_null();
616        let array = builder.finish();
617
618        let val = value_from_column(
619            &array,
620            &DataType::List(Box::new(DataType::Int64)),
621            0,
622            CrdtDecodeMode::Strict,
623        )
624        .unwrap();
625        assert_eq!(val, Value::Null);
626    }
627
628    #[test]
629    fn test_decode_unknown_type_returns_null() {
630        // Using a String array but decoding with an unhandled type should return Null
631        let mut builder = StringBuilder::new();
632        builder.append_value("test");
633        let array = builder.finish();
634
635        let val = value_from_column(
636            &array,
637            &DataType::Point(uni_common::core::schema::PointType::Geographic),
638            0,
639            CrdtDecodeMode::Strict,
640        );
641        // Point type falls through to the _ => Ok(Value::Null) arm
642        assert_eq!(val.unwrap(), Value::Null);
643    }
644}