Skip to main content

uni_store/storage/
value_codec.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Arrow column value decoding utilities.
5//!
6//! Provides a unified `value_from_column` function for decoding Arrow column
7//! values to `serde_json::Value`, used by both PropertyManager and DeltaDataset.
8
9use anyhow::{Result, anyhow};
10use arrow_array::{
11    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
12    Int32Array, Int64Array, LargeBinaryArray, ListArray, StringArray, StructArray,
13    Time64NanosecondArray, TimestampNanosecondArray,
14};
15use serde_json::Value;
16use uni_common::{DataType, TemporalValue};
17use uni_crdt::Crdt;
18
19/// Controls how CRDT decode errors are handled.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum CrdtDecodeMode {
22    /// Return an error on CRDT decode failure (strict validation).
23    #[default]
24    Strict,
25    /// Log a warning and return a default GCounter on failure (lenient).
26    Lenient,
27}
28
29/// Maximum recursion depth for nested List/Map decoding to prevent stack overflow.
30/// Issue #62: Added to prevent stack overflow from deeply nested structures.
31pub const MAX_DECODE_DEPTH: usize = 32;
32
33/// Decode an Arrow column value to a serde_json::Value.
34///
35/// # Arguments
36/// * `col` - The Arrow array to read from
37/// * `data_type` - The uni_common::DataType describing the column's logical type
38/// * `row` - The row index to read
39/// * `crdt_mode` - How to handle CRDT decode errors
40///
41/// # Returns
42/// The decoded JSON value, or an error if decoding fails.
43pub fn value_from_column(
44    col: &dyn Array,
45    data_type: &DataType,
46    row: usize,
47    crdt_mode: CrdtDecodeMode,
48) -> Result<Value> {
49    value_from_column_inner(col, data_type, row, crdt_mode, 0)
50}
51
52/// Internal implementation of value_from_column with depth tracking.
53fn value_from_column_inner(
54    col: &dyn Array,
55    data_type: &DataType,
56    row: usize,
57    crdt_mode: CrdtDecodeMode,
58    depth: usize,
59) -> Result<Value> {
60    if depth > MAX_DECODE_DEPTH {
61        return Err(anyhow!("decode depth exceeded (max {})", MAX_DECODE_DEPTH));
62    }
63    match data_type {
64        DataType::String => {
65            let s = col
66                .as_any()
67                .downcast_ref::<StringArray>()
68                .ok_or_else(|| anyhow!("Invalid string col"))?
69                .value(row);
70            Ok(Value::String(s.to_string()))
71        }
72        DataType::Int32 => {
73            let v = col
74                .as_any()
75                .downcast_ref::<Int32Array>()
76                .ok_or_else(|| anyhow!("Invalid int32 col"))?
77                .value(row);
78            Ok(serde_json::json!(v))
79        }
80        DataType::Int64 => {
81            let v = col
82                .as_any()
83                .downcast_ref::<Int64Array>()
84                .ok_or_else(|| anyhow!("Invalid int64 col"))?
85                .value(row);
86            Ok(serde_json::json!(v))
87        }
88        DataType::Float32 => {
89            let v = col
90                .as_any()
91                .downcast_ref::<Float32Array>()
92                .ok_or_else(|| anyhow!("Invalid float32 col"))?
93                .value(row);
94            Ok(serde_json::json!(v))
95        }
96        DataType::Float64 => {
97            let v = col
98                .as_any()
99                .downcast_ref::<Float64Array>()
100                .ok_or_else(|| anyhow!("Invalid float64 col"))?
101                .value(row);
102            Ok(serde_json::json!(v))
103        }
104        DataType::Bool => {
105            let v = col
106                .as_any()
107                .downcast_ref::<BooleanArray>()
108                .ok_or_else(|| anyhow!("Invalid bool col"))?
109                .value(row);
110            Ok(serde_json::json!(v))
111        }
112        DataType::Vector { .. } => {
113            let list_arr = col
114                .as_any()
115                .downcast_ref::<FixedSizeListArray>()
116                .ok_or_else(|| anyhow!("Invalid fixed list col for vector"))?;
117            let values = list_arr.value(row);
118            let float_values = values
119                .as_any()
120                .downcast_ref::<Float32Array>()
121                .ok_or_else(|| anyhow!("Invalid float32 inner col for vector"))?;
122
123            let vec: Vec<f32> = (0..float_values.len())
124                .map(|i| float_values.value(i))
125                .collect();
126            Ok(serde_json::json!(vec))
127        }
128        DataType::CypherValue => {
129            let bytes = col
130                .as_any()
131                .downcast_ref::<LargeBinaryArray>()
132                .ok_or_else(|| anyhow!("Invalid large binary col for CypherValue"))?
133                .value(row);
134            if bytes.is_empty() {
135                return Ok(Value::Null);
136            }
137            let uni_val = uni_common::cypher_value_codec::decode(bytes)
138                .map_err(|e| anyhow!("CypherValue decode error: {}", e))?;
139            // Convert uni_common::Value to serde_json::Value
140            Ok(uni_val.into())
141        }
142        DataType::Crdt(_) => {
143            let bytes = col
144                .as_any()
145                .downcast_ref::<BinaryArray>()
146                .ok_or_else(|| anyhow!("Invalid binary col for CRDT"))?
147                .value(row);
148
149            match crdt_mode {
150                CrdtDecodeMode::Strict => {
151                    let crdt = Crdt::from_msgpack(bytes)
152                        .map_err(|e| anyhow!("CRDT decode error: {}", e))?;
153                    Ok(serde_json::to_value(crdt)?)
154                }
155                CrdtDecodeMode::Lenient => {
156                    let crdt = Crdt::from_msgpack(bytes).unwrap_or_else(|e| {
157                        log::warn!("Failed to deserialize CRDT: {}", e);
158                        Crdt::GCounter(uni_crdt::GCounter::new())
159                    });
160                    Ok(serde_json::to_value(crdt).unwrap_or(Value::Null))
161                }
162            }
163        }
164        DataType::List(inner) => {
165            let list_arr = col
166                .as_any()
167                .downcast_ref::<ListArray>()
168                .ok_or_else(|| anyhow!("Invalid list col"))?;
169            if list_arr.is_null(row) {
170                return Ok(Value::Null);
171            }
172            let values = list_arr.value(row);
173            let mut vec = Vec::with_capacity(values.len());
174            for i in 0..values.len() {
175                vec.push(value_from_column_inner(
176                    values.as_ref(),
177                    inner,
178                    i,
179                    crdt_mode,
180                    depth + 1,
181                )?);
182            }
183            Ok(Value::Array(vec))
184        }
185        DataType::Map(key_type, value_type) => {
186            let list_arr = col
187                .as_any()
188                .downcast_ref::<ListArray>()
189                .ok_or_else(|| anyhow!("Invalid map (list) col"))?;
190            if list_arr.is_null(row) {
191                return Ok(Value::Null);
192            }
193            let struct_arr = list_arr.value(row);
194            let struct_arr_ref = struct_arr
195                .as_any()
196                .downcast_ref::<StructArray>()
197                .ok_or_else(|| anyhow!("Invalid struct array inner for map"))?;
198
199            let keys = struct_arr_ref.column(0);
200            let values = struct_arr_ref.column(1);
201
202            let mut map = serde_json::Map::with_capacity(struct_arr_ref.len());
203
204            for i in 0..struct_arr_ref.len() {
205                let k_val =
206                    value_from_column_inner(keys.as_ref(), key_type, i, crdt_mode, depth + 1)?;
207                let v_val =
208                    value_from_column_inner(values.as_ref(), value_type, i, crdt_mode, depth + 1)?;
209
210                // Convert key to string for JSON object
211                if let Some(k_str) = k_val.as_str() {
212                    map.insert(k_str.to_string(), v_val);
213                } else if let Some(k_int) = k_val.as_i64() {
214                    map.insert(k_int.to_string(), v_val);
215                } else {
216                    map.insert(k_val.to_string(), v_val);
217                }
218            }
219            Ok(Value::Object(map))
220        }
221        DataType::Date => {
222            let arr = col
223                .as_any()
224                .downcast_ref::<Date32Array>()
225                .ok_or_else(|| anyhow!("Invalid date32 col"))?;
226            if arr.is_null(row) {
227                return Ok(Value::Null);
228            }
229            let days = arr.value(row);
230            let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
231            if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) {
232                Ok(Value::String(date.format("%Y-%m-%d").to_string()))
233            } else {
234                Ok(Value::Null)
235            }
236        }
237        DataType::Time => {
238            // Preferred schema: struct{nanos_since_midnight, offset_seconds}
239            if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
240                && let (Some(nanos_col), Some(offset_col)) = (
241                    struct_arr.column_by_name("nanos_since_midnight"),
242                    struct_arr.column_by_name("offset_seconds"),
243                )
244                && let (Some(nanos_arr), Some(offset_arr)) = (
245                    nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
246                    offset_col.as_any().downcast_ref::<Int32Array>(),
247                )
248            {
249                if nanos_arr.is_null(row) {
250                    return Ok(Value::Null);
251                }
252                let tv = if offset_arr.is_null(row) {
253                    TemporalValue::LocalTime {
254                        nanos_since_midnight: nanos_arr.value(row),
255                    }
256                } else {
257                    TemporalValue::Time {
258                        nanos_since_midnight: nanos_arr.value(row),
259                        offset_seconds: offset_arr.value(row),
260                    }
261                };
262                return Ok(Value::String(tv.to_string()));
263            }
264
265            // Legacy schema: plain time64 nanos, assume UTC offset=0
266            let arr = col
267                .as_any()
268                .downcast_ref::<Time64NanosecondArray>()
269                .ok_or_else(|| anyhow!("Invalid time64 col"))?;
270            if arr.is_null(row) {
271                return Ok(Value::Null);
272            }
273            let tv = TemporalValue::Time {
274                nanos_since_midnight: arr.value(row),
275                offset_seconds: 0,
276            };
277            Ok(Value::String(tv.to_string()))
278        }
279        DataType::Duration => {
280            // Duration is stored as LargeBinary via CypherValue codec
281            let arr = col
282                .as_any()
283                .downcast_ref::<LargeBinaryArray>()
284                .ok_or_else(|| anyhow!("Invalid duration col (expected LargeBinary)"))?;
285            if arr.is_null(row) {
286                return Ok(Value::Null);
287            }
288            let bytes = arr.value(row);
289            let uni_val = uni_common::cypher_value_codec::decode(bytes)
290                .map_err(|e| anyhow!("Failed to decode duration: {}", e))?;
291            // Return canonical ISO-8601 text for compatibility.
292            if let uni_common::Value::Temporal(uni_common::TemporalValue::Duration {
293                months,
294                days,
295                nanos,
296            }) = &uni_val
297            {
298                let tv = TemporalValue::Duration {
299                    months: *months,
300                    days: *days,
301                    nanos: *nanos,
302                };
303                Ok(Value::String(tv.to_string()))
304            } else {
305                Ok(serde_json::json!(uni_val.to_string()))
306            }
307        }
308        DataType::DateTime | DataType::Timestamp => {
309            // Preferred schema: struct{nanos_since_epoch, offset_seconds, timezone_name}
310            if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
311                && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
312                    struct_arr.column_by_name("nanos_since_epoch"),
313                    struct_arr.column_by_name("offset_seconds"),
314                    struct_arr.column_by_name("timezone_name"),
315                )
316                && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
317                    nanos_col
318                        .as_any()
319                        .downcast_ref::<TimestampNanosecondArray>(),
320                    offset_col.as_any().downcast_ref::<Int32Array>(),
321                    tz_col.as_any().downcast_ref::<StringArray>(),
322                )
323            {
324                if nanos_arr.is_null(row) {
325                    return Ok(Value::Null);
326                }
327                let tv = if offset_arr.is_null(row) {
328                    TemporalValue::LocalDateTime {
329                        nanos_since_epoch: nanos_arr.value(row),
330                    }
331                } else {
332                    let timezone_name =
333                        (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
334                    TemporalValue::DateTime {
335                        nanos_since_epoch: nanos_arr.value(row),
336                        offset_seconds: offset_arr.value(row),
337                        timezone_name,
338                    }
339                };
340                return Ok(Value::String(tv.to_string()));
341            }
342
343            // Legacy schema: plain timestamp nanos, assume UTC offset=0
344            let arr = col
345                .as_any()
346                .downcast_ref::<TimestampNanosecondArray>()
347                .ok_or_else(|| anyhow!("Invalid timestamp col"))?;
348            if arr.is_null(row) {
349                return Ok(Value::Null);
350            }
351            let tv = TemporalValue::DateTime {
352                nanos_since_epoch: arr.value(row),
353                offset_seconds: 0,
354                timezone_name: arr.timezone().map(|s| s.to_string()),
355            };
356            Ok(Value::String(tv.to_string()))
357        }
358        _ => Ok(Value::Null),
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use arrow_array::builder::{Int64Builder, StringBuilder};
366
367    #[test]
368    fn test_decode_string() {
369        let mut builder = StringBuilder::new();
370        builder.append_value("hello");
371        builder.append_value("world");
372        let array = builder.finish();
373
374        let val = value_from_column(&array, &DataType::String, 0, CrdtDecodeMode::Strict).unwrap();
375        assert_eq!(val, Value::String("hello".to_string()));
376
377        let val = value_from_column(&array, &DataType::String, 1, CrdtDecodeMode::Strict).unwrap();
378        assert_eq!(val, Value::String("world".to_string()));
379    }
380
381    #[test]
382    fn test_decode_int64() {
383        let mut builder = Int64Builder::new();
384        builder.append_value(42);
385        builder.append_value(-100);
386        let array = builder.finish();
387
388        let val = value_from_column(&array, &DataType::Int64, 0, CrdtDecodeMode::Strict).unwrap();
389        assert_eq!(val, serde_json::json!(42));
390
391        let val = value_from_column(&array, &DataType::Int64, 1, CrdtDecodeMode::Strict).unwrap();
392        assert_eq!(val, serde_json::json!(-100));
393    }
394
395    #[test]
396    fn test_decode_json() {
397        use arrow_array::builder::LargeBinaryBuilder;
398
399        // Encode JSON values as JSONB binary (matching the LargeBinary storage format)
400        let mut builder = LargeBinaryBuilder::new();
401
402        let obj_cv = {
403            let val: uni_common::Value = serde_json::json!({"key": "value"}).into();
404            uni_common::cypher_value_codec::encode(&val)
405        };
406        builder.append_value(&obj_cv);
407
408        let null_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::Null);
409        builder.append_value(&null_cv);
410
411        let text_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::String(
412            "plain text".to_string(),
413        ));
414        builder.append_value(&text_cv);
415
416        let array = builder.finish();
417
418        let val =
419            value_from_column(&array, &DataType::CypherValue, 0, CrdtDecodeMode::Strict).unwrap();
420        assert_eq!(val, serde_json::json!({"key": "value"}));
421
422        let val =
423            value_from_column(&array, &DataType::CypherValue, 1, CrdtDecodeMode::Strict).unwrap();
424        assert_eq!(val, Value::Null);
425
426        let val =
427            value_from_column(&array, &DataType::CypherValue, 2, CrdtDecodeMode::Strict).unwrap();
428        assert_eq!(val, Value::String("plain text".to_string()));
429    }
430}