1use std::sync::Arc;
16
17use super::{Interval, NumberValue, Value};
18use crate::error::{ConvertError, Error};
19use crate::value::geo::convert_geometry;
20use arrow_array::{
21 Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Decimal256Array,
22 Decimal64Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
23 LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
24 StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array,
25 UInt8Array,
26};
27use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
28use databend_client::schema::{
29 DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
30 ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
31 ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
32 EXTENSION_KEY,
33};
34use databend_client::ResultFormatSettings;
35use ethnum::i256;
36use jiff::{tz, Timestamp};
37use jsonb::RawJsonb;
38
39#[allow(non_camel_case_types)]
41#[repr(C)]
42struct months_days_micros(pub i128);
43
44const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
46const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
48
49const TIMESTAMP_MAX: i64 = 253402207200000000;
52const TIMESTAMP_MIN: i64 = -377705023201000000;
54
55fn clamp_ts(ts: i64) -> i64 {
57 ts.clamp(TIMESTAMP_MIN, TIMESTAMP_MAX)
58}
59
60impl months_days_micros {
61 #[inline]
62 pub fn months(&self) -> i32 {
63 ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
64 }
65
66 #[inline]
67 pub fn days(&self) -> i32 {
68 ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
69 }
70
71 #[inline]
72 pub fn microseconds(&self) -> i64 {
73 (self.0 & MICROS_MASK) as i64
74 }
75}
76
77impl
78 TryFrom<(
79 &ArrowField,
80 &Arc<dyn ArrowArray>,
81 usize,
82 &ResultFormatSettings,
83 )> for Value
84{
85 type Error = Error;
86 fn try_from(
87 (field, array, seq, settings): (
88 &ArrowField,
89 &Arc<dyn ArrowArray>,
90 usize,
91 &ResultFormatSettings,
92 ),
93 ) -> std::result::Result<Self, Self::Error> {
94 if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
95 return match extend_type.as_str() {
96 ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
97 ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
98 ARROW_EXT_TYPE_VARIANT => {
99 if field.is_nullable() && array.is_null(seq) {
100 return Ok(Value::Null);
101 }
102 match array.as_any().downcast_ref::<LargeBinaryArray>() {
103 Some(array) => {
104 Ok(Value::Variant(RawJsonb::new(array.value(seq)).to_string()))
105 }
106 None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
107 }
108 }
109 ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
110 if field.is_nullable() && array.is_null(seq) {
111 return Ok(Value::Null);
112 }
113 match array.as_any().downcast_ref::<Decimal128Array>() {
114 Some(array) => {
115 let v = array.value(seq);
116 let unix_ts = clamp_ts(v as u64 as i64);
117 let offset = (v >> 64) as i32;
118 let offset = tz::Offset::from_seconds(offset).map_err(|e| {
119 Error::Parsing(format!("invalid offset: {offset}, {e}"))
120 })?;
121 let time_zone = tz::TimeZone::fixed(offset);
122 let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
123 Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
124 })?;
125 Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
126 }
127 None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
128 }
129 }
130 ARROW_EXT_TYPE_INTERVAL => {
131 if field.is_nullable() && array.is_null(seq) {
132 return Ok(Value::Null);
133 }
134 match array.as_any().downcast_ref::<Decimal128Array>() {
135 Some(array) => {
136 let res = months_days_micros(array.value(seq));
137 Ok(Value::Interval(
138 Interval {
139 months: res.months(),
140 days: res.days(),
141 micros: res.microseconds(),
142 }
143 .to_string(),
144 ))
145 }
146 None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
147 }
148 }
149 ARROW_EXT_TYPE_BITMAP => {
150 if field.is_nullable() && array.is_null(seq) {
151 return Ok(Value::Null);
152 }
153 match array.as_any().downcast_ref::<LargeBinaryArray>() {
154 Some(array) => {
155 let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
156 .expect("failed to deserialize bitmap");
157 let raw = rb.into_iter().collect::<Vec<_>>();
158 let s = itertools::join(raw.iter(), ",");
159 Ok(Value::Bitmap(s))
160 }
161 None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
162 }
163 }
164 ARROW_EXT_TYPE_GEOMETRY => {
165 if field.is_nullable() && array.is_null(seq) {
166 return Ok(Value::Null);
167 }
168 match array.as_any().downcast_ref::<LargeBinaryArray>() {
169 Some(array) => {
170 let value = convert_geometry(
171 array.value(seq),
172 settings.geometry_output_format,
173 )?;
174 Ok(Value::Geometry(value))
175 }
176 None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
177 }
178 }
179 ARROW_EXT_TYPE_GEOGRAPHY => {
180 if field.is_nullable() && array.is_null(seq) {
181 return Ok(Value::Null);
182 }
183 match array.as_any().downcast_ref::<LargeBinaryArray>() {
184 Some(array) => {
185 let value = convert_geometry(
186 array.value(seq),
187 settings.geometry_output_format,
188 )?;
189 Ok(Value::Geography(value))
190 }
191 None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
192 }
193 }
194 ARROW_EXT_TYPE_VECTOR => {
195 if field.is_nullable() && array.is_null(seq) {
196 return Ok(Value::Null);
197 }
198 match field.data_type() {
199 ArrowDataType::FixedSizeList(_, dimension) => {
200 match array
201 .as_any()
202 .downcast_ref::<arrow_array::FixedSizeListArray>()
203 {
204 Some(inner_array) => {
205 match inner_array
206 .value(seq)
207 .as_any()
208 .downcast_ref::<Float32Array>()
209 {
210 Some(inner_array) => {
211 let dimension = *dimension as usize;
212 let mut values = Vec::with_capacity(dimension);
213 for i in 0..dimension {
214 let value = inner_array.value(i);
215 values.push(value);
216 }
217 Ok(Value::Vector(values))
218 }
219 None => Err(ConvertError::new(
220 "vector float32",
221 format!("{inner_array:?}"),
222 )
223 .into()),
224 }
225 }
226 None => {
227 Err(ConvertError::new("vector", format!("{array:?}")).into())
228 }
229 }
230 }
231 arrow_type => Err(ConvertError::new(
232 "vector",
233 format!("Unsupported Arrow type: {arrow_type:?}"),
234 )
235 .into()),
236 }
237 }
238 _ => Err(ConvertError::new(
239 "extension",
240 format!("Unsupported extension datatype for arrow field: {field:?}"),
241 )
242 .into()),
243 };
244 }
245
246 if field.is_nullable() && array.is_null(seq) {
247 return Ok(Value::Null);
248 }
249 match field.data_type() {
250 ArrowDataType::Null => Ok(Value::Null),
251 ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
252 Some(array) => Ok(Value::Boolean(array.value(seq))),
253 None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
254 },
255 ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
256 Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
257 None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
258 },
259 ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
260 Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
261 None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
262 },
263 ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
264 Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
265 None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
266 },
267 ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
268 Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
269 None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
270 },
271 ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
272 Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
273 None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
274 },
275 ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
276 Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
277 None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
278 },
279 ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
280 Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
281 None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
282 },
283 ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
284 Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
285 None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
286 },
287 ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
288 Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
289 None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
290 },
291 ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
292 Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
293 None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
294 },
295 ArrowDataType::Decimal64(p, s) => {
296 match array.as_any().downcast_ref::<Decimal64Array>() {
297 Some(array) => Ok(Value::Number(NumberValue::Decimal64(
298 array.value(seq),
299 DecimalSize {
300 precision: *p,
301 scale: *s as u8,
302 },
303 ))),
304 None => Err(ConvertError::new("Decimal64", format!("{array:?}")).into()),
305 }
306 }
307 ArrowDataType::Decimal128(p, s) => {
308 match array.as_any().downcast_ref::<Decimal128Array>() {
309 Some(array) => Ok(Value::Number(NumberValue::Decimal128(
310 array.value(seq),
311 DecimalSize {
312 precision: *p,
313 scale: *s as u8,
314 },
315 ))),
316 None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
317 }
318 }
319 ArrowDataType::Decimal256(p, s) => {
320 match array.as_any().downcast_ref::<Decimal256Array>() {
321 Some(array) => {
322 let v = array.value(seq);
323 let v = i256::from_le_bytes(v.to_le_bytes());
324 Ok(Value::Number(NumberValue::Decimal256(
325 v,
326 DecimalSize {
327 precision: *p,
328 scale: *s as u8,
329 },
330 )))
331 }
332 None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
333 }
334 }
335
336 ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
337 Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
338 None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
339 },
340 ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
341 match array.as_any().downcast_ref::<LargeBinaryArray>() {
342 Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
343 None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
344 }
345 }
346 ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
347 Some(array) => Ok(Value::String(array.value(seq).to_string())),
348 None => Err(ConvertError::new("string", format!("{array:?}")).into()),
349 },
350 ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
351 Some(array) => Ok(Value::String(array.value(seq).to_string())),
352 None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
353 },
354 ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
355 Some(array) => Ok(Value::String(array.value(seq).to_string())),
356 None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
357 },
358 ArrowDataType::Timestamp(unit, tz) => {
360 match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
361 Some(array) => {
362 if unit != &TimeUnit::Microsecond {
363 return Err(ConvertError::new("timestamp", format!("{array:?}"))
364 .with_message(format!(
365 "unsupported timestamp unit: {unit:?}, only support microsecond"
366 ))
367 .into());
368 }
369 let ts = clamp_ts(array.value(seq));
370 match tz {
371 None => {
372 let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
373 Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
374 })?;
375 let dt = timestamp.to_zoned(settings.timezone.clone());
376 Ok(Value::Timestamp(dt))
377 }
378 Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
379 .with_message(format!("non-UTC timezone not supported: {tz:?}"))
380 .into()),
381 }
382 }
383 None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
384 }
385 }
386 ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
387 Some(array) => Ok(Value::Date(array.value(seq))),
388 None => Err(ConvertError::new("date", format!("{array:?}")).into()),
389 },
390 ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
391 Some(array) => {
392 let inner_array = unsafe { array.value_unchecked(seq) };
393 let mut values = Vec::with_capacity(inner_array.len());
394 for i in 0..inner_array.len() {
395 let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
396 values.push(value);
397 }
398 Ok(Value::Array(values))
399 }
400 None => Err(ConvertError::new("list", format!("{array:?}")).into()),
401 },
402 ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
403 Some(array) => {
404 let inner_array = unsafe { array.value_unchecked(seq) };
405 let mut values = Vec::with_capacity(inner_array.len());
406 for i in 0..inner_array.len() {
407 let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
408 values.push(value);
409 }
410 Ok(Value::Array(values))
411 }
412 None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
413 },
414 ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
415 Some(array) => {
416 if let ArrowDataType::Struct(fs) = f.data_type() {
417 let inner_array = unsafe { array.value_unchecked(seq) };
418 let mut values = Vec::with_capacity(inner_array.len());
419 for i in 0..inner_array.len() {
420 let key = Value::try_from((
421 fs[0].as_ref(),
422 inner_array.column(0),
423 i,
424 settings,
425 ))?;
426 let val = Value::try_from((
427 fs[1].as_ref(),
428 inner_array.column(1),
429 i,
430 settings,
431 ))?;
432 values.push((key, val));
433 }
434 Ok(Value::Map(values))
435 } else {
436 Err(
437 ConvertError::new("invalid map inner type", format!("{array:?}"))
438 .into(),
439 )
440 }
441 }
442 None => Err(ConvertError::new("map", format!("{array:?}")).into()),
443 },
444 ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
445 Some(array) => {
446 let mut values = Vec::with_capacity(array.len());
447 for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
448 let value = Value::try_from((f.as_ref(), inner_array, seq, settings))?;
449 values.push(value);
450 }
451 Ok(Value::Tuple(values))
452 }
453 None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
454 },
455 _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
456 }
457 }
458}