Skip to main content

exarrow_rs/export/
arrow.rs

1//! Arrow RecordBatch export functionality.
2//!
3//! This module provides utilities for exporting data from Exasol to Arrow RecordBatches
4//! and Arrow IPC format. The export process:
5//!
6//! 1. Executes an EXPORT SQL to receive CSV via HTTP transport
7//! 2. Parses CSV and converts to Arrow RecordBatches
8//! 3. Returns as a Stream or writes to Arrow IPC format
9//!
10//! # Example
11//!
12//! ```ignore
13//! use exarrow_rs::export::arrow::{ArrowExportOptions, CsvToArrowReader};
14//! use arrow::datatypes::{Schema, Field, DataType};
15//! use std::sync::Arc;
16//!
17//! // Create a schema
18//! let schema = Arc::new(Schema::new(vec![
19//!     Field::new("id", DataType::Int64, false),
20//!     Field::new("name", DataType::Utf8, true),
21//! ]));
22//!
23//! // Create options
24//! let options = ArrowExportOptions::default()
25//!     .with_batch_size(2048)
26//!     .with_schema(schema);
27//! ```
28
29use std::sync::Arc;
30
31use arrow::array::builder::{
32    BooleanBuilder, Date32Builder, Decimal128Builder, Float64Builder, Int64Builder, StringBuilder,
33    TimestampMicrosecondBuilder,
34};
35use arrow::array::{ArrayRef, RecordBatch};
36use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
37use thiserror::Error;
38use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
39
40use crate::types::{
41    conversion::{
42        exasol_type_to_arrow as exasol_type_to_arrow_impl, parse_date_to_days,
43        parse_decimal_to_i128, parse_timestamp_to_micros,
44    },
45    ExasolType,
46};
47
48/// Errors that can occur during Arrow export operations.
49#[derive(Error, Debug)]
50pub enum ExportError {
51    /// CSV parsing error
52    #[error("CSV parsing error at row {row}: {message}")]
53    CsvParseError { row: usize, message: String },
54
55    /// Type conversion error
56    #[error("Type conversion error at row {row}, column {column}: {message}")]
57    TypeConversionError {
58        row: usize,
59        column: usize,
60        message: String,
61    },
62
63    /// Schema error
64    #[error("Schema error: {0}")]
65    SchemaError(String),
66
67    /// I/O error
68    #[error("I/O error: {0}")]
69    IoError(String),
70
71    /// Arrow error
72    #[error("Arrow error: {0}")]
73    ArrowError(String),
74
75    /// Transport error
76    #[error("Transport error: {0}")]
77    TransportError(String),
78}
79
80impl From<std::io::Error> for ExportError {
81    fn from(err: std::io::Error) -> Self {
82        ExportError::IoError(err.to_string())
83    }
84}
85
86impl From<arrow::error::ArrowError> for ExportError {
87    fn from(err: arrow::error::ArrowError) -> Self {
88        ExportError::ArrowError(err.to_string())
89    }
90}
91
92/// Options for Arrow export operations.
93#[derive(Debug, Clone)]
94pub struct ArrowExportOptions {
95    /// Number of rows per RecordBatch (default: 1024)
96    pub batch_size: usize,
97    /// Custom NULL value representation in CSV (default: empty string)
98    pub null_value: Option<String>,
99    /// Optional explicit Arrow schema (if not provided, will be inferred)
100    pub schema: Option<Arc<Schema>>,
101    /// Column separator in CSV (default: ',')
102    pub column_separator: char,
103    /// Column delimiter/quote character (default: '"')
104    pub column_delimiter: char,
105    /// Exasol host for HTTP transport connection.
106    /// This is typically the same host as the WebSocket connection.
107    pub host: String,
108    /// Exasol port for HTTP transport connection.
109    /// This is typically the same port as the WebSocket connection.
110    pub port: u16,
111    /// Whether to use TLS for the HTTP transport connection.
112    /// Default is `false` because the main WebSocket connection typically
113    /// already handles TLS encryption.
114    pub use_encryption: bool,
115}
116
117impl Default for ArrowExportOptions {
118    fn default() -> Self {
119        Self {
120            batch_size: 1024,
121            null_value: None,
122            schema: None,
123            column_separator: ',',
124            column_delimiter: '"',
125            host: String::new(),
126            port: 0,
127            use_encryption: false,
128        }
129    }
130}
131
132impl ArrowExportOptions {
133    /// Creates new ArrowExportOptions with default values.
134    #[must_use]
135    pub fn new() -> Self {
136        Self::default()
137    }
138
139    #[must_use]
140    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
141        self.batch_size = batch_size;
142        self
143    }
144
145    #[must_use]
146    pub fn with_null_value(mut self, null_value: impl Into<String>) -> Self {
147        self.null_value = Some(null_value.into());
148        self
149    }
150
151    #[must_use]
152    pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
153        self.schema = Some(schema);
154        self
155    }
156
157    #[must_use]
158    pub fn with_column_separator(mut self, sep: char) -> Self {
159        self.column_separator = sep;
160        self
161    }
162
163    #[must_use]
164    pub fn with_column_delimiter(mut self, delim: char) -> Self {
165        self.column_delimiter = delim;
166        self
167    }
168
169    /// Sets the Exasol host for HTTP transport connection.
170    ///
171    /// This is typically the same host as the WebSocket connection.
172    #[must_use]
173    pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
174        self.host = host.into();
175        self
176    }
177
178    /// Sets the Exasol port for HTTP transport connection.
179    ///
180    /// This is typically the same port as the WebSocket connection.
181    #[must_use]
182    pub fn exasol_port(mut self, port: u16) -> Self {
183        self.port = port;
184        self
185    }
186
187    /// Sets whether to use TLS for the HTTP transport connection.
188    ///
189    /// Default is `false` because the main WebSocket connection typically
190    /// already handles TLS encryption for the control channel.
191    #[must_use]
192    pub fn use_encryption(mut self, use_encryption: bool) -> Self {
193        self.use_encryption = use_encryption;
194        self
195    }
196}
197
198/// CSV-to-Arrow streaming reader.
199///
200/// This struct reads CSV data from an async reader and converts it to Arrow RecordBatches.
201/// It supports configurable batch sizes and handles NULL values and type parsing.
202pub struct CsvToArrowReader<R> {
203    reader: BufReader<R>,
204    schema: Arc<Schema>,
205    batch_size: usize,
206    null_value: Option<String>,
207    column_separator: char,
208    column_delimiter: char,
209    current_row: usize,
210    finished: bool,
211}
212
213impl<R: AsyncBufRead + Unpin> CsvToArrowReader<R> {
214    /// Creates a new CsvToArrowReader.
215    ///
216    /// # Arguments
217    ///
218    /// * `reader` - The async reader to read CSV data from
219    /// * `schema` - The Arrow schema for the output RecordBatches
220    /// * `options` - Export options including batch size and NULL value
221    pub fn new(reader: R, schema: Arc<Schema>, options: &ArrowExportOptions) -> Self
222    where
223        R: tokio::io::AsyncRead,
224    {
225        Self {
226            reader: BufReader::new(reader),
227            schema,
228            batch_size: options.batch_size,
229            null_value: options.null_value.clone(),
230            column_separator: options.column_separator,
231            column_delimiter: options.column_delimiter,
232            current_row: 0,
233            finished: false,
234        }
235    }
236
237    /// Creates a new CsvToArrowReader from an already buffered reader.
238    ///
239    /// # Arguments
240    ///
241    /// * `reader` - The buffered async reader to read CSV data from
242    /// * `schema` - The Arrow schema for the output RecordBatches
243    /// * `options` - Export options including batch size and NULL value
244    pub fn from_buffered(
245        reader: BufReader<R>,
246        schema: Arc<Schema>,
247        options: &ArrowExportOptions,
248    ) -> Self
249    where
250        R: tokio::io::AsyncRead,
251    {
252        Self {
253            reader,
254            schema,
255            batch_size: options.batch_size,
256            null_value: options.null_value.clone(),
257            column_separator: options.column_separator,
258            column_delimiter: options.column_delimiter,
259            current_row: 0,
260            finished: false,
261        }
262    }
263
264    /// Returns the Arrow schema.
265    #[must_use]
266    pub fn schema(&self) -> Arc<Schema> {
267        Arc::clone(&self.schema)
268    }
269
270    /// Reads the next batch of rows and returns a RecordBatch.
271    ///
272    /// # Returns
273    ///
274    /// * `Ok(Some(batch))` - A RecordBatch with up to `batch_size` rows
275    /// * `Ok(None)` - No more data to read
276    /// * `Err(e)` - An error occurred during parsing or conversion
277    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>, ExportError> {
278        if self.finished {
279            return Ok(None);
280        }
281
282        let mut rows: Vec<Vec<String>> = Vec::with_capacity(self.batch_size);
283
284        // Read up to batch_size rows
285        for _ in 0..self.batch_size {
286            let mut line = String::new();
287            let bytes_read = self.reader.read_line(&mut line).await?;
288
289            if bytes_read == 0 {
290                self.finished = true;
291                break;
292            }
293
294            // Remove trailing newline
295            let line = line.trim_end_matches(&['\r', '\n'][..]);
296            if line.is_empty() {
297                continue;
298            }
299
300            let fields = parse_csv_row(
301                line,
302                self.column_separator,
303                self.column_delimiter,
304                self.current_row,
305            )?;
306            rows.push(fields);
307            self.current_row += 1;
308        }
309
310        if rows.is_empty() {
311            return Ok(None);
312        }
313
314        // Build RecordBatch from parsed rows
315        let batch = self.build_record_batch(&rows)?;
316        Ok(Some(batch))
317    }
318
319    /// Builds a RecordBatch from parsed CSV rows.
320    fn build_record_batch(&self, rows: &[Vec<String>]) -> Result<RecordBatch, ExportError> {
321        let num_columns = self.schema.fields().len();
322        let num_rows = rows.len();
323
324        // Build arrays for each column
325        let arrays: Result<Vec<ArrayRef>, ExportError> = (0..num_columns)
326            .map(|col_idx| {
327                let field = self.schema.field(col_idx);
328                let values: Vec<&str> = rows
329                    .iter()
330                    .map(|row| row.get(col_idx).map(|s| s.as_str()).unwrap_or(""))
331                    .collect();
332
333                build_array_from_strings(
334                    &values,
335                    field.data_type(),
336                    field.is_nullable(),
337                    &self.null_value,
338                    self.current_row - num_rows,
339                    col_idx,
340                )
341            })
342            .collect();
343
344        let arrays = arrays?;
345
346        RecordBatch::try_new(Arc::clone(&self.schema), arrays)
347            .map_err(|e| ExportError::ArrowError(e.to_string()))
348    }
349}
350
351/// Parses a CSV row into fields, handling quoted values.
352fn parse_csv_row(
353    line: &str,
354    separator: char,
355    delimiter: char,
356    row: usize,
357) -> Result<Vec<String>, ExportError> {
358    let mut fields = Vec::new();
359    let mut current_field = String::new();
360    let mut in_quotes = false;
361    let mut chars = line.chars().peekable();
362
363    while let Some(c) = chars.next() {
364        if in_quotes {
365            if c == delimiter {
366                // Check for escaped quote (double delimiter)
367                if chars.peek() == Some(&delimiter) {
368                    current_field.push(delimiter);
369                    chars.next();
370                } else {
371                    in_quotes = false;
372                }
373            } else {
374                current_field.push(c);
375            }
376        } else if c == delimiter {
377            in_quotes = true;
378        } else if c == separator {
379            fields.push(current_field);
380            current_field = String::new();
381        } else {
382            current_field.push(c);
383        }
384    }
385
386    // Don't forget the last field
387    fields.push(current_field);
388
389    // Validate that we're not still in quotes
390    if in_quotes {
391        return Err(ExportError::CsvParseError {
392            row,
393            message: "Unclosed quote in CSV row".to_string(),
394        });
395    }
396
397    Ok(fields)
398}
399
400/// Builds an Arrow array from string values.
401fn build_array_from_strings(
402    values: &[&str],
403    data_type: &DataType,
404    nullable: bool,
405    null_value: &Option<String>,
406    start_row: usize,
407    column: usize,
408) -> Result<ArrayRef, ExportError> {
409    match data_type {
410        DataType::Boolean => build_boolean_array(values, nullable, null_value, start_row, column),
411        DataType::Int64 => build_int64_array(values, nullable, null_value, start_row, column),
412        DataType::Float64 => build_float64_array(values, nullable, null_value, start_row, column),
413        DataType::Utf8 => build_string_array(values, nullable, null_value),
414        DataType::Date32 => build_date32_array(values, nullable, null_value, start_row, column),
415        DataType::Timestamp(TimeUnit::Microsecond, _) => {
416            build_timestamp_array(values, nullable, null_value, start_row, column)
417        }
418        DataType::Decimal128(precision, scale) => build_decimal128_array(
419            values, *precision, *scale, nullable, null_value, start_row, column,
420        ),
421        _ => Err(ExportError::SchemaError(format!(
422            "Unsupported data type: {:?}",
423            data_type
424        ))),
425    }
426}
427
428/// Checks if a value is NULL.
429fn is_null_value(value: &str, null_value: &Option<String>) -> bool {
430    if value.is_empty() {
431        return true;
432    }
433    if let Some(nv) = null_value {
434        return value == nv;
435    }
436    false
437}
438
439/// Builds a Boolean array from string values.
440fn build_boolean_array(
441    values: &[&str],
442    nullable: bool,
443    null_value: &Option<String>,
444    start_row: usize,
445    column: usize,
446) -> Result<ArrayRef, ExportError> {
447    let mut builder = BooleanBuilder::with_capacity(values.len());
448
449    for (i, value) in values.iter().enumerate() {
450        if is_null_value(value, null_value) {
451            if nullable {
452                builder.append_null();
453            } else {
454                return Err(ExportError::TypeConversionError {
455                    row: start_row + i,
456                    column,
457                    message: "NULL value in non-nullable column".to_string(),
458                });
459            }
460        } else {
461            let lower = value.to_lowercase();
462            let b = match lower.as_str() {
463                "true" | "1" | "t" | "yes" | "y" => true,
464                "false" | "0" | "f" | "no" | "n" => false,
465                _ => {
466                    return Err(ExportError::TypeConversionError {
467                        row: start_row + i,
468                        column,
469                        message: format!("Invalid boolean value: {}", value),
470                    });
471                }
472            };
473            builder.append_value(b);
474        }
475    }
476
477    Ok(Arc::new(builder.finish()))
478}
479
480/// Builds an Int64 array from string values.
481fn build_int64_array(
482    values: &[&str],
483    nullable: bool,
484    null_value: &Option<String>,
485    start_row: usize,
486    column: usize,
487) -> Result<ArrayRef, ExportError> {
488    let mut builder = Int64Builder::with_capacity(values.len());
489
490    for (i, value) in values.iter().enumerate() {
491        if is_null_value(value, null_value) {
492            if nullable {
493                builder.append_null();
494            } else {
495                return Err(ExportError::TypeConversionError {
496                    row: start_row + i,
497                    column,
498                    message: "NULL value in non-nullable column".to_string(),
499                });
500            }
501        } else {
502            let n = value
503                .parse::<i64>()
504                .map_err(|e| ExportError::TypeConversionError {
505                    row: start_row + i,
506                    column,
507                    message: format!("Invalid integer value '{}': {}", value, e),
508                })?;
509            builder.append_value(n);
510        }
511    }
512
513    Ok(Arc::new(builder.finish()))
514}
515
516/// Builds a Float64 array from string values.
517fn build_float64_array(
518    values: &[&str],
519    nullable: bool,
520    null_value: &Option<String>,
521    start_row: usize,
522    column: usize,
523) -> Result<ArrayRef, ExportError> {
524    let mut builder = Float64Builder::with_capacity(values.len());
525
526    for (i, value) in values.iter().enumerate() {
527        if is_null_value(value, null_value) {
528            if nullable {
529                builder.append_null();
530            } else {
531                return Err(ExportError::TypeConversionError {
532                    row: start_row + i,
533                    column,
534                    message: "NULL value in non-nullable column".to_string(),
535                });
536            }
537        } else {
538            // Handle special float values
539            let f = match *value {
540                "Infinity" | "inf" => f64::INFINITY,
541                "-Infinity" | "-inf" => f64::NEG_INFINITY,
542                "NaN" | "nan" => f64::NAN,
543                _ => value
544                    .parse::<f64>()
545                    .map_err(|e| ExportError::TypeConversionError {
546                        row: start_row + i,
547                        column,
548                        message: format!("Invalid float value '{}': {}", value, e),
549                    })?,
550            };
551            builder.append_value(f);
552        }
553    }
554
555    Ok(Arc::new(builder.finish()))
556}
557
558/// Builds a String array from string values.
559fn build_string_array(
560    values: &[&str],
561    nullable: bool,
562    null_value: &Option<String>,
563) -> Result<ArrayRef, ExportError> {
564    let mut builder =
565        StringBuilder::with_capacity(values.len(), values.iter().map(|s| s.len()).sum());
566
567    for value in values.iter() {
568        if is_null_value(value, null_value) {
569            if nullable {
570                builder.append_null();
571            } else {
572                builder.append_value("");
573            }
574        } else {
575            builder.append_value(value);
576        }
577    }
578
579    Ok(Arc::new(builder.finish()))
580}
581
582/// Builds a Date32 array from string values (YYYY-MM-DD format).
583fn build_date32_array(
584    values: &[&str],
585    nullable: bool,
586    null_value: &Option<String>,
587    start_row: usize,
588    column: usize,
589) -> Result<ArrayRef, ExportError> {
590    let mut builder = Date32Builder::with_capacity(values.len());
591
592    for (i, value) in values.iter().enumerate() {
593        if is_null_value(value, null_value) {
594            if nullable {
595                builder.append_null();
596            } else {
597                return Err(ExportError::TypeConversionError {
598                    row: start_row + i,
599                    column,
600                    message: "NULL value in non-nullable column".to_string(),
601                });
602            }
603        } else {
604            let days = parse_date_to_days(value).map_err(|e| ExportError::TypeConversionError {
605                row: start_row + i,
606                column,
607                message: e,
608            })?;
609            builder.append_value(days);
610        }
611    }
612
613    Ok(Arc::new(builder.finish()))
614}
615
616/// Builds a Timestamp array from string values (YYYY-MM-DD HH:MM:SS format).
617fn build_timestamp_array(
618    values: &[&str],
619    nullable: bool,
620    null_value: &Option<String>,
621    start_row: usize,
622    column: usize,
623) -> Result<ArrayRef, ExportError> {
624    let mut builder = TimestampMicrosecondBuilder::with_capacity(values.len());
625
626    for (i, value) in values.iter().enumerate() {
627        if is_null_value(value, null_value) {
628            if nullable {
629                builder.append_null();
630            } else {
631                return Err(ExportError::TypeConversionError {
632                    row: start_row + i,
633                    column,
634                    message: "NULL value in non-nullable column".to_string(),
635                });
636            }
637        } else {
638            let micros =
639                parse_timestamp_to_micros(value).map_err(|e| ExportError::TypeConversionError {
640                    row: start_row + i,
641                    column,
642                    message: e,
643                })?;
644            builder.append_value(micros);
645        }
646    }
647
648    Ok(Arc::new(builder.finish()))
649}
650
651/// Builds a Decimal128 array from string values.
652fn build_decimal128_array(
653    values: &[&str],
654    precision: u8,
655    scale: i8,
656    nullable: bool,
657    null_value: &Option<String>,
658    start_row: usize,
659    column: usize,
660) -> Result<ArrayRef, ExportError> {
661    let mut builder = Decimal128Builder::with_capacity(values.len())
662        .with_precision_and_scale(precision, scale)
663        .map_err(|e| ExportError::ArrowError(e.to_string()))?;
664
665    for (i, value) in values.iter().enumerate() {
666        if is_null_value(value, null_value) {
667            if nullable {
668                builder.append_null();
669            } else {
670                return Err(ExportError::TypeConversionError {
671                    row: start_row + i,
672                    column,
673                    message: "NULL value in non-nullable column".to_string(),
674                });
675            }
676        } else {
677            let decimal = parse_decimal_to_i128(value, scale).map_err(|e| {
678                ExportError::TypeConversionError {
679                    row: start_row + i,
680                    column,
681                    message: e,
682                }
683            })?;
684            builder.append_value(decimal);
685        }
686    }
687
688    Ok(Arc::new(builder.finish()))
689}
690
691/// Maps Exasol types to Arrow DataTypes.
692pub fn exasol_type_to_arrow(exasol_type: &ExasolType) -> Result<DataType, ExportError> {
693    exasol_type_to_arrow_impl(exasol_type).map_err(ExportError::SchemaError)
694}
695
696/// Builds an Arrow schema from Exasol column metadata.
697pub fn build_schema_from_exasol_types(
698    columns: &[(String, ExasolType, bool)],
699) -> Result<Schema, ExportError> {
700    let fields: Result<Vec<Field>, ExportError> = columns
701        .iter()
702        .map(|(name, exasol_type, nullable)| {
703            let data_type = exasol_type_to_arrow(exasol_type)?;
704            Ok(Field::new(name, data_type, *nullable))
705        })
706        .collect();
707
708    Ok(Schema::new(fields?))
709}
710
711/// Writes Arrow RecordBatches to Arrow IPC format.
712///
713/// # Arguments
714///
715/// * `writer` - The async writer to write to
716/// * `schema` - The Arrow schema
717/// * `batches` - Iterator of RecordBatches to write
718///
719/// # Returns
720///
721/// The total number of rows written.
722pub async fn write_arrow_ipc<W, I>(
723    writer: &mut W,
724    schema: Arc<Schema>,
725    batches: I,
726) -> Result<u64, ExportError>
727where
728    W: AsyncWrite + Unpin + Send,
729    I: IntoIterator<Item = Result<RecordBatch, ExportError>>,
730{
731    use arrow::ipc::writer::StreamWriter;
732    use std::io::Cursor;
733
734    let mut total_rows = 0u64;
735
736    // We need to use synchronous Arrow IPC writer, then write to async
737    // First, collect all batches and write to a buffer
738    let mut buffer = Cursor::new(Vec::new());
739    {
740        let mut ipc_writer = StreamWriter::try_new(&mut buffer, &schema)
741            .map_err(|e| ExportError::ArrowError(e.to_string()))?;
742
743        for batch_result in batches {
744            let batch = batch_result?;
745            total_rows += batch.num_rows() as u64;
746            ipc_writer
747                .write(&batch)
748                .map_err(|e| ExportError::ArrowError(e.to_string()))?;
749        }
750
751        ipc_writer
752            .finish()
753            .map_err(|e| ExportError::ArrowError(e.to_string()))?;
754    }
755
756    // Write the buffer to the async writer
757    let data = buffer.into_inner();
758    writer
759        .write_all(&data)
760        .await
761        .map_err(|e| ExportError::IoError(e.to_string()))?;
762    writer
763        .flush()
764        .await
765        .map_err(|e| ExportError::IoError(e.to_string()))?;
766
767    Ok(total_rows)
768}
769
770/// Writes Arrow RecordBatches to Arrow IPC File format (with footer).
771///
772/// # Arguments
773///
774/// * `writer` - The async writer to write to
775/// * `schema` - The Arrow schema
776/// * `batches` - Iterator of RecordBatches to write
777///
778/// # Returns
779///
780/// The total number of rows written.
781pub async fn write_arrow_ipc_file<W, I>(
782    writer: &mut W,
783    schema: Arc<Schema>,
784    batches: I,
785) -> Result<u64, ExportError>
786where
787    W: AsyncWrite + Unpin + Send,
788    I: IntoIterator<Item = Result<RecordBatch, ExportError>>,
789{
790    use arrow::ipc::writer::FileWriter;
791    use std::io::Cursor;
792
793    let mut total_rows = 0u64;
794
795    // We need to use synchronous Arrow IPC writer, then write to async
796    let mut buffer = Cursor::new(Vec::new());
797    {
798        let mut ipc_writer = FileWriter::try_new(&mut buffer, &schema)
799            .map_err(|e| ExportError::ArrowError(e.to_string()))?;
800
801        for batch_result in batches {
802            let batch = batch_result?;
803            total_rows += batch.num_rows() as u64;
804            ipc_writer
805                .write(&batch)
806                .map_err(|e| ExportError::ArrowError(e.to_string()))?;
807        }
808
809        ipc_writer
810            .finish()
811            .map_err(|e| ExportError::ArrowError(e.to_string()))?;
812    }
813
814    // Write the buffer to the async writer
815    let data = buffer.into_inner();
816    writer
817        .write_all(&data)
818        .await
819        .map_err(|e| ExportError::IoError(e.to_string()))?;
820    writer
821        .flush()
822        .await
823        .map_err(|e| ExportError::IoError(e.to_string()))?;
824
825    Ok(total_rows)
826}
827
828// =============================================================================
829// Transport-integrated export functions
830// =============================================================================
831
832use crate::query::export::ExportSource;
833use crate::transport::TransportProtocol;
834
835/// Exports data from an Exasol table or query to Arrow RecordBatches.
836///
837/// This function exports data from Exasol (table or query) and converts it
838/// to Arrow RecordBatches.
839///
840/// # Arguments
841///
842/// * `transport` - Transport for executing SQL
843/// * `source` - The data source (table or query)
844/// * `options` - Export options
845///
846/// # Returns
847///
848/// A vector of RecordBatches on success.
849///
850/// # Errors
851///
852/// Returns `ExportError` if the export fails.
853pub async fn export_to_record_batches<T: TransportProtocol + ?Sized>(
854    transport: &mut T,
855    source: ExportSource,
856    options: ArrowExportOptions,
857) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
858    use crate::export::csv::{export_to_list, CsvExportOptions};
859
860    // First, get the data as CSV via the existing export function
861    let csv_options = CsvExportOptions::default()
862        .column_separator(options.column_separator)
863        .column_delimiter(options.column_delimiter)
864        .with_column_names(false)
865        .exasol_host(&options.host)
866        .exasol_port(options.port)
867        .use_tls(options.use_encryption);
868
869    // Get the CSV data as a list of rows
870    let rows = export_to_list(transport, source, csv_options).await?;
871
872    // If schema is provided, use it; otherwise return empty result
873    let schema = match options.schema {
874        Some(s) => s,
875        None => {
876            // Without a schema, we can't build RecordBatches
877            return Err(crate::export::csv::ExportError::CsvParseError {
878                row: 0,
879                message: "Schema required for Arrow export".to_string(),
880            });
881        }
882    };
883
884    // Convert the rows to RecordBatches
885    let mut batches = Vec::new();
886    for chunk in rows.chunks(options.batch_size) {
887        let arrays: Result<Vec<ArrayRef>, ExportError> = (0..schema.fields().len())
888            .map(|col_idx| {
889                let field = schema.field(col_idx);
890                let values: Vec<&str> = chunk
891                    .iter()
892                    .map(|row| row.get(col_idx).map(|s| s.as_str()).unwrap_or(""))
893                    .collect();
894
895                build_array_from_strings(
896                    &values,
897                    field.data_type(),
898                    field.is_nullable(),
899                    &options.null_value,
900                    0,
901                    col_idx,
902                )
903            })
904            .collect();
905
906        let arrays = arrays.map_err(|e| crate::export::csv::ExportError::CsvParseError {
907            row: 0,
908            message: e.to_string(),
909        })?;
910
911        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
912            crate::export::csv::ExportError::CsvParseError {
913                row: 0,
914                message: e.to_string(),
915            }
916        })?;
917
918        batches.push(batch);
919    }
920
921    Ok(batches)
922}
923
924/// Exports data from an Exasol table or query to an Arrow IPC file.
925///
926/// This function exports data from Exasol (table or query) to an Arrow IPC file.
927///
928/// # Arguments
929///
930/// * `transport` - Transport for executing SQL
931/// * `source` - The data source (table or query)
932/// * `file_path` - Path to the output Arrow IPC file
933/// * `options` - Export options
934///
935/// # Returns
936///
937/// The number of rows exported on success.
938///
939/// # Errors
940///
941/// Returns `ExportError` if the export fails.
942pub async fn export_to_arrow_ipc<T: TransportProtocol + ?Sized>(
943    transport: &mut T,
944    source: ExportSource,
945    file_path: &std::path::Path,
946    options: ArrowExportOptions,
947) -> Result<u64, crate::export::csv::ExportError> {
948    // Get the batches
949    let batches = export_to_record_batches(transport, source, options.clone()).await?;
950
951    // Get the schema
952    let schema = options
953        .schema
954        .ok_or_else(|| crate::export::csv::ExportError::CsvParseError {
955            row: 0,
956            message: "Schema required for Arrow IPC export".to_string(),
957        })?;
958
959    // Write to the file
960    let file = tokio::fs::File::create(file_path).await?;
961
962    let mut file = tokio::io::BufWriter::new(file);
963
964    let batch_results: Vec<Result<RecordBatch, ExportError>> =
965        batches.into_iter().map(Ok).collect();
966
967    let rows = write_arrow_ipc_file(&mut file, schema, batch_results)
968        .await
969        .map_err(|e| crate::export::csv::ExportError::CsvParseError {
970            row: 0,
971            message: e.to_string(),
972        })?;
973
974    Ok(rows)
975}
976
977#[cfg(test)]
978mod tests {
979    use super::*;
980    use tokio::io::BufReader;
981
982    // ==========================================================================
983    // Tests for ArrowExportOptions
984    // ==========================================================================
985
986    #[test]
987    fn test_arrow_export_options_default() {
988        let options = ArrowExportOptions::default();
989        assert_eq!(options.batch_size, 1024);
990        assert!(options.null_value.is_none());
991        assert!(options.schema.is_none());
992        assert_eq!(options.column_separator, ',');
993        assert_eq!(options.column_delimiter, '"');
994        assert_eq!(options.host, "");
995        assert_eq!(options.port, 0);
996        assert!(!options.use_encryption);
997    }
998
999    #[test]
1000    fn test_arrow_export_options_builder() {
1001        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1002
1003        let options = ArrowExportOptions::new()
1004            .with_batch_size(2048)
1005            .with_null_value("NULL")
1006            .with_schema(Arc::clone(&schema))
1007            .with_column_separator(';')
1008            .with_column_delimiter('\'')
1009            .exasol_host("exasol.example.com")
1010            .exasol_port(8563)
1011            .use_encryption(true);
1012
1013        assert_eq!(options.batch_size, 2048);
1014        assert_eq!(options.null_value, Some("NULL".to_string()));
1015        assert!(options.schema.is_some());
1016        assert_eq!(options.column_separator, ';');
1017        assert_eq!(options.column_delimiter, '\'');
1018        assert_eq!(options.host, "exasol.example.com");
1019        assert_eq!(options.port, 8563);
1020        assert!(options.use_encryption);
1021    }
1022
1023    // ==========================================================================
1024    // Tests for CSV parsing
1025    // ==========================================================================
1026
1027    #[test]
1028    fn test_parse_csv_row_simple() {
1029        let line = "1,hello,world";
1030        let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1031        assert_eq!(fields, vec!["1", "hello", "world"]);
1032    }
1033
1034    #[test]
1035    fn test_parse_csv_row_quoted() {
1036        let line = r#"1,"hello, world","test""#;
1037        let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1038        assert_eq!(fields, vec!["1", "hello, world", "test"]);
1039    }
1040
1041    #[test]
1042    fn test_parse_csv_row_escaped_quote() {
1043        let line = r#"1,"hello ""world""","test""#;
1044        let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1045        assert_eq!(fields, vec!["1", r#"hello "world""#, "test"]);
1046    }
1047
1048    #[test]
1049    fn test_parse_csv_row_empty_fields() {
1050        let line = "1,,3";
1051        let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1052        assert_eq!(fields, vec!["1", "", "3"]);
1053    }
1054
1055    #[test]
1056    fn test_parse_csv_row_custom_separator() {
1057        let line = "1;hello;world";
1058        let fields = parse_csv_row(line, ';', '"', 0).unwrap();
1059        assert_eq!(fields, vec!["1", "hello", "world"]);
1060    }
1061
1062    #[test]
1063    fn test_parse_csv_row_unclosed_quote_error() {
1064        let line = r#"1,"hello"#;
1065        let result = parse_csv_row(line, ',', '"', 0);
1066        assert!(result.is_err());
1067        match result.unwrap_err() {
1068            ExportError::CsvParseError { row, message } => {
1069                assert_eq!(row, 0);
1070                assert!(message.contains("Unclosed quote"));
1071            }
1072            _ => panic!("Expected CsvParseError"),
1073        }
1074    }
1075
1076    // ==========================================================================
1077    // Tests for NULL detection
1078    // ==========================================================================
1079
1080    #[test]
1081    fn test_is_null_value_empty() {
1082        assert!(is_null_value("", &None));
1083        assert!(is_null_value("", &Some("NULL".to_string())));
1084    }
1085
1086    #[test]
1087    fn test_is_null_value_custom() {
1088        assert!(is_null_value("NULL", &Some("NULL".to_string())));
1089        assert!(!is_null_value("null", &Some("NULL".to_string())));
1090        assert!(!is_null_value("value", &Some("NULL".to_string())));
1091    }
1092
1093    #[test]
1094    fn test_is_null_value_no_custom() {
1095        assert!(!is_null_value("NULL", &None));
1096        assert!(!is_null_value("value", &None));
1097    }
1098
1099    // ==========================================================================
1100    // Tests for date parsing
1101    // ==========================================================================
1102
1103    #[test]
1104    fn test_parse_date_to_days_epoch() {
1105        let days = parse_date_to_days("1970-01-01").unwrap();
1106        assert_eq!(days, 0);
1107    }
1108
1109    #[test]
1110    fn test_parse_date_to_days_after_epoch() {
1111        let days = parse_date_to_days("1970-01-02").unwrap();
1112        assert_eq!(days, 1);
1113    }
1114
1115    #[test]
1116    fn test_parse_date_to_days_before_epoch() {
1117        let days = parse_date_to_days("1969-12-31").unwrap();
1118        assert_eq!(days, -1);
1119    }
1120
1121    #[test]
1122    fn test_parse_date_to_days_leap_year() {
1123        // 2000 is a leap year
1124        let mar1_2000 = parse_date_to_days("2000-03-01").unwrap();
1125        let feb28_2000 = parse_date_to_days("2000-02-28").unwrap();
1126        assert_eq!(mar1_2000 - feb28_2000, 2); // Feb 29 exists
1127    }
1128
1129    #[test]
1130    fn test_parse_date_to_days_invalid_format() {
1131        assert!(parse_date_to_days("2024/01/15").is_err());
1132        assert!(parse_date_to_days("2024-01").is_err());
1133        assert!(parse_date_to_days("invalid").is_err());
1134    }
1135
1136    #[test]
1137    fn test_parse_date_to_days_invalid_values() {
1138        assert!(parse_date_to_days("2024-13-01").is_err()); // Invalid month
1139        assert!(parse_date_to_days("2024-01-32").is_err()); // Invalid day
1140        assert!(parse_date_to_days("2024-00-15").is_err()); // Month 0
1141    }
1142
1143    // ==========================================================================
1144    // Tests for timestamp parsing
1145    // ==========================================================================
1146
1147    #[test]
1148    fn test_parse_timestamp_to_micros_epoch() {
1149        let micros = parse_timestamp_to_micros("1970-01-01 00:00:00").unwrap();
1150        assert_eq!(micros, 0);
1151    }
1152
1153    #[test]
1154    fn test_parse_timestamp_to_micros_with_time() {
1155        let micros = parse_timestamp_to_micros("1970-01-01 00:00:01").unwrap();
1156        assert_eq!(micros, 1_000_000);
1157    }
1158
1159    #[test]
1160    fn test_parse_timestamp_to_micros_with_fractional() {
1161        let micros = parse_timestamp_to_micros("1970-01-01 00:00:00.123456").unwrap();
1162        assert_eq!(micros, 123_456);
1163    }
1164
1165    #[test]
1166    fn test_parse_timestamp_to_micros_date_only() {
1167        let micros = parse_timestamp_to_micros("1970-01-02").unwrap();
1168        assert_eq!(micros, 86400 * 1_000_000);
1169    }
1170
1171    // ==========================================================================
1172    // Tests for decimal parsing
1173    // ==========================================================================
1174
1175    #[test]
1176    fn test_parse_decimal_to_i128_integer() {
1177        let result = parse_decimal_to_i128("123", 2).unwrap();
1178        assert_eq!(result, 12300); // 123 * 10^2
1179    }
1180
1181    #[test]
1182    fn test_parse_decimal_to_i128_with_fraction() {
1183        let result = parse_decimal_to_i128("123.45", 2).unwrap();
1184        assert_eq!(result, 12345);
1185    }
1186
1187    #[test]
1188    fn test_parse_decimal_to_i128_negative() {
1189        let result = parse_decimal_to_i128("-123.45", 2).unwrap();
1190        assert_eq!(result, -12345);
1191    }
1192
1193    #[test]
1194    fn test_parse_decimal_to_i128_short_fraction() {
1195        let result = parse_decimal_to_i128("123.4", 2).unwrap();
1196        assert_eq!(result, 12340);
1197    }
1198
1199    #[test]
1200    fn test_parse_decimal_to_i128_invalid() {
1201        assert!(parse_decimal_to_i128("abc", 2).is_err());
1202        assert!(parse_decimal_to_i128("1.2.3", 2).is_err());
1203    }
1204
1205    // ==========================================================================
1206    // Tests for array builders
1207    // ==========================================================================
1208
1209    #[test]
1210    fn test_build_boolean_array() {
1211        let values = vec!["true", "false", "", "1", "0"];
1212        let null_value = None;
1213        let array = build_boolean_array(&values, true, &null_value, 0, 0).unwrap();
1214
1215        assert_eq!(array.len(), 5);
1216        assert_eq!(array.null_count(), 1);
1217    }
1218
1219    #[test]
1220    fn test_build_boolean_array_invalid() {
1221        let values = vec!["invalid"];
1222        let null_value = None;
1223        let result = build_boolean_array(&values, true, &null_value, 0, 0);
1224        assert!(result.is_err());
1225    }
1226
1227    #[test]
1228    fn test_build_int64_array() {
1229        let values = vec!["1", "2", "", "3"];
1230        let null_value = None;
1231        let array = build_int64_array(&values, true, &null_value, 0, 0).unwrap();
1232
1233        assert_eq!(array.len(), 4);
1234        assert_eq!(array.null_count(), 1);
1235    }
1236
1237    #[test]
1238    fn test_build_float64_array() {
1239        let values = vec!["1.5", "2.5", "Infinity", "-Infinity", "NaN", ""];
1240        let null_value = None;
1241        let array = build_float64_array(&values, true, &null_value, 0, 0).unwrap();
1242
1243        assert_eq!(array.len(), 6);
1244        assert_eq!(array.null_count(), 1);
1245    }
1246
1247    #[test]
1248    fn test_build_string_array() {
1249        let values = vec!["hello", "world", "", "test"];
1250        let null_value = None;
1251        let array = build_string_array(&values, true, &null_value).unwrap();
1252
1253        assert_eq!(array.len(), 4);
1254        assert_eq!(array.null_count(), 1);
1255    }
1256
1257    #[test]
1258    fn test_build_date32_array() {
1259        let values = vec!["2024-01-15", "2024-06-20", ""];
1260        let null_value = None;
1261        let array = build_date32_array(&values, true, &null_value, 0, 0).unwrap();
1262
1263        assert_eq!(array.len(), 3);
1264        assert_eq!(array.null_count(), 1);
1265    }
1266
1267    #[test]
1268    fn test_build_timestamp_array() {
1269        let values = vec!["2024-01-15 10:30:00", "2024-06-20 14:45:30.123456", ""];
1270        let null_value = None;
1271        let array = build_timestamp_array(&values, true, &null_value, 0, 0).unwrap();
1272
1273        assert_eq!(array.len(), 3);
1274        assert_eq!(array.null_count(), 1);
1275    }
1276
1277    #[test]
1278    fn test_build_decimal128_array() {
1279        let values = vec!["123.45", "678.90", ""];
1280        let null_value = None;
1281        let array = build_decimal128_array(&values, 10, 2, true, &null_value, 0, 0).unwrap();
1282
1283        assert_eq!(array.len(), 3);
1284        assert_eq!(array.null_count(), 1);
1285    }
1286
1287    // ==========================================================================
1288    // Tests for schema building
1289    // ==========================================================================
1290
1291    #[test]
1292    fn test_exasol_type_to_arrow() {
1293        assert_eq!(
1294            exasol_type_to_arrow(&ExasolType::Boolean).unwrap(),
1295            DataType::Boolean
1296        );
1297        assert_eq!(
1298            exasol_type_to_arrow(&ExasolType::Varchar { size: 100 }).unwrap(),
1299            DataType::Utf8
1300        );
1301        assert_eq!(
1302            exasol_type_to_arrow(&ExasolType::Double).unwrap(),
1303            DataType::Float64
1304        );
1305        assert_eq!(
1306            exasol_type_to_arrow(&ExasolType::Date).unwrap(),
1307            DataType::Date32
1308        );
1309    }
1310
1311    #[test]
1312    fn test_build_schema_from_exasol_types() {
1313        let columns = vec![
1314            (
1315                "id".to_string(),
1316                ExasolType::Decimal {
1317                    precision: 18,
1318                    scale: 0,
1319                },
1320                false,
1321            ),
1322            ("name".to_string(), ExasolType::Varchar { size: 100 }, true),
1323            ("active".to_string(), ExasolType::Boolean, true),
1324        ];
1325
1326        let schema = build_schema_from_exasol_types(&columns).unwrap();
1327        assert_eq!(schema.fields().len(), 3);
1328        assert_eq!(schema.field(0).name(), "id");
1329        assert_eq!(schema.field(1).name(), "name");
1330        assert_eq!(schema.field(2).name(), "active");
1331    }
1332
1333    // ==========================================================================
1334    // Tests for CsvToArrowReader
1335    // ==========================================================================
1336
1337    #[tokio::test]
1338    async fn test_csv_to_arrow_reader_simple() {
1339        let csv_data = "1,hello,true\n2,world,false\n";
1340        let reader = BufReader::new(csv_data.as_bytes());
1341
1342        let schema = Arc::new(Schema::new(vec![
1343            Field::new("id", DataType::Int64, false),
1344            Field::new("name", DataType::Utf8, false),
1345            Field::new("active", DataType::Boolean, false),
1346        ]));
1347
1348        let options = ArrowExportOptions::default().with_batch_size(10);
1349        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1350
1351        let batch = arrow_reader.next_batch().await.unwrap().unwrap();
1352        assert_eq!(batch.num_rows(), 2);
1353        assert_eq!(batch.num_columns(), 3);
1354
1355        // Should return None on next call
1356        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1357    }
1358
1359    #[tokio::test]
1360    async fn test_csv_to_arrow_reader_with_nulls() {
1361        let csv_data = "1,hello\n2,\n";
1362        let reader = BufReader::new(csv_data.as_bytes());
1363
1364        let schema = Arc::new(Schema::new(vec![
1365            Field::new("id", DataType::Int64, false),
1366            Field::new("name", DataType::Utf8, true),
1367        ]));
1368
1369        let options = ArrowExportOptions::default();
1370        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1371
1372        let batch = arrow_reader.next_batch().await.unwrap().unwrap();
1373        assert_eq!(batch.num_rows(), 2);
1374        assert_eq!(batch.column(1).null_count(), 1);
1375    }
1376
1377    #[tokio::test]
1378    async fn test_csv_to_arrow_reader_batching() {
1379        let csv_data = "1\n2\n3\n4\n5\n";
1380        let reader = BufReader::new(csv_data.as_bytes());
1381
1382        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1383
1384        let options = ArrowExportOptions::default().with_batch_size(2);
1385        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1386
1387        // First batch: 2 rows
1388        let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1389        assert_eq!(batch1.num_rows(), 2);
1390
1391        // Second batch: 2 rows
1392        let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1393        assert_eq!(batch2.num_rows(), 2);
1394
1395        // Third batch: 1 row
1396        let batch3 = arrow_reader.next_batch().await.unwrap().unwrap();
1397        assert_eq!(batch3.num_rows(), 1);
1398
1399        // No more data
1400        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1401    }
1402
1403    #[tokio::test]
1404    async fn test_csv_to_arrow_reader_empty() {
1405        let csv_data = "";
1406        let reader = BufReader::new(csv_data.as_bytes());
1407
1408        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1409
1410        let options = ArrowExportOptions::default();
1411        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1412
1413        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1414    }
1415
1416    // ==========================================================================
1417    // Tests for Arrow IPC writing
1418    // ==========================================================================
1419
1420    #[tokio::test]
1421    async fn test_write_arrow_ipc() {
1422        let schema = Arc::new(Schema::new(vec![
1423            Field::new("id", DataType::Int64, false),
1424            Field::new("name", DataType::Utf8, true),
1425        ]));
1426
1427        let batch = RecordBatch::try_new(
1428            Arc::clone(&schema),
1429            vec![
1430                Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3])),
1431                Arc::new(arrow::array::StringArray::from(vec![
1432                    Some("a"),
1433                    Some("b"),
1434                    None,
1435                ])),
1436            ],
1437        )
1438        .unwrap();
1439
1440        let mut buffer = Vec::new();
1441        let rows = write_arrow_ipc(&mut buffer, schema, vec![Ok(batch)])
1442            .await
1443            .unwrap();
1444
1445        assert_eq!(rows, 3);
1446        assert!(!buffer.is_empty());
1447    }
1448
1449    #[tokio::test]
1450    async fn test_write_arrow_ipc_file() {
1451        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1452
1453        let batch = RecordBatch::try_new(
1454            Arc::clone(&schema),
1455            vec![Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3]))],
1456        )
1457        .unwrap();
1458
1459        let mut buffer = Vec::new();
1460        let rows = write_arrow_ipc_file(&mut buffer, schema, vec![Ok(batch)])
1461            .await
1462            .unwrap();
1463
1464        assert_eq!(rows, 3);
1465        assert!(!buffer.is_empty());
1466        // IPC file format has ARROW1 magic at start
1467        assert_eq!(&buffer[0..6], b"ARROW1");
1468    }
1469
1470    // ==========================================================================
1471    // Tests for type conversion edge cases
1472    // ==========================================================================
1473
1474    #[test]
1475    fn test_boolean_conversion_variants() {
1476        let values = vec!["true", "false", "TRUE", "FALSE", "True", "False"];
1477        let null_value = None;
1478        let array = build_boolean_array(&values, false, &null_value, 0, 0).unwrap();
1479        assert_eq!(array.len(), 6);
1480
1481        let values = vec!["1", "0", "t", "f", "yes", "no", "y", "n"];
1482        let array = build_boolean_array(&values, false, &null_value, 0, 0).unwrap();
1483        assert_eq!(array.len(), 8);
1484    }
1485
1486    #[test]
1487    fn test_non_nullable_null_error() {
1488        let values = vec![""];
1489        let null_value = None;
1490        let result = build_int64_array(&values, false, &null_value, 0, 0);
1491        assert!(result.is_err());
1492        match result.unwrap_err() {
1493            ExportError::TypeConversionError { message, .. } => {
1494                assert!(message.contains("NULL value in non-nullable column"));
1495            }
1496            _ => panic!("Expected TypeConversionError"),
1497        }
1498    }
1499
1500    #[test]
1501    fn test_custom_null_value() {
1502        let values = vec!["1", "NULL", "3"];
1503        let null_value = Some("NULL".to_string());
1504        let array = build_int64_array(&values, true, &null_value, 0, 0).unwrap();
1505
1506        assert_eq!(array.len(), 3);
1507        assert_eq!(array.null_count(), 1);
1508    }
1509
1510    // ==========================================================================
1511    // Tests for schema derivation from Exasol metadata
1512    // ==========================================================================
1513
1514    #[test]
1515    fn test_exasol_type_to_arrow_decimal() {
1516        let result = exasol_type_to_arrow(&ExasolType::Decimal {
1517            precision: 18,
1518            scale: 4,
1519        })
1520        .unwrap();
1521        assert_eq!(result, DataType::Decimal128(18, 4));
1522    }
1523
1524    #[test]
1525    fn test_exasol_type_to_arrow_char() {
1526        let result = exasol_type_to_arrow(&ExasolType::Char { size: 50 }).unwrap();
1527        assert_eq!(result, DataType::Utf8);
1528    }
1529
1530    #[test]
1531    fn test_exasol_type_to_arrow_timestamp() {
1532        let result = exasol_type_to_arrow(&ExasolType::Timestamp {
1533            with_local_time_zone: false,
1534        })
1535        .unwrap();
1536        assert_eq!(result, DataType::Timestamp(TimeUnit::Microsecond, None));
1537    }
1538
1539    #[test]
1540    fn test_exasol_type_to_arrow_timestamp_with_tz() {
1541        let result = exasol_type_to_arrow(&ExasolType::Timestamp {
1542            with_local_time_zone: true,
1543        })
1544        .unwrap();
1545        assert_eq!(
1546            result,
1547            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
1548        );
1549    }
1550
1551    #[test]
1552    fn test_exasol_type_to_arrow_interval_year_to_month() {
1553        let result = exasol_type_to_arrow(&ExasolType::IntervalYearToMonth).unwrap();
1554        assert_eq!(result, DataType::Int64);
1555    }
1556
1557    #[test]
1558    fn test_exasol_type_to_arrow_interval_day_to_second() {
1559        let result =
1560            exasol_type_to_arrow(&ExasolType::IntervalDayToSecond { precision: 3 }).unwrap();
1561        assert_eq!(result, DataType::Int64);
1562    }
1563
1564    #[test]
1565    fn test_exasol_type_to_arrow_geometry() {
1566        let result = exasol_type_to_arrow(&ExasolType::Geometry { srid: Some(4326) }).unwrap();
1567        assert_eq!(result, DataType::Binary);
1568    }
1569
1570    #[test]
1571    fn test_exasol_type_to_arrow_hashtype() {
1572        let result = exasol_type_to_arrow(&ExasolType::Hashtype { byte_size: 32 }).unwrap();
1573        assert_eq!(result, DataType::Binary);
1574    }
1575
1576    #[test]
1577    fn test_build_schema_from_exasol_types_all_types() {
1578        // Test comprehensive schema building with all supported Exasol types
1579        let columns = vec![
1580            ("bool_col".to_string(), ExasolType::Boolean, false),
1581            ("char_col".to_string(), ExasolType::Char { size: 10 }, true),
1582            (
1583                "varchar_col".to_string(),
1584                ExasolType::Varchar { size: 100 },
1585                true,
1586            ),
1587            (
1588                "decimal_col".to_string(),
1589                ExasolType::Decimal {
1590                    precision: 18,
1591                    scale: 4,
1592                },
1593                true,
1594            ),
1595            ("double_col".to_string(), ExasolType::Double, true),
1596            ("date_col".to_string(), ExasolType::Date, true),
1597            (
1598                "timestamp_col".to_string(),
1599                ExasolType::Timestamp {
1600                    with_local_time_zone: false,
1601                },
1602                true,
1603            ),
1604            (
1605                "timestamp_tz_col".to_string(),
1606                ExasolType::Timestamp {
1607                    with_local_time_zone: true,
1608                },
1609                true,
1610            ),
1611            (
1612                "interval_ym_col".to_string(),
1613                ExasolType::IntervalYearToMonth,
1614                true,
1615            ),
1616            (
1617                "interval_ds_col".to_string(),
1618                ExasolType::IntervalDayToSecond { precision: 3 },
1619                true,
1620            ),
1621            (
1622                "geometry_col".to_string(),
1623                ExasolType::Geometry { srid: Some(4326) },
1624                true,
1625            ),
1626            (
1627                "hashtype_col".to_string(),
1628                ExasolType::Hashtype { byte_size: 32 },
1629                true,
1630            ),
1631        ];
1632
1633        let schema = build_schema_from_exasol_types(&columns).unwrap();
1634
1635        assert_eq!(schema.fields().len(), 12);
1636
1637        // Verify each field
1638        assert_eq!(schema.field(0).name(), "bool_col");
1639        assert_eq!(schema.field(0).data_type(), &DataType::Boolean);
1640        assert!(!schema.field(0).is_nullable());
1641
1642        assert_eq!(schema.field(1).name(), "char_col");
1643        assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1644        assert!(schema.field(1).is_nullable());
1645
1646        assert_eq!(schema.field(2).name(), "varchar_col");
1647        assert_eq!(schema.field(2).data_type(), &DataType::Utf8);
1648
1649        assert_eq!(schema.field(3).name(), "decimal_col");
1650        assert_eq!(schema.field(3).data_type(), &DataType::Decimal128(18, 4));
1651
1652        assert_eq!(schema.field(4).name(), "double_col");
1653        assert_eq!(schema.field(4).data_type(), &DataType::Float64);
1654
1655        assert_eq!(schema.field(5).name(), "date_col");
1656        assert_eq!(schema.field(5).data_type(), &DataType::Date32);
1657
1658        assert_eq!(schema.field(6).name(), "timestamp_col");
1659        assert_eq!(
1660            schema.field(6).data_type(),
1661            &DataType::Timestamp(TimeUnit::Microsecond, None)
1662        );
1663
1664        assert_eq!(schema.field(7).name(), "timestamp_tz_col");
1665        assert_eq!(
1666            schema.field(7).data_type(),
1667            &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
1668        );
1669
1670        assert_eq!(schema.field(8).name(), "interval_ym_col");
1671        assert_eq!(schema.field(8).data_type(), &DataType::Int64);
1672
1673        assert_eq!(schema.field(9).name(), "interval_ds_col");
1674        assert_eq!(schema.field(9).data_type(), &DataType::Int64);
1675
1676        assert_eq!(schema.field(10).name(), "geometry_col");
1677        assert_eq!(schema.field(10).data_type(), &DataType::Binary);
1678
1679        assert_eq!(schema.field(11).name(), "hashtype_col");
1680        assert_eq!(schema.field(11).data_type(), &DataType::Binary);
1681    }
1682
1683    // ==========================================================================
1684    // Tests for configurable batch size
1685    // ==========================================================================
1686
1687    #[tokio::test]
1688    async fn test_csv_to_arrow_reader_batch_size_1() {
1689        // Test with batch size 1 - should produce one batch per row
1690        let csv_data = "1,a\n2,b\n3,c\n";
1691        let reader = BufReader::new(csv_data.as_bytes());
1692
1693        let schema = Arc::new(Schema::new(vec![
1694            Field::new("id", DataType::Int64, false),
1695            Field::new("name", DataType::Utf8, false),
1696        ]));
1697
1698        let options = ArrowExportOptions::default().with_batch_size(1);
1699        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1700
1701        // Should get 3 batches, each with 1 row
1702        let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1703        assert_eq!(batch1.num_rows(), 1);
1704
1705        let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1706        assert_eq!(batch2.num_rows(), 1);
1707
1708        let batch3 = arrow_reader.next_batch().await.unwrap().unwrap();
1709        assert_eq!(batch3.num_rows(), 1);
1710
1711        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1712    }
1713
1714    #[tokio::test]
1715    async fn test_csv_to_arrow_reader_batch_size_larger_than_data() {
1716        // Test with batch size larger than available data
1717        let csv_data = "1,a\n2,b\n";
1718        let reader = BufReader::new(csv_data.as_bytes());
1719
1720        let schema = Arc::new(Schema::new(vec![
1721            Field::new("id", DataType::Int64, false),
1722            Field::new("name", DataType::Utf8, false),
1723        ]));
1724
1725        let options = ArrowExportOptions::default().with_batch_size(1000);
1726        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1727
1728        // Should get 1 batch with all 2 rows
1729        let batch = arrow_reader.next_batch().await.unwrap().unwrap();
1730        assert_eq!(batch.num_rows(), 2);
1731
1732        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1733    }
1734
1735    #[tokio::test]
1736    async fn test_csv_to_arrow_reader_batch_size_exact_multiple() {
1737        // Test with batch size that divides data exactly
1738        let csv_data = "1\n2\n3\n4\n5\n6\n";
1739        let reader = BufReader::new(csv_data.as_bytes());
1740
1741        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1742
1743        let options = ArrowExportOptions::default().with_batch_size(3);
1744        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1745
1746        // Should get 2 batches, each with 3 rows
1747        let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1748        assert_eq!(batch1.num_rows(), 3);
1749
1750        let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1751        assert_eq!(batch2.num_rows(), 3);
1752
1753        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1754    }
1755
1756    #[tokio::test]
1757    async fn test_csv_to_arrow_reader_batch_size_with_partial_last_batch() {
1758        // Test batch size that doesn't divide data evenly
1759        let csv_data = "1\n2\n3\n4\n5\n6\n7\n";
1760        let reader = BufReader::new(csv_data.as_bytes());
1761
1762        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1763
1764        let options = ArrowExportOptions::default().with_batch_size(3);
1765        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1766
1767        // Should get 3 batches: 3 + 3 + 1
1768        let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1769        assert_eq!(batch1.num_rows(), 3);
1770
1771        let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1772        assert_eq!(batch2.num_rows(), 3);
1773
1774        let batch3 = arrow_reader.next_batch().await.unwrap().unwrap();
1775        assert_eq!(batch3.num_rows(), 1);
1776
1777        assert!(arrow_reader.next_batch().await.unwrap().is_none());
1778    }
1779
1780    #[test]
1781    fn test_arrow_export_options_batch_size_default() {
1782        let options = ArrowExportOptions::default();
1783        assert_eq!(options.batch_size, 1024);
1784    }
1785
1786    #[test]
1787    fn test_arrow_export_options_batch_size_custom() {
1788        let options = ArrowExportOptions::default().with_batch_size(500);
1789        assert_eq!(options.batch_size, 500);
1790    }
1791
1792    #[tokio::test]
1793    async fn test_csv_to_arrow_reader_total_row_count_across_batches() {
1794        // Verify that total rows across all batches equals input rows
1795        let csv_data = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n";
1796        let reader = BufReader::new(csv_data.as_bytes());
1797
1798        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1799
1800        let options = ArrowExportOptions::default().with_batch_size(3);
1801        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1802
1803        let mut total_rows = 0;
1804        while let Some(batch) = arrow_reader.next_batch().await.unwrap() {
1805            total_rows += batch.num_rows();
1806        }
1807
1808        assert_eq!(total_rows, 10);
1809    }
1810
1811    #[tokio::test]
1812    async fn test_csv_to_arrow_reader_preserves_data_across_batches() {
1813        // Verify data integrity across batches
1814        let csv_data = "1\n2\n3\n4\n5\n";
1815        let reader = BufReader::new(csv_data.as_bytes());
1816
1817        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1818
1819        let options = ArrowExportOptions::default().with_batch_size(2);
1820        let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1821
1822        // Collect all values
1823        let mut all_values: Vec<i64> = Vec::new();
1824        while let Some(batch) = arrow_reader.next_batch().await.unwrap() {
1825            let array = batch
1826                .column(0)
1827                .as_any()
1828                .downcast_ref::<arrow::array::Int64Array>()
1829                .unwrap();
1830            for i in 0..array.len() {
1831                all_values.push(array.value(i));
1832            }
1833        }
1834
1835        assert_eq!(all_values, vec![1, 2, 3, 4, 5]);
1836    }
1837}