1use anyhow::{Result, anyhow};
10use arrow_array::{
11 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
12 Int32Array, Int64Array, LargeBinaryArray, ListArray, StringArray, StructArray,
13 Time64NanosecondArray, TimestampNanosecondArray,
14};
15use serde_json::Value;
16use uni_common::{DataType, TemporalValue};
17use uni_crdt::Crdt;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum CrdtDecodeMode {
22 #[default]
24 Strict,
25 Lenient,
27}
28
29pub const MAX_DECODE_DEPTH: usize = 32;
32
33pub fn value_from_column(
44 col: &dyn Array,
45 data_type: &DataType,
46 row: usize,
47 crdt_mode: CrdtDecodeMode,
48) -> Result<Value> {
49 value_from_column_inner(col, data_type, row, crdt_mode, 0)
50}
51
52fn value_from_column_inner(
54 col: &dyn Array,
55 data_type: &DataType,
56 row: usize,
57 crdt_mode: CrdtDecodeMode,
58 depth: usize,
59) -> Result<Value> {
60 if depth > MAX_DECODE_DEPTH {
61 return Err(anyhow!("decode depth exceeded (max {})", MAX_DECODE_DEPTH));
62 }
63 match data_type {
64 DataType::String => {
65 let s = col
66 .as_any()
67 .downcast_ref::<StringArray>()
68 .ok_or_else(|| anyhow!("Invalid string col"))?
69 .value(row);
70 Ok(Value::String(s.to_string()))
71 }
72 DataType::Int32 => {
73 let v = col
74 .as_any()
75 .downcast_ref::<Int32Array>()
76 .ok_or_else(|| anyhow!("Invalid int32 col"))?
77 .value(row);
78 Ok(serde_json::json!(v))
79 }
80 DataType::Int64 => {
81 let v = col
82 .as_any()
83 .downcast_ref::<Int64Array>()
84 .ok_or_else(|| anyhow!("Invalid int64 col"))?
85 .value(row);
86 Ok(serde_json::json!(v))
87 }
88 DataType::Float32 => {
89 let v = col
90 .as_any()
91 .downcast_ref::<Float32Array>()
92 .ok_or_else(|| anyhow!("Invalid float32 col"))?
93 .value(row);
94 Ok(serde_json::json!(v))
95 }
96 DataType::Float64 => {
97 let v = col
98 .as_any()
99 .downcast_ref::<Float64Array>()
100 .ok_or_else(|| anyhow!("Invalid float64 col"))?
101 .value(row);
102 Ok(serde_json::json!(v))
103 }
104 DataType::Bool => {
105 let v = col
106 .as_any()
107 .downcast_ref::<BooleanArray>()
108 .ok_or_else(|| anyhow!("Invalid bool col"))?
109 .value(row);
110 Ok(serde_json::json!(v))
111 }
112 DataType::Vector { .. } => {
113 let list_arr = col
114 .as_any()
115 .downcast_ref::<FixedSizeListArray>()
116 .ok_or_else(|| anyhow!("Invalid fixed list col for vector"))?;
117 let values = list_arr.value(row);
118 let float_values = values
119 .as_any()
120 .downcast_ref::<Float32Array>()
121 .ok_or_else(|| anyhow!("Invalid float32 inner col for vector"))?;
122
123 let vec: Vec<f32> = (0..float_values.len())
124 .map(|i| float_values.value(i))
125 .collect();
126 Ok(serde_json::json!(vec))
127 }
128 DataType::CypherValue => {
129 let bytes = col
130 .as_any()
131 .downcast_ref::<LargeBinaryArray>()
132 .ok_or_else(|| anyhow!("Invalid large binary col for CypherValue"))?
133 .value(row);
134 if bytes.is_empty() {
135 return Ok(Value::Null);
136 }
137 let uni_val = uni_common::cypher_value_codec::decode(bytes)
138 .map_err(|e| anyhow!("CypherValue decode error: {}", e))?;
139 Ok(uni_val.into())
141 }
142 DataType::Bytes => {
143 let arr = col
144 .as_any()
145 .downcast_ref::<LargeBinaryArray>()
146 .ok_or_else(|| anyhow!("Invalid large binary col for Bytes"))?;
147 if arr.is_null(row) {
148 return Ok(Value::Null);
149 }
150 let bytes = arr.value(row);
152 Ok(Value::Array(
153 bytes.iter().map(|b| serde_json::json!(*b)).collect(),
154 ))
155 }
156 DataType::Crdt(_) => {
157 let bytes = col
158 .as_any()
159 .downcast_ref::<BinaryArray>()
160 .ok_or_else(|| anyhow!("Invalid binary col for CRDT"))?
161 .value(row);
162
163 match crdt_mode {
164 CrdtDecodeMode::Strict => {
165 let crdt = Crdt::from_msgpack(bytes)
166 .map_err(|e| anyhow!("CRDT decode error: {}", e))?;
167 Ok(serde_json::to_value(crdt)?)
168 }
169 CrdtDecodeMode::Lenient => {
170 let crdt = Crdt::from_msgpack(bytes).unwrap_or_else(|e| {
171 log::warn!("Failed to deserialize CRDT: {}", e);
172 Crdt::GCounter(uni_crdt::GCounter::new())
173 });
174 Ok(serde_json::to_value(crdt).unwrap_or(Value::Null))
175 }
176 }
177 }
178 DataType::List(inner) => {
179 let list_arr = col
180 .as_any()
181 .downcast_ref::<ListArray>()
182 .ok_or_else(|| anyhow!("Invalid list col"))?;
183 if list_arr.is_null(row) {
184 return Ok(Value::Null);
185 }
186 let values = list_arr.value(row);
187 let mut vec = Vec::with_capacity(values.len());
188 for i in 0..values.len() {
189 vec.push(value_from_column_inner(
190 values.as_ref(),
191 inner,
192 i,
193 crdt_mode,
194 depth + 1,
195 )?);
196 }
197 Ok(Value::Array(vec))
198 }
199 DataType::Map(key_type, value_type) => {
200 let list_arr = col
201 .as_any()
202 .downcast_ref::<ListArray>()
203 .ok_or_else(|| anyhow!("Invalid map (list) col"))?;
204 if list_arr.is_null(row) {
205 return Ok(Value::Null);
206 }
207 let struct_arr = list_arr.value(row);
208 let struct_arr_ref = struct_arr
209 .as_any()
210 .downcast_ref::<StructArray>()
211 .ok_or_else(|| anyhow!("Invalid struct array inner for map"))?;
212
213 let keys = struct_arr_ref.column(0);
214 let values = struct_arr_ref.column(1);
215
216 let mut map = serde_json::Map::with_capacity(struct_arr_ref.len());
217
218 for i in 0..struct_arr_ref.len() {
219 let k_val =
220 value_from_column_inner(keys.as_ref(), key_type, i, crdt_mode, depth + 1)?;
221 let v_val =
222 value_from_column_inner(values.as_ref(), value_type, i, crdt_mode, depth + 1)?;
223
224 if let Some(k_str) = k_val.as_str() {
226 map.insert(k_str.to_string(), v_val);
227 } else if let Some(k_int) = k_val.as_i64() {
228 map.insert(k_int.to_string(), v_val);
229 } else {
230 map.insert(k_val.to_string(), v_val);
231 }
232 }
233 Ok(Value::Object(map))
234 }
235 DataType::Date => {
236 let arr = col
237 .as_any()
238 .downcast_ref::<Date32Array>()
239 .ok_or_else(|| anyhow!("Invalid date32 col"))?;
240 if arr.is_null(row) {
241 return Ok(Value::Null);
242 }
243 let days = arr.value(row);
244 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
245 if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) {
246 Ok(Value::String(date.format("%Y-%m-%d").to_string()))
247 } else {
248 Ok(Value::Null)
249 }
250 }
251 DataType::Time => {
252 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
254 && let (Some(nanos_col), Some(offset_col)) = (
255 struct_arr.column_by_name("nanos_since_midnight"),
256 struct_arr.column_by_name("offset_seconds"),
257 )
258 && let (Some(nanos_arr), Some(offset_arr)) = (
259 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
260 offset_col.as_any().downcast_ref::<Int32Array>(),
261 )
262 {
263 if nanos_arr.is_null(row) {
264 return Ok(Value::Null);
265 }
266 let tv = if offset_arr.is_null(row) {
267 TemporalValue::LocalTime {
268 nanos_since_midnight: nanos_arr.value(row),
269 }
270 } else {
271 TemporalValue::Time {
272 nanos_since_midnight: nanos_arr.value(row),
273 offset_seconds: offset_arr.value(row),
274 }
275 };
276 return Ok(Value::String(tv.to_string()));
277 }
278
279 let arr = col
281 .as_any()
282 .downcast_ref::<Time64NanosecondArray>()
283 .ok_or_else(|| anyhow!("Invalid time64 col"))?;
284 if arr.is_null(row) {
285 return Ok(Value::Null);
286 }
287 let tv = TemporalValue::Time {
288 nanos_since_midnight: arr.value(row),
289 offset_seconds: 0,
290 };
291 Ok(Value::String(tv.to_string()))
292 }
293 DataType::Duration => {
294 let arr = col
296 .as_any()
297 .downcast_ref::<LargeBinaryArray>()
298 .ok_or_else(|| anyhow!("Invalid duration col (expected LargeBinary)"))?;
299 if arr.is_null(row) {
300 return Ok(Value::Null);
301 }
302 let bytes = arr.value(row);
303 let uni_val = uni_common::cypher_value_codec::decode(bytes)
304 .map_err(|e| anyhow!("Failed to decode duration: {}", e))?;
305 if let uni_common::Value::Temporal(uni_common::TemporalValue::Duration {
307 months,
308 days,
309 nanos,
310 }) = &uni_val
311 {
312 let tv = TemporalValue::Duration {
313 months: *months,
314 days: *days,
315 nanos: *nanos,
316 };
317 Ok(Value::String(tv.to_string()))
318 } else {
319 Ok(serde_json::json!(uni_val.to_string()))
320 }
321 }
322 DataType::DateTime | DataType::Timestamp => {
323 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
325 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
326 struct_arr.column_by_name("nanos_since_epoch"),
327 struct_arr.column_by_name("offset_seconds"),
328 struct_arr.column_by_name("timezone_name"),
329 )
330 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
331 nanos_col
332 .as_any()
333 .downcast_ref::<TimestampNanosecondArray>(),
334 offset_col.as_any().downcast_ref::<Int32Array>(),
335 tz_col.as_any().downcast_ref::<StringArray>(),
336 )
337 {
338 if nanos_arr.is_null(row) {
339 return Ok(Value::Null);
340 }
341 let tv = if offset_arr.is_null(row) {
342 TemporalValue::LocalDateTime {
343 nanos_since_epoch: nanos_arr.value(row),
344 }
345 } else {
346 let timezone_name =
347 (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
348 TemporalValue::DateTime {
349 nanos_since_epoch: nanos_arr.value(row),
350 offset_seconds: offset_arr.value(row),
351 timezone_name,
352 }
353 };
354 return Ok(Value::String(tv.to_string()));
355 }
356
357 let arr = col
359 .as_any()
360 .downcast_ref::<TimestampNanosecondArray>()
361 .ok_or_else(|| anyhow!("Invalid timestamp col"))?;
362 if arr.is_null(row) {
363 return Ok(Value::Null);
364 }
365 let tv = TemporalValue::DateTime {
366 nanos_since_epoch: arr.value(row),
367 offset_seconds: 0,
368 timezone_name: arr.timezone().map(|s| s.to_string()),
369 };
370 Ok(Value::String(tv.to_string()))
371 }
372 _ => Ok(Value::Null),
373 }
374}
375
376pub fn decode_column_value(
382 col: &dyn Array,
383 data_type: &DataType,
384 row: usize,
385 crdt_mode: CrdtDecodeMode,
386) -> anyhow::Result<uni_common::Value> {
387 match data_type {
388 DataType::DateTime
389 | DataType::Timestamp
390 | DataType::Date
391 | DataType::Time
392 | DataType::Btic
393 | DataType::Bytes => Ok(super::arrow_convert::arrow_to_value(
394 col,
395 row,
396 Some(data_type),
397 )),
398 _ => value_from_column(col, data_type, row, crdt_mode).map(uni_common::Value::from),
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use arrow_array::builder::{Int64Builder, StringBuilder};
406
407 #[test]
408 fn test_decode_string() {
409 let mut builder = StringBuilder::new();
410 builder.append_value("hello");
411 builder.append_value("world");
412 let array = builder.finish();
413
414 let val = value_from_column(&array, &DataType::String, 0, CrdtDecodeMode::Strict).unwrap();
415 assert_eq!(val, Value::String("hello".to_string()));
416
417 let val = value_from_column(&array, &DataType::String, 1, CrdtDecodeMode::Strict).unwrap();
418 assert_eq!(val, Value::String("world".to_string()));
419 }
420
421 #[test]
422 fn test_decode_int64() {
423 let mut builder = Int64Builder::new();
424 builder.append_value(42);
425 builder.append_value(-100);
426 let array = builder.finish();
427
428 let val = value_from_column(&array, &DataType::Int64, 0, CrdtDecodeMode::Strict).unwrap();
429 assert_eq!(val, serde_json::json!(42));
430
431 let val = value_from_column(&array, &DataType::Int64, 1, CrdtDecodeMode::Strict).unwrap();
432 assert_eq!(val, serde_json::json!(-100));
433 }
434
435 #[test]
436 fn test_decode_json() {
437 use arrow_array::builder::LargeBinaryBuilder;
438
439 let mut builder = LargeBinaryBuilder::new();
441
442 let obj_cv = {
443 let val: uni_common::Value = serde_json::json!({"key": "value"}).into();
444 uni_common::cypher_value_codec::encode(&val)
445 };
446 builder.append_value(&obj_cv);
447
448 let null_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::Null);
449 builder.append_value(&null_cv);
450
451 let text_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::String(
452 "plain text".to_string(),
453 ));
454 builder.append_value(&text_cv);
455
456 let array = builder.finish();
457
458 let val =
459 value_from_column(&array, &DataType::CypherValue, 0, CrdtDecodeMode::Strict).unwrap();
460 assert_eq!(val, serde_json::json!({"key": "value"}));
461
462 let val =
463 value_from_column(&array, &DataType::CypherValue, 1, CrdtDecodeMode::Strict).unwrap();
464 assert_eq!(val, Value::Null);
465
466 let val =
467 value_from_column(&array, &DataType::CypherValue, 2, CrdtDecodeMode::Strict).unwrap();
468 assert_eq!(val, Value::String("plain text".to_string()));
469 }
470
471 #[test]
472 fn test_decode_bool() {
473 use arrow_array::builder::BooleanBuilder;
474 let mut builder = BooleanBuilder::new();
475 builder.append_value(true);
476 builder.append_value(false);
477 let array = builder.finish();
478
479 let val = value_from_column(&array, &DataType::Bool, 0, CrdtDecodeMode::Strict).unwrap();
480 assert_eq!(val, serde_json::json!(true));
481
482 let val = value_from_column(&array, &DataType::Bool, 1, CrdtDecodeMode::Strict).unwrap();
483 assert_eq!(val, serde_json::json!(false));
484 }
485
486 #[test]
487 fn test_decode_float64() {
488 use arrow_array::builder::Float64Builder;
489 let mut builder = Float64Builder::new();
490 builder.append_value(3.25);
491 builder.append_value(-0.5);
492 let array = builder.finish();
493
494 let val = value_from_column(&array, &DataType::Float64, 0, CrdtDecodeMode::Strict).unwrap();
495 assert_eq!(val, serde_json::json!(3.25));
496
497 let val = value_from_column(&array, &DataType::Float64, 1, CrdtDecodeMode::Strict).unwrap();
498 assert_eq!(val, serde_json::json!(-0.5));
499 }
500
501 #[test]
502 fn test_decode_int32() {
503 use arrow_array::builder::Int32Builder;
504 let mut builder = Int32Builder::new();
505 builder.append_value(42);
506 builder.append_value(-1);
507 let array = builder.finish();
508
509 let val = value_from_column(&array, &DataType::Int32, 0, CrdtDecodeMode::Strict).unwrap();
510 assert_eq!(val, serde_json::json!(42));
511
512 let val = value_from_column(&array, &DataType::Int32, 1, CrdtDecodeMode::Strict).unwrap();
513 assert_eq!(val, serde_json::json!(-1));
514 }
515
516 #[test]
517 fn test_decode_float32() {
518 use arrow_array::builder::Float32Builder;
519 let mut builder = Float32Builder::new();
520 builder.append_value(1.5);
521 let array = builder.finish();
522
523 let val = value_from_column(&array, &DataType::Float32, 0, CrdtDecodeMode::Strict).unwrap();
524 let f = val.as_f64().unwrap();
526 assert!((f - 1.5).abs() < 0.001);
527 }
528
529 #[test]
530 fn test_decode_vector() {
531 use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
532 let values_builder = Float32Builder::new();
533 let mut builder = FixedSizeListBuilder::new(values_builder, 3);
534 builder.values().append_value(1.0);
535 builder.values().append_value(2.0);
536 builder.values().append_value(3.0);
537 builder.append(true);
538 let array = builder.finish();
539
540 let val = value_from_column(
541 &array,
542 &DataType::Vector { dimensions: 3 },
543 0,
544 CrdtDecodeMode::Strict,
545 )
546 .unwrap();
547 assert_eq!(val, serde_json::json!([1.0, 2.0, 3.0]));
548 }
549
550 #[test]
551 fn test_decode_date() {
552 use arrow_array::builder::Date32Builder;
553 let mut builder = Date32Builder::new();
554 builder.append_value(18628);
556 let array = builder.finish();
557
558 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
559 assert_eq!(val, Value::String("2021-01-01".to_string()));
560 }
561
562 #[test]
563 fn test_decode_date_null() {
564 use arrow_array::builder::Date32Builder;
565 let mut builder = Date32Builder::new();
566 builder.append_null();
567 let array = builder.finish();
568
569 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
570 assert_eq!(val, Value::Null);
571 }
572
573 #[test]
574 fn test_decode_list_of_strings() {
575 use arrow_array::builder::{ListBuilder, StringBuilder};
576 let values_builder = StringBuilder::new();
577 let mut builder = ListBuilder::new(values_builder);
578 builder.values().append_value("a");
579 builder.values().append_value("b");
580 builder.values().append_value("c");
581 builder.append(true);
582 let array = builder.finish();
583
584 let val = value_from_column(
585 &array,
586 &DataType::List(Box::new(DataType::String)),
587 0,
588 CrdtDecodeMode::Strict,
589 )
590 .unwrap();
591 assert_eq!(val, serde_json::json!(["a", "b", "c"]));
592 }
593
594 #[test]
595 fn test_decode_list_of_ints() {
596 use arrow_array::builder::{Int64Builder, ListBuilder};
597 let values_builder = Int64Builder::new();
598 let mut builder = ListBuilder::new(values_builder);
599 builder.values().append_value(1);
600 builder.values().append_value(2);
601 builder.values().append_value(3);
602 builder.append(true);
603 let array = builder.finish();
604
605 let val = value_from_column(
606 &array,
607 &DataType::List(Box::new(DataType::Int64)),
608 0,
609 CrdtDecodeMode::Strict,
610 )
611 .unwrap();
612 assert_eq!(val, serde_json::json!([1, 2, 3]));
613 }
614
615 #[test]
616 fn test_decode_list_null() {
617 use arrow_array::builder::{Int64Builder, ListBuilder};
618 let values_builder = Int64Builder::new();
619 let mut builder = ListBuilder::new(values_builder);
620 builder.append_null();
621 let array = builder.finish();
622
623 let val = value_from_column(
624 &array,
625 &DataType::List(Box::new(DataType::Int64)),
626 0,
627 CrdtDecodeMode::Strict,
628 )
629 .unwrap();
630 assert_eq!(val, Value::Null);
631 }
632
633 #[test]
634 fn test_decode_unknown_type_returns_null() {
635 let mut builder = StringBuilder::new();
637 builder.append_value("test");
638 let array = builder.finish();
639
640 let val = value_from_column(
641 &array,
642 &DataType::Point(uni_common::core::schema::PointType::Geographic),
643 0,
644 CrdtDecodeMode::Strict,
645 );
646 assert_eq!(val.unwrap(), Value::Null);
648 }
649}