1use anyhow::{Result, anyhow};
10use arrow_array::{
11 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
12 Int32Array, Int64Array, LargeBinaryArray, ListArray, StringArray, StructArray,
13 Time64NanosecondArray, TimestampNanosecondArray, UInt32Array,
14};
15use serde_json::Value;
16use uni_common::{DataType, TemporalValue};
17use uni_crdt::Crdt;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum CrdtDecodeMode {
22 #[default]
24 Strict,
25 Lenient,
27}
28
29pub const MAX_DECODE_DEPTH: usize = 32;
32
33pub fn value_from_column(
44 col: &dyn Array,
45 data_type: &DataType,
46 row: usize,
47 crdt_mode: CrdtDecodeMode,
48) -> Result<Value> {
49 value_from_column_inner(col, data_type, row, crdt_mode, 0)
50}
51
52fn value_from_column_inner(
54 col: &dyn Array,
55 data_type: &DataType,
56 row: usize,
57 crdt_mode: CrdtDecodeMode,
58 depth: usize,
59) -> Result<Value> {
60 if depth > MAX_DECODE_DEPTH {
61 return Err(anyhow!("decode depth exceeded (max {})", MAX_DECODE_DEPTH));
62 }
63 match data_type {
64 DataType::String => {
65 let s = col
66 .as_any()
67 .downcast_ref::<StringArray>()
68 .ok_or_else(|| anyhow!("Invalid string col"))?
69 .value(row);
70 Ok(Value::String(s.to_string()))
71 }
72 DataType::Int32 => {
73 let v = col
74 .as_any()
75 .downcast_ref::<Int32Array>()
76 .ok_or_else(|| anyhow!("Invalid int32 col"))?
77 .value(row);
78 Ok(serde_json::json!(v))
79 }
80 DataType::Int64 => {
81 let v = col
82 .as_any()
83 .downcast_ref::<Int64Array>()
84 .ok_or_else(|| anyhow!("Invalid int64 col"))?
85 .value(row);
86 Ok(serde_json::json!(v))
87 }
88 DataType::Float32 => {
89 let v = col
90 .as_any()
91 .downcast_ref::<Float32Array>()
92 .ok_or_else(|| anyhow!("Invalid float32 col"))?
93 .value(row);
94 Ok(serde_json::json!(v))
95 }
96 DataType::Float64 => {
97 let v = col
98 .as_any()
99 .downcast_ref::<Float64Array>()
100 .ok_or_else(|| anyhow!("Invalid float64 col"))?
101 .value(row);
102 Ok(serde_json::json!(v))
103 }
104 DataType::Bool => {
105 let v = col
106 .as_any()
107 .downcast_ref::<BooleanArray>()
108 .ok_or_else(|| anyhow!("Invalid bool col"))?
109 .value(row);
110 Ok(serde_json::json!(v))
111 }
112 DataType::Vector { .. } => {
113 let list_arr = col
114 .as_any()
115 .downcast_ref::<FixedSizeListArray>()
116 .ok_or_else(|| anyhow!("Invalid fixed list col for vector"))?;
117 let values = list_arr.value(row);
118 let float_values = values
119 .as_any()
120 .downcast_ref::<Float32Array>()
121 .ok_or_else(|| anyhow!("Invalid float32 inner col for vector"))?;
122
123 let vec: Vec<f32> = (0..float_values.len())
124 .map(|i| float_values.value(i))
125 .collect();
126 Ok(serde_json::json!(vec))
127 }
128 DataType::SparseVector { .. } => {
129 let struct_arr = col
135 .as_any()
136 .downcast_ref::<StructArray>()
137 .ok_or_else(|| anyhow!("Invalid struct col for sparse vector"))?;
138 if struct_arr.is_null(row) {
139 return Ok(Value::Null);
140 }
141 let indices_list = struct_arr
142 .column_by_name("indices")
143 .and_then(|c| c.as_any().downcast_ref::<ListArray>())
144 .ok_or_else(|| anyhow!("sparse vector missing list column 'indices'"))?;
145 let values_list = struct_arr
146 .column_by_name("values")
147 .and_then(|c| c.as_any().downcast_ref::<ListArray>())
148 .ok_or_else(|| anyhow!("sparse vector missing list column 'values'"))?;
149 let idx_vals = indices_list.value(row);
150 let idx_arr = idx_vals
151 .as_any()
152 .downcast_ref::<UInt32Array>()
153 .ok_or_else(|| anyhow!("sparse 'indices' inner not UInt32"))?;
154 let w_vals = values_list.value(row);
155 let w_arr = w_vals
156 .as_any()
157 .downcast_ref::<Float32Array>()
158 .ok_or_else(|| anyhow!("sparse 'values' inner not Float32"))?;
159 let indices: Vec<Value> = (0..idx_arr.len())
160 .map(|i| serde_json::json!(idx_arr.value(i)))
161 .collect();
162 let values: Vec<Value> = (0..w_arr.len())
163 .map(|i| serde_json::json!(w_arr.value(i)))
164 .collect();
165 let mut map = serde_json::Map::new();
166 map.insert("indices".to_string(), Value::Array(indices));
167 map.insert("values".to_string(), Value::Array(values));
168 Ok(Value::Object(map))
169 }
170 DataType::CypherValue => {
171 let bytes = col
172 .as_any()
173 .downcast_ref::<LargeBinaryArray>()
174 .ok_or_else(|| anyhow!("Invalid large binary col for CypherValue"))?
175 .value(row);
176 if bytes.is_empty() {
177 return Ok(Value::Null);
178 }
179 let uni_val = uni_common::cypher_value_codec::decode(bytes)
180 .map_err(|e| anyhow!("CypherValue decode error: {}", e))?;
181 Ok(uni_val.into())
183 }
184 DataType::Bytes => {
185 let arr = col
186 .as_any()
187 .downcast_ref::<LargeBinaryArray>()
188 .ok_or_else(|| anyhow!("Invalid large binary col for Bytes"))?;
189 if arr.is_null(row) {
190 return Ok(Value::Null);
191 }
192 let bytes = arr.value(row);
194 Ok(Value::Array(
195 bytes.iter().map(|b| serde_json::json!(*b)).collect(),
196 ))
197 }
198 DataType::Crdt(_) => {
199 let bytes = col
200 .as_any()
201 .downcast_ref::<BinaryArray>()
202 .ok_or_else(|| anyhow!("Invalid binary col for CRDT"))?
203 .value(row);
204
205 match crdt_mode {
206 CrdtDecodeMode::Strict => {
207 let crdt = Crdt::from_msgpack(bytes)
208 .map_err(|e| anyhow!("CRDT decode error: {}", e))?;
209 Ok(serde_json::to_value(crdt)?)
210 }
211 CrdtDecodeMode::Lenient => {
212 let crdt = Crdt::from_msgpack(bytes).unwrap_or_else(|e| {
213 log::warn!("Failed to deserialize CRDT: {}", e);
214 Crdt::GCounter(uni_crdt::GCounter::new())
215 });
216 Ok(serde_json::to_value(crdt).unwrap_or(Value::Null))
217 }
218 }
219 }
220 DataType::List(inner) => {
221 let list_arr = col
222 .as_any()
223 .downcast_ref::<ListArray>()
224 .ok_or_else(|| anyhow!("Invalid list col"))?;
225 if list_arr.is_null(row) {
226 return Ok(Value::Null);
227 }
228 let values = list_arr.value(row);
229 let mut vec = Vec::with_capacity(values.len());
230 for i in 0..values.len() {
231 vec.push(value_from_column_inner(
232 values.as_ref(),
233 inner,
234 i,
235 crdt_mode,
236 depth + 1,
237 )?);
238 }
239 Ok(Value::Array(vec))
240 }
241 DataType::Map(_, _) => {
242 let list_arr = col
243 .as_any()
244 .downcast_ref::<ListArray>()
245 .ok_or_else(|| anyhow!("Invalid map (list) col"))?;
246 if list_arr.is_null(row) {
247 return Ok(Value::Null);
248 }
249 let struct_arr = list_arr.value(row);
256 let uni_map = super::arrow_convert::try_reconstruct_map(&struct_arr)
257 .ok_or_else(|| anyhow!("Invalid struct array inner for map"))?;
258 let mut map = serde_json::Map::with_capacity(uni_map.len());
259 for (k, v) in uni_map {
260 map.insert(
261 k,
262 serde_json::to_value(&v).unwrap_or(serde_json::Value::Null),
263 );
264 }
265 Ok(Value::Object(map))
266 }
267 DataType::Date => {
268 let arr = col
269 .as_any()
270 .downcast_ref::<Date32Array>()
271 .ok_or_else(|| anyhow!("Invalid date32 col"))?;
272 if arr.is_null(row) {
273 return Ok(Value::Null);
274 }
275 let days = arr.value(row);
276 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
277 if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) {
278 Ok(Value::String(date.format("%Y-%m-%d").to_string()))
279 } else {
280 Ok(Value::Null)
281 }
282 }
283 DataType::Time => {
284 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
286 && let (Some(nanos_col), Some(offset_col)) = (
287 struct_arr.column_by_name("nanos_since_midnight"),
288 struct_arr.column_by_name("offset_seconds"),
289 )
290 && let (Some(nanos_arr), Some(offset_arr)) = (
291 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
292 offset_col.as_any().downcast_ref::<Int32Array>(),
293 )
294 {
295 if nanos_arr.is_null(row) {
296 return Ok(Value::Null);
297 }
298 let tv = if offset_arr.is_null(row) {
299 TemporalValue::LocalTime {
300 nanos_since_midnight: nanos_arr.value(row),
301 }
302 } else {
303 TemporalValue::Time {
304 nanos_since_midnight: nanos_arr.value(row),
305 offset_seconds: offset_arr.value(row),
306 }
307 };
308 return Ok(Value::String(tv.to_string()));
309 }
310
311 let arr = col
313 .as_any()
314 .downcast_ref::<Time64NanosecondArray>()
315 .ok_or_else(|| anyhow!("Invalid time64 col"))?;
316 if arr.is_null(row) {
317 return Ok(Value::Null);
318 }
319 let tv = TemporalValue::Time {
320 nanos_since_midnight: arr.value(row),
321 offset_seconds: 0,
322 };
323 Ok(Value::String(tv.to_string()))
324 }
325 DataType::Duration => {
326 let arr = col
328 .as_any()
329 .downcast_ref::<LargeBinaryArray>()
330 .ok_or_else(|| anyhow!("Invalid duration col (expected LargeBinary)"))?;
331 if arr.is_null(row) {
332 return Ok(Value::Null);
333 }
334 let bytes = arr.value(row);
335 let uni_val = uni_common::cypher_value_codec::decode(bytes)
336 .map_err(|e| anyhow!("Failed to decode duration: {}", e))?;
337 if let uni_common::Value::Temporal(uni_common::TemporalValue::Duration {
339 months,
340 days,
341 nanos,
342 }) = &uni_val
343 {
344 let tv = TemporalValue::Duration {
345 months: *months,
346 days: *days,
347 nanos: *nanos,
348 };
349 Ok(Value::String(tv.to_string()))
350 } else {
351 Ok(serde_json::json!(uni_val.to_string()))
352 }
353 }
354 DataType::DateTime | DataType::Timestamp => {
355 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
357 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
358 struct_arr.column_by_name("nanos_since_epoch"),
359 struct_arr.column_by_name("offset_seconds"),
360 struct_arr.column_by_name("timezone_name"),
361 )
362 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
363 nanos_col
364 .as_any()
365 .downcast_ref::<TimestampNanosecondArray>(),
366 offset_col.as_any().downcast_ref::<Int32Array>(),
367 tz_col.as_any().downcast_ref::<StringArray>(),
368 )
369 {
370 if nanos_arr.is_null(row) {
371 return Ok(Value::Null);
372 }
373 let tv = if offset_arr.is_null(row) {
374 TemporalValue::LocalDateTime {
375 nanos_since_epoch: nanos_arr.value(row),
376 }
377 } else {
378 let timezone_name =
379 (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
380 TemporalValue::DateTime {
381 nanos_since_epoch: nanos_arr.value(row),
382 offset_seconds: offset_arr.value(row),
383 timezone_name,
384 }
385 };
386 return Ok(Value::String(tv.to_string()));
387 }
388
389 let arr = col
391 .as_any()
392 .downcast_ref::<TimestampNanosecondArray>()
393 .ok_or_else(|| anyhow!("Invalid timestamp col"))?;
394 if arr.is_null(row) {
395 return Ok(Value::Null);
396 }
397 let tv = TemporalValue::DateTime {
398 nanos_since_epoch: arr.value(row),
399 offset_seconds: 0,
400 timezone_name: arr.timezone().map(|s| s.to_string()),
401 };
402 Ok(Value::String(tv.to_string()))
403 }
404 _ => Ok(Value::Null),
405 }
406}
407
408pub fn decode_column_value(
414 col: &dyn Array,
415 data_type: &DataType,
416 row: usize,
417 crdt_mode: CrdtDecodeMode,
418) -> anyhow::Result<uni_common::Value> {
419 match data_type {
420 DataType::DateTime
421 | DataType::Timestamp
422 | DataType::Date
423 | DataType::Time
424 | DataType::Btic
425 | DataType::Bytes
426 | DataType::SparseVector { .. }
430 | DataType::Map(_, _) => Ok(super::arrow_convert::arrow_to_value(
435 col,
436 row,
437 Some(data_type),
438 )),
439 _ => value_from_column(col, data_type, row, crdt_mode).map(uni_common::Value::from),
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use arrow_array::builder::{Int64Builder, StringBuilder};
447
448 #[test]
449 fn test_decode_string() {
450 let mut builder = StringBuilder::new();
451 builder.append_value("hello");
452 builder.append_value("world");
453 let array = builder.finish();
454
455 let val = value_from_column(&array, &DataType::String, 0, CrdtDecodeMode::Strict).unwrap();
456 assert_eq!(val, Value::String("hello".to_string()));
457
458 let val = value_from_column(&array, &DataType::String, 1, CrdtDecodeMode::Strict).unwrap();
459 assert_eq!(val, Value::String("world".to_string()));
460 }
461
462 #[test]
463 fn test_decode_int64() {
464 let mut builder = Int64Builder::new();
465 builder.append_value(42);
466 builder.append_value(-100);
467 let array = builder.finish();
468
469 let val = value_from_column(&array, &DataType::Int64, 0, CrdtDecodeMode::Strict).unwrap();
470 assert_eq!(val, serde_json::json!(42));
471
472 let val = value_from_column(&array, &DataType::Int64, 1, CrdtDecodeMode::Strict).unwrap();
473 assert_eq!(val, serde_json::json!(-100));
474 }
475
476 #[test]
477 fn test_decode_json() {
478 use arrow_array::builder::LargeBinaryBuilder;
479
480 let mut builder = LargeBinaryBuilder::new();
482
483 let obj_cv = {
484 let val: uni_common::Value = serde_json::json!({"key": "value"}).into();
485 uni_common::cypher_value_codec::encode(&val)
486 };
487 builder.append_value(&obj_cv);
488
489 let null_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::Null);
490 builder.append_value(&null_cv);
491
492 let text_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::String(
493 "plain text".to_string(),
494 ));
495 builder.append_value(&text_cv);
496
497 let array = builder.finish();
498
499 let val =
500 value_from_column(&array, &DataType::CypherValue, 0, CrdtDecodeMode::Strict).unwrap();
501 assert_eq!(val, serde_json::json!({"key": "value"}));
502
503 let val =
504 value_from_column(&array, &DataType::CypherValue, 1, CrdtDecodeMode::Strict).unwrap();
505 assert_eq!(val, Value::Null);
506
507 let val =
508 value_from_column(&array, &DataType::CypherValue, 2, CrdtDecodeMode::Strict).unwrap();
509 assert_eq!(val, Value::String("plain text".to_string()));
510 }
511
512 #[test]
513 fn test_decode_bool() {
514 use arrow_array::builder::BooleanBuilder;
515 let mut builder = BooleanBuilder::new();
516 builder.append_value(true);
517 builder.append_value(false);
518 let array = builder.finish();
519
520 let val = value_from_column(&array, &DataType::Bool, 0, CrdtDecodeMode::Strict).unwrap();
521 assert_eq!(val, serde_json::json!(true));
522
523 let val = value_from_column(&array, &DataType::Bool, 1, CrdtDecodeMode::Strict).unwrap();
524 assert_eq!(val, serde_json::json!(false));
525 }
526
527 #[test]
528 fn test_decode_float64() {
529 use arrow_array::builder::Float64Builder;
530 let mut builder = Float64Builder::new();
531 builder.append_value(3.25);
532 builder.append_value(-0.5);
533 let array = builder.finish();
534
535 let val = value_from_column(&array, &DataType::Float64, 0, CrdtDecodeMode::Strict).unwrap();
536 assert_eq!(val, serde_json::json!(3.25));
537
538 let val = value_from_column(&array, &DataType::Float64, 1, CrdtDecodeMode::Strict).unwrap();
539 assert_eq!(val, serde_json::json!(-0.5));
540 }
541
542 #[test]
543 fn test_decode_int32() {
544 use arrow_array::builder::Int32Builder;
545 let mut builder = Int32Builder::new();
546 builder.append_value(42);
547 builder.append_value(-1);
548 let array = builder.finish();
549
550 let val = value_from_column(&array, &DataType::Int32, 0, CrdtDecodeMode::Strict).unwrap();
551 assert_eq!(val, serde_json::json!(42));
552
553 let val = value_from_column(&array, &DataType::Int32, 1, CrdtDecodeMode::Strict).unwrap();
554 assert_eq!(val, serde_json::json!(-1));
555 }
556
557 #[test]
558 fn test_decode_float32() {
559 use arrow_array::builder::Float32Builder;
560 let mut builder = Float32Builder::new();
561 builder.append_value(1.5);
562 let array = builder.finish();
563
564 let val = value_from_column(&array, &DataType::Float32, 0, CrdtDecodeMode::Strict).unwrap();
565 let f = val.as_f64().unwrap();
567 assert!((f - 1.5).abs() < 0.001);
568 }
569
570 #[test]
571 fn test_decode_vector() {
572 use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
573 let values_builder = Float32Builder::new();
574 let mut builder = FixedSizeListBuilder::new(values_builder, 3);
575 builder.values().append_value(1.0);
576 builder.values().append_value(2.0);
577 builder.values().append_value(3.0);
578 builder.append(true);
579 let array = builder.finish();
580
581 let val = value_from_column(
582 &array,
583 &DataType::Vector { dimensions: 3 },
584 0,
585 CrdtDecodeMode::Strict,
586 )
587 .unwrap();
588 assert_eq!(val, serde_json::json!([1.0, 2.0, 3.0]));
589 }
590
591 #[test]
592 fn test_decode_date() {
593 use arrow_array::builder::Date32Builder;
594 let mut builder = Date32Builder::new();
595 builder.append_value(18628);
597 let array = builder.finish();
598
599 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
600 assert_eq!(val, Value::String("2021-01-01".to_string()));
601 }
602
603 #[test]
604 fn test_decode_date_null() {
605 use arrow_array::builder::Date32Builder;
606 let mut builder = Date32Builder::new();
607 builder.append_null();
608 let array = builder.finish();
609
610 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
611 assert_eq!(val, Value::Null);
612 }
613
614 #[test]
615 fn test_decode_list_of_strings() {
616 use arrow_array::builder::{ListBuilder, StringBuilder};
617 let values_builder = StringBuilder::new();
618 let mut builder = ListBuilder::new(values_builder);
619 builder.values().append_value("a");
620 builder.values().append_value("b");
621 builder.values().append_value("c");
622 builder.append(true);
623 let array = builder.finish();
624
625 let val = value_from_column(
626 &array,
627 &DataType::List(Box::new(DataType::String)),
628 0,
629 CrdtDecodeMode::Strict,
630 )
631 .unwrap();
632 assert_eq!(val, serde_json::json!(["a", "b", "c"]));
633 }
634
635 #[test]
636 fn test_decode_list_of_ints() {
637 use arrow_array::builder::{Int64Builder, ListBuilder};
638 let values_builder = Int64Builder::new();
639 let mut builder = ListBuilder::new(values_builder);
640 builder.values().append_value(1);
641 builder.values().append_value(2);
642 builder.values().append_value(3);
643 builder.append(true);
644 let array = builder.finish();
645
646 let val = value_from_column(
647 &array,
648 &DataType::List(Box::new(DataType::Int64)),
649 0,
650 CrdtDecodeMode::Strict,
651 )
652 .unwrap();
653 assert_eq!(val, serde_json::json!([1, 2, 3]));
654 }
655
656 #[test]
657 fn test_decode_list_null() {
658 use arrow_array::builder::{Int64Builder, ListBuilder};
659 let values_builder = Int64Builder::new();
660 let mut builder = ListBuilder::new(values_builder);
661 builder.append_null();
662 let array = builder.finish();
663
664 let val = value_from_column(
665 &array,
666 &DataType::List(Box::new(DataType::Int64)),
667 0,
668 CrdtDecodeMode::Strict,
669 )
670 .unwrap();
671 assert_eq!(val, Value::Null);
672 }
673
674 #[test]
675 fn test_decode_unknown_type_returns_null() {
676 let mut builder = StringBuilder::new();
678 builder.append_value("test");
679 let array = builder.finish();
680
681 let val = value_from_column(
682 &array,
683 &DataType::Point(uni_common::core::schema::PointType::Geographic),
684 0,
685 CrdtDecodeMode::Strict,
686 );
687 assert_eq!(val.unwrap(), Value::Null);
689 }
690}