1use std::sync::Arc;
21
22use arrow_array::cast::{AsArray, as_dictionary_array};
23use arrow_array::types::{
24 Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type, DurationMicrosecondType,
25 DurationMillisecondType, DurationNanosecondType, DurationSecondType, Float16Type, Float32Type,
26 Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, IntervalDayTimeType,
27 IntervalMonthDayNanoType, IntervalYearMonthType, Time32MillisecondType, Time32SecondType,
28 Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
29 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
30 UInt32Type, UInt64Type,
31};
32use arrow_array::{Array, Date32Array, Date64Array};
33use arrow_schema::{DataType, Schema, TimeUnit};
34use base64::Engine as _;
35use base64::engine::general_purpose::STANDARD as BASE64;
36use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime};
37use serde_json::{Map as JsonMap, Value};
38
39use crate::Result;
40use crate::cli::BinaryFormat;
41use crate::error::Error;
42
43pub fn validate_csv_schema(schema: &Schema) -> Result<()> {
49 for field in schema.fields() {
50 validate_csv_type(field.name(), field.data_type())?;
51 }
52 Ok(())
53}
54
55fn validate_csv_type(col: &str, ty: &DataType) -> Result<()> {
56 use DataType::*;
57 match ty {
58 Null
59 | Boolean
60 | Int8
61 | Int16
62 | Int32
63 | Int64
64 | UInt8
65 | UInt16
66 | UInt32
67 | UInt64
68 | Float16
69 | Float32
70 | Float64
71 | Utf8
72 | LargeUtf8
73 | Date32
74 | Date64
75 | Time32(_)
76 | Time64(_)
77 | Timestamp(_, _)
78 | Decimal32(_, _)
79 | Decimal64(_, _)
80 | Decimal128(_, _)
81 | Decimal256(_, _)
82 | Binary
83 | LargeBinary
84 | BinaryView
85 | FixedSizeBinary(_) => Ok(()),
86
87 Dictionary(_, value_ty) => validate_csv_type(col, value_ty),
88
89 Utf8View
90 | List(_)
91 | LargeList(_)
92 | FixedSizeList(_, _)
93 | ListView(_)
94 | LargeListView(_)
95 | Struct(_)
96 | Map(_, _)
97 | Union(_, _)
98 | RunEndEncoded(_, _)
99 | Duration(_)
100 | Interval(_) => Err(Error::UnsupportedCsvType {
101 column: col.to_string(),
102 data_type: format!("{ty:?}"),
103 }),
104 }
105}
106
107pub fn csv_cell(
111 array: &dyn Array,
112 row: usize,
113 binary_format: BinaryFormat,
114) -> Result<Option<String>> {
115 if array.is_null(row) {
116 return Ok(None);
117 }
118 Ok(Some(csv_non_null(array, row, binary_format)?))
119}
120
121fn csv_non_null(array: &dyn Array, row: usize, binary_format: BinaryFormat) -> Result<String> {
122 use DataType::*;
123 Ok(match array.data_type() {
124 Null => String::new(),
125 Boolean => array.as_boolean().value(row).to_string(),
126 Int8 => array.as_primitive::<Int8Type>().value(row).to_string(),
127 Int16 => array.as_primitive::<Int16Type>().value(row).to_string(),
128 Int32 => array.as_primitive::<Int32Type>().value(row).to_string(),
129 Int64 => array.as_primitive::<Int64Type>().value(row).to_string(),
130 UInt8 => array.as_primitive::<UInt8Type>().value(row).to_string(),
131 UInt16 => array.as_primitive::<UInt16Type>().value(row).to_string(),
132 UInt32 => array.as_primitive::<UInt32Type>().value(row).to_string(),
133 UInt64 => array.as_primitive::<UInt64Type>().value(row).to_string(),
134 Float16 => format_f32_csv(f32::from(array.as_primitive::<Float16Type>().value(row))),
135 Float32 => format_f32_csv(array.as_primitive::<Float32Type>().value(row)),
136 Float64 => format_f64_csv(array.as_primitive::<Float64Type>().value(row)),
137 Utf8 => array.as_string::<i32>().value(row).to_string(),
138 LargeUtf8 => array.as_string::<i64>().value(row).to_string(),
139 Date32 => format_date32(
140 array
141 .as_any()
142 .downcast_ref::<Date32Array>()
143 .unwrap()
144 .value(row),
145 ),
146 Date64 => format_date64(
147 array
148 .as_any()
149 .downcast_ref::<Date64Array>()
150 .unwrap()
151 .value(row),
152 ),
153 Time32(unit) => format_time32(time32_value_at(array, *unit, row), *unit),
154 Time64(unit) => {
155 let v = match unit {
156 TimeUnit::Microsecond => array.as_primitive::<Time64MicrosecondType>().value(row),
157 TimeUnit::Nanosecond => array.as_primitive::<Time64NanosecondType>().value(row),
158 TimeUnit::Second | TimeUnit::Millisecond => {
159 unreachable!("arrow disallows Time64 with second/millisecond units")
160 }
161 };
162 format_time64(v, *unit)
163 }
164 Timestamp(unit, tz) => format_timestamp_at(array, *unit, tz.as_deref(), row),
165 Decimal32(_, scale) => insert_decimal_point(
166 &array.as_primitive::<Decimal32Type>().value(row).to_string(),
167 *scale,
168 ),
169 Decimal64(_, scale) => insert_decimal_point(
170 &array.as_primitive::<Decimal64Type>().value(row).to_string(),
171 *scale,
172 ),
173 Decimal128(_, scale) => {
174 format_decimal128_csv(array.as_primitive::<Decimal128Type>().value(row), *scale)
175 }
176 Decimal256(_, scale) => format_decimal256_csv(
177 array
178 .as_primitive::<Decimal256Type>()
179 .value(row)
180 .to_string(),
181 *scale,
182 ),
183 Binary => encode_binary(array.as_binary::<i32>().value(row), binary_format),
184 LargeBinary => encode_binary(array.as_binary::<i64>().value(row), binary_format),
185 BinaryView => encode_binary(array.as_binary_view().value(row), binary_format),
186 FixedSizeBinary(_) => encode_binary(array.as_fixed_size_binary().value(row), binary_format),
187 Dictionary(key_ty, _) => {
188 let values = dict_values(array, key_ty);
189 let logical_index = dict_logical_index(array, key_ty, row);
190 csv_non_null(values.as_ref(), logical_index, binary_format)?
191 }
192 other => {
194 return Err(Error::UnsupportedCsvType {
195 column: String::new(),
196 data_type: format!("{other:?}"),
197 });
198 }
199 })
200}
201
202pub fn table_cell(array: &dyn Array, row: usize, binary_format: BinaryFormat) -> Result<String> {
209 if array.is_null(row) {
210 return Ok(String::new());
211 }
212 match csv_non_null(array, row, binary_format) {
213 Ok(s) => Ok(s),
214 Err(Error::UnsupportedCsvType { .. }) => {
215 let v = json_non_null(array, row, binary_format)?;
216 Ok(v.to_string())
217 }
218 Err(e) => Err(e),
219 }
220}
221
222pub fn json_value(array: &dyn Array, row: usize, binary_format: BinaryFormat) -> Result<Value> {
226 if array.is_null(row) {
227 return Ok(Value::Null);
228 }
229 json_non_null(array, row, binary_format)
230}
231
232fn json_non_null(array: &dyn Array, row: usize, binary_format: BinaryFormat) -> Result<Value> {
233 use DataType::*;
234 match array.data_type() {
235 Null => Ok(Value::Null),
236 Boolean => Ok(Value::Bool(array.as_boolean().value(row))),
237 Int8 => Ok(Value::from(array.as_primitive::<Int8Type>().value(row))),
238 Int16 => Ok(Value::from(array.as_primitive::<Int16Type>().value(row))),
239 Int32 => Ok(Value::from(array.as_primitive::<Int32Type>().value(row))),
240 Int64 => Ok(Value::from(array.as_primitive::<Int64Type>().value(row))),
241 UInt8 => Ok(Value::from(array.as_primitive::<UInt8Type>().value(row))),
242 UInt16 => Ok(Value::from(array.as_primitive::<UInt16Type>().value(row))),
243 UInt32 => Ok(Value::from(array.as_primitive::<UInt32Type>().value(row))),
244 UInt64 => Ok(Value::from(array.as_primitive::<UInt64Type>().value(row))),
245 Float16 => Ok(float_json(f64::from(f32::from(
246 array.as_primitive::<Float16Type>().value(row),
247 )))),
248 Float32 => Ok(float_json(f64::from(
249 array.as_primitive::<Float32Type>().value(row),
250 ))),
251 Float64 => Ok(float_json(array.as_primitive::<Float64Type>().value(row))),
252 Utf8 => Ok(Value::String(
253 array.as_string::<i32>().value(row).to_string(),
254 )),
255 LargeUtf8 => Ok(Value::String(
256 array.as_string::<i64>().value(row).to_string(),
257 )),
258 Utf8View => Ok(Value::String(array.as_string_view().value(row).to_string())),
259 Binary => Ok(encode_binary_json(
260 array.as_binary::<i32>().value(row),
261 binary_format,
262 )),
263 LargeBinary => Ok(encode_binary_json(
264 array.as_binary::<i64>().value(row),
265 binary_format,
266 )),
267 BinaryView => Ok(encode_binary_json(
268 array.as_binary_view().value(row),
269 binary_format,
270 )),
271 FixedSizeBinary(_) => Ok(encode_binary_json(
272 array.as_fixed_size_binary().value(row),
273 binary_format,
274 )),
275 Date32 => Ok(Value::String(format_date32(
276 array
277 .as_any()
278 .downcast_ref::<Date32Array>()
279 .unwrap()
280 .value(row),
281 ))),
282 Date64 => Ok(Value::String(format_date64(
283 array
284 .as_any()
285 .downcast_ref::<Date64Array>()
286 .unwrap()
287 .value(row),
288 ))),
289 Time32(unit) => Ok(Value::String(format_time32(
290 time32_value_at(array, *unit, row),
291 *unit,
292 ))),
293 Time64(unit) => {
294 let v = match unit {
295 TimeUnit::Microsecond => array.as_primitive::<Time64MicrosecondType>().value(row),
296 TimeUnit::Nanosecond => array.as_primitive::<Time64NanosecondType>().value(row),
297 TimeUnit::Second | TimeUnit::Millisecond => {
298 unreachable!("arrow disallows Time64 with second/millisecond units")
299 }
300 };
301 Ok(Value::String(format_time64(v, *unit)))
302 }
303 Timestamp(unit, tz) => Ok(Value::String(format_timestamp_at(
304 array,
305 *unit,
306 tz.as_deref(),
307 row,
308 ))),
309 Duration(unit) => Ok(Value::String(format_duration_at(array, *unit, row))),
310 Interval(unit) => {
311 use arrow_schema::IntervalUnit::*;
312 let s = match unit {
313 YearMonth => {
314 let v = array.as_primitive::<IntervalYearMonthType>().value(row);
315 format_interval_year_month(v)
316 }
317 DayTime => {
318 let v = array.as_primitive::<IntervalDayTimeType>().value(row);
319 format_interval_day_time(v.days, v.milliseconds)
320 }
321 MonthDayNano => {
322 let v = array.as_primitive::<IntervalMonthDayNanoType>().value(row);
323 format_interval_month_day_nano(v.months, v.days, v.nanoseconds)
324 }
325 };
326 Ok(Value::String(s))
327 }
328 Decimal32(_, _) => {
329 let v = array.as_primitive::<Decimal32Type>().value(row);
330 Ok(json_number_from_str(&v.to_string()))
331 }
332 Decimal64(_, _) => {
333 let v = array.as_primitive::<Decimal64Type>().value(row);
334 Ok(json_number_from_str(&v.to_string()))
335 }
336 Decimal128(_, _) => {
337 let v = array.as_primitive::<Decimal128Type>().value(row);
338 Ok(json_number_from_str(&v.to_string()))
339 }
340 Decimal256(_, _) => {
341 let v = array.as_primitive::<Decimal256Type>().value(row);
342 Ok(json_number_from_str(&v.to_string()))
343 }
344 List(_) => json_list_like(array.as_list::<i32>().value(row).as_ref(), binary_format),
345 LargeList(_) => json_list_like(array.as_list::<i64>().value(row).as_ref(), binary_format),
346 FixedSizeList(_, _) => json_list_like(
347 array.as_fixed_size_list().value(row).as_ref(),
348 binary_format,
349 ),
350 Struct(_) => {
351 let s = array.as_struct();
352 let mut obj = JsonMap::new();
353 for (i, field) in s.fields().iter().enumerate() {
354 let child = s.column(i);
355 obj.insert(
356 field.name().clone(),
357 json_value(child.as_ref(), row, binary_format)?,
358 );
359 }
360 Ok(Value::Object(obj))
361 }
362 Map(_, _) => {
363 let m = array.as_map();
364 let start = m.value_offsets()[row] as usize;
365 let end = m.value_offsets()[row + 1] as usize;
366 let keys = m.keys();
367 let values = m.values();
368 let mut obj = JsonMap::new();
369 for i in start..end {
370 let key = json_value(keys.as_ref(), i, binary_format)?;
371 let key_str = match key {
372 Value::String(s) => s,
373 other => other.to_string(),
374 };
375 let val = json_value(values.as_ref(), i, binary_format)?;
376 obj.insert(key_str, val);
377 }
378 Ok(Value::Object(obj))
379 }
380 Dictionary(key_ty, _) => {
381 let values = dict_values(array, key_ty);
382 let logical_index = dict_logical_index(array, key_ty, row);
383 json_value(values.as_ref(), logical_index, binary_format)
384 }
385 other => Err(Error::UnsupportedCsvType {
386 column: String::new(),
387 data_type: format!("unsupported arrow type in JSONL output: {other:?}"),
388 }),
389 }
390}
391
392fn json_list_like(array: &dyn Array, binary_format: BinaryFormat) -> Result<Value> {
393 let mut out = Vec::with_capacity(array.len());
394 for i in 0..array.len() {
395 out.push(json_value(array, i, binary_format)?);
396 }
397 Ok(Value::Array(out))
398}
399
400fn float_json(v: f64) -> Value {
403 if v.is_nan() {
404 Value::String("NaN".to_string())
405 } else if v.is_infinite() {
406 Value::String(if v > 0.0 { "Infinity" } else { "-Infinity" }.to_string())
407 } else {
408 serde_json::Number::from_f64(v)
409 .map(Value::Number)
410 .unwrap_or(Value::Null)
411 }
412}
413
414fn format_f32_csv(v: f32) -> String {
415 if v.is_nan() {
416 "NaN".into()
417 } else if v.is_infinite() {
418 if v > 0.0 { "inf".into() } else { "-inf".into() }
419 } else {
420 v.to_string()
421 }
422}
423
424fn format_f64_csv(v: f64) -> String {
425 if v.is_nan() {
426 "NaN".into()
427 } else if v.is_infinite() {
428 if v > 0.0 { "inf".into() } else { "-inf".into() }
429 } else {
430 v.to_string()
431 }
432}
433
434fn hex_escape(bytes: &[u8]) -> String {
435 let mut s = String::with_capacity(bytes.len() * 4);
436 for b in bytes {
437 s.push('\\');
438 s.push('x');
439 s.push(hex_digit(b >> 4));
440 s.push(hex_digit(b & 0x0f));
441 }
442 s
443}
444
445pub const BINARY_PLACEHOLDER: &str = "BINARY_DATA";
447
448fn encode_binary(bytes: &[u8], binary_format: BinaryFormat) -> String {
450 match binary_format {
451 BinaryFormat::None => BINARY_PLACEHOLDER.to_string(),
452 BinaryFormat::Hex => hex_escape(bytes),
453 BinaryFormat::Base64 => BASE64.encode(bytes),
454 }
455}
456
457fn encode_binary_json(bytes: &[u8], binary_format: BinaryFormat) -> Value {
459 match binary_format {
460 BinaryFormat::None => Value::String(BINARY_PLACEHOLDER.to_string()),
461 BinaryFormat::Hex => Value::String(hex_escape(bytes)),
462 BinaryFormat::Base64 => Value::String(BASE64.encode(bytes)),
463 }
464}
465
466fn hex_digit(n: u8) -> char {
467 match n {
468 0..=9 => (b'0' + n) as char,
469 10..=15 => (b'a' + n - 10) as char,
470 _ => unreachable!(),
471 }
472}
473
474fn json_number_from_str(s: &str) -> Value {
475 match s.parse::<serde_json::Number>() {
477 Ok(n) => Value::Number(n),
478 Err(_) => Value::String(s.to_string()),
479 }
480}
481
482fn format_date32(days: i32) -> String {
485 epoch_date()
486 .checked_add_signed(chrono::Duration::days(i64::from(days)))
487 .map_or_else(|| format!("{days}"), |d| d.format("%Y-%m-%d").to_string())
488}
489
490fn format_date64(ms: i64) -> String {
491 DateTime::<chrono::Utc>::from_timestamp_millis(ms)
492 .map_or_else(|| format!("{ms}"), |d| d.format("%Y-%m-%d").to_string())
493}
494
495fn format_time32(v: i32, unit: TimeUnit) -> String {
496 let (secs, nanos) = match unit {
497 TimeUnit::Second => (i64::from(v), 0),
498 TimeUnit::Millisecond => {
499 let v64 = i64::from(v);
500 (v64 / 1_000, (v64 % 1_000) * 1_000_000)
501 }
502 _ => unreachable!("Time32 only supports Second / Millisecond"),
503 };
504 format_time_parts(secs, nanos.try_into().unwrap_or(0), unit)
505}
506
507fn format_time64(v: i64, unit: TimeUnit) -> String {
508 let (secs, nanos) = match unit {
509 TimeUnit::Microsecond => (v / 1_000_000, (v % 1_000_000) * 1_000),
510 TimeUnit::Nanosecond => (v / 1_000_000_000, v % 1_000_000_000),
511 _ => unreachable!("Time64 only supports Microsecond / Nanosecond"),
512 };
513 format_time_parts(secs, nanos.try_into().unwrap_or(0), unit)
514}
515
516fn format_time_parts(secs: i64, nanos: u32, unit: TimeUnit) -> String {
517 let total = (secs.rem_euclid(86_400)) as u32;
518 let h = total / 3600;
519 let m = (total / 60) % 60;
520 let s = total % 60;
521 let t = NaiveTime::from_hms_nano_opt(h, m, s, nanos).unwrap_or_default();
522 match unit {
523 TimeUnit::Second => t.format("%H:%M:%S").to_string(),
524 TimeUnit::Millisecond => t.format("%H:%M:%S%.3f").to_string(),
525 TimeUnit::Microsecond => t.format("%H:%M:%S%.6f").to_string(),
526 TimeUnit::Nanosecond => t.format("%H:%M:%S%.9f").to_string(),
527 }
528}
529
530fn epoch_date() -> NaiveDate {
531 NaiveDate::from_ymd_opt(1970, 1, 1).expect("epoch")
532}
533
534fn time32_value_at(array: &dyn Array, unit: TimeUnit, row: usize) -> i32 {
535 match unit {
536 TimeUnit::Second => array.as_primitive::<Time32SecondType>().value(row),
537 TimeUnit::Millisecond => array.as_primitive::<Time32MillisecondType>().value(row),
538 _ => unreachable!("Time32 only supports Second / Millisecond"),
539 }
540}
541
542fn timestamp_value_at(array: &dyn Array, unit: TimeUnit, row: usize) -> i64 {
543 match unit {
544 TimeUnit::Second => array.as_primitive::<TimestampSecondType>().value(row),
545 TimeUnit::Millisecond => array.as_primitive::<TimestampMillisecondType>().value(row),
546 TimeUnit::Microsecond => array.as_primitive::<TimestampMicrosecondType>().value(row),
547 TimeUnit::Nanosecond => array.as_primitive::<TimestampNanosecondType>().value(row),
548 }
549}
550
551fn format_timestamp_at(array: &dyn Array, unit: TimeUnit, tz: Option<&str>, row: usize) -> String {
552 let v = timestamp_value_at(array, unit, row);
553 let (secs, nanos) = decompose_timestamp(v, unit);
554 match tz {
555 None => naive_ts_iso(secs, nanos, unit),
556 Some(tz_str) => zoned_ts_iso(secs, nanos, unit, tz_str),
557 }
558}
559
560fn decompose_timestamp(v: i64, unit: TimeUnit) -> (i64, u32) {
561 match unit {
562 TimeUnit::Second => (v, 0),
563 TimeUnit::Millisecond => (
564 v.div_euclid(1_000),
565 (v.rem_euclid(1_000) * 1_000_000) as u32,
566 ),
567 TimeUnit::Microsecond => (
568 v.div_euclid(1_000_000),
569 (v.rem_euclid(1_000_000) * 1_000) as u32,
570 ),
571 TimeUnit::Nanosecond => (
572 v.div_euclid(1_000_000_000),
573 v.rem_euclid(1_000_000_000) as u32,
574 ),
575 }
576}
577
578fn naive_ts_iso(secs: i64, nanos: u32, unit: TimeUnit) -> String {
579 let Some(dt) = DateTime::<chrono::Utc>::from_timestamp(secs, nanos) else {
580 return format!("{secs}.{nanos:09}");
581 };
582 let naive: NaiveDateTime = dt.naive_utc();
583 match unit {
584 TimeUnit::Second => naive.format("%Y-%m-%dT%H:%M:%S").to_string(),
585 TimeUnit::Millisecond => naive.format("%Y-%m-%dT%H:%M:%S%.3f").to_string(),
586 TimeUnit::Microsecond => naive.format("%Y-%m-%dT%H:%M:%S%.6f").to_string(),
587 TimeUnit::Nanosecond => naive.format("%Y-%m-%dT%H:%M:%S%.9f").to_string(),
588 }
589}
590
591fn zoned_ts_iso(secs: i64, nanos: u32, unit: TimeUnit, tz: &str) -> String {
592 let Some(utc) = DateTime::<chrono::Utc>::from_timestamp(secs, nanos) else {
593 return format!("{secs}.{nanos:09}{tz}");
594 };
595 if let Some(offset) = parse_fixed_offset(tz) {
598 let dt = utc.with_timezone(&offset);
599 return match unit {
600 TimeUnit::Second => dt.format("%Y-%m-%dT%H:%M:%S%:z").to_string(),
601 TimeUnit::Millisecond => dt.format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string(),
602 TimeUnit::Microsecond => dt.format("%Y-%m-%dT%H:%M:%S%.6f%:z").to_string(),
603 TimeUnit::Nanosecond => dt.format("%Y-%m-%dT%H:%M:%S%.9f%:z").to_string(),
604 };
605 }
606 let base = naive_ts_iso(secs, nanos, unit);
609 format!("{base}Z[{tz}]")
610}
611
612fn parse_fixed_offset(s: &str) -> Option<FixedOffset> {
613 if s == "Z" || s == "UTC" || s == "+00:00" || s == "-00:00" {
614 return FixedOffset::east_opt(0);
615 }
616 let bytes = s.as_bytes();
618 if bytes.len() < 3 {
619 return None;
620 }
621 let sign = match bytes[0] {
622 b'+' => 1,
623 b'-' => -1,
624 _ => return None,
625 };
626 let rest = &s[1..];
627 let (hh, mm) = if let Some((h, m)) = rest.split_once(':') {
628 (h, m)
629 } else if rest.len() == 4 {
630 (&rest[..2], &rest[2..])
631 } else {
632 return None;
633 };
634 let h: i32 = hh.parse().ok()?;
635 let m: i32 = mm.parse().ok()?;
636 FixedOffset::east_opt(sign * (h * 3600 + m * 60))
637}
638
639fn duration_value_at(array: &dyn Array, unit: TimeUnit, row: usize) -> i64 {
642 match unit {
643 TimeUnit::Second => array.as_primitive::<DurationSecondType>().value(row),
644 TimeUnit::Millisecond => array.as_primitive::<DurationMillisecondType>().value(row),
645 TimeUnit::Microsecond => array.as_primitive::<DurationMicrosecondType>().value(row),
646 TimeUnit::Nanosecond => array.as_primitive::<DurationNanosecondType>().value(row),
647 }
648}
649
650fn format_duration_at(array: &dyn Array, unit: TimeUnit, row: usize) -> String {
651 let v = duration_value_at(array, unit, row);
652 let (secs, nanos) = match unit {
653 TimeUnit::Second => (v, 0i64),
654 TimeUnit::Millisecond => (v.div_euclid(1_000), v.rem_euclid(1_000) * 1_000_000),
655 TimeUnit::Microsecond => (v.div_euclid(1_000_000), v.rem_euclid(1_000_000) * 1_000),
656 TimeUnit::Nanosecond => (v.div_euclid(1_000_000_000), v.rem_euclid(1_000_000_000)),
657 };
658 let sign = if secs < 0 || nanos < 0 { "-" } else { "" };
659 let a_secs = secs.unsigned_abs();
660 let a_nanos = nanos.unsigned_abs();
661 let hours = a_secs / 3600;
662 let minutes = (a_secs % 3600) / 60;
663 let seconds = a_secs % 60;
664 let mut buf = format!("{sign}PT");
665 if hours > 0 {
666 buf.push_str(&format!("{hours}H"));
667 }
668 if minutes > 0 {
669 buf.push_str(&format!("{minutes}M"));
670 }
671 if a_nanos > 0 {
672 let frac = format!("{a_nanos:09}");
673 let trimmed = frac.trim_end_matches('0');
674 buf.push_str(&format!("{seconds}.{trimmed}S"));
675 } else if seconds > 0 || (hours == 0 && minutes == 0) {
676 buf.push_str(&format!("{seconds}S"));
677 }
678 buf
679}
680
681fn format_interval_year_month(months: i32) -> String {
682 let sign = if months < 0 { "-" } else { "" };
683 let a = months.unsigned_abs();
684 let y = a / 12;
685 let m = a % 12;
686 if y > 0 && m > 0 {
687 format!("{sign}P{y}Y{m}M")
688 } else if y > 0 {
689 format!("{sign}P{y}Y")
690 } else {
691 format!("{sign}P{m}M")
692 }
693}
694
695fn format_interval_day_time(days: i32, ms: i32) -> String {
696 let sign_bit = days < 0 || ms < 0;
697 let sign = if sign_bit { "-" } else { "" };
698 let d = days.unsigned_abs();
699 let ms_abs = ms.unsigned_abs();
700 let s = ms_abs / 1000;
701 let frac = ms_abs % 1000;
702 let mut out = format!("{sign}P");
703 if d > 0 {
704 out.push_str(&format!("{d}D"));
705 }
706 if s > 0 || frac > 0 {
707 out.push('T');
708 if frac == 0 {
709 out.push_str(&format!("{s}S"));
710 } else {
711 out.push_str(&format!("{s}.{frac:03}S"));
712 }
713 }
714 if out == "P" || out == "-P" {
715 out.push_str("T0S");
716 }
717 out
718}
719
720fn format_interval_month_day_nano(months: i32, days: i32, nanos: i64) -> String {
721 let sign_bit = months < 0 || days < 0 || nanos < 0;
722 let sign = if sign_bit { "-" } else { "" };
723 let mo = months.unsigned_abs();
724 let d = days.unsigned_abs();
725 let n = nanos.unsigned_abs();
726 let s = n / 1_000_000_000;
727 let frac = n % 1_000_000_000;
728 let mut out = format!("{sign}P");
729 if mo > 0 {
730 out.push_str(&format!("{mo}M"));
731 }
732 if d > 0 {
733 out.push_str(&format!("{d}D"));
734 }
735 if s > 0 || frac > 0 {
736 out.push('T');
737 if frac == 0 {
738 out.push_str(&format!("{s}S"));
739 } else {
740 let f = format!("{frac:09}");
741 let trimmed = f.trim_end_matches('0');
742 out.push_str(&format!("{s}.{trimmed}S"));
743 }
744 }
745 if out == "P" || out == "-P" {
746 out.push_str("T0S");
747 }
748 out
749}
750
751fn format_decimal128_csv(v: i128, scale: i8) -> String {
754 insert_decimal_point(&v.to_string(), scale)
755}
756
757fn format_decimal256_csv(raw: String, scale: i8) -> String {
758 insert_decimal_point(&raw, scale)
759}
760
761fn insert_decimal_point(raw: &str, scale: i8) -> String {
762 if scale <= 0 {
763 if scale == 0 {
764 return raw.to_string();
765 }
766 let mut s = raw.to_string();
768 s.extend(std::iter::repeat_n('0', (-scale) as usize));
769 return s;
770 }
771 let scale = scale as usize;
772 let (sign, digits) = match raw.strip_prefix('-') {
773 Some(rest) => ("-", rest),
774 None => ("", raw),
775 };
776 if digits.len() <= scale {
777 let pad = scale - digits.len();
778 let zeros = "0".repeat(pad);
779 return format!("{sign}0.{zeros}{digits}");
780 }
781 let split = digits.len() - scale;
782 let (int_part, frac_part) = digits.split_at(split);
783 format!("{sign}{int_part}.{frac_part}")
784}
785
786fn dict_values(array: &dyn Array, key_ty: &DataType) -> Arc<dyn Array> {
789 use DataType::*;
790 match key_ty {
791 Int8 => as_dictionary_array::<Int8Type>(array).values().clone(),
792 Int16 => as_dictionary_array::<Int16Type>(array).values().clone(),
793 Int32 => as_dictionary_array::<Int32Type>(array).values().clone(),
794 Int64 => as_dictionary_array::<Int64Type>(array).values().clone(),
795 UInt8 => as_dictionary_array::<UInt8Type>(array).values().clone(),
796 UInt16 => as_dictionary_array::<UInt16Type>(array).values().clone(),
797 UInt32 => as_dictionary_array::<UInt32Type>(array).values().clone(),
798 UInt64 => as_dictionary_array::<UInt64Type>(array).values().clone(),
799 _ => unreachable!("unsupported dictionary key type"),
800 }
801}
802
803fn dict_logical_index(array: &dyn Array, key_ty: &DataType, row: usize) -> usize {
804 use DataType::*;
805 match key_ty {
806 Int8 => as_dictionary_array::<Int8Type>(array).keys().value(row) as usize,
807 Int16 => as_dictionary_array::<Int16Type>(array).keys().value(row) as usize,
808 Int32 => as_dictionary_array::<Int32Type>(array).keys().value(row) as usize,
809 Int64 => as_dictionary_array::<Int64Type>(array).keys().value(row) as usize,
810 UInt8 => as_dictionary_array::<UInt8Type>(array).keys().value(row) as usize,
811 UInt16 => as_dictionary_array::<UInt16Type>(array).keys().value(row) as usize,
812 UInt32 => as_dictionary_array::<UInt32Type>(array).keys().value(row) as usize,
813 UInt64 => as_dictionary_array::<UInt64Type>(array).keys().value(row) as usize,
814 _ => unreachable!("unsupported dictionary key type"),
815 }
816}