1use anyhow::{Result, anyhow};
10use arrow_array::{
11 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
12 Int32Array, Int64Array, LargeBinaryArray, ListArray, StringArray, StructArray,
13 Time64NanosecondArray, TimestampNanosecondArray,
14};
15use serde_json::Value;
16use uni_common::{DataType, TemporalValue};
17use uni_crdt::Crdt;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum CrdtDecodeMode {
22 #[default]
24 Strict,
25 Lenient,
27}
28
29pub const MAX_DECODE_DEPTH: usize = 32;
32
33pub fn value_from_column(
44 col: &dyn Array,
45 data_type: &DataType,
46 row: usize,
47 crdt_mode: CrdtDecodeMode,
48) -> Result<Value> {
49 value_from_column_inner(col, data_type, row, crdt_mode, 0)
50}
51
52fn value_from_column_inner(
54 col: &dyn Array,
55 data_type: &DataType,
56 row: usize,
57 crdt_mode: CrdtDecodeMode,
58 depth: usize,
59) -> Result<Value> {
60 if depth > MAX_DECODE_DEPTH {
61 return Err(anyhow!("decode depth exceeded (max {})", MAX_DECODE_DEPTH));
62 }
63 match data_type {
64 DataType::String => {
65 let s = col
66 .as_any()
67 .downcast_ref::<StringArray>()
68 .ok_or_else(|| anyhow!("Invalid string col"))?
69 .value(row);
70 Ok(Value::String(s.to_string()))
71 }
72 DataType::Int32 => {
73 let v = col
74 .as_any()
75 .downcast_ref::<Int32Array>()
76 .ok_or_else(|| anyhow!("Invalid int32 col"))?
77 .value(row);
78 Ok(serde_json::json!(v))
79 }
80 DataType::Int64 => {
81 let v = col
82 .as_any()
83 .downcast_ref::<Int64Array>()
84 .ok_or_else(|| anyhow!("Invalid int64 col"))?
85 .value(row);
86 Ok(serde_json::json!(v))
87 }
88 DataType::Float32 => {
89 let v = col
90 .as_any()
91 .downcast_ref::<Float32Array>()
92 .ok_or_else(|| anyhow!("Invalid float32 col"))?
93 .value(row);
94 Ok(serde_json::json!(v))
95 }
96 DataType::Float64 => {
97 let v = col
98 .as_any()
99 .downcast_ref::<Float64Array>()
100 .ok_or_else(|| anyhow!("Invalid float64 col"))?
101 .value(row);
102 Ok(serde_json::json!(v))
103 }
104 DataType::Bool => {
105 let v = col
106 .as_any()
107 .downcast_ref::<BooleanArray>()
108 .ok_or_else(|| anyhow!("Invalid bool col"))?
109 .value(row);
110 Ok(serde_json::json!(v))
111 }
112 DataType::Vector { .. } => {
113 let list_arr = col
114 .as_any()
115 .downcast_ref::<FixedSizeListArray>()
116 .ok_or_else(|| anyhow!("Invalid fixed list col for vector"))?;
117 let values = list_arr.value(row);
118 let float_values = values
119 .as_any()
120 .downcast_ref::<Float32Array>()
121 .ok_or_else(|| anyhow!("Invalid float32 inner col for vector"))?;
122
123 let vec: Vec<f32> = (0..float_values.len())
124 .map(|i| float_values.value(i))
125 .collect();
126 Ok(serde_json::json!(vec))
127 }
128 DataType::CypherValue => {
129 let bytes = col
130 .as_any()
131 .downcast_ref::<LargeBinaryArray>()
132 .ok_or_else(|| anyhow!("Invalid large binary col for CypherValue"))?
133 .value(row);
134 if bytes.is_empty() {
135 return Ok(Value::Null);
136 }
137 let uni_val = uni_common::cypher_value_codec::decode(bytes)
138 .map_err(|e| anyhow!("CypherValue decode error: {}", e))?;
139 Ok(uni_val.into())
141 }
142 DataType::Crdt(_) => {
143 let bytes = col
144 .as_any()
145 .downcast_ref::<BinaryArray>()
146 .ok_or_else(|| anyhow!("Invalid binary col for CRDT"))?
147 .value(row);
148
149 match crdt_mode {
150 CrdtDecodeMode::Strict => {
151 let crdt = Crdt::from_msgpack(bytes)
152 .map_err(|e| anyhow!("CRDT decode error: {}", e))?;
153 Ok(serde_json::to_value(crdt)?)
154 }
155 CrdtDecodeMode::Lenient => {
156 let crdt = Crdt::from_msgpack(bytes).unwrap_or_else(|e| {
157 log::warn!("Failed to deserialize CRDT: {}", e);
158 Crdt::GCounter(uni_crdt::GCounter::new())
159 });
160 Ok(serde_json::to_value(crdt).unwrap_or(Value::Null))
161 }
162 }
163 }
164 DataType::List(inner) => {
165 let list_arr = col
166 .as_any()
167 .downcast_ref::<ListArray>()
168 .ok_or_else(|| anyhow!("Invalid list col"))?;
169 if list_arr.is_null(row) {
170 return Ok(Value::Null);
171 }
172 let values = list_arr.value(row);
173 let mut vec = Vec::with_capacity(values.len());
174 for i in 0..values.len() {
175 vec.push(value_from_column_inner(
176 values.as_ref(),
177 inner,
178 i,
179 crdt_mode,
180 depth + 1,
181 )?);
182 }
183 Ok(Value::Array(vec))
184 }
185 DataType::Map(key_type, value_type) => {
186 let list_arr = col
187 .as_any()
188 .downcast_ref::<ListArray>()
189 .ok_or_else(|| anyhow!("Invalid map (list) col"))?;
190 if list_arr.is_null(row) {
191 return Ok(Value::Null);
192 }
193 let struct_arr = list_arr.value(row);
194 let struct_arr_ref = struct_arr
195 .as_any()
196 .downcast_ref::<StructArray>()
197 .ok_or_else(|| anyhow!("Invalid struct array inner for map"))?;
198
199 let keys = struct_arr_ref.column(0);
200 let values = struct_arr_ref.column(1);
201
202 let mut map = serde_json::Map::with_capacity(struct_arr_ref.len());
203
204 for i in 0..struct_arr_ref.len() {
205 let k_val =
206 value_from_column_inner(keys.as_ref(), key_type, i, crdt_mode, depth + 1)?;
207 let v_val =
208 value_from_column_inner(values.as_ref(), value_type, i, crdt_mode, depth + 1)?;
209
210 if let Some(k_str) = k_val.as_str() {
212 map.insert(k_str.to_string(), v_val);
213 } else if let Some(k_int) = k_val.as_i64() {
214 map.insert(k_int.to_string(), v_val);
215 } else {
216 map.insert(k_val.to_string(), v_val);
217 }
218 }
219 Ok(Value::Object(map))
220 }
221 DataType::Date => {
222 let arr = col
223 .as_any()
224 .downcast_ref::<Date32Array>()
225 .ok_or_else(|| anyhow!("Invalid date32 col"))?;
226 if arr.is_null(row) {
227 return Ok(Value::Null);
228 }
229 let days = arr.value(row);
230 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
231 if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) {
232 Ok(Value::String(date.format("%Y-%m-%d").to_string()))
233 } else {
234 Ok(Value::Null)
235 }
236 }
237 DataType::Time => {
238 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
240 && let (Some(nanos_col), Some(offset_col)) = (
241 struct_arr.column_by_name("nanos_since_midnight"),
242 struct_arr.column_by_name("offset_seconds"),
243 )
244 && let (Some(nanos_arr), Some(offset_arr)) = (
245 nanos_col.as_any().downcast_ref::<Time64NanosecondArray>(),
246 offset_col.as_any().downcast_ref::<Int32Array>(),
247 )
248 {
249 if nanos_arr.is_null(row) {
250 return Ok(Value::Null);
251 }
252 let tv = if offset_arr.is_null(row) {
253 TemporalValue::LocalTime {
254 nanos_since_midnight: nanos_arr.value(row),
255 }
256 } else {
257 TemporalValue::Time {
258 nanos_since_midnight: nanos_arr.value(row),
259 offset_seconds: offset_arr.value(row),
260 }
261 };
262 return Ok(Value::String(tv.to_string()));
263 }
264
265 let arr = col
267 .as_any()
268 .downcast_ref::<Time64NanosecondArray>()
269 .ok_or_else(|| anyhow!("Invalid time64 col"))?;
270 if arr.is_null(row) {
271 return Ok(Value::Null);
272 }
273 let tv = TemporalValue::Time {
274 nanos_since_midnight: arr.value(row),
275 offset_seconds: 0,
276 };
277 Ok(Value::String(tv.to_string()))
278 }
279 DataType::Duration => {
280 let arr = col
282 .as_any()
283 .downcast_ref::<LargeBinaryArray>()
284 .ok_or_else(|| anyhow!("Invalid duration col (expected LargeBinary)"))?;
285 if arr.is_null(row) {
286 return Ok(Value::Null);
287 }
288 let bytes = arr.value(row);
289 let uni_val = uni_common::cypher_value_codec::decode(bytes)
290 .map_err(|e| anyhow!("Failed to decode duration: {}", e))?;
291 if let uni_common::Value::Temporal(uni_common::TemporalValue::Duration {
293 months,
294 days,
295 nanos,
296 }) = &uni_val
297 {
298 let tv = TemporalValue::Duration {
299 months: *months,
300 days: *days,
301 nanos: *nanos,
302 };
303 Ok(Value::String(tv.to_string()))
304 } else {
305 Ok(serde_json::json!(uni_val.to_string()))
306 }
307 }
308 DataType::DateTime | DataType::Timestamp => {
309 if let Some(struct_arr) = col.as_any().downcast_ref::<StructArray>()
311 && let (Some(nanos_col), Some(offset_col), Some(tz_col)) = (
312 struct_arr.column_by_name("nanos_since_epoch"),
313 struct_arr.column_by_name("offset_seconds"),
314 struct_arr.column_by_name("timezone_name"),
315 )
316 && let (Some(nanos_arr), Some(offset_arr), Some(tz_arr)) = (
317 nanos_col
318 .as_any()
319 .downcast_ref::<TimestampNanosecondArray>(),
320 offset_col.as_any().downcast_ref::<Int32Array>(),
321 tz_col.as_any().downcast_ref::<StringArray>(),
322 )
323 {
324 if nanos_arr.is_null(row) {
325 return Ok(Value::Null);
326 }
327 let tv = if offset_arr.is_null(row) {
328 TemporalValue::LocalDateTime {
329 nanos_since_epoch: nanos_arr.value(row),
330 }
331 } else {
332 let timezone_name =
333 (!tz_arr.is_null(row)).then(|| tz_arr.value(row).to_string());
334 TemporalValue::DateTime {
335 nanos_since_epoch: nanos_arr.value(row),
336 offset_seconds: offset_arr.value(row),
337 timezone_name,
338 }
339 };
340 return Ok(Value::String(tv.to_string()));
341 }
342
343 let arr = col
345 .as_any()
346 .downcast_ref::<TimestampNanosecondArray>()
347 .ok_or_else(|| anyhow!("Invalid timestamp col"))?;
348 if arr.is_null(row) {
349 return Ok(Value::Null);
350 }
351 let tv = TemporalValue::DateTime {
352 nanos_since_epoch: arr.value(row),
353 offset_seconds: 0,
354 timezone_name: arr.timezone().map(|s| s.to_string()),
355 };
356 Ok(Value::String(tv.to_string()))
357 }
358 _ => Ok(Value::Null),
359 }
360}
361
362pub fn decode_column_value(
368 col: &dyn Array,
369 data_type: &DataType,
370 row: usize,
371 crdt_mode: CrdtDecodeMode,
372) -> anyhow::Result<uni_common::Value> {
373 match data_type {
374 DataType::DateTime
375 | DataType::Timestamp
376 | DataType::Date
377 | DataType::Time
378 | DataType::Btic => Ok(super::arrow_convert::arrow_to_value(
379 col,
380 row,
381 Some(data_type),
382 )),
383 _ => value_from_column(col, data_type, row, crdt_mode).map(uni_common::Value::from),
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use arrow_array::builder::{Int64Builder, StringBuilder};
391
392 #[test]
393 fn test_decode_string() {
394 let mut builder = StringBuilder::new();
395 builder.append_value("hello");
396 builder.append_value("world");
397 let array = builder.finish();
398
399 let val = value_from_column(&array, &DataType::String, 0, CrdtDecodeMode::Strict).unwrap();
400 assert_eq!(val, Value::String("hello".to_string()));
401
402 let val = value_from_column(&array, &DataType::String, 1, CrdtDecodeMode::Strict).unwrap();
403 assert_eq!(val, Value::String("world".to_string()));
404 }
405
406 #[test]
407 fn test_decode_int64() {
408 let mut builder = Int64Builder::new();
409 builder.append_value(42);
410 builder.append_value(-100);
411 let array = builder.finish();
412
413 let val = value_from_column(&array, &DataType::Int64, 0, CrdtDecodeMode::Strict).unwrap();
414 assert_eq!(val, serde_json::json!(42));
415
416 let val = value_from_column(&array, &DataType::Int64, 1, CrdtDecodeMode::Strict).unwrap();
417 assert_eq!(val, serde_json::json!(-100));
418 }
419
420 #[test]
421 fn test_decode_json() {
422 use arrow_array::builder::LargeBinaryBuilder;
423
424 let mut builder = LargeBinaryBuilder::new();
426
427 let obj_cv = {
428 let val: uni_common::Value = serde_json::json!({"key": "value"}).into();
429 uni_common::cypher_value_codec::encode(&val)
430 };
431 builder.append_value(&obj_cv);
432
433 let null_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::Null);
434 builder.append_value(&null_cv);
435
436 let text_cv = uni_common::cypher_value_codec::encode(&uni_common::Value::String(
437 "plain text".to_string(),
438 ));
439 builder.append_value(&text_cv);
440
441 let array = builder.finish();
442
443 let val =
444 value_from_column(&array, &DataType::CypherValue, 0, CrdtDecodeMode::Strict).unwrap();
445 assert_eq!(val, serde_json::json!({"key": "value"}));
446
447 let val =
448 value_from_column(&array, &DataType::CypherValue, 1, CrdtDecodeMode::Strict).unwrap();
449 assert_eq!(val, Value::Null);
450
451 let val =
452 value_from_column(&array, &DataType::CypherValue, 2, CrdtDecodeMode::Strict).unwrap();
453 assert_eq!(val, Value::String("plain text".to_string()));
454 }
455
456 #[test]
457 fn test_decode_bool() {
458 use arrow_array::builder::BooleanBuilder;
459 let mut builder = BooleanBuilder::new();
460 builder.append_value(true);
461 builder.append_value(false);
462 let array = builder.finish();
463
464 let val = value_from_column(&array, &DataType::Bool, 0, CrdtDecodeMode::Strict).unwrap();
465 assert_eq!(val, serde_json::json!(true));
466
467 let val = value_from_column(&array, &DataType::Bool, 1, CrdtDecodeMode::Strict).unwrap();
468 assert_eq!(val, serde_json::json!(false));
469 }
470
471 #[test]
472 fn test_decode_float64() {
473 use arrow_array::builder::Float64Builder;
474 let mut builder = Float64Builder::new();
475 builder.append_value(3.25);
476 builder.append_value(-0.5);
477 let array = builder.finish();
478
479 let val = value_from_column(&array, &DataType::Float64, 0, CrdtDecodeMode::Strict).unwrap();
480 assert_eq!(val, serde_json::json!(3.25));
481
482 let val = value_from_column(&array, &DataType::Float64, 1, CrdtDecodeMode::Strict).unwrap();
483 assert_eq!(val, serde_json::json!(-0.5));
484 }
485
486 #[test]
487 fn test_decode_int32() {
488 use arrow_array::builder::Int32Builder;
489 let mut builder = Int32Builder::new();
490 builder.append_value(42);
491 builder.append_value(-1);
492 let array = builder.finish();
493
494 let val = value_from_column(&array, &DataType::Int32, 0, CrdtDecodeMode::Strict).unwrap();
495 assert_eq!(val, serde_json::json!(42));
496
497 let val = value_from_column(&array, &DataType::Int32, 1, CrdtDecodeMode::Strict).unwrap();
498 assert_eq!(val, serde_json::json!(-1));
499 }
500
501 #[test]
502 fn test_decode_float32() {
503 use arrow_array::builder::Float32Builder;
504 let mut builder = Float32Builder::new();
505 builder.append_value(1.5);
506 let array = builder.finish();
507
508 let val = value_from_column(&array, &DataType::Float32, 0, CrdtDecodeMode::Strict).unwrap();
509 let f = val.as_f64().unwrap();
511 assert!((f - 1.5).abs() < 0.001);
512 }
513
514 #[test]
515 fn test_decode_vector() {
516 use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
517 let values_builder = Float32Builder::new();
518 let mut builder = FixedSizeListBuilder::new(values_builder, 3);
519 builder.values().append_value(1.0);
520 builder.values().append_value(2.0);
521 builder.values().append_value(3.0);
522 builder.append(true);
523 let array = builder.finish();
524
525 let val = value_from_column(
526 &array,
527 &DataType::Vector { dimensions: 3 },
528 0,
529 CrdtDecodeMode::Strict,
530 )
531 .unwrap();
532 assert_eq!(val, serde_json::json!([1.0, 2.0, 3.0]));
533 }
534
535 #[test]
536 fn test_decode_date() {
537 use arrow_array::builder::Date32Builder;
538 let mut builder = Date32Builder::new();
539 builder.append_value(18628);
541 let array = builder.finish();
542
543 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
544 assert_eq!(val, Value::String("2021-01-01".to_string()));
545 }
546
547 #[test]
548 fn test_decode_date_null() {
549 use arrow_array::builder::Date32Builder;
550 let mut builder = Date32Builder::new();
551 builder.append_null();
552 let array = builder.finish();
553
554 let val = value_from_column(&array, &DataType::Date, 0, CrdtDecodeMode::Strict).unwrap();
555 assert_eq!(val, Value::Null);
556 }
557
558 #[test]
559 fn test_decode_list_of_strings() {
560 use arrow_array::builder::{ListBuilder, StringBuilder};
561 let values_builder = StringBuilder::new();
562 let mut builder = ListBuilder::new(values_builder);
563 builder.values().append_value("a");
564 builder.values().append_value("b");
565 builder.values().append_value("c");
566 builder.append(true);
567 let array = builder.finish();
568
569 let val = value_from_column(
570 &array,
571 &DataType::List(Box::new(DataType::String)),
572 0,
573 CrdtDecodeMode::Strict,
574 )
575 .unwrap();
576 assert_eq!(val, serde_json::json!(["a", "b", "c"]));
577 }
578
579 #[test]
580 fn test_decode_list_of_ints() {
581 use arrow_array::builder::{Int64Builder, ListBuilder};
582 let values_builder = Int64Builder::new();
583 let mut builder = ListBuilder::new(values_builder);
584 builder.values().append_value(1);
585 builder.values().append_value(2);
586 builder.values().append_value(3);
587 builder.append(true);
588 let array = builder.finish();
589
590 let val = value_from_column(
591 &array,
592 &DataType::List(Box::new(DataType::Int64)),
593 0,
594 CrdtDecodeMode::Strict,
595 )
596 .unwrap();
597 assert_eq!(val, serde_json::json!([1, 2, 3]));
598 }
599
600 #[test]
601 fn test_decode_list_null() {
602 use arrow_array::builder::{Int64Builder, ListBuilder};
603 let values_builder = Int64Builder::new();
604 let mut builder = ListBuilder::new(values_builder);
605 builder.append_null();
606 let array = builder.finish();
607
608 let val = value_from_column(
609 &array,
610 &DataType::List(Box::new(DataType::Int64)),
611 0,
612 CrdtDecodeMode::Strict,
613 )
614 .unwrap();
615 assert_eq!(val, Value::Null);
616 }
617
618 #[test]
619 fn test_decode_unknown_type_returns_null() {
620 let mut builder = StringBuilder::new();
622 builder.append_value("test");
623 let array = builder.finish();
624
625 let val = value_from_column(
626 &array,
627 &DataType::Point(uni_common::core::schema::PointType::Geographic),
628 0,
629 CrdtDecodeMode::Strict,
630 );
631 assert_eq!(val.unwrap(), Value::Null);
633 }
634}