Skip to main content

exarrow_rs/import/
arrow.rs

1//! Arrow RecordBatch import functionality.
2//!
3//! This module provides utilities for importing Arrow RecordBatches and Arrow IPC
4//! files/streams into Exasol tables by converting them to CSV format on-the-fly.
5//!
6//! # Overview
7//!
8//! Arrow data is converted to CSV format using [`ArrowToCsvWriter`] which:
9//! - Handles all Arrow data types (Int, Float, String, Date, Timestamp, Decimal, etc.)
10//! - Properly escapes CSV special characters
11//! - Handles NULL values
12//! - Supports configurable CSV format options
13//!
14//! # Example
15//!
16
17use super::ImportError;
18use arrow::array::cast::AsArray;
19use arrow::array::types::{
20    Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
21    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
22    TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
23};
24use arrow::array::{Array, RecordBatch};
25use arrow::datatypes::DataType;
26use std::fmt::Write as FmtWrite;
27use std::io::Write;
28use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
29
30/// Options for CSV output when converting Arrow data.
31#[derive(Debug, Clone)]
32pub struct CsvWriterOptions {
33    /// Column separator character (default: ',')
34    pub column_separator: char,
35    /// Column delimiter character for quoting (default: '"')
36    pub column_delimiter: char,
37    /// Row separator (default: '\n')
38    pub row_separator: &'static str,
39    /// String to represent NULL values (default: empty string)
40    pub null_value: String,
41    /// Whether to write column headers (default: false)
42    pub write_header: bool,
43    /// Date format string (default: "%Y-%m-%d")
44    pub date_format: String,
45    /// Timestamp format string (default: "%Y-%m-%d %H:%M:%S%.6f")
46    pub timestamp_format: String,
47}
48
49impl Default for CsvWriterOptions {
50    fn default() -> Self {
51        Self {
52            column_separator: ',',
53            column_delimiter: '"',
54            row_separator: "\n",
55            null_value: String::new(),
56            write_header: false,
57            date_format: "%Y-%m-%d".to_string(),
58            timestamp_format: "%Y-%m-%d %H:%M:%S%.6f".to_string(),
59        }
60    }
61}
62
63/// Options for Arrow import operations.
64#[derive(Debug, Clone)]
65pub struct ArrowImportOptions {
66    /// Target schema name (optional)
67    pub schema: Option<String>,
68    /// Columns to import (optional, imports all if None)
69    pub columns: Option<Vec<String>>,
70    /// Batch size for chunked processing (default: 10000)
71    pub batch_size: usize,
72    /// CSV writer options
73    pub csv_options: CsvWriterOptions,
74    /// Whether to use TLS encryption for transport
75    pub use_encryption: bool,
76    /// Exasol host for HTTP transport connection.
77    /// This is typically the same host as the WebSocket connection.
78    pub host: String,
79    /// Exasol port for HTTP transport connection.
80    /// This is typically the same port as the WebSocket connection.
81    pub port: u16,
82}
83
84impl Default for ArrowImportOptions {
85    fn default() -> Self {
86        Self {
87            schema: None,
88            columns: None,
89            batch_size: 10000,
90            csv_options: CsvWriterOptions::default(),
91            use_encryption: false,
92            host: String::new(),
93            port: 0,
94        }
95    }
96}
97
98impl ArrowImportOptions {
99    /// Create new options with default values.
100    #[must_use]
101    pub fn new() -> Self {
102        Self::default()
103    }
104
105    #[must_use]
106    pub fn schema(mut self, schema: impl Into<String>) -> Self {
107        self.schema = Some(schema.into());
108        self
109    }
110
111    #[must_use]
112    pub fn columns(mut self, columns: Vec<String>) -> Self {
113        self.columns = Some(columns);
114        self
115    }
116
117    #[must_use]
118    pub fn batch_size(mut self, size: usize) -> Self {
119        self.batch_size = size;
120        self
121    }
122
123    #[must_use]
124    pub fn null_value(mut self, value: impl Into<String>) -> Self {
125        self.csv_options.null_value = value.into();
126        self
127    }
128
129    #[must_use]
130    pub fn column_separator(mut self, sep: char) -> Self {
131        self.csv_options.column_separator = sep;
132        self
133    }
134
135    #[must_use]
136    pub fn column_delimiter(mut self, delim: char) -> Self {
137        self.csv_options.column_delimiter = delim;
138        self
139    }
140
141    #[must_use]
142    pub fn with_encryption(mut self) -> Self {
143        self.use_encryption = true;
144        self
145    }
146
147    /// Set the Exasol host for HTTP transport connection.
148    ///
149    /// This should be the same host as used for the WebSocket connection.
150    #[must_use]
151    pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
152        self.host = host.into();
153        self
154    }
155
156    /// Set the Exasol port for HTTP transport connection.
157    ///
158    /// This should be the same port as used for the WebSocket connection.
159    #[must_use]
160    pub fn exasol_port(mut self, port: u16) -> Self {
161        self.port = port;
162        self
163    }
164}
165
166/// Arrow to CSV writer that converts RecordBatches to CSV format.
167///
168/// This writer handles:
169/// - All Arrow numeric types (Int8-64, UInt8-64, Float32/64)
170/// - String types (Utf8, LargeUtf8)
171/// - Boolean types
172/// - Date types (Date32)
173/// - Timestamp types (with various precisions)
174/// - Decimal128 types
175/// - NULL handling
176/// - Proper CSV escaping
177pub struct ArrowToCsvWriter<W: AsyncWrite + Unpin> {
178    writer: W,
179    options: CsvWriterOptions,
180    rows_written: usize,
181}
182
183impl<W: AsyncWrite + Unpin> ArrowToCsvWriter<W> {
184    /// Create a new Arrow to CSV writer.
185    pub fn new(writer: W, options: CsvWriterOptions) -> Self {
186        Self {
187            writer,
188            options,
189            rows_written: 0,
190        }
191    }
192
193    /// Write a RecordBatch to CSV format.
194    ///
195    /// Returns the number of bytes written.
196    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<usize, ImportError> {
197        let mut bytes_written = 0;
198        let num_rows = batch.num_rows();
199        let num_cols = batch.num_columns();
200
201        // Write header if requested and this is the first batch
202        if self.options.write_header && self.rows_written == 0 {
203            let header = self.format_header(batch)?;
204            self.writer
205                .write_all(header.as_bytes())
206                .await
207                .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
208            bytes_written += header.len();
209        }
210
211        // Write each row
212        for row_idx in 0..num_rows {
213            let mut row_str = String::with_capacity(256);
214
215            for col_idx in 0..num_cols {
216                if col_idx > 0 {
217                    row_str.push(self.options.column_separator);
218                }
219
220                let column = batch.column(col_idx);
221                let value = self.format_value(column, row_idx)?;
222                row_str.push_str(&value);
223            }
224
225            row_str.push_str(self.options.row_separator);
226
227            self.writer
228                .write_all(row_str.as_bytes())
229                .await
230                .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
231            bytes_written += row_str.len();
232        }
233
234        self.rows_written += num_rows;
235        Ok(bytes_written)
236    }
237
238    /// Finish writing and flush the output.
239    pub async fn finish(mut self) -> Result<usize, ImportError> {
240        self.writer
241            .flush()
242            .await
243            .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
244        Ok(self.rows_written)
245    }
246
247    /// Get the number of rows written so far.
248    #[must_use]
249    pub fn rows_written(&self) -> usize {
250        self.rows_written
251    }
252
253    /// Format header row from schema.
254    fn format_header(&self, batch: &RecordBatch) -> Result<String, ImportError> {
255        let schema = batch.schema();
256        let mut header = String::new();
257
258        for (idx, field) in schema.fields().iter().enumerate() {
259            if idx > 0 {
260                header.push(self.options.column_separator);
261            }
262            header.push_str(&self.escape_string(field.name()));
263        }
264        header.push_str(self.options.row_separator);
265
266        Ok(header)
267    }
268
269    /// Format a value from an Arrow array at the given row index.
270    fn format_value(&self, array: &dyn Array, row_idx: usize) -> Result<String, ImportError> {
271        if array.is_null(row_idx) {
272            return Ok(self.options.null_value.clone());
273        }
274
275        let data_type = array.data_type();
276        match data_type {
277            DataType::Boolean => {
278                let arr = array.as_boolean();
279                Ok(if arr.value(row_idx) { "true" } else { "false" }.to_string())
280            }
281            DataType::Int8 => Ok(array.as_primitive::<Int8Type>().value(row_idx).to_string()),
282            DataType::Int16 => Ok(array.as_primitive::<Int16Type>().value(row_idx).to_string()),
283            DataType::Int32 => Ok(array.as_primitive::<Int32Type>().value(row_idx).to_string()),
284            DataType::Int64 => Ok(array.as_primitive::<Int64Type>().value(row_idx).to_string()),
285            DataType::UInt8 => Ok(array.as_primitive::<UInt8Type>().value(row_idx).to_string()),
286            DataType::UInt16 => Ok(array
287                .as_primitive::<UInt16Type>()
288                .value(row_idx)
289                .to_string()),
290            DataType::UInt32 => Ok(array
291                .as_primitive::<UInt32Type>()
292                .value(row_idx)
293                .to_string()),
294            DataType::UInt64 => Ok(array
295                .as_primitive::<UInt64Type>()
296                .value(row_idx)
297                .to_string()),
298            DataType::Float32 => {
299                let val = array.as_primitive::<Float32Type>().value(row_idx);
300                Ok(format_float(val as f64))
301            }
302            DataType::Float64 => {
303                let val = array.as_primitive::<Float64Type>().value(row_idx);
304                Ok(format_float(val))
305            }
306            DataType::Utf8 => {
307                let arr = array.as_string::<i32>();
308                Ok(self.escape_string(arr.value(row_idx)))
309            }
310            DataType::LargeUtf8 => {
311                let arr = array.as_string::<i64>();
312                Ok(self.escape_string(arr.value(row_idx)))
313            }
314            DataType::Date32 => {
315                let val = array.as_primitive::<Date32Type>().value(row_idx);
316                Ok(format_date32(val))
317            }
318            DataType::Timestamp(unit, _tz) => self.format_timestamp(array, row_idx, unit),
319            DataType::Decimal128(precision, scale) => {
320                self.format_decimal128(array, row_idx, *precision, *scale)
321            }
322            DataType::Binary => {
323                let arr = array.as_binary::<i32>();
324                Ok(hex::encode(arr.value(row_idx)))
325            }
326            DataType::LargeBinary => {
327                let arr = array.as_binary::<i64>();
328                Ok(hex::encode(arr.value(row_idx)))
329            }
330            other => Err(ImportError::ConversionError(format!(
331                "Unsupported Arrow type for CSV conversion: {:?}",
332                other
333            ))),
334        }
335    }
336
337    /// Format a timestamp value.
338    fn format_timestamp(
339        &self,
340        array: &dyn Array,
341        row_idx: usize,
342        unit: &arrow::datatypes::TimeUnit,
343    ) -> Result<String, ImportError> {
344        use arrow::datatypes::TimeUnit;
345
346        let micros = match unit {
347            TimeUnit::Second => {
348                let val = array.as_primitive::<TimestampSecondType>().value(row_idx);
349                val * 1_000_000
350            }
351            TimeUnit::Millisecond => {
352                let val = array
353                    .as_primitive::<TimestampMillisecondType>()
354                    .value(row_idx);
355                val * 1_000
356            }
357            TimeUnit::Microsecond => array
358                .as_primitive::<TimestampMicrosecondType>()
359                .value(row_idx),
360            TimeUnit::Nanosecond => {
361                let val = array
362                    .as_primitive::<TimestampNanosecondType>()
363                    .value(row_idx);
364                val / 1_000
365            }
366        };
367
368        Ok(format_timestamp_micros(micros))
369    }
370
371    /// Format a Decimal128 value.
372    fn format_decimal128(
373        &self,
374        array: &dyn Array,
375        row_idx: usize,
376        _precision: u8,
377        scale: i8,
378    ) -> Result<String, ImportError> {
379        let arr = array
380            .as_any()
381            .downcast_ref::<arrow::array::Decimal128Array>()
382            .ok_or_else(|| ImportError::ConversionError("Expected Decimal128Array".to_string()))?;
383
384        let value = arr.value(row_idx);
385        Ok(format_decimal128(value, scale))
386    }
387
388    /// Escape a string value for CSV.
389    ///
390    /// Quotes the string if it contains the separator, delimiter, or newlines.
391    /// Doubles any delimiter characters inside the string.
392    fn escape_string(&self, s: &str) -> String {
393        let sep = self.options.column_separator;
394        let delim = self.options.column_delimiter;
395
396        // Check if quoting is needed
397        let needs_quoting =
398            s.contains(sep) || s.contains(delim) || s.contains('\n') || s.contains('\r');
399
400        if needs_quoting {
401            // Quote the string and double any delimiter characters
402            let mut result = String::with_capacity(s.len() + 4);
403            result.push(delim);
404            for c in s.chars() {
405                if c == delim {
406                    result.push(delim);
407                }
408                result.push(c);
409            }
410            result.push(delim);
411            result
412        } else {
413            s.to_string()
414        }
415    }
416}
417
418/// Synchronous Arrow to CSV converter for testing and simple use cases.
419pub struct SyncArrowToCsvWriter<W: Write> {
420    writer: W,
421    options: CsvWriterOptions,
422    rows_written: usize,
423}
424
425impl<W: Write> SyncArrowToCsvWriter<W> {
426    /// Create a new synchronous Arrow to CSV writer.
427    pub fn new(writer: W, options: CsvWriterOptions) -> Self {
428        Self {
429            writer,
430            options,
431            rows_written: 0,
432        }
433    }
434
435    /// Write a RecordBatch to CSV format synchronously.
436    pub fn write_batch(&mut self, batch: &RecordBatch) -> Result<usize, ImportError> {
437        let mut bytes_written = 0;
438        let num_rows = batch.num_rows();
439        let num_cols = batch.num_columns();
440
441        // Write header if requested and this is the first batch
442        if self.options.write_header && self.rows_written == 0 {
443            let header = self.format_header(batch)?;
444            self.writer
445                .write_all(header.as_bytes())
446                .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
447            bytes_written += header.len();
448        }
449
450        // Write each row
451        for row_idx in 0..num_rows {
452            let mut row_str = String::with_capacity(256);
453
454            for col_idx in 0..num_cols {
455                if col_idx > 0 {
456                    row_str.push(self.options.column_separator);
457                }
458
459                let column = batch.column(col_idx);
460                let value = self.format_value(column, row_idx)?;
461                row_str.push_str(&value);
462            }
463
464            row_str.push_str(self.options.row_separator);
465
466            self.writer
467                .write_all(row_str.as_bytes())
468                .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
469            bytes_written += row_str.len();
470        }
471
472        self.rows_written += num_rows;
473        Ok(bytes_written)
474    }
475
476    /// Finish writing and flush the output.
477    pub fn finish(mut self) -> Result<usize, ImportError> {
478        self.writer
479            .flush()
480            .map_err(|e| ImportError::CsvWriteError(e.to_string()))?;
481        Ok(self.rows_written)
482    }
483
484    /// Format header row from schema.
485    fn format_header(&self, batch: &RecordBatch) -> Result<String, ImportError> {
486        let schema = batch.schema();
487        let mut header = String::new();
488
489        for (idx, field) in schema.fields().iter().enumerate() {
490            if idx > 0 {
491                header.push(self.options.column_separator);
492            }
493            header.push_str(&self.escape_string(field.name()));
494        }
495        header.push_str(self.options.row_separator);
496
497        Ok(header)
498    }
499
500    /// Format a value from an Arrow array at the given row index.
501    fn format_value(&self, array: &dyn Array, row_idx: usize) -> Result<String, ImportError> {
502        if array.is_null(row_idx) {
503            return Ok(self.options.null_value.clone());
504        }
505
506        let data_type = array.data_type();
507        match data_type {
508            DataType::Boolean => {
509                let arr = array.as_boolean();
510                Ok(if arr.value(row_idx) { "true" } else { "false" }.to_string())
511            }
512            DataType::Int8 => Ok(array.as_primitive::<Int8Type>().value(row_idx).to_string()),
513            DataType::Int16 => Ok(array.as_primitive::<Int16Type>().value(row_idx).to_string()),
514            DataType::Int32 => Ok(array.as_primitive::<Int32Type>().value(row_idx).to_string()),
515            DataType::Int64 => Ok(array.as_primitive::<Int64Type>().value(row_idx).to_string()),
516            DataType::UInt8 => Ok(array.as_primitive::<UInt8Type>().value(row_idx).to_string()),
517            DataType::UInt16 => Ok(array
518                .as_primitive::<UInt16Type>()
519                .value(row_idx)
520                .to_string()),
521            DataType::UInt32 => Ok(array
522                .as_primitive::<UInt32Type>()
523                .value(row_idx)
524                .to_string()),
525            DataType::UInt64 => Ok(array
526                .as_primitive::<UInt64Type>()
527                .value(row_idx)
528                .to_string()),
529            DataType::Float32 => {
530                let val = array.as_primitive::<Float32Type>().value(row_idx);
531                Ok(format_float(val as f64))
532            }
533            DataType::Float64 => {
534                let val = array.as_primitive::<Float64Type>().value(row_idx);
535                Ok(format_float(val))
536            }
537            DataType::Utf8 => {
538                let arr = array.as_string::<i32>();
539                Ok(self.escape_string(arr.value(row_idx)))
540            }
541            DataType::LargeUtf8 => {
542                let arr = array.as_string::<i64>();
543                Ok(self.escape_string(arr.value(row_idx)))
544            }
545            DataType::Date32 => {
546                let val = array.as_primitive::<Date32Type>().value(row_idx);
547                Ok(format_date32(val))
548            }
549            DataType::Timestamp(unit, _tz) => self.format_timestamp(array, row_idx, unit),
550            DataType::Decimal128(precision, scale) => {
551                self.format_decimal128(array, row_idx, *precision, *scale)
552            }
553            DataType::Binary => {
554                let arr = array.as_binary::<i32>();
555                Ok(hex::encode(arr.value(row_idx)))
556            }
557            DataType::LargeBinary => {
558                let arr = array.as_binary::<i64>();
559                Ok(hex::encode(arr.value(row_idx)))
560            }
561            other => Err(ImportError::ConversionError(format!(
562                "Unsupported Arrow type for CSV conversion: {:?}",
563                other
564            ))),
565        }
566    }
567
568    /// Format a timestamp value.
569    fn format_timestamp(
570        &self,
571        array: &dyn Array,
572        row_idx: usize,
573        unit: &arrow::datatypes::TimeUnit,
574    ) -> Result<String, ImportError> {
575        use arrow::datatypes::TimeUnit;
576
577        let micros = match unit {
578            TimeUnit::Second => {
579                let val = array.as_primitive::<TimestampSecondType>().value(row_idx);
580                val * 1_000_000
581            }
582            TimeUnit::Millisecond => {
583                let val = array
584                    .as_primitive::<TimestampMillisecondType>()
585                    .value(row_idx);
586                val * 1_000
587            }
588            TimeUnit::Microsecond => array
589                .as_primitive::<TimestampMicrosecondType>()
590                .value(row_idx),
591            TimeUnit::Nanosecond => {
592                let val = array
593                    .as_primitive::<TimestampNanosecondType>()
594                    .value(row_idx);
595                val / 1_000
596            }
597        };
598
599        Ok(format_timestamp_micros(micros))
600    }
601
602    /// Format a Decimal128 value.
603    fn format_decimal128(
604        &self,
605        array: &dyn Array,
606        row_idx: usize,
607        _precision: u8,
608        scale: i8,
609    ) -> Result<String, ImportError> {
610        let arr = array
611            .as_any()
612            .downcast_ref::<arrow::array::Decimal128Array>()
613            .ok_or_else(|| ImportError::ConversionError("Expected Decimal128Array".to_string()))?;
614
615        let value = arr.value(row_idx);
616        Ok(format_decimal128(value, scale))
617    }
618
619    /// Escape a string value for CSV.
620    fn escape_string(&self, s: &str) -> String {
621        let sep = self.options.column_separator;
622        let delim = self.options.column_delimiter;
623
624        let needs_quoting =
625            s.contains(sep) || s.contains(delim) || s.contains('\n') || s.contains('\r');
626
627        if needs_quoting {
628            let mut result = String::with_capacity(s.len() + 4);
629            result.push(delim);
630            for c in s.chars() {
631                if c == delim {
632                    result.push(delim);
633                }
634                result.push(c);
635            }
636            result.push(delim);
637            result
638        } else {
639            s.to_string()
640        }
641    }
642}
643
644/// Format a floating-point value for CSV.
645fn format_float(val: f64) -> String {
646    if val.is_nan() {
647        "NaN".to_string()
648    } else if val.is_infinite() {
649        if val.is_sign_positive() {
650            "Infinity".to_string()
651        } else {
652            "-Infinity".to_string()
653        }
654    } else {
655        // Use default Rust formatting which handles precision well
656        val.to_string()
657    }
658}
659
660/// Format a Date32 value (days since epoch) to YYYY-MM-DD.
661fn format_date32(days: i32) -> String {
662    // Days since Unix epoch (1970-01-01)
663    let (year, month, day) = days_to_ymd(days);
664    format!("{:04}-{:02}-{:02}", year, month, day)
665}
666
667/// Convert days since epoch to year, month, day.
668fn days_to_ymd(days: i32) -> (i32, u32, u32) {
669    // Algorithm from https://howardhinnant.github.io/date_algorithms.html
670    let z = days + 719468;
671    let era = if z >= 0 {
672        z / 146097
673    } else {
674        (z - 146096) / 146097
675    };
676    let doe = (z - era * 146097) as u32;
677    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
678    let y = yoe as i32 + era * 400;
679    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
680    let mp = (5 * doy + 2) / 153;
681    let d = doy - (153 * mp + 2) / 5 + 1;
682    let m = if mp < 10 { mp + 3 } else { mp - 9 };
683    let year = if m <= 2 { y + 1 } else { y };
684    (year, m, d)
685}
686
687/// Format a timestamp in microseconds since epoch to YYYY-MM-DD HH:MM:SS.ffffff.
688fn format_timestamp_micros(micros: i64) -> String {
689    let total_seconds = micros / 1_000_000;
690    let frac_micros = (micros % 1_000_000).unsigned_abs();
691
692    let days = (total_seconds / 86400) as i32;
693    let day_seconds = (total_seconds % 86400).unsigned_abs() as u32;
694
695    let (year, month, day) = days_to_ymd(days);
696    let hours = day_seconds / 3600;
697    let minutes = (day_seconds % 3600) / 60;
698    let seconds = day_seconds % 60;
699
700    format!(
701        "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
702        year, month, day, hours, minutes, seconds, frac_micros
703    )
704}
705
706/// Format a Decimal128 value to a string with proper scale.
707fn format_decimal128(value: i128, scale: i8) -> String {
708    if scale <= 0 {
709        // No decimal point needed, value is integer
710        let multiplier = 10_i128.pow((-scale) as u32);
711        return (value * multiplier).to_string();
712    }
713
714    let scale = scale as u32;
715    let divisor = 10_i128.pow(scale);
716
717    let is_negative = value < 0;
718    let abs_value = value.unsigned_abs();
719
720    let integer_part = abs_value / divisor as u128;
721    let fractional_part = abs_value % divisor as u128;
722
723    let mut result = String::new();
724    if is_negative {
725        result.push('-');
726    }
727    write!(
728        result,
729        "{}.{:0width$}",
730        integer_part,
731        fractional_part,
732        width = scale as usize
733    )
734    .unwrap();
735
736    result
737}
738
739/// Convert a RecordBatch to CSV bytes.
740///
741/// This is a convenience function for converting a single RecordBatch to CSV format
742/// in memory.
743pub fn record_batch_to_csv(
744    batch: &RecordBatch,
745    options: CsvWriterOptions,
746) -> Result<Vec<u8>, ImportError> {
747    let mut buffer = Vec::new();
748    let mut writer = SyncArrowToCsvWriter::new(&mut buffer, options);
749    writer.write_batch(batch)?;
750    writer.finish()?;
751    Ok(buffer)
752}
753
754// ============================================================================
755// Import Functions (placeholder implementations for session integration)
756// ============================================================================
757
758/// Import a single RecordBatch into an Exasol table.
759///
760/// This function converts the RecordBatch to CSV format and streams it to Exasol
761/// using the HTTP transport protocol.
762///
763/// # Arguments
764///
765/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
766/// * `table` - The target table name
767/// * `batch` - The RecordBatch to import
768/// * `options` - Import options
769///
770/// # Returns
771///
772/// The number of rows imported.
773///
774/// # Example
775///
776pub async fn import_from_record_batch<F, Fut>(
777    execute_sql: F,
778    table: &str,
779    batch: &RecordBatch,
780    options: ArrowImportOptions,
781) -> Result<u64, ImportError>
782where
783    F: FnOnce(String) -> Fut,
784    Fut: std::future::Future<Output = Result<u64, String>>,
785{
786    // Convert RecordBatch to CSV bytes
787    let csv_bytes = record_batch_to_csv(batch, options.csv_options.clone())?;
788
789    // Create CSV import options from Arrow options
790    let csv_options = super::csv::CsvImportOptions {
791        encoding: "UTF-8".to_string(),
792        column_separator: options.csv_options.column_separator,
793        column_delimiter: options.csv_options.column_delimiter,
794        row_separator: crate::query::import::RowSeparator::LF,
795        skip_rows: 0,
796        null_value: if options.csv_options.null_value.is_empty() {
797            None
798        } else {
799            Some(options.csv_options.null_value.clone())
800        },
801        trim_mode: crate::query::import::TrimMode::None,
802        compression: crate::query::import::Compression::None,
803        reject_limit: None,
804        use_tls: options.use_encryption,
805        schema: options.schema.clone(),
806        columns: options.columns.clone(),
807        host: options.host.clone(),
808        port: options.port,
809    };
810
811    // Use the CSV import stream function to send the data
812    super::csv::import_from_stream(
813        execute_sql,
814        table,
815        std::io::Cursor::new(csv_bytes),
816        csv_options,
817    )
818    .await
819}
820
821/// Import multiple RecordBatches from an iterator into an Exasol table.
822///
823/// This function converts each RecordBatch to CSV format and streams it to Exasol
824/// using the HTTP transport protocol. All batches are concatenated into a single
825/// CSV stream.
826///
827/// # Arguments
828///
829/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
830/// * `table` - The target table name
831/// * `batches` - An iterator of RecordBatches to import
832/// * `options` - Import options
833///
834/// # Returns
835///
836/// The number of rows imported.
837///
838/// # Example
839///
840pub async fn import_from_record_batches<I, F, Fut>(
841    execute_sql: F,
842    table: &str,
843    batches: I,
844    options: ArrowImportOptions,
845) -> Result<u64, ImportError>
846where
847    I: IntoIterator<Item = RecordBatch>,
848    F: FnOnce(String) -> Fut,
849    Fut: std::future::Future<Output = Result<u64, String>>,
850{
851    // Convert all batches to CSV and concatenate
852    let mut all_csv_bytes = Vec::new();
853    let mut writer = SyncArrowToCsvWriter::new(&mut all_csv_bytes, options.csv_options.clone());
854
855    for batch in batches {
856        writer.write_batch(&batch)?;
857    }
858    writer.finish()?;
859
860    // Create CSV import options from Arrow options
861    let csv_options = super::csv::CsvImportOptions {
862        encoding: "UTF-8".to_string(),
863        column_separator: options.csv_options.column_separator,
864        column_delimiter: options.csv_options.column_delimiter,
865        row_separator: crate::query::import::RowSeparator::LF,
866        skip_rows: 0,
867        null_value: if options.csv_options.null_value.is_empty() {
868            None
869        } else {
870            Some(options.csv_options.null_value.clone())
871        },
872        trim_mode: crate::query::import::TrimMode::None,
873        compression: crate::query::import::Compression::None,
874        reject_limit: None,
875        use_tls: options.use_encryption,
876        schema: options.schema.clone(),
877        columns: options.columns.clone(),
878        host: options.host.clone(),
879        port: options.port,
880    };
881
882    // Use the CSV import stream function to send the data
883    super::csv::import_from_stream(
884        execute_sql,
885        table,
886        std::io::Cursor::new(all_csv_bytes),
887        csv_options,
888    )
889    .await
890}
891
892/// Import from an Arrow IPC file/stream into an Exasol table.
893///
894/// This function reads Arrow IPC format data, converts each RecordBatch to CSV,
895/// and streams it to Exasol using the HTTP transport protocol.
896///
897/// # Arguments
898///
899/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
900/// * `table` - The target table name
901/// * `reader` - An async reader containing Arrow IPC data
902/// * `options` - Import options
903///
904/// # Returns
905///
906/// The number of rows imported.
907///
908/// # Example
909///
910pub async fn import_from_arrow_ipc<R, F, Fut>(
911    execute_sql: F,
912    table: &str,
913    mut reader: R,
914    options: ArrowImportOptions,
915) -> Result<u64, ImportError>
916where
917    R: AsyncRead + Unpin + Send,
918    F: FnOnce(String) -> Fut,
919    Fut: std::future::Future<Output = Result<u64, String>>,
920{
921    use tokio::io::AsyncReadExt;
922
923    // Read all IPC data into memory
924    let mut buffer = Vec::new();
925    reader
926        .read_to_end(&mut buffer)
927        .await
928        .map_err(ImportError::IoError)?;
929
930    // Parse as Arrow IPC
931    let cursor = std::io::Cursor::new(buffer);
932    let ipc_reader = arrow::ipc::reader::FileReader::try_new(cursor, None)
933        .map_err(|e| ImportError::ArrowIpcError(e.to_string()))?;
934
935    // Convert all batches to CSV
936    let mut all_csv_bytes = Vec::new();
937    let mut writer = SyncArrowToCsvWriter::new(&mut all_csv_bytes, options.csv_options.clone());
938
939    for batch_result in ipc_reader {
940        let batch = batch_result.map_err(|e| ImportError::ArrowIpcError(e.to_string()))?;
941        writer.write_batch(&batch)?;
942    }
943    writer.finish()?;
944
945    // Create CSV import options from Arrow options
946    let csv_options = super::csv::CsvImportOptions {
947        encoding: "UTF-8".to_string(),
948        column_separator: options.csv_options.column_separator,
949        column_delimiter: options.csv_options.column_delimiter,
950        row_separator: crate::query::import::RowSeparator::LF,
951        skip_rows: 0,
952        null_value: if options.csv_options.null_value.is_empty() {
953            None
954        } else {
955            Some(options.csv_options.null_value.clone())
956        },
957        trim_mode: crate::query::import::TrimMode::None,
958        compression: crate::query::import::Compression::None,
959        reject_limit: None,
960        use_tls: options.use_encryption,
961        schema: options.schema.clone(),
962        columns: options.columns.clone(),
963        host: options.host.clone(),
964        port: options.port,
965    };
966
967    // Use the CSV import stream function to send the data
968    super::csv::import_from_stream(
969        execute_sql,
970        table,
971        std::io::Cursor::new(all_csv_bytes),
972        csv_options,
973    )
974    .await
975}
976
977#[cfg(test)]
978mod tests {
979    use super::*;
980    use arrow::array::{
981        ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
982        Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
983    };
984    use arrow::datatypes::{Field, Schema};
985    use std::sync::Arc;
986
987    fn create_test_batch() -> RecordBatch {
988        let schema = Schema::new(vec![
989            Field::new("id", DataType::Int32, false),
990            Field::new("name", DataType::Utf8, true),
991            Field::new("value", DataType::Float64, true),
992        ]);
993
994        let id_array = Int32Array::from(vec![1, 2, 3]);
995        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), None]);
996        let value_array = Float64Array::from(vec![Some(1.5), Some(2.5), Some(3.5)]);
997
998        RecordBatch::try_new(
999            Arc::new(schema),
1000            vec![
1001                Arc::new(id_array) as ArrayRef,
1002                Arc::new(name_array) as ArrayRef,
1003                Arc::new(value_array) as ArrayRef,
1004            ],
1005        )
1006        .unwrap()
1007    }
1008
1009    #[test]
1010    fn test_csv_writer_options_default() {
1011        let options = CsvWriterOptions::default();
1012        assert_eq!(options.column_separator, ',');
1013        assert_eq!(options.column_delimiter, '"');
1014        assert_eq!(options.row_separator, "\n");
1015        assert_eq!(options.null_value, "");
1016        assert!(!options.write_header);
1017    }
1018
1019    #[test]
1020    fn test_arrow_import_options_default() {
1021        let options = ArrowImportOptions::default();
1022        assert!(options.schema.is_none());
1023        assert!(options.columns.is_none());
1024        assert_eq!(options.batch_size, 10000);
1025        assert!(!options.use_encryption);
1026        assert_eq!(options.host, "");
1027        assert_eq!(options.port, 0);
1028    }
1029
1030    #[test]
1031    fn test_arrow_import_options_builder() {
1032        let options = ArrowImportOptions::new()
1033            .schema("myschema")
1034            .columns(vec!["col1".to_string(), "col2".to_string()])
1035            .batch_size(5000)
1036            .null_value("NULL")
1037            .column_separator(';')
1038            .with_encryption()
1039            .exasol_host("exasol.example.com")
1040            .exasol_port(8563);
1041
1042        assert_eq!(options.schema, Some("myschema".to_string()));
1043        assert_eq!(
1044            options.columns,
1045            Some(vec!["col1".to_string(), "col2".to_string()])
1046        );
1047        assert_eq!(options.batch_size, 5000);
1048        assert_eq!(options.csv_options.null_value, "NULL");
1049        assert_eq!(options.csv_options.column_separator, ';');
1050        assert!(options.use_encryption);
1051        assert_eq!(options.host, "exasol.example.com");
1052        assert_eq!(options.port, 8563);
1053    }
1054
1055    #[test]
1056    fn test_record_batch_to_csv_basic() {
1057        let batch = create_test_batch();
1058        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1059        let csv_str = String::from_utf8(result).unwrap();
1060
1061        assert!(csv_str.contains("1,Alice,1.5"));
1062        assert!(csv_str.contains("2,Bob,2.5"));
1063        assert!(csv_str.contains("3,,3.5"));
1064    }
1065
1066    #[test]
1067    fn test_record_batch_to_csv_with_header() {
1068        let batch = create_test_batch();
1069        let options = CsvWriterOptions {
1070            write_header: true,
1071            ..Default::default()
1072        };
1073        let result = record_batch_to_csv(&batch, options).unwrap();
1074        let csv_str = String::from_utf8(result).unwrap();
1075
1076        assert!(csv_str.starts_with("id,name,value\n"));
1077    }
1078
1079    #[test]
1080    fn test_record_batch_to_csv_with_custom_separator() {
1081        let batch = create_test_batch();
1082        let options = CsvWriterOptions {
1083            column_separator: ';',
1084            ..Default::default()
1085        };
1086        let result = record_batch_to_csv(&batch, options).unwrap();
1087        let csv_str = String::from_utf8(result).unwrap();
1088
1089        assert!(csv_str.contains("1;Alice;1.5"));
1090    }
1091
1092    #[test]
1093    fn test_record_batch_to_csv_with_null_value() {
1094        let batch = create_test_batch();
1095        let options = CsvWriterOptions {
1096            null_value: "NULL".to_string(),
1097            ..Default::default()
1098        };
1099        let result = record_batch_to_csv(&batch, options).unwrap();
1100        let csv_str = String::from_utf8(result).unwrap();
1101
1102        assert!(csv_str.contains("3,NULL,3.5"));
1103    }
1104
1105    #[test]
1106    fn test_escape_string_no_special_chars() {
1107        let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1108        let escaped = writer.escape_string("hello");
1109        assert_eq!(escaped, "hello");
1110    }
1111
1112    #[test]
1113    fn test_escape_string_with_separator() {
1114        let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1115        let escaped = writer.escape_string("hello,world");
1116        assert_eq!(escaped, "\"hello,world\"");
1117    }
1118
1119    #[test]
1120    fn test_escape_string_with_delimiter() {
1121        let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1122        let escaped = writer.escape_string("say \"hello\"");
1123        assert_eq!(escaped, "\"say \"\"hello\"\"\"");
1124    }
1125
1126    #[test]
1127    fn test_escape_string_with_newline() {
1128        let writer = SyncArrowToCsvWriter::new(Vec::new(), CsvWriterOptions::default());
1129        let escaped = writer.escape_string("line1\nline2");
1130        assert_eq!(escaped, "\"line1\nline2\"");
1131    }
1132
1133    #[test]
1134    fn test_format_float_normal() {
1135        assert_eq!(format_float(1.5), "1.5");
1136        assert_eq!(format_float(-2.5), "-2.5");
1137        assert_eq!(format_float(0.0), "0");
1138    }
1139
1140    #[test]
1141    fn test_format_float_special() {
1142        assert_eq!(format_float(f64::NAN), "NaN");
1143        assert_eq!(format_float(f64::INFINITY), "Infinity");
1144        assert_eq!(format_float(f64::NEG_INFINITY), "-Infinity");
1145    }
1146
1147    #[test]
1148    fn test_format_date32() {
1149        assert_eq!(format_date32(0), "1970-01-01");
1150        assert_eq!(format_date32(1), "1970-01-02");
1151        assert_eq!(format_date32(365), "1971-01-01");
1152        assert_eq!(format_date32(-1), "1969-12-31");
1153    }
1154
1155    #[test]
1156    fn test_format_timestamp_micros() {
1157        assert_eq!(format_timestamp_micros(0), "1970-01-01 00:00:00.000000");
1158        assert_eq!(
1159            format_timestamp_micros(1_000_000),
1160            "1970-01-01 00:00:01.000000"
1161        );
1162        assert_eq!(
1163            format_timestamp_micros(86_400_000_000),
1164            "1970-01-02 00:00:00.000000"
1165        );
1166        assert_eq!(
1167            format_timestamp_micros(123456),
1168            "1970-01-01 00:00:00.123456"
1169        );
1170    }
1171
1172    #[test]
1173    fn test_format_decimal128() {
1174        assert_eq!(format_decimal128(12345, 2), "123.45");
1175        assert_eq!(format_decimal128(-12345, 2), "-123.45");
1176        assert_eq!(format_decimal128(100, 2), "1.00");
1177        assert_eq!(format_decimal128(1, 2), "0.01");
1178        assert_eq!(format_decimal128(12345, 0), "12345");
1179        assert_eq!(format_decimal128(12345, -2), "1234500");
1180    }
1181
1182    #[test]
1183    fn test_boolean_array_conversion() {
1184        let schema = Schema::new(vec![Field::new("flag", DataType::Boolean, true)]);
1185        let arr = BooleanArray::from(vec![Some(true), Some(false), None]);
1186        let batch =
1187            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1188
1189        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1190        let csv_str = String::from_utf8(result).unwrap();
1191
1192        assert!(csv_str.contains("true"));
1193        assert!(csv_str.contains("false"));
1194    }
1195
1196    #[test]
1197    fn test_int32_array_conversion() {
1198        let schema = Schema::new(vec![Field::new("num", DataType::Int32, true)]);
1199        let arr = Int32Array::from(vec![Some(42), Some(-100), None]);
1200        let batch =
1201            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1202
1203        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1204        let csv_str = String::from_utf8(result).unwrap();
1205
1206        assert!(csv_str.contains("42"));
1207        assert!(csv_str.contains("-100"));
1208    }
1209
1210    #[test]
1211    fn test_int64_array_conversion() {
1212        let schema = Schema::new(vec![Field::new("big_num", DataType::Int64, false)]);
1213        let arr = Int64Array::from(vec![9_223_372_036_854_775_807i64, -1]);
1214        let batch =
1215            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1216
1217        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1218        let csv_str = String::from_utf8(result).unwrap();
1219
1220        assert!(csv_str.contains("9223372036854775807"));
1221        assert!(csv_str.contains("-1"));
1222    }
1223
1224    #[test]
1225    fn test_float32_array_conversion() {
1226        let schema = Schema::new(vec![Field::new("val", DataType::Float32, true)]);
1227        let arr = Float32Array::from(vec![Some(1.5f32), Some(f32::INFINITY), None]);
1228        let batch =
1229            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1230
1231        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1232        let csv_str = String::from_utf8(result).unwrap();
1233
1234        assert!(csv_str.contains("1.5"));
1235        assert!(csv_str.contains("Infinity"));
1236    }
1237
1238    #[test]
1239    fn test_date32_array_conversion() {
1240        let schema = Schema::new(vec![Field::new("date", DataType::Date32, false)]);
1241        let arr = Date32Array::from(vec![0, 365, 19724]); // 1970-01-01, 1971-01-01, 2024-01-01
1242        let batch =
1243            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1244
1245        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1246        let csv_str = String::from_utf8(result).unwrap();
1247
1248        assert!(csv_str.contains("1970-01-01"));
1249        assert!(csv_str.contains("1971-01-01"));
1250    }
1251
1252    #[test]
1253    fn test_timestamp_array_conversion() {
1254        let schema = Schema::new(vec![Field::new(
1255            "ts",
1256            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1257            false,
1258        )]);
1259        let arr = TimestampMicrosecondArray::from(vec![0, 1_000_000, 86_400_000_000]);
1260        let batch =
1261            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1262
1263        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1264        let csv_str = String::from_utf8(result).unwrap();
1265
1266        assert!(csv_str.contains("1970-01-01 00:00:00.000000"));
1267        assert!(csv_str.contains("1970-01-01 00:00:01.000000"));
1268        assert!(csv_str.contains("1970-01-02 00:00:00.000000"));
1269    }
1270
1271    #[test]
1272    fn test_decimal128_array_conversion() {
1273        let schema = Schema::new(vec![Field::new(
1274            "price",
1275            DataType::Decimal128(10, 2),
1276            false,
1277        )]);
1278        let arr = Decimal128Array::from(vec![12345i128, -9999, 100])
1279            .with_precision_and_scale(10, 2)
1280            .unwrap();
1281        let batch =
1282            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1283
1284        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1285        let csv_str = String::from_utf8(result).unwrap();
1286
1287        assert!(csv_str.contains("123.45"));
1288        assert!(csv_str.contains("-99.99"));
1289        assert!(csv_str.contains("1.00"));
1290    }
1291
1292    #[test]
1293    fn test_binary_array_conversion() {
1294        let schema = Schema::new(vec![Field::new("data", DataType::Binary, false)]);
1295        let arr = BinaryArray::from(vec![b"Hello".as_slice(), b"\xDE\xAD\xBE\xEF".as_slice()]);
1296        let batch =
1297            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr) as ArrayRef]).unwrap();
1298
1299        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1300        let csv_str = String::from_utf8(result).unwrap();
1301
1302        assert!(csv_str.contains("48656c6c6f")); // "Hello" in hex
1303        assert!(csv_str.contains("deadbeef"));
1304    }
1305
1306    #[test]
1307    fn test_multiple_batches() {
1308        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1309
1310        let batch1 = RecordBatch::try_new(
1311            schema.clone(),
1312            vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
1313        )
1314        .unwrap();
1315        let batch2 = RecordBatch::try_new(
1316            schema,
1317            vec![Arc::new(Int32Array::from(vec![3, 4])) as ArrayRef],
1318        )
1319        .unwrap();
1320
1321        let mut buffer = Vec::new();
1322        let mut writer = SyncArrowToCsvWriter::new(&mut buffer, CsvWriterOptions::default());
1323
1324        writer.write_batch(&batch1).unwrap();
1325        writer.write_batch(&batch2).unwrap();
1326        let total = writer.finish().unwrap();
1327
1328        assert_eq!(total, 4);
1329        let csv_str = String::from_utf8(buffer).unwrap();
1330        assert!(csv_str.contains("1\n"));
1331        assert!(csv_str.contains("2\n"));
1332        assert!(csv_str.contains("3\n"));
1333        assert!(csv_str.contains("4\n"));
1334    }
1335
1336    /// Test that record_batch_to_csv correctly validates Arrow type conversions.
1337    /// This tests the conversion logic that import_from_record_batch uses internally.
1338    #[test]
1339    fn test_record_batch_csv_conversion_for_import() {
1340        let batch = create_test_batch();
1341        let options = CsvWriterOptions::default();
1342
1343        // Verify that the batch can be converted to CSV without errors
1344        let result = record_batch_to_csv(&batch, options);
1345        assert!(result.is_ok());
1346
1347        let csv_bytes = result.unwrap();
1348        let csv_str = String::from_utf8(csv_bytes).unwrap();
1349
1350        // Verify all rows are present
1351        assert!(csv_str.contains("1,Alice,1.5"));
1352        assert!(csv_str.contains("2,Bob,2.5"));
1353        // Row 3 has NULL name
1354        assert!(csv_str.contains("3,,3.5"));
1355    }
1356
1357    /// Test that multiple batches can be combined into a single CSV stream.
1358    /// This tests the conversion logic that import_from_record_batches uses internally.
1359    #[test]
1360    fn test_multiple_record_batches_csv_conversion_for_import() {
1361        let batch1 = create_test_batch();
1362        let batch2 = create_test_batch();
1363        let options = CsvWriterOptions::default();
1364
1365        // Convert batches using the same pattern as import_from_record_batches
1366        let mut all_csv_bytes = Vec::new();
1367        let mut writer = SyncArrowToCsvWriter::new(&mut all_csv_bytes, options);
1368
1369        writer.write_batch(&batch1).unwrap();
1370        writer.write_batch(&batch2).unwrap();
1371        let total_rows = writer.finish().unwrap();
1372
1373        assert_eq!(total_rows, 6);
1374
1375        let csv_str = String::from_utf8(all_csv_bytes).unwrap();
1376        // Both batches should be present (6 rows total = 2 x 3 rows)
1377        let lines: Vec<&str> = csv_str.lines().collect();
1378        assert_eq!(lines.len(), 6);
1379    }
1380
1381    // ========================================================================
1382    // Task 7.4: Arrow import unit tests
1383    // ========================================================================
1384
1385    // 7.4.1: Test all Arrow type conversions
1386    // ----------------------------------------
1387
1388    #[test]
1389    fn test_all_integer_types_conversion() {
1390        use arrow::array::{
1391            Int16Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
1392        };
1393
1394        let schema = Schema::new(vec![
1395            Field::new("i8", DataType::Int8, true),
1396            Field::new("i16", DataType::Int16, true),
1397            Field::new("i32", DataType::Int32, true),
1398            Field::new("i64", DataType::Int64, true),
1399            Field::new("u8", DataType::UInt8, true),
1400            Field::new("u16", DataType::UInt16, true),
1401            Field::new("u32", DataType::UInt32, true),
1402            Field::new("u64", DataType::UInt64, true),
1403        ]);
1404
1405        let batch = RecordBatch::try_new(
1406            Arc::new(schema),
1407            vec![
1408                Arc::new(Int8Array::from(vec![Some(i8::MIN), Some(i8::MAX), None])) as ArrayRef,
1409                Arc::new(Int16Array::from(vec![Some(i16::MIN), Some(i16::MAX), None])) as ArrayRef,
1410                Arc::new(Int32Array::from(vec![Some(i32::MIN), Some(i32::MAX), None])) as ArrayRef,
1411                Arc::new(Int64Array::from(vec![Some(i64::MIN), Some(i64::MAX), None])) as ArrayRef,
1412                Arc::new(UInt8Array::from(vec![Some(u8::MIN), Some(u8::MAX), None])) as ArrayRef,
1413                Arc::new(UInt16Array::from(vec![
1414                    Some(u16::MIN),
1415                    Some(u16::MAX),
1416                    None,
1417                ])) as ArrayRef,
1418                Arc::new(UInt32Array::from(vec![
1419                    Some(u32::MIN),
1420                    Some(u32::MAX),
1421                    None,
1422                ])) as ArrayRef,
1423                Arc::new(UInt64Array::from(vec![
1424                    Some(u64::MIN),
1425                    Some(u64::MAX),
1426                    None,
1427                ])) as ArrayRef,
1428            ],
1429        )
1430        .unwrap();
1431
1432        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1433        let csv_str = String::from_utf8(result).unwrap();
1434
1435        // Check min values row
1436        assert!(csv_str.contains("-128,-32768,-2147483648,-9223372036854775808,0,0,0,0"));
1437        // Check max values row
1438        assert!(csv_str.contains(
1439            "127,32767,2147483647,9223372036854775807,255,65535,4294967295,18446744073709551615"
1440        ));
1441    }
1442
1443    #[test]
1444    fn test_all_float_types_conversion() {
1445        let schema = Schema::new(vec![
1446            Field::new("f32", DataType::Float32, true),
1447            Field::new("f64", DataType::Float64, true),
1448        ]);
1449
1450        let batch = RecordBatch::try_new(
1451            Arc::new(schema),
1452            vec![
1453                Arc::new(Float32Array::from(vec![
1454                    Some(1.5f32),
1455                    Some(-2.5f32),
1456                    Some(f32::INFINITY),
1457                    Some(f32::NEG_INFINITY),
1458                    Some(f32::NAN),
1459                    None,
1460                ])) as ArrayRef,
1461                Arc::new(Float64Array::from(vec![
1462                    Some(1.5f64),
1463                    Some(-2.5f64),
1464                    Some(f64::INFINITY),
1465                    Some(f64::NEG_INFINITY),
1466                    Some(f64::NAN),
1467                    None,
1468                ])) as ArrayRef,
1469            ],
1470        )
1471        .unwrap();
1472
1473        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1474        let csv_str = String::from_utf8(result).unwrap();
1475
1476        assert!(csv_str.contains("1.5,1.5"));
1477        assert!(csv_str.contains("-2.5,-2.5"));
1478        assert!(csv_str.contains("Infinity,Infinity"));
1479        assert!(csv_str.contains("-Infinity,-Infinity"));
1480        assert!(csv_str.contains("NaN,NaN"));
1481    }
1482
1483    #[test]
1484    fn test_all_timestamp_units_conversion() {
1485        use arrow::array::{
1486            TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
1487        };
1488
1489        let schema = Schema::new(vec![
1490            Field::new(
1491                "ts_sec",
1492                DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None),
1493                false,
1494            ),
1495            Field::new(
1496                "ts_ms",
1497                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1498                false,
1499            ),
1500            Field::new(
1501                "ts_us",
1502                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1503                false,
1504            ),
1505            Field::new(
1506                "ts_ns",
1507                DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
1508                false,
1509            ),
1510        ]);
1511
1512        // All represent the same timestamp: 1970-01-01 00:00:01.000000
1513        let batch = RecordBatch::try_new(
1514            Arc::new(schema),
1515            vec![
1516                Arc::new(TimestampSecondArray::from(vec![1i64])) as ArrayRef,
1517                Arc::new(TimestampMillisecondArray::from(vec![1000i64])) as ArrayRef,
1518                Arc::new(TimestampMicrosecondArray::from(vec![1_000_000i64])) as ArrayRef,
1519                Arc::new(TimestampNanosecondArray::from(vec![1_000_000_000i64])) as ArrayRef,
1520            ],
1521        )
1522        .unwrap();
1523
1524        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1525        let csv_str = String::from_utf8(result).unwrap();
1526
1527        // All should produce the same timestamp string
1528        let expected = "1970-01-01 00:00:01.000000";
1529        let row = csv_str.lines().next().unwrap();
1530        let parts: Vec<&str> = row.split(',').collect();
1531        assert_eq!(parts.len(), 4);
1532        for part in parts {
1533            assert_eq!(part, expected);
1534        }
1535    }
1536
1537    #[test]
1538    fn test_string_types_conversion() {
1539        use arrow::array::LargeStringArray;
1540
1541        let schema = Schema::new(vec![
1542            Field::new("utf8", DataType::Utf8, true),
1543            Field::new("large_utf8", DataType::LargeUtf8, true),
1544        ]);
1545
1546        let batch = RecordBatch::try_new(
1547            Arc::new(schema),
1548            vec![
1549                Arc::new(StringArray::from(vec![Some("hello"), Some("world"), None])) as ArrayRef,
1550                Arc::new(LargeStringArray::from(vec![
1551                    Some("large"),
1552                    Some("string"),
1553                    None,
1554                ])) as ArrayRef,
1555            ],
1556        )
1557        .unwrap();
1558
1559        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1560        let csv_str = String::from_utf8(result).unwrap();
1561
1562        assert!(csv_str.contains("hello,large"));
1563        assert!(csv_str.contains("world,string"));
1564    }
1565
1566    #[test]
1567    fn test_binary_types_conversion() {
1568        use arrow::array::LargeBinaryArray;
1569
1570        let schema = Schema::new(vec![
1571            Field::new("bin", DataType::Binary, false),
1572            Field::new("large_bin", DataType::LargeBinary, false),
1573        ]);
1574
1575        let batch = RecordBatch::try_new(
1576            Arc::new(schema),
1577            vec![
1578                Arc::new(BinaryArray::from(vec![
1579                    b"abc".as_slice(),
1580                    b"\x00\xff".as_slice(),
1581                ])) as ArrayRef,
1582                Arc::new(LargeBinaryArray::from(vec![
1583                    b"def".as_slice(),
1584                    b"\x01\x02".as_slice(),
1585                ])) as ArrayRef,
1586            ],
1587        )
1588        .unwrap();
1589
1590        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1591        let csv_str = String::from_utf8(result).unwrap();
1592
1593        // Binary is hex-encoded
1594        assert!(csv_str.contains("616263,646566")); // "abc" and "def" in hex
1595        assert!(csv_str.contains("00ff,0102")); // binary bytes in hex
1596    }
1597
1598    // 7.4.2: Test NULL value handling
1599    // ----------------------------------------
1600
1601    #[test]
1602    fn test_null_values_default_representation() {
1603        let schema = Schema::new(vec![
1604            Field::new("str", DataType::Utf8, true),
1605            Field::new("int", DataType::Int32, true),
1606            Field::new("float", DataType::Float64, true),
1607        ]);
1608
1609        let batch = RecordBatch::try_new(
1610            Arc::new(schema),
1611            vec![
1612                Arc::new(StringArray::from(vec![
1613                    None as Option<&str>,
1614                    Some("value"),
1615                    None,
1616                ])) as ArrayRef,
1617                Arc::new(Int32Array::from(vec![None, Some(42), None])) as ArrayRef,
1618                Arc::new(Float64Array::from(vec![None, Some(3.125), None])) as ArrayRef,
1619            ],
1620        )
1621        .unwrap();
1622
1623        // Default: null_value is empty string
1624        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1625        let csv_str = String::from_utf8(result).unwrap();
1626
1627        // Rows with all NULLs should be ",,\n"
1628        assert!(csv_str.contains(",,"));
1629        // Row with values should be "value,42,3.125"
1630        assert!(csv_str.contains("value,42,3.125"));
1631    }
1632
1633    #[test]
1634    fn test_null_values_custom_representation() {
1635        let schema = Schema::new(vec![
1636            Field::new("str", DataType::Utf8, true),
1637            Field::new("int", DataType::Int32, true),
1638        ]);
1639
1640        let batch = RecordBatch::try_new(
1641            Arc::new(schema),
1642            vec![
1643                Arc::new(StringArray::from(vec![None as Option<&str>, Some("value")])) as ArrayRef,
1644                Arc::new(Int32Array::from(vec![None, Some(42)])) as ArrayRef,
1645            ],
1646        )
1647        .unwrap();
1648
1649        let options = CsvWriterOptions {
1650            null_value: "\\N".to_string(),
1651            ..Default::default()
1652        };
1653        let result = record_batch_to_csv(&batch, options).unwrap();
1654        let csv_str = String::from_utf8(result).unwrap();
1655
1656        // NULLs should be represented as "\N"
1657        assert!(csv_str.contains("\\N,\\N"));
1658        assert!(csv_str.contains("value,42"));
1659    }
1660
1661    #[test]
1662    fn test_null_values_in_nested_data() {
1663        // Test NULL handling in various array types
1664        let schema = Schema::new(vec![
1665            Field::new("bool", DataType::Boolean, true),
1666            Field::new("date", DataType::Date32, true),
1667            Field::new("decimal", DataType::Decimal128(10, 2), true),
1668        ]);
1669
1670        let batch = RecordBatch::try_new(
1671            Arc::new(schema),
1672            vec![
1673                Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)])) as ArrayRef,
1674                Arc::new(Date32Array::from(vec![Some(0), None, Some(365)])) as ArrayRef,
1675                Arc::new(
1676                    Decimal128Array::from(vec![Some(12345i128), None, Some(100)])
1677                        .with_precision_and_scale(10, 2)
1678                        .unwrap(),
1679                ) as ArrayRef,
1680            ],
1681        )
1682        .unwrap();
1683
1684        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1685        let csv_str = String::from_utf8(result).unwrap();
1686
1687        assert!(csv_str.contains("true,1970-01-01,123.45"));
1688        assert!(csv_str.contains(",,")); // Row with all NULLs
1689        assert!(csv_str.contains("false,1971-01-01,1.00"));
1690    }
1691
1692    // 7.4.3: Test special character escaping
1693    // ----------------------------------------
1694
1695    #[test]
1696    fn test_escape_comma_in_string() {
1697        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1698        let batch = RecordBatch::try_new(
1699            Arc::new(schema),
1700            vec![Arc::new(StringArray::from(vec!["hello,world"])) as ArrayRef],
1701        )
1702        .unwrap();
1703
1704        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1705        let csv_str = String::from_utf8(result).unwrap();
1706
1707        // Comma-containing strings should be quoted
1708        assert!(csv_str.contains("\"hello,world\""));
1709    }
1710
1711    #[test]
1712    fn test_escape_quotes_in_string() {
1713        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1714        let batch = RecordBatch::try_new(
1715            Arc::new(schema),
1716            vec![Arc::new(StringArray::from(vec!["say \"hello\""])) as ArrayRef],
1717        )
1718        .unwrap();
1719
1720        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1721        let csv_str = String::from_utf8(result).unwrap();
1722
1723        // Quotes should be doubled inside quoted strings
1724        assert!(csv_str.contains("\"say \"\"hello\"\"\""));
1725    }
1726
1727    #[test]
1728    fn test_escape_newlines_in_string() {
1729        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1730        let batch = RecordBatch::try_new(
1731            Arc::new(schema),
1732            vec![Arc::new(StringArray::from(vec!["line1\nline2", "line1\r\nline2"])) as ArrayRef],
1733        )
1734        .unwrap();
1735
1736        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1737        let csv_str = String::from_utf8(result).unwrap();
1738
1739        // Newline-containing strings should be quoted
1740        assert!(csv_str.contains("\"line1\nline2\""));
1741        assert!(csv_str.contains("\"line1\r\nline2\""));
1742    }
1743
1744    #[test]
1745    fn test_escape_carriage_return_in_string() {
1746        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1747        let batch = RecordBatch::try_new(
1748            Arc::new(schema),
1749            vec![Arc::new(StringArray::from(vec!["text\rwith\rCR"])) as ArrayRef],
1750        )
1751        .unwrap();
1752
1753        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1754        let csv_str = String::from_utf8(result).unwrap();
1755
1756        // Carriage return should trigger quoting
1757        assert!(csv_str.contains("\"text\rwith\rCR\""));
1758    }
1759
1760    #[test]
1761    fn test_escape_multiple_special_chars() {
1762        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1763        let batch = RecordBatch::try_new(
1764            Arc::new(schema),
1765            vec![Arc::new(StringArray::from(vec![
1766                "combo: \"quoted\", newline\n, and comma",
1767            ])) as ArrayRef],
1768        )
1769        .unwrap();
1770
1771        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1772        let csv_str = String::from_utf8(result).unwrap();
1773
1774        // Complex string with multiple special chars
1775        assert!(csv_str.contains("\"combo: \"\"quoted\"\", newline\n, and comma\""));
1776    }
1777
1778    #[test]
1779    fn test_custom_separator_escaping() {
1780        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1781        let batch = RecordBatch::try_new(
1782            Arc::new(schema),
1783            vec![Arc::new(StringArray::from(vec!["value;with;semicolons"])) as ArrayRef],
1784        )
1785        .unwrap();
1786
1787        // Use semicolon as separator
1788        let options = CsvWriterOptions {
1789            column_separator: ';',
1790            ..Default::default()
1791        };
1792        let result = record_batch_to_csv(&batch, options).unwrap();
1793        let csv_str = String::from_utf8(result).unwrap();
1794
1795        // String containing semicolons should be quoted when separator is semicolon
1796        assert!(csv_str.contains("\"value;with;semicolons\""));
1797    }
1798
1799    #[test]
1800    fn test_custom_delimiter_escaping() {
1801        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1802        let batch = RecordBatch::try_new(
1803            Arc::new(schema),
1804            vec![Arc::new(StringArray::from(vec!["value'with'quotes"])) as ArrayRef],
1805        )
1806        .unwrap();
1807
1808        // Use single quote as delimiter
1809        let options = CsvWriterOptions {
1810            column_delimiter: '\'',
1811            ..Default::default()
1812        };
1813        let result = record_batch_to_csv(&batch, options).unwrap();
1814        let csv_str = String::from_utf8(result).unwrap();
1815
1816        // String containing single quotes should be escaped when delimiter is single quote
1817        assert!(csv_str.contains("'value''with''quotes'"));
1818    }
1819
1820    #[test]
1821    fn test_no_escaping_for_clean_string() {
1822        let schema = Schema::new(vec![Field::new("text", DataType::Utf8, false)]);
1823        let batch = RecordBatch::try_new(
1824            Arc::new(schema),
1825            vec![Arc::new(StringArray::from(vec!["clean simple text"])) as ArrayRef],
1826        )
1827        .unwrap();
1828
1829        let result = record_batch_to_csv(&batch, CsvWriterOptions::default()).unwrap();
1830        let csv_str = String::from_utf8(result).unwrap();
1831
1832        // Clean strings should not be quoted
1833        assert!(csv_str.contains("clean simple text"));
1834        assert!(!csv_str.contains("\"clean simple text\""));
1835    }
1836
1837    #[test]
1838    fn test_days_to_ymd_edge_cases() {
1839        // Test boundary dates
1840        let (y, m, d) = days_to_ymd(0);
1841        assert_eq!((y, m, d), (1970, 1, 1));
1842
1843        // Leap year date - 2000-02-29 is day 11016 from epoch
1844        let (y, m, d) = days_to_ymd(11016);
1845        assert_eq!((y, m, d), (2000, 2, 29));
1846
1847        // 2000-03-01 is day 11017 from epoch
1848        let (y, m, d) = days_to_ymd(11017);
1849        assert_eq!((y, m, d), (2000, 3, 1));
1850
1851        // End of 1999 - December 31, 1999 is day 10956 from epoch
1852        let (y, m, d) = days_to_ymd(10956);
1853        assert_eq!((y, m, d), (1999, 12, 31));
1854    }
1855}