1use std::sync::Arc;
16
17use super::{Interval, NumberValue, Value};
18use crate::error::{ConvertError, Error};
19use crate::value::geo::convert_geometry;
20use arrow_array::{
21 Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Decimal256Array,
22 Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
23 LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StringViewArray,
24 StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
25};
26use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
27use databend_client::schema::{
28 DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
29 ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
30 ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
31 EXTENSION_KEY,
32};
33use databend_client::ResultFormatSettings;
34use ethnum::i256;
35use jiff::{tz, Timestamp};
36use jsonb::RawJsonb;
37
38#[allow(non_camel_case_types)]
40#[repr(C)]
41struct months_days_micros(pub i128);
42
43const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
45const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
47
48const TIMESTAMP_MAX: i64 = 253402207200000000;
51const TIMESTAMP_MIN: i64 = -377705023201000000;
53
54fn clamp_ts(ts: i64) -> i64 {
56 ts.clamp(TIMESTAMP_MIN, TIMESTAMP_MAX)
57}
58
59impl months_days_micros {
60 #[inline]
61 pub fn months(&self) -> i32 {
62 ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
63 }
64
65 #[inline]
66 pub fn days(&self) -> i32 {
67 ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
68 }
69
70 #[inline]
71 pub fn microseconds(&self) -> i64 {
72 (self.0 & MICROS_MASK) as i64
73 }
74}
75
76impl
77 TryFrom<(
78 &ArrowField,
79 &Arc<dyn ArrowArray>,
80 usize,
81 &ResultFormatSettings,
82 )> for Value
83{
84 type Error = Error;
85 fn try_from(
86 (field, array, seq, settings): (
87 &ArrowField,
88 &Arc<dyn ArrowArray>,
89 usize,
90 &ResultFormatSettings,
91 ),
92 ) -> std::result::Result<Self, Self::Error> {
93 if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
94 return match extend_type.as_str() {
95 ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
96 ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
97 ARROW_EXT_TYPE_VARIANT => {
98 if field.is_nullable() && array.is_null(seq) {
99 return Ok(Value::Null);
100 }
101 match array.as_any().downcast_ref::<LargeBinaryArray>() {
102 Some(array) => {
103 Ok(Value::Variant(RawJsonb::new(array.value(seq)).to_string()))
104 }
105 None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
106 }
107 }
108 ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
109 if field.is_nullable() && array.is_null(seq) {
110 return Ok(Value::Null);
111 }
112 match array.as_any().downcast_ref::<Decimal128Array>() {
113 Some(array) => {
114 let v = array.value(seq);
115 let unix_ts = clamp_ts(v as u64 as i64);
116 let offset = (v >> 64) as i32;
117 let offset = tz::Offset::from_seconds(offset).map_err(|e| {
118 Error::Parsing(format!("invalid offset: {offset}, {e}"))
119 })?;
120 let time_zone = tz::TimeZone::fixed(offset);
121 let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
122 Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
123 })?;
124 Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
125 }
126 None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
127 }
128 }
129 ARROW_EXT_TYPE_INTERVAL => {
130 if field.is_nullable() && array.is_null(seq) {
131 return Ok(Value::Null);
132 }
133 match array.as_any().downcast_ref::<Decimal128Array>() {
134 Some(array) => {
135 let res = months_days_micros(array.value(seq));
136 Ok(Value::Interval(
137 Interval {
138 months: res.months(),
139 days: res.days(),
140 micros: res.microseconds(),
141 }
142 .to_string(),
143 ))
144 }
145 None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
146 }
147 }
148 ARROW_EXT_TYPE_BITMAP => {
149 if field.is_nullable() && array.is_null(seq) {
150 return Ok(Value::Null);
151 }
152 match array.as_any().downcast_ref::<LargeBinaryArray>() {
153 Some(array) => {
154 let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
155 .expect("failed to deserialize bitmap");
156 let raw = rb.into_iter().collect::<Vec<_>>();
157 let s = itertools::join(raw.iter(), ",");
158 Ok(Value::Bitmap(s))
159 }
160 None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
161 }
162 }
163 ARROW_EXT_TYPE_GEOMETRY => {
164 if field.is_nullable() && array.is_null(seq) {
165 return Ok(Value::Null);
166 }
167 match array.as_any().downcast_ref::<LargeBinaryArray>() {
168 Some(array) => {
169 let value = convert_geometry(
170 array.value(seq),
171 settings.geometry_output_format,
172 )?;
173 Ok(Value::Geometry(value))
174 }
175 None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
176 }
177 }
178 ARROW_EXT_TYPE_GEOGRAPHY => {
179 if field.is_nullable() && array.is_null(seq) {
180 return Ok(Value::Null);
181 }
182 match array.as_any().downcast_ref::<LargeBinaryArray>() {
183 Some(array) => {
184 let value = convert_geometry(
185 array.value(seq),
186 settings.geometry_output_format,
187 )?;
188 Ok(Value::Geography(value))
189 }
190 None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
191 }
192 }
193 ARROW_EXT_TYPE_VECTOR => {
194 if field.is_nullable() && array.is_null(seq) {
195 return Ok(Value::Null);
196 }
197 match field.data_type() {
198 ArrowDataType::FixedSizeList(_, dimension) => {
199 match array
200 .as_any()
201 .downcast_ref::<arrow_array::FixedSizeListArray>()
202 {
203 Some(inner_array) => {
204 match inner_array
205 .value(seq)
206 .as_any()
207 .downcast_ref::<Float32Array>()
208 {
209 Some(inner_array) => {
210 let dimension = *dimension as usize;
211 let mut values = Vec::with_capacity(dimension);
212 for i in 0..dimension {
213 let value = inner_array.value(i);
214 values.push(value);
215 }
216 Ok(Value::Vector(values))
217 }
218 None => Err(ConvertError::new(
219 "vector float32",
220 format!("{inner_array:?}"),
221 )
222 .into()),
223 }
224 }
225 None => {
226 Err(ConvertError::new("vector", format!("{array:?}")).into())
227 }
228 }
229 }
230 arrow_type => Err(ConvertError::new(
231 "vector",
232 format!("Unsupported Arrow type: {arrow_type:?}"),
233 )
234 .into()),
235 }
236 }
237 _ => Err(ConvertError::new(
238 "extension",
239 format!("Unsupported extension datatype for arrow field: {field:?}"),
240 )
241 .into()),
242 };
243 }
244
245 if field.is_nullable() && array.is_null(seq) {
246 return Ok(Value::Null);
247 }
248 match field.data_type() {
249 ArrowDataType::Null => Ok(Value::Null),
250 ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
251 Some(array) => Ok(Value::Boolean(array.value(seq))),
252 None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
253 },
254 ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
255 Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
256 None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
257 },
258 ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
259 Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
260 None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
261 },
262 ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
263 Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
264 None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
265 },
266 ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
267 Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
268 None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
269 },
270 ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
271 Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
272 None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
273 },
274 ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
275 Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
276 None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
277 },
278 ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
279 Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
280 None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
281 },
282 ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
283 Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
284 None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
285 },
286 ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
287 Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
288 None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
289 },
290 ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
291 Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
292 None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
293 },
294
295 ArrowDataType::Decimal128(p, s) => {
296 match array.as_any().downcast_ref::<Decimal128Array>() {
297 Some(array) => Ok(Value::Number(NumberValue::Decimal128(
298 array.value(seq),
299 DecimalSize {
300 precision: *p,
301 scale: *s as u8,
302 },
303 ))),
304 None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
305 }
306 }
307 ArrowDataType::Decimal256(p, s) => {
308 match array.as_any().downcast_ref::<Decimal256Array>() {
309 Some(array) => {
310 let v = array.value(seq);
311 let v = i256::from_le_bytes(v.to_le_bytes());
312 Ok(Value::Number(NumberValue::Decimal256(
313 v,
314 DecimalSize {
315 precision: *p,
316 scale: *s as u8,
317 },
318 )))
319 }
320 None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
321 }
322 }
323
324 ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
325 Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
326 None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
327 },
328 ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
329 match array.as_any().downcast_ref::<LargeBinaryArray>() {
330 Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
331 None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
332 }
333 }
334 ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
335 Some(array) => Ok(Value::String(array.value(seq).to_string())),
336 None => Err(ConvertError::new("string", format!("{array:?}")).into()),
337 },
338 ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
339 Some(array) => Ok(Value::String(array.value(seq).to_string())),
340 None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
341 },
342 ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
343 Some(array) => Ok(Value::String(array.value(seq).to_string())),
344 None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
345 },
346 ArrowDataType::Timestamp(unit, tz) => {
348 match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
349 Some(array) => {
350 if unit != &TimeUnit::Microsecond {
351 return Err(ConvertError::new("timestamp", format!("{array:?}"))
352 .with_message(format!(
353 "unsupported timestamp unit: {unit:?}, only support microsecond"
354 ))
355 .into());
356 }
357 let ts = clamp_ts(array.value(seq));
358 match tz {
359 None => {
360 let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
361 Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
362 })?;
363 let dt = timestamp.to_zoned(settings.timezone.clone());
364 Ok(Value::Timestamp(dt))
365 }
366 Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
367 .with_message(format!("non-UTC timezone not supported: {tz:?}"))
368 .into()),
369 }
370 }
371 None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
372 }
373 }
374 ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
375 Some(array) => Ok(Value::Date(array.value(seq))),
376 None => Err(ConvertError::new("date", format!("{array:?}")).into()),
377 },
378 ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
379 Some(array) => {
380 let inner_array = unsafe { array.value_unchecked(seq) };
381 let mut values = Vec::with_capacity(inner_array.len());
382 for i in 0..inner_array.len() {
383 let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
384 values.push(value);
385 }
386 Ok(Value::Array(values))
387 }
388 None => Err(ConvertError::new("list", format!("{array:?}")).into()),
389 },
390 ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
391 Some(array) => {
392 let inner_array = unsafe { array.value_unchecked(seq) };
393 let mut values = Vec::with_capacity(inner_array.len());
394 for i in 0..inner_array.len() {
395 let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
396 values.push(value);
397 }
398 Ok(Value::Array(values))
399 }
400 None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
401 },
402 ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
403 Some(array) => {
404 if let ArrowDataType::Struct(fs) = f.data_type() {
405 let inner_array = unsafe { array.value_unchecked(seq) };
406 let mut values = Vec::with_capacity(inner_array.len());
407 for i in 0..inner_array.len() {
408 let key = Value::try_from((
409 fs[0].as_ref(),
410 inner_array.column(0),
411 i,
412 settings,
413 ))?;
414 let val = Value::try_from((
415 fs[1].as_ref(),
416 inner_array.column(1),
417 i,
418 settings,
419 ))?;
420 values.push((key, val));
421 }
422 Ok(Value::Map(values))
423 } else {
424 Err(
425 ConvertError::new("invalid map inner type", format!("{array:?}"))
426 .into(),
427 )
428 }
429 }
430 None => Err(ConvertError::new("map", format!("{array:?}")).into()),
431 },
432 ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
433 Some(array) => {
434 let mut values = Vec::with_capacity(array.len());
435 for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
436 let value = Value::try_from((f.as_ref(), inner_array, seq, settings))?;
437 values.push(value);
438 }
439 Ok(Value::Tuple(values))
440 }
441 None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
442 },
443 _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
444 }
445 }
446}