1use super::ImportError;
18use arrow::array::cast::AsArray;
19use arrow::array::types::{
20 Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
21 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
22 TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
23};
24use arrow::array::{Array, RecordBatch};
25use arrow::datatypes::DataType;
26use std::fmt::Write as FmtWrite;
27use std::io::Write;
28use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
29
30#[derive(Debug, Clone)]
32pub struct CsvWriterOptions {
33 pub column_separator: char,
35 pub column_delimiter: char,
37 pub row_separator: &'static str,
39 pub null_value: String,
41 pub write_header: bool,
43 pub date_format: String,
45 pub timestamp_format: String,
47}
48
49impl Default for CsvWriterOptions {
50 fn default() -> Self {
51 Self {
52 column_separator: ',',
53 column_delimiter: '"',
54 row_separator: "\n",
55 null_value: String::new(),
56 write_header: false,
57 date_format: "%Y-%m-%d".to_string(),
58 timestamp_format: "%Y-%m-%d %H:%M:%S%.6f".to_string(),
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
65pub struct ArrowImportOptions {
66 pub schema: Option<String>,
68 pub columns: Option<Vec<String>>,
70 pub batch_size: usize,
72 pub csv_options: CsvWriterOptions,
74 pub use_encryption: bool,
76 pub host: String,
79 pub port: u16,
82}
83
84impl Default for ArrowImportOptions {
85 fn default() -> Self {
86 Self {
87 schema: None,
88 columns: None,
89 batch_size: 10000,
90 csv_options: CsvWriterOptions::default(),
91 use_encryption: false,
92 host: String::new(),
93 port: 0,
94 }
95 }
96}
97
98impl ArrowImportOptions {
99 #[must_use]
101 pub fn new() -> Self {
102 Self::default()
103 }
104
105 #[must_use]
106 pub fn schema(mut self, schema: impl Into<String>) -> Self {
107 self.schema = Some(schema.into());
108 self
109 }
110
111 #[must_use]
112 pub fn columns(mut self, columns: Vec<String>) -> Self {
113 self.columns = Some(columns);
114 self
115 }
116
117 #[must_use]
118 pub fn batch_size(mut self, size: usize) -> Self {
119 self.batch_size = size;
120 self
121 }
122
123 #[must_use]
124 pub fn null_value(mut self, value: impl Into<String>) -> Self {
125 self.csv_options.null_value = value.into();
126 self
127 }
128
129 #[must_use]
130 pub fn column_separator(mut self, sep: char) -> Self {
131 self.csv_options.column_separator = sep;
132 self
133 }
134
135 #[must_use]
136 pub fn column_delimiter(mut self, delim: char) -> Self {
137 self.csv_options.column_delimiter = delim;
138 self
139 }
140
141 #[must_use]
142 pub fn with_encryption(mut self) -> Self {
143 self.use_encryption = true;
144 self
145 }
146
147 #[must_use]
151 pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
152 self.host = host.into();
153 self
154 }
155
156 #[must_use]
160 pub fn exasol_port(mut self, port: u16) -> Self {
161 self.port = port;
162 self
163 }
164}
165
166pub struct ArrowToCsvWriter<W: AsyncWrite + Unpin> {
178 writer: W,
179 options: CsvWriterOptions,
180 rows_written: usize,
181}
182
183impl<W: AsyncWrite + Unpin> ArrowToCsvWriter<W> {
184 pub fn new(writer: W, options: CsvWriterOptions) -> Self {
186 Self {
187 writer,
188 options,
189 rows_written: 0,
190 }
191 }
192
193 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<usize, ImportError> {
197 let mut bytes_written = 0;
198 let num_rows = batch.num_rows();
199 let num_cols = batch.num_columns();
200
201 if self.options.write_header && self.rows_written == 0 {
203 let header = self.format_header(batch)?;
204 self.writer
205 .write_all(header.as_bytes())
206 .await
207 .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
208 bytes_written += header.len();
209 }
210
211 for row_idx in 0..num_rows {
213 let mut row_str = String::with_capacity(256);
214
215 for col_idx in 0..num_cols {
216 if col_idx > 0 {
217 row_str.push(self.options.column_separator);
218 }
219
220 let column = batch.column(col_idx);
221 let value = self.format_value(column, row_idx)?;
222 row_str.push_str(&value);
223 }
224
225 row_str.push_str(self.options.row_separator);
226
227 self.writer
228 .write_all(row_str.as_bytes())
229 .await
230 .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
231 bytes_written += row_str.len();
232 }
233
234 self.rows_written += num_rows;
235 Ok(bytes_written)
236 }
237
238 pub async fn finish(mut self) -> Result<usize, ImportError> {
240 self.writer
241 .flush()
242 .await
243 .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
244 Ok(self.rows_written)
245 }
246
247 #[must_use]
249 pub fn rows_written(&self) -> usize {
250 self.rows_written
251 }
252
253 fn format_header(&self, batch: &RecordBatch) -> Result<String, ImportError> {
255 let schema = batch.schema();
256 let mut header = String::new();
257
258 for (idx, field) in schema.fields().iter().enumerate() {
259 if idx > 0 {
260 header.push(self.options.column_separator);
261 }
262 header.push_str(&self.escape_string(field.name()));
263 }
264 header.push_str(self.options.row_separator);
265
266 Ok(header)
267 }
268
269 fn format_value(&self, array: &dyn Array, row_idx: usize) -> Result<String, ImportError> {
271 if array.is_null(row_idx) {
272 return Ok(self.options.null_value.clone());
273 }
274
275 let data_type = array.data_type();
276 match data_type {
277 DataType::Boolean => {
278 let arr = array.as_boolean();
279 Ok(if arr.value(row_idx) { "true" } else { "false" }.to_string())
280 }
281 DataType::Int8 => Ok(array.as_primitive::<Int8Type>().value(row_idx).to_string()),
282 DataType::Int16 => Ok(array.as_primitive::<Int16Type>().value(row_idx).to_string()),
283 DataType::Int32 => Ok(array.as_primitive::<Int32Type>().value(row_idx).to_string()),
284 DataType::Int64 => Ok(array.as_primitive::<Int64Type>().value(row_idx).to_string()),
285 DataType::UInt8 => Ok(array.as_primitive::<UInt8Type>().value(row_idx).to_string()),
286 DataType::UInt16 => Ok(array
287 .as_primitive::<UInt16Type>()
288 .value(row_idx)
289 .to_string()),
290 DataType::UInt32 => Ok(array
291 .as_primitive::<UInt32Type>()
292 .value(row_idx)
293 .to_string()),
294 DataType::UInt64 => Ok(array
295 .as_primitive::<UInt64Type>()
296 .value(row_idx)
297 .to_string()),
298 DataType::Float32 => {
299 let val = array.as_primitive::<Float32Type>().value(row_idx);
300 Ok(format_float(val as f64))
301 }
302 DataType::Float64 => {
303 let val = array.as_primitive::<Float64Type>().value(row_idx);
304 Ok(format_float(val))
305 }
306 DataType::Utf8 => {
307 let arr = array.as_string::<i32>();
308 Ok(self.escape_string(arr.value(row_idx)))
309 }
310 DataType::LargeUtf8 => {
311 let arr = array.as_string::<i64>();
312 Ok(self.escape_string(arr.value(row_idx)))
313 }
314 DataType::Date32 => {
315 let val = array.as_primitive::<Date32Type>().value(row_idx);
316 Ok(format_date32(val))
317 }
318 DataType::Timestamp(unit, _tz) => self.format_timestamp(array, row_idx, unit),
319 DataType::Decimal128(precision, scale) => {
320 self.format_decimal128(array, row_idx, *precision, *scale)
321 }
322 DataType::Binary => {
323 let arr = array.as_binary::<i32>();
324 Ok(hex::encode(arr.value(row_idx)))
325 }
326 DataType::LargeBinary => {
327 let arr = array.as_binary::<i64>();
328 Ok(hex::encode(arr.value(row_idx)))
329 }
330 other => Err(ImportError::ConversionError(format!(
331 "Unsupported Arrow type for CSV conversion: {:?}",
332 other
333 ))),
334 }
335 }
336
337 fn format_timestamp(
339 &self,
340 array: &dyn Array,
341 row_idx: usize,
342 unit: &arrow::datatypes::TimeUnit,
343 ) -> Result<String, ImportError> {
344 use arrow::datatypes::TimeUnit;
345
346 let micros = match unit {
347 TimeUnit::Second => {
348 let val = array.as_primitive::<TimestampSecondType>().value(row_idx);
349 val * 1_000_000
350 }
351 TimeUnit::Millisecond => {
352 let val = array
353 .as_primitive::<TimestampMillisecondType>()
354 .value(row_idx);
355 val * 1_000
356 }
357 TimeUnit::Microsecond => array
358 .as_primitive::<TimestampMicrosecondType>()
359 .value(row_idx),
360 TimeUnit::Nanosecond => {
361 let val = array
362 .as_primitive::<TimestampNanosecondType>()
363 .value(row_idx);
364 val / 1_000
365 }
366 };
367
368 Ok(format_timestamp_micros(micros))
369 }
370
371 fn format_decimal128(
373 &self,
374 array: &dyn Array,
375 row_idx: usize,
376 _precision: u8,
377 scale: i8,
378 ) -> Result<String, ImportError> {
379 let arr = array
380 .as_any()
381 .downcast_ref::<arrow::array::Decimal128Array>()
382 .ok_or_else(|| ImportError::ConversionError("Expected Decimal128Array".to_string()))?;
383
384 let value = arr.value(row_idx);
385 Ok(format_decimal128(value, scale))
386 }
387
388 fn escape_string(&self, s: &str) -> String {
393 let sep = self.options.column_separator;
394 let delim = self.options.column_delimiter;
395
396 let needs_quoting =
398 s.contains(sep) || s.contains(delim) || s.contains('\n') || s.contains('\r');
399
400 if needs_quoting {
401 let mut result = String::with_capacity(s.len() + 4);
403 result.push(delim);
404 for c in s.chars() {
405 if c == delim {
406 result.push(delim);
407 }
408 result.push(c);
409 }
410 result.push(delim);
411 result
412 } else {
413 s.to_string()
414 }
415 }
416}
417
418pub struct SyncArrowToCsvWriter<W: Write> {
420 writer: W,
421 options: CsvWriterOptions,
422 rows_written: usize,
423}
424
425impl<W: Write> SyncArrowToCsvWriter<W> {
426 pub fn new(writer: W, options: CsvWriterOptions) -> Self {
428 Self {
429 writer,
430 options,
431 rows_written: 0,
432 }
433 }
434
435 pub fn write_batch(&mut self, batch: &RecordBatch) -> Result<usize, ImportError> {
437 let mut bytes_written = 0;
438 let num_rows = batch.num_rows();
439 let num_cols = batch.num_columns();
440
441 if self.options.write_header && self.rows_written == 0 {
443 let header = self.format_header(batch)?;
444 self.writer
445 .write_all(header.as_bytes())
446 .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
447 bytes_written += header.len();
448 }
449
450 for row_idx in 0..num_rows {
452 let mut row_str = String::with_capacity(256);
453
454 for col_idx in 0..num_cols {
455 if col_idx > 0 {
456 row_str.push(self.options.column_separator);
457 }
458
459 let column = batch.column(col_idx);
460 let value = self.format_value(column, row_idx)?;
461 row_str.push_str(&value);
462 }
463
464 row_str.push_str(self.options.row_separator);
465
466 self.writer
467 .write_all(row_str.as_bytes())
468 .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
469 bytes_written += row_str.len();
470 }
471
472 self.rows_written += num_rows;
473 Ok(bytes_written)
474 }
475
476 pub fn finish(mut self) -> Result<usize, ImportError> {
478 self.writer
479 .flush()
480 .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
481 Ok(self.rows_written)
482 }
483
484 fn format_header(&self, batch: &RecordBatch) -> Result<String, ImportError> {
486 let schema = batch.schema();
487 let mut header = String::new();
488
489 for (idx, field) in schema.fields().iter().enumerate() {
490 if idx > 0 {
491 header.push(self.options.column_separator);
492 }
493 header.push_str(&self.escape_string(field.name()));
494 }
495 header.push_str(self.options.row_separator);
496
497 Ok(header)
498 }
499
500 fn format_value(&self, array: &dyn Array, row_idx: usize) -> Result<String, ImportError> {
502 if array.is_null(row_idx) {
503 return Ok(self.options.null_value.clone());
504 }
505
506 let data_type = array.data_type();
507 match data_type {
508 DataType::Boolean => {
509 let arr = array.as_boolean();
510 Ok(if arr.value(row_idx) { "true" } else { "false" }.to_string())
511 }
512 DataType::Int8 => Ok(array.as_primitive::<Int8Type>().value(row_idx).to_string()),
513 DataType::Int16 => Ok(array.as_primitive::<Int16Type>().value(row_idx).to_string()),
514 DataType::Int32 => Ok(array.as_primitive::<Int32Type>().value(row_idx).to_string()),
515 DataType::Int64 => Ok(array.as_primitive::<Int64Type>().value(row_idx).to_string()),
516 DataType::UInt8 => Ok(array.as_primitive::<UInt8Type>().value(row_idx).to_string()),
517 DataType::UInt16 => Ok(array
518 .as_primitive::<UInt16Type>()
519 .value(row_idx)
520 .to_string()),
521 DataType::UInt32 => Ok(array
522 .as_primitive::<UInt32Type>()
523 .value(row_idx)
524 .to_string()),
525 DataType::UInt64 => Ok(array
526 .as_primitive::<UInt64Type>()
527 .value(row_idx)
528 .to_string()),
529 DataType::Float32 => {
530 let val = array.as_primitive::<Float32Type>().value(row_idx);
531 Ok(format_float(val as f64))
532 }
533 DataType::Float64 => {
534 let val = array.as_primitive::<Float64Type>().value(row_idx);
535 Ok(format_float(val))
536 }
537 DataType::Utf8 => {
538 let arr = array.as_string::<i32>();
539 Ok(self.escape_string(arr.value(row_idx)))
540 }
541 DataType::LargeUtf8 => {
542 let arr = array.as_string::<i64>();
543 Ok(self.escape_string(arr.value(row_idx)))
544 }
545 DataType::Date32 => {
546 let val = array.as_primitive::<Date32Type>().value(row_idx);
547 Ok(format_date32(val))
548 }
549 DataType::Timestamp(unit, _tz) => self.format_timestamp(array, row_idx, unit),
550 DataType::Decimal128(precision, scale) => {
551 self.format_decimal128(array, row_idx, *precision, *scale)
552 }
553 DataType::Binary => {
554 let arr = array.as_binary::<i32>();
555 Ok(hex::encode(arr.value(row_idx)))
556 }
557 DataType::LargeBinary => {
558 let arr = array.as_binary::<i64>();
559 Ok(hex::encode(arr.value(row_idx)))
560 }
561 other => Err(ImportError::ConversionError(format!(
562 "Unsupported Arrow type for CSV conversion: {:?}",
563 other
564 ))),
565 }
566 }
567
568 fn format_timestamp(
570 &self,
571 array: &dyn Array,
572 row_idx: usize,
573 unit: &arrow::datatypes::TimeUnit,
574 ) -> Result<String, ImportError> {
575 use arrow::datatypes::TimeUnit;
576
577 let micros = match unit {
578 TimeUnit::Second => {
579 let val = array.as_primitive::<TimestampSecondType>().value(row_idx);
580 val * 1_000_000
581 }
582 TimeUnit::Millisecond => {
583 let val = array
584 .as_primitive::<TimestampMillisecondType>()
585 .value(row_idx);
586 val * 1_000
587 }
588 TimeUnit::Microsecond => array
589 .as_primitive::<TimestampMicrosecondType>()
590 .value(row_idx),
591 TimeUnit::Nanosecond => {
592 let val = array
593 .as_primitive::<TimestampNanosecondType>()
594 .value(row_idx);
595 val / 1_000
596 }
597 };
598
599 Ok(format_timestamp_micros(micros))
600 }
601
602 fn format_decimal128(
604 &self,
605 array: &dyn Array,
606 row_idx: usize,
607 _precision: u8,
608 scale: i8,
609 ) -> Result<String, ImportError> {
610 let arr = array
611 .as_any()
612 .downcast_ref::<arrow::array::Decimal128Array>()
613 .ok_or_else(|| ImportError::ConversionError("Expected Decimal128Array".to_string()))?;
614
615 let value = arr.value(row_idx);
616 Ok(format_decimal128(value, scale))
617 }
618
619 fn escape_string(&self, s: &str) -> String {
621 let sep = self.options.column_separator;
622 let delim = self.options.column_delimiter;
623
624 let needs_quoting =
625 s.contains(sep) || s.contains(delim) || s.contains('\n') || s.contains('\r');
626
627 if needs_quoting {
628 let mut result = String::with_capacity(s.len() + 4);
629 result.push(delim);
630 for c in s.chars() {
631 if c == delim {
632 result.push(delim);
633 }
634 result.push(c);
635 }
636 result.push(delim);
637 result
638 } else {
639 s.to_string()
640 }
641 }
642}
643
644fn format_float(val: f64) -> String {
646 if val.is_nan() {
647 "NaN".to_string()
648 } else if val.is_infinite() {
649 if val.is_sign_positive() {
650 "Infinity".to_string()
651 } else {
652 "-Infinity".to_string()
653 }
654 } else {
655 val.to_string()
657 }
658}
659
660fn format_date32(days: i32) -> String {
662 let (year, month, day) = days_to_ymd(days);
664 format!("{:04}-{:02}-{:02}", year, month, day)
665}
666
667fn days_to_ymd(days: i32) -> (i32, u32, u32) {
669 let z = days + 719468;
671 let era = if z >= 0 {
672 z / 146097
673 } else {
674 (z - 146096) / 146097
675 };
676 let doe = (z - era * 146097) as u32;
677 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
678 let y = yoe as i32 + era * 400;
679 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
680 let mp = (5 * doy + 2) / 153;
681 let d = doy - (153 * mp + 2) / 5 + 1;
682 let m = if mp < 10 { mp + 3 } else { mp - 9 };
683 let year = if m <= 2 { y + 1 } else { y };
684 (year, m, d)
685}
686
687fn format_timestamp_micros(micros: i64) -> String {
689 let total_seconds = micros / 1_000_000;
690 let frac_micros = (micros % 1_000_000).unsigned_abs();
691
692 let days = (total_seconds / 86400) as i32;
693 let day_seconds = (total_seconds % 86400).unsigned_abs() as u32;
694
695 let (year, month, day) = days_to_ymd(days);
696 let hours = day_seconds / 3600;
697 let minutes = (day_seconds % 3600) / 60;
698 let seconds = day_seconds % 60;
699
700 format!(
701 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
702 year, month, day, hours, minutes, seconds, frac_micros
703 )
704}
705
706fn format_decimal128(value: i128, scale: i8) -> String {
708 if scale <= 0 {
709 let multiplier = 10_i128.pow((-scale) as u32);
711 return (value * multiplier).to_string();
712 }
713
714 let scale = scale as u32;
715 let divisor = 10_i128.pow(scale);
716
717 let is_negative = value < 0;
718 let abs_value = value.unsigned_abs();
719
720 let integer_part = abs_value / divisor as u128;
721 let fractional_part = abs_value % divisor as u128;
722
723 let mut result = String::new();
724 if is_negative {
725 result.push('-');
726 }
727 write!(
728 result,
729 "{}.{:0width$}",
730 integer_part,
731 fractional_part,
732 width = scale as usize
733 )
734 .unwrap();
735
736 result
737}
738
739pub fn record_batch_to_csv(
744 batch: &RecordBatch,
745 options: CsvWriterOptions,
746) -> Result<Vec<u8>, ImportError> {
747 let mut buffer = Vec::new();
748 let mut writer = SyncArrowToCsvWriter::new(&mut buffer, options);
749 writer.write_batch(batch)?;
750 writer.finish()?;
751 Ok(buffer)
752}
753
754pub async fn import_from_record_batch<F, Fut>(
777 execute_sql: F,
778 table: &str,
779 batch: &RecordBatch,
780 options: ArrowImportOptions,
781) -> Result<u64, ImportError>
782where
783 F: FnOnce(String) -> Fut,
784 Fut: std::future::Future<Output = Result<u64, String>>,
785{
786 let csv_bytes = record_batch_to_csv(batch, options.csv_options.clone())?;
788
789 let csv_options = super::csv::CsvImportOptions {
791 encoding: "UTF-8".to_string(),
792 column_separator: options.csv_options.column_separator,
793 column_delimiter: options.csv_options.column_delimiter,
794 row_separator: crate::query::import::RowSeparator::LF,
795 skip_rows: 0,
796 null_value: if options.csv_options.null_value.is_empty() {
797 None
798 } else {
799 Some(options.csv_options.null_value.clone())
800 },
801 trim_mode: crate::query::import::TrimMode::None,
802 compression: crate::query::import::Compression::None,
803 reject_limit: None,
804 use_tls: options.use_encryption,
805 schema: options.schema.clone(),
806 columns: options.columns.clone(),
807 host: options.host.clone(),
808 port: options.port,
809 };
810
811 super::csv::import_from_stream(
813 execute_sql,
814 table,
815 std::io::Cursor::new(csv_bytes),
816 csv_options,
817 )
818 .await
819}
820
821pub async fn import_from_record_batches<I, F, Fut>(
841 execute_sql: F,
842 table: &str,
843 batches: I,
844 options: ArrowImportOptions,
845) -> Result<u64, ImportError>
846where
847 I: IntoIterator<Item = RecordBatch>,
848 F: FnOnce(String) -> Fut,
849 Fut: std::future::Future<Output = Result<u64, String>>,
850{
851 let mut all_csv_bytes = Vec::new();
853 let mut writer = SyncArrowToCsvWriter::new(&mut all_csv_bytes, options.csv_options.clone());
854
855 for batch in batches {
856 writer.write_batch(&batch)?;
857 }
858 writer.finish()?;
859
860 let csv_options = super::csv::CsvImportOptions {
862 encoding: "UTF-8".to_string(),
863 column_separator: options.csv_options.column_separator,
864 column_delimiter: options.csv_options.column_delimiter,
865 row_separator: crate::query::import::RowSeparator::LF,
866 skip_rows: 0,
867 null_value: if options.csv_options.null_value.is_empty() {
868 None
869 } else {
870 Some(options.csv_options.null_value.clone())
871 },
872 trim_mode: crate::query::import::TrimMode::None,
873 compression: crate::query::import::Compression::None,
874 reject_limit: None,
875 use_tls: options.use_encryption,
876 schema: options.schema.clone(),
877 columns: options.columns.clone(),
878 host: options.host.clone(),
879 port: options.port,
880 };
881
882 super::csv::import_from_stream(
884 execute_sql,
885 table,
886 std::io::Cursor::new(all_csv_bytes),
887 csv_options,
888 )
889 .await
890}
891
892pub async fn import_from_arrow_ipc<R, F, Fut>(
911 execute_sql: F,
912 table: &str,
913 mut reader: R,
914 options: ArrowImportOptions,
915) -> Result<u64, ImportError>
916where
917 R: AsyncRead + Unpin + Send,
918 F: FnOnce(String) -> Fut,
919 Fut: std::future::Future<Output = Result<u64, String>>,
920{
921 use tokio::io::AsyncReadExt;
922
923 let mut buffer = Vec::new();
925 reader
926 .read_to_end(&mut buffer)
927 .await
928 .map_err(ImportError::IoError)?;
929
930 let cursor = std::io::Cursor::new(buffer);
932 let ipc_reader = arrow::ipc::reader::FileReader::try_new(cursor, None)
933 .map_err(|e| ImportError::ArrowIpcError(e.to_string()))?;
934
935 let mut all_csv_bytes = Vec::new();
937 let mut writer = SyncArrowToCsvWriter::new(&mut all_csv_bytes, options.csv_options.clone());
938
939 for batch_result in ipc_reader {
940 let batch = batch_result.map_err(|e| ImportError::ArrowIpcError(e.to_string()))?;
941 writer.write_batch(&batch)?;
942 }
943 writer.finish()?;
944
945 let csv_options = super::csv::CsvImportOptions {
947 encoding: "UTF-8".to_string(),
948 column_separator: options.csv_options.column_separator,
949 column_delimiter: options.csv_options.column_delimiter,
950 row_separator: crate::query::import::RowSeparator::LF,
951 skip_rows: 0,
952 null_value: if options.csv_options.null_value.is_empty() {
953 None
954 } else {
955 Some(options.csv_options.null_value.clone())
956 },
957 trim_mode: crate::query::import::TrimMode::None,
958 compression: crate::query::import::Compression::None,
959 reject_limit: None,
960 use_tls: options.use_encryption,
961 schema: options.schema.clone(),
962 columns: options.columns.clone(),
963 host: options.host.clone(),
964 port: options.port,
965 };
966
967 super::csv::import_from_stream(
969 execute_sql,
970 table,
971 std::io::Cursor::new(all_csv_bytes),
972 csv_options,
973 )
974 .await
975}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980 use arrow::array::{
981 ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
982 Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
983 };
984 use arrow::datatypes::{Field, Schema};
985 use std::sync::Arc;
986
987 fn create_test_batch() -> RecordBatch {
988 let schema = Schema::new(vec![
989 Field::new("id", DataType::Int32, false),
990 Field::new("name", DataType::Utf8, true),
991 Field::new("value", DataType::Float64, true),
992 ]);
993
994 let id_array = Int32Array::from(vec![1, 2, 3]);
995 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), None]);
996 let value_array = Float64Array::from(vec![Some(1.5), Some(2.5), Some(3.5)]);
997
998 RecordBatch::try_new(
999 Arc::new(schema),
1000 vec![
1001 Arc::new(id_array) as ArrayRef,
1002 Arc::new(name_array) as ArrayRef,
1003 Arc::new(value_array) as ArrayRef,
1004 ],
1005 )
1006 .unwrap()
1007 }
1008
1009 #[test]
1010 fn test_csv_writer_options_default() {
1011 let options = CsvWriterOptions::default();
1012 assert_eq!(options.column_separator, ',');
1013 assert_eq!(options.column_delimiter, '"');
1014 assert_eq!(options.row_separator, "\n");
1015 assert_eq!(options.null_value, "");
1016 assert!(!options.write_header);
1017 }
1018
1019 #[test]
1020 fn test_arrow_import_options_default() {
1021 let options = ArrowImportOptions::default();
1022 assert!(options.schema.is_none());
1023 assert!(options.columns.is_none());
1024 assert_eq!(options.batch_size, 10000);
1025 assert!(!options.use_encryption);
1026 assert_eq!(options.host, "");
1027 assert_eq!(options.port, 0);
1028 }
1029
1030 #[test]
1031 fn test_arrow_import_options_builder() {
1032 let options = ArrowImportOptions::new()
1033 .schema("myschema")
1034 .columns(vec!["col1".to_string(), "col2".to_string()])
1035 .batch_size(5000)
1036 .null_value("NULL")
1037 .column_separator(';')
1038 .with_encryption()
1039 .exasol_host("exasol.example.com")
1040 .exasol_port(8563);
1041
1042 assert_eq!(options.schema, Some("myschema".to_string()));
1043 assert_eq!(
1044 options.columns,
1045 Some(vec!["col1".to_string(), "col2".to_string()])
1046 );
1047 assert_eq!(options.batch_size, 5000);
1048 assert_eq!(options.csv_options.null_value, "NULL");
1049 assert_eq!(options.csv_options.column_separator, ';');
1050 assert!(options.use_encryption);
1051 assert_eq!(options.host, "exasol.example.com");
1052 assert_eq!(options.port, 8563);
1053 }
1054
1055 #[test]
1056 fn test_record_batch_to_csv_basic() {
1057 let batch = create_test_batch();
1058 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1059 let csv_str = String::from_utf8(result).unwrap();
1060
1061 assert!(csv_str.contains("1,Alice,1.5"));
1062 assert!(csv_str.contains("2,Bob,2.5"));
1063 assert!(csv_str.contains("3,,3.5"));
1064 }
1065
1066 #[test]
1067 fn test_record_batch_to_csv_with_header() {
1068 let batch = create_test_batch();
1069 let options = CsvWriterOptions {
1070 write_header: true,
1071 ..Default::default()
1072 };
1073 let result = record_batch_to_csv(&batch, options).unwrap();
1074 let csv_str = String::from_utf8(result).unwrap();
1075
1076 assert!(csv_str.starts_with("id,name,value\n"));
1077 }
1078
1079 #[test]
1080 fn test_record_batch_to_csv_with_custom_separator() {
1081 let batch = create_test_batch();
1082 let options = CsvWriterOptions {
1083 column_separator: ';',
1084 ..Default::default()
1085 };
1086 let result = record_batch_to_csv(&batch, options).unwrap();
1087 let csv_str = String::from_utf8(result).unwrap();
1088
1089 assert!(csv_str.contains("1;Alice;1.5"));
1090 }
1091
1092 #[test]
1093 fn test_record_batch_to_csv_with_null_value() {
1094 let batch = create_test_batch();
1095 let options = CsvWriterOptions {
1096 null_value: "NULL".to_string(),
1097 ..Default::default()
1098 };
1099 let result = record_batch_to_csv(&batch, options).unwrap();
1100 let csv_str = String::from_utf8(result).unwrap();
1101
1102 assert!(csv_str.contains("3,NULL,3.5"));
1103 }
1104
1105 #[test]
1106 fn test_escape_string_no_special_chars() {
1107 let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1108 let escaped = writer.escape_string("hello");
1109 assert_eq!(escaped, "hello");
1110 }
1111
1112 #[test]
1113 fn test_escape_string_with_separator() {
1114 let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1115 let escaped = writer.escape_string("hello,world");
1116 assert_eq!(escaped, "\"hello,world\"");
1117 }
1118
1119 #[test]
1120 fn test_escape_string_with_delimiter() {
1121 let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1122 let escaped = writer.escape_string("say \"hello\"");
1123 assert_eq!(escaped, "\"say \"\"hello\"\"\"");
1124 }
1125
1126 #[test]
1127 fn test_escape_string_with_newline() {
1128 let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1129 let escaped = writer.escape_string("line1\nline2");
1130 assert_eq!(escaped, "\"line1\nline2\"");
1131 }
1132
1133 #[test]
1134 fn test_format_float_normal() {
1135 assert_eq!(format_float(1.5), "1.5");
1136 assert_eq!(format_float(-2.5), "-2.5");
1137 assert_eq!(format_float(0.0), "0");
1138 }
1139
1140 #[test]
1141 fn test_format_float_special() {
1142 assert_eq!(format_float(f64::NAN), "NaN");
1143 assert_eq!(format_float(f64::INFINITY), "Infinity");
1144 assert_eq!(format_float(f64::NEG_INFINITY), "-Infinity");
1145 }
1146
1147 #[test]
1148 fn test_format_date32() {
1149 assert_eq!(format_date32(0), "1970-01-01");
1150 assert_eq!(format_date32(1), "1970-01-02");
1151 assert_eq!(format_date32(365), "1971-01-01");
1152 assert_eq!(format_date32(-1), "1969-12-31");
1153 }
1154
1155 #[test]
1156 fn test_format_timestamp_micros() {
1157 assert_eq!(format_timestamp_micros(0), "1970-01-01 00:00:00.000000");
1158 assert_eq!(
1159 format_timestamp_micros(1_000_000),
1160 "1970-01-01 00:00:01.000000"
1161 );
1162 assert_eq!(
1163 format_timestamp_micros(86_400_000_000),
1164 "1970-01-02 00:00:00.000000"
1165 );
1166 assert_eq!(
1167 format_timestamp_micros(123456),
1168 "1970-01-01 00:00:00.123456"
1169 );
1170 }
1171
1172 #[test]
1173 fn test_format_decimal128() {
1174 assert_eq!(format_decimal128(12345, 2), "123.45");
1175 assert_eq!(format_decimal128(-12345, 2), "-123.45");
1176 assert_eq!(format_decimal128(100, 2), "1.00");
1177 assert_eq!(format_decimal128(1, 2), "0.01");
1178 assert_eq!(format_decimal128(12345, 0), "12345");
1179 assert_eq!(format_decimal128(12345, -2), "1234500");
1180 }
1181
1182 #[test]
1183 fn test_boolean_array_conversion() {
1184 let schema = Schema::new(vec![Field::new("flag", DataType::Boolean, true)]);
1185 let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1186 let batch =
1187 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1188
1189 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1190 let csv_str = String::from_utf8(result).unwrap();
1191
1192 assert!(csv_str.contains("true"));
1193 assert!(csv_str.contains("false"));
1194 }
1195
1196 #[test]
1197 fn test_int32_array_conversion() {
1198 let schema = Schema::new(vec![Field::new("num", DataType::Int32, true)]);
1199 let arr = Int32Array::from(vec![Some(42), Some(-100), None]);
1200 let batch =
1201 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1202
1203 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1204 let csv_str = String::from_utf8(result).unwrap();
1205
1206 assert!(csv_str.contains("42"));
1207 assert!(csv_str.contains("-100"));
1208 }
1209
1210 #[test]
1211 fn test_int64_array_conversion() {
1212 let schema = Schema::new(vec![Field::new("big_num", DataType::Int64, false)]);
1213 let arr = Int64Array::from(vec![9_223_372_036_854_775_807i64, -1]);
1214 let batch =
1215 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1216
1217 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1218 let csv_str = String::from_utf8(result).unwrap();
1219
1220 assert!(csv_str.contains("9223372036854775807"));
1221 assert!(csv_str.contains("-1"));
1222 }
1223
1224 #[test]
1225 fn test_float32_array_conversion() {
1226 let schema = Schema::new(vec![Field::new("val", DataType::Float32, true)]);
1227 let arr = Float32Array::from(vec![Some(1.5f32), Some(f32::INFINITY), None]);
1228 let batch =
1229 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1230
1231 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1232 let csv_str = String::from_utf8(result).unwrap();
1233
1234 assert!(csv_str.contains("1.5"));
1235 assert!(csv_str.contains("Infinity"));
1236 }
1237
1238 #[test]
1239 fn test_date32_array_conversion() {
1240 let schema = Schema::new(vec![Field::new("date", DataType::Date32, false)]);
1241 let arr = Date32Array::from(vec![0, 365, 19724]); let batch =
1243 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1244
1245 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1246 let csv_str = String::from_utf8(result).unwrap();
1247
1248 assert!(csv_str.contains("1970-01-01"));
1249 assert!(csv_str.contains("1971-01-01"));
1250 }
1251
1252 #[test]
1253 fn test_timestamp_array_conversion() {
1254 let schema = Schema::new(vec![Field::new(
1255 "ts",
1256 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1257 false,
1258 )]);
1259 let arr = TimestampMicrosecondArray::from(vec![0, 1_000_000, 86_400_000_000]);
1260 let batch =
1261 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1262
1263 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1264 let csv_str = String::from_utf8(result).unwrap();
1265
1266 assert!(csv_str.contains("1970-01-01 00:00:00.000000"));
1267 assert!(csv_str.contains("1970-01-01 00:00:01.000000"));
1268 assert!(csv_str.contains("1970-01-02 00:00:00.000000"));
1269 }
1270
1271 #[test]
1272 fn test_decimal128_array_conversion() {
1273 let schema = Schema::new(vec![Field::new(
1274 "price",
1275 DataType::Decimal128(10, 2),
1276 false,
1277 )]);
1278 let arr = Decimal128Array::from(vec![12345i128, -9999, 100])
1279 .with_precision_and_scale(10, 2)
1280 .unwrap();
1281 let batch =
1282 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1283
1284 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1285 let csv_str = String::from_utf8(result).unwrap();
1286
1287 assert!(csv_str.contains("123.45"));
1288 assert!(csv_str.contains("-99.99"));
1289 assert!(csv_str.contains("1.00"));
1290 }
1291
1292 #[test]
1293 fn test_binary_array_conversion() {
1294 let schema = Schema::new(vec![Field::new("data", DataType::Binary, false)]);
1295 let arr = BinaryArray::from(vec![b"Hello".as_slice(), b"\xDE\xAD\xBE\xEF".as_slice()]);
1296 let batch =
1297 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1298
1299 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1300 let csv_str = String::from_utf8(result).unwrap();
1301
1302 assert!(csv_str.contains("48656c6c6f")); assert!(csv_str.contains("deadbeef"));
1304 }
1305
1306 #[test]
1307 fn test_multiple_batches() {
1308 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1309
1310 let batch1 = RecordBatch::try_new(
1311 schema.clone(),
1312 vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
1313 )
1314 .unwrap();
1315 let batch2 = RecordBatch::try_new(
1316 schema,
1317 vec![Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef],
1318 )
1319 .unwrap();
1320
1321 let mut buffer = Vec::new();
1322 let mut writer = SyncArrowToCsvWriter::new(&mut buffer, CsvWriterOptions::default());
1323
1324 writer.write_batch(&batch1).unwrap();
1325 writer.write_batch(&batch2).unwrap();
1326 let total = writer.finish().unwrap();
1327
1328 assert_eq!(total, 4);
1329 let csv_str = String::from_utf8(buffer).unwrap();
1330 assert!(csv_str.contains("1\n"));
1331 assert!(csv_str.contains("2\n"));
1332 assert!(csv_str.contains("3\n"));
1333 assert!(csv_str.contains("4\n"));
1334 }
1335
1336 #[test]
1339 fn test_record_batch_csv_conversion_for_import() {
1340 let batch = create_test_batch();
1341 let options = CsvWriterOptions::default();
1342
1343 let result = record_batch_to_csv(&batch, options);
1345 assert!(result.is_ok());
1346
1347 let csv_bytes = result.unwrap();
1348 let csv_str = String::from_utf8(csv_bytes).unwrap();
1349
1350 assert!(csv_str.contains("1,Alice,1.5"));
1352 assert!(csv_str.contains("2,Bob,2.5"));
1353 assert!(csv_str.contains("3,,3.5"));
1355 }
1356
1357 #[test]
1360 fn test_multiple_record_batches_csv_conversion_for_import() {
1361 let batch1 = create_test_batch();
1362 let batch2 = create_test_batch();
1363 let options = CsvWriterOptions::default();
1364
1365 let mut all_csv_bytes = Vec::new();
1367 let mut writer = SyncArrowToCsvWriter::new(&mut all_csv_bytes, options);
1368
1369 writer.write_batch(&batch1).unwrap();
1370 writer.write_batch(&batch2).unwrap();
1371 let total_rows = writer.finish().unwrap();
1372
1373 assert_eq!(total_rows, 6);
1374
1375 let csv_str = String::from_utf8(all_csv_bytes).unwrap();
1376 let lines: Vec<&str> = csv_str.lines().collect();
1378 assert_eq!(lines.len(), 6);
1379 }
1380
1381 #[test]
1389 fn test_all_integer_types_conversion() {
1390 use arrow::array::{
1391 Int16Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
1392 };
1393
1394 let schema = Schema::new(vec![
1395 Field::new("i8", DataType::Int8, true),
1396 Field::new("i16", DataType::Int16, true),
1397 Field::new("i32", DataType::Int32, true),
1398 Field::new("i64", DataType::Int64, true),
1399 Field::new("u8", DataType::UInt8, true),
1400 Field::new("u16", DataType::UInt16, true),
1401 Field::new("u32", DataType::UInt32, true),
1402 Field::new("u64", DataType::UInt64, true),
1403 ]);
1404
1405 let batch = RecordBatch::try_new(
1406 Arc::new(schema),
1407 vec![
1408 Arc::new(Int8Array::from(vec![Some(i8::MIN), Some(i8::MAX), None])) as ArrayRef,
1409 Arc::new(Int16Array::from(vec![Some(i16::MIN), Some(i16::MAX), None])) as ArrayRef,
1410 Arc::new(Int32Array::from(vec![Some(i32::MIN), Some(i32::MAX), None])) as ArrayRef,
1411 Arc::new(Int64Array::from(vec![Some(i64::MIN), Some(i64::MAX), None])) as ArrayRef,
1412 Arc::new(UInt8Array::from(vec![Some(u8::MIN), Some(u8::MAX), None])) as ArrayRef,
1413 Arc::new(UInt16Array::from(vec![
1414 Some(u16::MIN),
1415 Some(u16::MAX),
1416 None,
1417 ])) as ArrayRef,
1418 Arc::new(UInt32Array::from(vec![
1419 Some(u32::MIN),
1420 Some(u32::MAX),
1421 None,
1422 ])) as ArrayRef,
1423 Arc::new(UInt64Array::from(vec![
1424 Some(u64::MIN),
1425 Some(u64::MAX),
1426 None,
1427 ])) as ArrayRef,
1428 ],
1429 )
1430 .unwrap();
1431
1432 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1433 let csv_str = String::from_utf8(result).unwrap();
1434
1435 assert!(csv_str.contains("-128,-32768,-2147483648,-9223372036854775808,0,0,0,0"));
1437 assert!(csv_str.contains(
1439 "127,32767,2147483647,9223372036854775807,255,65535,4294967295,18446744073709551615"
1440 ));
1441 }
1442
1443 #[test]
1444 fn test_all_float_types_conversion() {
1445 let schema = Schema::new(vec![
1446 Field::new("f32", DataType::Float32, true),
1447 Field::new("f64", DataType::Float64, true),
1448 ]);
1449
1450 let batch = RecordBatch::try_new(
1451 Arc::new(schema),
1452 vec![
1453 Arc::new(Float32Array::from(vec![
1454 Some(1.5f32),
1455 Some(-2.5f32),
1456 Some(f32::INFINITY),
1457 Some(f32::NEG_INFINITY),
1458 Some(f32::NAN),
1459 None,
1460 ])) as ArrayRef,
1461 Arc::new(Float64Array::from(vec![
1462 Some(1.5f64),
1463 Some(-2.5f64),
1464 Some(f64::INFINITY),
1465 Some(f64::NEG_INFINITY),
1466 Some(f64::NAN),
1467 None,
1468 ])) as ArrayRef,
1469 ],
1470 )
1471 .unwrap();
1472
1473 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1474 let csv_str = String::from_utf8(result).unwrap();
1475
1476 assert!(csv_str.contains("1.5,1.5"));
1477 assert!(csv_str.contains("-2.5,-2.5"));
1478 assert!(csv_str.contains("Infinity,Infinity"));
1479 assert!(csv_str.contains("-Infinity,-Infinity"));
1480 assert!(csv_str.contains("NaN,NaN"));
1481 }
1482
1483 #[test]
1484 fn test_all_timestamp_units_conversion() {
1485 use arrow::array::{
1486 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
1487 };
1488
1489 let schema = Schema::new(vec![
1490 Field::new(
1491 "ts_sec",
1492 DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None),
1493 false,
1494 ),
1495 Field::new(
1496 "ts_ms",
1497 DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1498 false,
1499 ),
1500 Field::new(
1501 "ts_us",
1502 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1503 false,
1504 ),
1505 Field::new(
1506 "ts_ns",
1507 DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
1508 false,
1509 ),
1510 ]);
1511
1512 let batch = RecordBatch::try_new(
1514 Arc::new(schema),
1515 vec![
1516 Arc::new(TimestampSecondArray::from(vec![1i64])) as ArrayRef,
1517 Arc::new(TimestampMillisecondArray::from(vec![1000i64])) as ArrayRef,
1518 Arc::new(TimestampMicrosecondArray::from(vec![1_000_000i64])) as ArrayRef,
1519 Arc::new(TimestampNanosecondArray::from(vec![1_000_000_000i64])) as ArrayRef,
1520 ],
1521 )
1522 .unwrap();
1523
1524 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1525 let csv_str = String::from_utf8(result).unwrap();
1526
1527 let expected = "1970-01-01 00:00:01.000000";
1529 let row = csv_str.lines().next().unwrap();
1530 let parts: Vec<&str> = row.split(',').collect();
1531 assert_eq!(parts.len(), 4);
1532 for part in parts {
1533 assert_eq!(part, expected);
1534 }
1535 }
1536
1537 #[test]
1538 fn test_string_types_conversion() {
1539 use arrow::array::LargeStringArray;
1540
1541 let schema = Schema::new(vec![
1542 Field::new("utf8", DataType::Utf8, true),
1543 Field::new("large_utf8", DataType::LargeUtf8, true),
1544 ]);
1545
1546 let batch = RecordBatch::try_new(
1547 Arc::new(schema),
1548 vec![
1549 Arc::new(StringArray::from(vec![Some("hello"), Some("world"), None])) as ArrayRef,
1550 Arc::new(LargeStringArray::from(vec![
1551 Some("large"),
1552 Some("string"),
1553 None,
1554 ])) as ArrayRef,
1555 ],
1556 )
1557 .unwrap();
1558
1559 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1560 let csv_str = String::from_utf8(result).unwrap();
1561
1562 assert!(csv_str.contains("hello,large"));
1563 assert!(csv_str.contains("world,string"));
1564 }
1565
1566 #[test]
1567 fn test_binary_types_conversion() {
1568 use arrow::array::LargeBinaryArray;
1569
1570 let schema = Schema::new(vec![
1571 Field::new("bin", DataType::Binary, false),
1572 Field::new("large_bin", DataType::LargeBinary, false),
1573 ]);
1574
1575 let batch = RecordBatch::try_new(
1576 Arc::new(schema),
1577 vec![
1578 Arc::new(BinaryArray::from(vec![
1579 b"abc".as_slice(),
1580 b"\x00\xff".as_slice(),
1581 ])) as ArrayRef,
1582 Arc::new(LargeBinaryArray::from(vec![
1583 b"def".as_slice(),
1584 b"\x01\x02".as_slice(),
1585 ])) as ArrayRef,
1586 ],
1587 )
1588 .unwrap();
1589
1590 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1591 let csv_str = String::from_utf8(result).unwrap();
1592
1593 assert!(csv_str.contains("616263,646566")); assert!(csv_str.contains("00ff,0102")); }
1597
1598 #[test]
1602 fn test_null_values_default_representation() {
1603 let schema = Schema::new(vec![
1604 Field::new("str", DataType::Utf8, true),
1605 Field::new("int", DataType::Int32, true),
1606 Field::new("float", DataType::Float64, true),
1607 ]);
1608
1609 let batch = RecordBatch::try_new(
1610 Arc::new(schema),
1611 vec![
1612 Arc::new(StringArray::from(vec![
1613 None as Option<&str>,
1614 Some("value"),
1615 None,
1616 ])) as ArrayRef,
1617 Arc::new(Int32Array::from(vec![None, Some(42), None])) as ArrayRef,
1618 Arc::new(Float64Array::from(vec![None, Some(3.125), None])) as ArrayRef,
1619 ],
1620 )
1621 .unwrap();
1622
1623 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1625 let csv_str = String::from_utf8(result).unwrap();
1626
1627 assert!(csv_str.contains(",,"));
1629 assert!(csv_str.contains("value,42,3.125"));
1631 }
1632
1633 #[test]
1634 fn test_null_values_custom_representation() {
1635 let schema = Schema::new(vec![
1636 Field::new("str", DataType::Utf8, true),
1637 Field::new("int", DataType::Int32, true),
1638 ]);
1639
1640 let batch = RecordBatch::try_new(
1641 Arc::new(schema),
1642 vec![
1643 Arc::new(StringArray::from(vec![None as Option<&str>, Some("value")])) as ArrayRef,
1644 Arc::new(Int32Array::from(vec![None, Some(42)])) as ArrayRef,
1645 ],
1646 )
1647 .unwrap();
1648
1649 let options = CsvWriterOptions {
1650 null_value: "\\N".to_string(),
1651 ..Default::default()
1652 };
1653 let result = record_batch_to_csv(&batch, options).unwrap();
1654 let csv_str = String::from_utf8(result).unwrap();
1655
1656 assert!(csv_str.contains("\\N,\\N"));
1658 assert!(csv_str.contains("value,42"));
1659 }
1660
1661 #[test]
1662 fn test_null_values_in_nested_data() {
1663 let schema = Schema::new(vec![
1665 Field::new("bool", DataType::Boolean, true),
1666 Field::new("date", DataType::Date32, true),
1667 Field::new("decimal", DataType::Decimal128(10, 2), true),
1668 ]);
1669
1670 let batch = RecordBatch::try_new(
1671 Arc::new(schema),
1672 vec![
1673 Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)])) as ArrayRef,
1674 Arc::new(Date32Array::from(vec![Some(0), None, Some(365)])) as ArrayRef,
1675 Arc::new(
1676 Decimal128Array::from(vec![Some(12345i128), None, Some(100)])
1677 .with_precision_and_scale(10, 2)
1678 .unwrap(),
1679 ) as ArrayRef,
1680 ],
1681 )
1682 .unwrap();
1683
1684 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1685 let csv_str = String::from_utf8(result).unwrap();
1686
1687 assert!(csv_str.contains("true,1970-01-01,123.45"));
1688 assert!(csv_str.contains(",,")); assert!(csv_str.contains("false,1971-01-01,1.00"));
1690 }
1691
1692 #[test]
1696 fn test_escape_comma_in_string() {
1697 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1698 let batch = RecordBatch::try_new(
1699 Arc::new(schema),
1700 vec![Arc::new(StringArray::from(vec!["hello,world"])) as ArrayRef],
1701 )
1702 .unwrap();
1703
1704 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1705 let csv_str = String::from_utf8(result).unwrap();
1706
1707 assert!(csv_str.contains("\"hello,world\""));
1709 }
1710
1711 #[test]
1712 fn test_escape_quotes_in_string() {
1713 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1714 let batch = RecordBatch::try_new(
1715 Arc::new(schema),
1716 vec![Arc::new(StringArray::from(vec!["say \"hello\""])) as ArrayRef],
1717 )
1718 .unwrap();
1719
1720 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1721 let csv_str = String::from_utf8(result).unwrap();
1722
1723 assert!(csv_str.contains("\"say \"\"hello\"\"\""));
1725 }
1726
1727 #[test]
1728 fn test_escape_newlines_in_string() {
1729 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1730 let batch = RecordBatch::try_new(
1731 Arc::new(schema),
1732 vec![Arc::new(StringArray::from(vec!["line1\nline2", "line1\r\nline2"])) as ArrayRef],
1733 )
1734 .unwrap();
1735
1736 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1737 let csv_str = String::from_utf8(result).unwrap();
1738
1739 assert!(csv_str.contains("\"line1\nline2\""));
1741 assert!(csv_str.contains("\"line1\r\nline2\""));
1742 }
1743
1744 #[test]
1745 fn test_escape_carriage_return_in_string() {
1746 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1747 let batch = RecordBatch::try_new(
1748 Arc::new(schema),
1749 vec![Arc::new(StringArray::from(vec!["text\rwith\rCR"])) as ArrayRef],
1750 )
1751 .unwrap();
1752
1753 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1754 let csv_str = String::from_utf8(result).unwrap();
1755
1756 assert!(csv_str.contains("\"text\rwith\rCR\""));
1758 }
1759
1760 #[test]
1761 fn test_escape_multiple_special_chars() {
1762 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1763 let batch = RecordBatch::try_new(
1764 Arc::new(schema),
1765 vec![Arc::new(StringArray::from(vec![
1766 "combo: \"quoted\", newline\n, and comma",
1767 ])) as ArrayRef],
1768 )
1769 .unwrap();
1770
1771 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1772 let csv_str = String::from_utf8(result).unwrap();
1773
1774 assert!(csv_str.contains("\"combo: \"\"quoted\"\", newline\n, and comma\""));
1776 }
1777
1778 #[test]
1779 fn test_custom_separator_escaping() {
1780 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1781 let batch = RecordBatch::try_new(
1782 Arc::new(schema),
1783 vec![Arc::new(StringArray::from(vec!["value;with;semicolons"])) as ArrayRef],
1784 )
1785 .unwrap();
1786
1787 let options = CsvWriterOptions {
1789 column_separator: ';',
1790 ..Default::default()
1791 };
1792 let result = record_batch_to_csv(&batch, options).unwrap();
1793 let csv_str = String::from_utf8(result).unwrap();
1794
1795 assert!(csv_str.contains("\"value;with;semicolons\""));
1797 }
1798
1799 #[test]
1800 fn test_custom_delimiter_escaping() {
1801 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1802 let batch = RecordBatch::try_new(
1803 Arc::new(schema),
1804 vec![Arc::new(StringArray::from(vec!["value'with'quotes"])) as ArrayRef],
1805 )
1806 .unwrap();
1807
1808 let options = CsvWriterOptions {
1810 column_delimiter: '\'',
1811 ..Default::default()
1812 };
1813 let result = record_batch_to_csv(&batch, options).unwrap();
1814 let csv_str = String::from_utf8(result).unwrap();
1815
1816 assert!(csv_str.contains("'value''with''quotes'"));
1818 }
1819
1820 #[test]
1821 fn test_no_escaping_for_clean_string() {
1822 let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1823 let batch = RecordBatch::try_new(
1824 Arc::new(schema),
1825 vec![Arc::new(StringArray::from(vec!["clean simple text"])) as ArrayRef],
1826 )
1827 .unwrap();
1828
1829 let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1830 let csv_str = String::from_utf8(result).unwrap();
1831
1832 assert!(csv_str.contains("clean simple text"));
1834 assert!(!csv_str.contains("\"clean simple text\""));
1835 }
1836
1837 #[test]
1838 fn test_days_to_ymd_edge_cases() {
1839 let (y, m, d) = days_to_ymd(0);
1841 assert_eq!((y, m, d), (1970, 1, 1));
1842
1843 let (y, m, d) = days_to_ymd(11016);
1845 assert_eq!((y, m, d), (2000, 2, 29));
1846
1847 let (y, m, d) = days_to_ymd(11017);
1849 assert_eq!((y, m, d), (2000, 3, 1));
1850
1851 let (y, m, d) = days_to_ymd(10956);
1853 assert_eq!((y, m, d), (1999, 12, 31));
1854 }
1855}