1use anyhow::{Result, anyhow};
10use arrow_array::{
11 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
12 Int32Array, Int64Array, LargeBinaryArray, ListArray, StringArray, StructArray,
13 Time64NanosecondArray, TimestampNanosecondArray,
14};
15use serde_json::Value;
16use uni_common::{DataType, TemporalValue};
17use uni_crdt::Crdt;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum CrdtDecodeMode {
22 #[default]
24 Strict,
25 Lenient,
27}
28
29pub const MAX_DECODE_DEPTH: usize = 32;
32
33pub fn value_from_column(
44 col: &dyn Array,
45 data_type: &DataType,
46 row: usize,
47 crdt_mode: CrdtDecodeMode,
48) -> Result<Value> {
49 value_from_column_inner(col, data_type, row, crdt_mode, 0)
50}
51
52fn value_from_column_inner(
54 col: &dyn Array,
55 data_type: &DataType,
56 row: usize,
57 crdt_mode: CrdtDecodeMode,
58 depth: usize,
59) -> Result<Value> {
60 if depth > MAX_DECODE_DEPTH {
61 return Err(anyhow!("decode depth exceeded (max {})", MAX_DECODE_DEPTH));
62 }
63 match data_type {
64 DataType::String => {
65 let s = col
66 .as_any()
67 .downcast_ref::<StringArray>()
68 .ok_or_else(|| anyhow!("Invalid string col"))?
69 .value(row);
70 Ok(Value::String(s.to_string()))
71 }
72 DataType::Int32 => {
73 let v = col
74 .as_any()
75 .downcast_ref::<Int32Array>()
76 .ok_or_else(|| anyhow!("Invalid int32 col"))?
77 .value(row);
78 Ok(serde_json::json!(v))
79 }
80 DataType::Int64 => {
81 let v = col
82 .as_any()
83 .downcast_ref::<Int64Array>()
84 .ok_or_else(|| anyhow!("Invalid int64 col"))?
85 .value(row);
86 Ok(serde_json::json!(v))
87 }
88 DataType::Float32 => {
89 let v = col
90 .as_any()
91 .downcast_ref::<Float32Array>()
92 .ok_or_else(|| anyhow!("Invalid float32 col"))?
93 .value(row);
94 Ok(serde_json::json!(v))
95 }
96 DataType::Float64 => {
97 let v = col
98 .as_any()
99 .downcast_ref::<Float64Array>()
100 .ok_or_else(|| anyhow!("Invalid float64 col"))?
101 .value(row);
102 Ok(serde_json::json!(v))
103 }
104 DataType::Bool => {
105 let v = col
106 .as_any()
107 .downcast_ref::<BooleanArray>()
108 .ok_or_else(|| anyhow!("Invalid bool col"))?
109 .value(row);
110 Ok(serde_json::json!(v))
111 }
112 DataType::Vector { .. } => {
113 let list_arr = col
114 .as_any()
115 .downcast_ref::<FixedSizeListArray>()
116 .ok_or_else(|| anyhow!("Invalid fixed list col for vector"))?;
117 let values = list_arr.value(row);
118 let float_values = values
119 .as_any()
120 .downcast_ref::<Float32Array>()
121 .ok_or_else(|| anyhow!("Invalid float32 inner col for vector"))?;
122
123 let vec: Vec<f32> = (0..float_values.len())
124 .map(|i| float_values.value(i))
125 .collect();
126 Ok(serde_json::json!(vec))
127 }
128 DataType::CypherValue => {
129 let bytes = col
130 .as_any()
131 .downcast_ref::<LargeBinaryArray>()
132 .ok_or_else(|| anyhow!("Invalid large binary col for CypherValue"))?
133 .value(row);
134 if bytes.is_empty() {
135 return Ok(Value::Null);
136 }
137 let uni_val = uni_common::cypher_value_codec::decode(bytes)
138 .map_err(|e| anyhow!("CypherValue decode error: {}", e))?;
139 Ok(uni_val.into())
141 }
142 DataType::Bytes => {
143 let arr = col
144 .as_any()
145 .downcast_ref::<LargeBinaryArray>()
146 .ok_or_else(|| anyhow!("Invalid large binary col for Bytes"))?;
147 if arr.is_null(row) {
148 return Ok(Value::Null);
149 }
150 let bytes = arr.value(row);
152 Ok(Value::Array(
153 bytes.iter().map(|b| serde_json::json!(*b)).collect(),
154 ))
155 }
156 DataType::Crdt(_) => {
157 let bytes = col
158 .as_any()
159 .downcast_ref::<BinaryArray>()
160 .ok_or_else(|| anyhow!("Invalid binary col for CRDT"))?
161 .value(row);
162
163 match crdt_mode {
164 CrdtDecodeMode::Strict => {
165 let crdt = Crdt::from_msgpack(bytes)
166 .map_err(|e| anyhow!("CRDT decode error: {}", e))?;
167 Ok(serde_json::to_value(crdt)?)
168 }
169 CrdtDecodeMode::Lenient => {
170 let crdt = Crdt::from_msgpack(bytes).unwrap_or_else(|e| {
171 log::warn!("Failed to deserialize CRDT: {}", e);
172 Crdt::GCounter(uni_crdt::GCounter::new())
173 });
174 Ok(serde_json::to_value(crdt).unwrap_or(Value::Null))
175 }
176 }
177 }
178 DataType::List(inner) => {
179 let list_arr = col
180 .as_any()
181 .downcast_ref::<ListArray>()
182 .ok_or_else(|| anyhow!("Invalid list col"))?;
183 if list_arr.is_null(row) {
184 return Ok(Value::Null);
185 }
186 let values = list_arr.value(row);
187 let mut vec = Vec::with_capacity(values.len());
188 for i in 0..values.len() {
189 vec.push(value_from_column_inner(
190 values.as_ref(),
191 inner,
192 i,
193 crdt_mode,
194 depth + 1,
195 )?);
196 }
197 Ok(Value::Array(vec))
198 }
199 DataType::Map(_, _) => {
200 let list_arr = col
201 .as_any()
202 .downcast_ref::<ListArray>()
203 .ok_or_else(|| anyhow!("Invalid map (list) col"))?;
204 if list_arr.is_null(row) {
205 return Ok(Value::Null);
206 }
207 let struct_arr = list_arr.value(row);
214 let uni_map = super::arrow_convert::try_reconstruct_map(&struct_arr)
215 .ok_or_else(|| anyhow!("Invalid struct array inner for map"))?;
216 let mut map = serde_json::Map::with_capacity(uni_map.len());
217 for (k, v) in uni_map {
218 map.insert(
219 k,
220 serde_json::to_value(&v).unwrap_or(serde_json::Value::Null),
221 );
222 }
223 Ok(Value::Object(map))
224 }
225 DataType::Date => {
226 let arr = col
227 .as_any()
228 .downcast_ref::<Date32Array>()
229 .ok_or_else(|| anyhow!("Invalid date32 col"))?;
230 if arr.is_null(row) {
231 return Ok(Value::Null);
232 }
233 let days = arr.value(row);
234 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
235 if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) {
236 Ok(Value::String(date.format("%Y-%m-%d").to_string()))
237 } else {
238 Ok(Value::Null)
239 }
240 }
241 DataType::Time => {
242 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
244 && let (Some(nanos_col), Some(offset_col)) = (
245 struct_arr.column_by_name("nanos_since_midnight"),
246 struct_arr.column_by_name("offset_seconds"),
247 )
248 && let (Some(nanos_arr), Some(offset_arr)) = (
249 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
250 offset_col.as_any().downcast_ref::<Int32Array>(),
251 )
252 {
253 if nanos_arr.is_null(row) {
254 return Ok(Value::Null);
255 }
256 let tv = if offset_arr.is_null(row) {
257 TemporalValue::LocalTime {
258 nanos_since_midnight: nanos_arr.value(row),
259 }
260 } else {
261 TemporalValue::Time {
262 nanos_since_midnight: nanos_arr.value(row),
263 offset_seconds: offset_arr.value(row),
264 }
265 };
266 return Ok(Value::String(tv.to_string()));
267 }
268
269 let arr = col
271 .as_any()
272 .downcast_ref::<Time64NanosecondArray>()
273 .ok_or_else(|| anyhow!("Invalid time64 col"))?;
274 if arr.is_null(row) {
275 return Ok(Value::Null);
276 }
277 let tv = TemporalValue::Time {
278 nanos_since_midnight: arr.value(row),
279 offset_seconds: 0,
280 };
281 Ok(Value::String(tv.to_string()))
282 }
283 DataType::Duration => {
284 let arr = col
286 .as_any()
287 .downcast_ref::<LargeBinaryArray>()
288 .ok_or_else(|| anyhow!("Invalid duration col (expected LargeBinary)"))?;
289 if arr.is_null(row) {
290 return Ok(Value::Null);
291 }
292 let bytes = arr.value(row);
293 let uni_val = uni_common::cypher_value_codec::decode(bytes)
294 .map_err(|e| anyhow!("Failed to decode duration: {}", e))?;
295 if let uni_common::Value::Temporal(uni_common::TemporalValue::Duration {
297 months,
298 days,
299 nanos,
300 }) = &uni_val
301 {
302 let tv = TemporalValue::Duration {
303 months: *months,
304 days: *days,
305 nanos: *nanos,
306 };
307 Ok(Value::String(tv.to_string()))
308 } else {
309 Ok(serde_json::json!(uni_val.to_string()))
310 }
311 }
312 DataType::DateTime | DataType::Timestamp => {
313 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
315 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
316 struct_arr.column_by_name("nanos_since_epoch"),
317 struct_arr.column_by_name("offset_seconds"),
318 struct_arr.column_by_name("timezone_name"),
319 )
320 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
321 nanos_col
322 .as_any()
323 .downcast_ref::<TimestampNanosecondArray>(),
324 offset_col.as_any().downcast_ref::<Int32Array>(),
325 tz_col.as_any().downcast_ref::<StringArray>(),
326 )
327 {
328 if nanos_arr.is_null(row) {
329 return Ok(Value::Null);
330 }
331 let tv = if offset_arr.is_null(row) {
332 TemporalValue::LocalDateTime {
333 nanos_since_epoch: nanos_arr.value(row),
334 }
335 } else {
336 let timezone_name =
337 (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
338 TemporalValue::DateTime {
339 nanos_since_epoch: nanos_arr.value(row),
340 offset_seconds: offset_arr.value(row),
341 timezone_name,
342 }
343 };
344 return Ok(Value::String(tv.to_string()));
345 }
346
347 let arr = col
349 .as_any()
350 .downcast_ref::<TimestampNanosecondArray>()
351 .ok_or_else(|| anyhow!("Invalid timestamp col"))?;
352 if arr.is_null(row) {
353 return Ok(Value::Null);
354 }
355 let tv = TemporalValue::DateTime {
356 nanos_since_epoch: arr.value(row),
357 offset_seconds: 0,
358 timezone_name: arr.timezone().map(|s| s.to_string()),
359 };
360 Ok(Value::String(tv.to_string()))
361 }
362 _ => Ok(Value::Null),
363 }
364}
365
366pub fn decode_column_value(
372 col: &dyn Array,
373 data_type: &DataType,
374 row: usize,
375 crdt_mode: CrdtDecodeMode,
376) -> anyhow::Result<uni_common::Value> {
377 match data_type {
378 DataType::DateTime
379 | DataType::Timestamp
380 | DataType::Date
381 | DataType::Time
382 | DataType::Btic
383 | DataType::Bytes
384 | DataType::Map(_, _) => Ok(super::arrow_convert::arrow_to_value(
389 col,
390 row,
391 Some(data_type),
392 )),
393 _ => value_from_column(col, data_type, row, crdt_mode).map(uni_common::Value::from),
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use arrow_array::builder::{Int64Builder, StringBuilder};
401
402 #[test]
403 fn test_decode_string() {
404 let mut builder = StringBuilder::new();
405 builder.append_value("hello");
406 builder.append_value("world");
407 let array = builder.finish();
408
409 let val = value_from_column(&array, &DataType::String, 0, CrdtDecodeMode::Strict).unwrap();
410 assert_eq!(val, Value::String("hello".to_string()));
411
412 let val = value_from_column(&array, &DataType::String, 1, CrdtDecodeMode::Strict).unwrap();
413 assert_eq!(val, Value::String("world".to_string()));
414 }
415
416 #[test]
417 fn test_decode_int64() {
418 let mut builder = Int64Builder::new();
419 builder.append_value(42);
420 builder.append_value(-100);
421 let array = builder.finish();
422
423 let val = value_from_column(&array, &DataType::Int64, 0, CrdtDecodeMode::Strict).unwrap();
424 assert_eq!(val, serde_json::json!(42));
425
426 let val = value_from_column(&array, &DataType::Int64, 1, CrdtDecodeMode::Strict).unwrap();
427 assert_eq!(val, serde_json::json!(-100));
428 }
429
430 #[test]
431 fn test_decode_json() {
432 use arrow_array::builder::LargeBinaryBuilder;
433
434 let mut builder = LargeBinaryBuilder::new();
436
437 let obj_cv = {
438 let val: uni_common::Value = serde_json::json!({"key": "value"}).into();
439 uni_common::cypher_value_codec::encode(&val)
440 };
441 builder.append_value(&obj_cv);
442
443 let null_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::Null);
444 builder.append_value(&null_cv);
445
446 let text_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::String(
447 "plain text".to_string(),
448 ));
449 builder.append_value(&text_cv);
450
451 let array = builder.finish();
452
453 let val =
454 value_from_column(&array, &DataType::CypherValue, 0, CrdtDecodeMode::Strict).unwrap();
455 assert_eq!(val, serde_json::json!({"key": "value"}));
456
457 let val =
458 value_from_column(&array, &DataType::CypherValue, 1, CrdtDecodeMode::Strict).unwrap();
459 assert_eq!(val, Value::Null);
460
461 let val =
462 value_from_column(&array, &DataType::CypherValue, 2, CrdtDecodeMode::Strict).unwrap();
463 assert_eq!(val, Value::String("plain text".to_string()));
464 }
465
466 #[test]
467 fn test_decode_bool() {
468 use arrow_array::builder::BooleanBuilder;
469 let mut builder = BooleanBuilder::new();
470 builder.append_value(true);
471 builder.append_value(false);
472 let array = builder.finish();
473
474 let val = value_from_column(&array, &DataType::Bool, 0, CrdtDecodeMode::Strict).unwrap();
475 assert_eq!(val, serde_json::json!(true));
476
477 let val = value_from_column(&array, &DataType::Bool, 1, CrdtDecodeMode::Strict).unwrap();
478 assert_eq!(val, serde_json::json!(false));
479 }
480
481 #[test]
482 fn test_decode_float64() {
483 use arrow_array::builder::Float64Builder;
484 let mut builder = Float64Builder::new();
485 builder.append_value(3.25);
486 builder.append_value(-0.5);
487 let array = builder.finish();
488
489 let val = value_from_column(&array, &DataType::Float64, 0, CrdtDecodeMode::Strict).unwrap();
490 assert_eq!(val, serde_json::json!(3.25));
491
492 let val = value_from_column(&array, &DataType::Float64, 1, CrdtDecodeMode::Strict).unwrap();
493 assert_eq!(val, serde_json::json!(-0.5));
494 }
495
496 #[test]
497 fn test_decode_int32() {
498 use arrow_array::builder::Int32Builder;
499 let mut builder = Int32Builder::new();
500 builder.append_value(42);
501 builder.append_value(-1);
502 let array = builder.finish();
503
504 let val = value_from_column(&array, &DataType::Int32, 0, CrdtDecodeMode::Strict).unwrap();
505 assert_eq!(val, serde_json::json!(42));
506
507 let val = value_from_column(&array, &DataType::Int32, 1, CrdtDecodeMode::Strict).unwrap();
508 assert_eq!(val, serde_json::json!(-1));
509 }
510
511 #[test]
512 fn test_decode_float32() {
513 use arrow_array::builder::Float32Builder;
514 let mut builder = Float32Builder::new();
515 builder.append_value(1.5);
516 let array = builder.finish();
517
518 let val = value_from_column(&array, &DataType::Float32, 0, CrdtDecodeMode::Strict).unwrap();
519 let f = val.as_f64().unwrap();
521 assert!((f - 1.5).abs() < 0.001);
522 }
523
524 #[test]
525 fn test_decode_vector() {
526 use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
527 let values_builder = Float32Builder::new();
528 let mut builder = FixedSizeListBuilder::new(values_builder, 3);
529 builder.values().append_value(1.0);
530 builder.values().append_value(2.0);
531 builder.values().append_value(3.0);
532 builder.append(true);
533 let array = builder.finish();
534
535 let val = value_from_column(
536 &array,
537 &DataType::Vector { dimensions: 3 },
538 0,
539 CrdtDecodeMode::Strict,
540 )
541 .unwrap();
542 assert_eq!(val, serde_json::json!([1.0, 2.0, 3.0]));
543 }
544
545 #[test]
546 fn test_decode_date() {
547 use arrow_array::builder::Date32Builder;
548 let mut builder = Date32Builder::new();
549 builder.append_value(18628);
551 let array = builder.finish();
552
553 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
554 assert_eq!(val, Value::String("2021-01-01".to_string()));
555 }
556
557 #[test]
558 fn test_decode_date_null() {
559 use arrow_array::builder::Date32Builder;
560 let mut builder = Date32Builder::new();
561 builder.append_null();
562 let array = builder.finish();
563
564 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
565 assert_eq!(val, Value::Null);
566 }
567
568 #[test]
569 fn test_decode_list_of_strings() {
570 use arrow_array::builder::{ListBuilder, StringBuilder};
571 let values_builder = StringBuilder::new();
572 let mut builder = ListBuilder::new(values_builder);
573 builder.values().append_value("a");
574 builder.values().append_value("b");
575 builder.values().append_value("c");
576 builder.append(true);
577 let array = builder.finish();
578
579 let val = value_from_column(
580 &array,
581 &DataType::List(Box::new(DataType::String)),
582 0,
583 CrdtDecodeMode::Strict,
584 )
585 .unwrap();
586 assert_eq!(val, serde_json::json!(["a", "b", "c"]));
587 }
588
589 #[test]
590 fn test_decode_list_of_ints() {
591 use arrow_array::builder::{Int64Builder, ListBuilder};
592 let values_builder = Int64Builder::new();
593 let mut builder = ListBuilder::new(values_builder);
594 builder.values().append_value(1);
595 builder.values().append_value(2);
596 builder.values().append_value(3);
597 builder.append(true);
598 let array = builder.finish();
599
600 let val = value_from_column(
601 &array,
602 &DataType::List(Box::new(DataType::Int64)),
603 0,
604 CrdtDecodeMode::Strict,
605 )
606 .unwrap();
607 assert_eq!(val, serde_json::json!([1, 2, 3]));
608 }
609
610 #[test]
611 fn test_decode_list_null() {
612 use arrow_array::builder::{Int64Builder, ListBuilder};
613 let values_builder = Int64Builder::new();
614 let mut builder = ListBuilder::new(values_builder);
615 builder.append_null();
616 let array = builder.finish();
617
618 let val = value_from_column(
619 &array,
620 &DataType::List(Box::new(DataType::Int64)),
621 0,
622 CrdtDecodeMode::Strict,
623 )
624 .unwrap();
625 assert_eq!(val, Value::Null);
626 }
627
628 #[test]
629 fn test_decode_unknown_type_returns_null() {
630 let mut builder = StringBuilder::new();
632 builder.append_value("test");
633 let array = builder.finish();
634
635 let val = value_from_column(
636 &array,
637 &DataType::Point(uni_common::core::schema::PointType::Geographic),
638 0,
639 CrdtDecodeMode::Strict,
640 );
641 assert_eq!(val.unwrap(), Value::Null);
643 }
644}