Skip to main content

exarrow_rs/export/
csv.rs

1//! CSV export functionality for Exasol.
2//!
3//! This module provides functions for exporting data from Exasol tables or query results
4//! to CSV format via files, streams, in-memory lists, or custom callbacks.
5//!
6//! # Architecture
7//!
8//! The export process uses **client mode** where:
9//! 1. We connect TO Exasol (outbound connection - works through firewalls)
10//! 2. Perform EXA tunneling handshake to receive an internal address
11//! 3. Execute an EXPORT SQL statement using the internal address
12//! 4. Exasol sends data through the established connection
13//! 5. We read and process the data stream
14//!
15//! This client mode approach works with cloud Exasol instances and through NAT/firewalls.
16
17use std::future::Future;
18use std::io;
19use std::path::Path;
20
21use bzip2::read::BzDecoder;
22use flate2::read::GzDecoder;
23use thiserror::Error;
24use tokio::fs::File;
25use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
26use tokio::sync::mpsc;
27
28use crate::query::export::{Compression, ExportQuery, ExportSource, RowSeparator};
29use crate::transport::protocol::TransportProtocol;
30use crate::transport::HttpTransportClient;
31
32/// Default buffer size for data pipe channels (number of chunks).
33const DEFAULT_PIPE_BUFFER_SIZE: usize = 16;
34
35/// Error types for export operations.
36#[derive(Error, Debug)]
37pub enum ExportError {
38    /// I/O error during export.
39    #[error("I/O error: {0}")]
40    IoError(#[from] io::Error),
41
42    /// Transport error during export.
43    #[error("Transport error: {0}")]
44    TransportError(#[from] crate::error::TransportError),
45
46    /// HTTP transport setup error.
47    #[error("HTTP transport error: {message}")]
48    HttpTransportError { message: String },
49
50    /// SQL execution error.
51    #[error("SQL execution error: {message}")]
52    SqlExecutionError { message: String },
53
54    /// CSV parsing error.
55    #[error("CSV parsing error at row {row}: {message}")]
56    CsvParseError { row: usize, message: String },
57
58    /// CSV parsing error (alternative for parquet module compatibility).
59    #[error("CSV parsing error at row {row}: {message}")]
60    CsvParse { row: usize, message: String },
61
62    /// Decompression error.
63    #[error("Decompression error: {0}")]
64    DecompressionError(String),
65
66    /// Channel communication error.
67    #[error("Channel error: {0}")]
68    ChannelError(String),
69
70    /// Export timeout.
71    #[error("Export timed out after {timeout_ms}ms")]
72    Timeout { timeout_ms: u64 },
73
74    /// Export was cancelled.
75    #[error("Export was cancelled")]
76    Cancelled,
77
78    /// Arrow error.
79    #[error("Arrow error: {0}")]
80    Arrow(String),
81
82    /// Schema error.
83    #[error("Schema error: {0}")]
84    Schema(String),
85
86    /// Parquet error.
87    #[error("Parquet error: {0}")]
88    Parquet(String),
89}
90
91impl From<parquet::errors::ParquetError> for ExportError {
92    fn from(err: parquet::errors::ParquetError) -> Self {
93        ExportError::Parquet(err.to_string())
94    }
95}
96
97/// Options for CSV export configuration.
98#[derive(Debug, Clone)]
99pub struct CsvExportOptions {
100    /// Column separator character (default: ',').
101    pub column_separator: char,
102
103    /// Column delimiter character for quoting (default: '"').
104    pub column_delimiter: char,
105
106    /// Row separator (default: LF).
107    pub row_separator: RowSeparator,
108
109    /// Character encoding (default: "UTF-8").
110    pub encoding: String,
111
112    /// Custom NULL value representation (default: None, empty string).
113    pub null_value: Option<String>,
114
115    /// Compression type (default: None).
116    pub compression: Compression,
117
118    /// Whether to include column headers in the output (default: false).
119    pub with_column_names: bool,
120
121    /// Use TLS for the HTTP transport (default: true).
122    pub use_tls: bool,
123
124    /// Timeout in milliseconds for the export operation (default: 300000 = 5 minutes).
125    pub timeout_ms: u64,
126
127    /// Exasol host for HTTP transport connection.
128    /// This is typically the same host as the WebSocket connection.
129    pub host: String,
130
131    /// Exasol port for HTTP transport connection.
132    /// This is typically the same port as the WebSocket connection.
133    pub port: u16,
134}
135
136impl Default for CsvExportOptions {
137    fn default() -> Self {
138        Self {
139            column_separator: ',',
140            column_delimiter: '"',
141            row_separator: RowSeparator::LF,
142            encoding: "UTF-8".to_string(),
143            null_value: None,
144            compression: Compression::None,
145            with_column_names: false,
146            use_tls: false,
147            timeout_ms: 300_000, // 5 minutes
148            host: String::new(),
149            port: 0,
150        }
151    }
152}
153
154impl CsvExportOptions {
155    /// Creates new export options with default values.
156    #[must_use]
157    pub fn new() -> Self {
158        Self::default()
159    }
160
161    #[must_use]
162    pub fn column_separator(mut self, sep: char) -> Self {
163        self.column_separator = sep;
164        self
165    }
166
167    #[must_use]
168    pub fn column_delimiter(mut self, delim: char) -> Self {
169        self.column_delimiter = delim;
170        self
171    }
172
173    #[must_use]
174    pub fn row_separator(mut self, sep: RowSeparator) -> Self {
175        self.row_separator = sep;
176        self
177    }
178
179    #[must_use]
180    pub fn encoding(mut self, enc: &str) -> Self {
181        self.encoding = enc.to_string();
182        self
183    }
184
185    #[must_use]
186    pub fn null_value(mut self, val: &str) -> Self {
187        self.null_value = Some(val.to_string());
188        self
189    }
190
191    #[must_use]
192    pub fn compression(mut self, compression: Compression) -> Self {
193        self.compression = compression;
194        self
195    }
196
197    #[must_use]
198    pub fn with_column_names(mut self, include: bool) -> Self {
199        self.with_column_names = include;
200        self
201    }
202
203    #[must_use]
204    pub fn use_tls(mut self, use_tls: bool) -> Self {
205        self.use_tls = use_tls;
206        self
207    }
208
209    #[must_use]
210    pub fn timeout_ms(mut self, timeout: u64) -> Self {
211        self.timeout_ms = timeout;
212        self
213    }
214
215    /// Sets the Exasol host for HTTP transport connection.
216    ///
217    /// This is typically the same host as the WebSocket connection.
218    #[must_use]
219    pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
220        self.host = host.into();
221        self
222    }
223
224    /// Sets the Exasol port for HTTP transport connection.
225    ///
226    /// This is typically the same port as the WebSocket connection.
227    #[must_use]
228    pub fn exasol_port(mut self, port: u16) -> Self {
229        self.port = port;
230        self
231    }
232}
233
234/// Receiver end of the data pipe for processing exported data.
235pub struct DataPipeReceiver {
236    rx: mpsc::Receiver<Vec<u8>>,
237}
238
239impl DataPipeReceiver {
240    /// Receives the next chunk of data.
241    ///
242    /// Returns `None` when all data has been received.
243    pub async fn recv(&mut self) -> Option<Vec<u8>> {
244        self.rx.recv().await
245    }
246}
247
248/// Exports data from an Exasol table or query to a file.
249///
250/// # Arguments
251///
252/// * `ws_transport` - WebSocket transport for executing SQL
253/// * `source` - The data source (table or query)
254/// * `file_path` - Path to the output file
255/// * `options` - Export options
256///
257/// # Returns
258///
259/// The number of rows exported on success.
260///
261/// # Errors
262///
263/// Returns `ExportError` if the export fails.
264///
265/// # Example
266///
267pub async fn export_to_file<T: TransportProtocol + ?Sized>(
268    ws_transport: &mut T,
269    source: ExportSource,
270    file_path: &Path,
271    options: CsvExportOptions,
272) -> Result<u64, ExportError> {
273    // Create the output file
274    let file = File::create(file_path).await?;
275    let writer = BufWriter::new(file);
276
277    export_to_stream(ws_transport, source, writer, options).await
278}
279
280/// Exports data from an Exasol table or query to an async writer.
281///
282/// # Arguments
283///
284/// * `ws_transport` - WebSocket transport for executing SQL
285/// * `source` - The data source (table or query)
286/// * `writer` - Async writer to write the CSV data to
287/// * `options` - Export options
288///
289/// # Returns
290///
291/// The number of rows exported on success.
292///
293/// # Errors
294///
295/// Returns `ExportError` if the export fails.
296pub async fn export_to_stream<T: TransportProtocol + ?Sized, W: AsyncWrite + Unpin>(
297    ws_transport: &mut T,
298    source: ExportSource,
299    mut writer: W,
300    options: CsvExportOptions,
301) -> Result<u64, ExportError> {
302    let compression = options.compression;
303
304    // Use the callback variant to process data
305    export_to_callback(ws_transport, source, options, |mut receiver| async move {
306        let mut row_count = 0u64;
307        let mut buffer = Vec::new();
308
309        // Collect all data first
310        while let Some(chunk) = receiver.recv().await {
311            buffer.extend_from_slice(&chunk);
312        }
313
314        // Decompress if needed
315        let data = match compression {
316            Compression::Gzip => {
317                let decoder = GzDecoder::new(buffer.as_slice());
318                let mut decompressed = Vec::new();
319                std::io::Read::read_to_end(
320                    &mut std::io::BufReader::new(decoder),
321                    &mut decompressed,
322                )
323                .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
324                decompressed
325            }
326            Compression::Bzip2 => {
327                let decoder = BzDecoder::new(buffer.as_slice());
328                let mut decompressed = Vec::new();
329                std::io::Read::read_to_end(
330                    &mut std::io::BufReader::new(decoder),
331                    &mut decompressed,
332                )
333                .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
334                decompressed
335            }
336            Compression::None => buffer,
337        };
338
339        // Write to output and count rows
340        writer.write_all(&data).await?;
341        writer.flush().await?;
342
343        // Count rows by counting newlines
344        for byte in &data {
345            if *byte == b'\n' {
346                row_count += 1;
347            }
348        }
349
350        Ok(row_count)
351    })
352    .await
353}
354
355/// Exports data from an Exasol table or query to an in-memory list of rows.
356///
357/// Each row is represented as a vector of string values.
358///
359/// # Arguments
360///
361/// * `ws_transport` - WebSocket transport for executing SQL
362/// * `source` - The data source (table or query)
363/// * `options` - Export options
364///
365/// # Returns
366///
367/// A vector of rows, where each row is a vector of column values.
368///
369/// # Errors
370///
371/// Returns `ExportError` if the export fails.
372///
373/// # Example
374///
375pub async fn export_to_list<T: TransportProtocol + ?Sized>(
376    ws_transport: &mut T,
377    source: ExportSource,
378    options: CsvExportOptions,
379) -> Result<Vec<Vec<String>>, ExportError> {
380    let separator = options.column_separator;
381    let delimiter = options.column_delimiter;
382    let compression = options.compression;
383
384    export_to_callback(ws_transport, source, options, |mut receiver| async move {
385        let mut buffer = Vec::new();
386
387        // Collect all data
388        while let Some(chunk) = receiver.recv().await {
389            buffer.extend_from_slice(&chunk);
390        }
391
392        // Decompress if needed
393        let data = match compression {
394            Compression::Gzip => {
395                let decoder = GzDecoder::new(buffer.as_slice());
396                let mut decompressed = Vec::new();
397                std::io::Read::read_to_end(
398                    &mut std::io::BufReader::new(decoder),
399                    &mut decompressed,
400                )
401                .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
402                decompressed
403            }
404            Compression::Bzip2 => {
405                let decoder = BzDecoder::new(buffer.as_slice());
406                let mut decompressed = Vec::new();
407                std::io::Read::read_to_end(
408                    &mut std::io::BufReader::new(decoder),
409                    &mut decompressed,
410                )
411                .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
412                decompressed
413            }
414            Compression::None => buffer,
415        };
416
417        // Parse CSV data
418        let csv_string = String::from_utf8(data).map_err(|e| ExportError::CsvParseError {
419            row: 0,
420            message: format!("Invalid UTF-8: {}", e),
421        })?;
422
423        parse_csv(&csv_string, separator, delimiter)
424    })
425    .await
426}
427
428/// Exports data from an Exasol table or query using a custom callback.
429///
430/// This is the most flexible export method, allowing you to process the data
431/// stream however you need.
432///
433/// # Arguments
434///
435/// * `ws_transport` - WebSocket transport for executing SQL
436/// * `source` - The data source (table or query)
437/// * `options` - Export options
438/// * `callback` - A callback function that receives a `DataPipeReceiver` and processes the data
439///
440/// # Returns
441///
442/// The result of the callback function.
443///
444/// # Errors
445///
446/// Returns `ExportError` if the export fails.
447pub async fn export_to_callback<T, F, Fut, R>(
448    ws_transport: &mut T,
449    source: ExportSource,
450    options: CsvExportOptions,
451    callback: F,
452) -> Result<R, ExportError>
453where
454    T: TransportProtocol + ?Sized,
455    F: FnOnce(DataPipeReceiver) -> Fut,
456    Fut: Future<Output = Result<R, ExportError>>,
457{
458    // Connect to Exasol via HTTP transport client (performs handshake automatically)
459    let mut client = HttpTransportClient::connect(&options.host, options.port, options.use_tls)
460        .await
461        .map_err(|e| ExportError::HttpTransportError {
462            message: format!("Failed to connect to Exasol: {e}"),
463        })?;
464
465    // Get internal address and fingerprint from the handshake response
466    let internal_addr = client.internal_address().to_string();
467    let fingerprint = client.public_key_fingerprint().map(|s| s.to_string());
468
469    // Build the EXPORT query
470    let mut query_builder = match source {
471        ExportSource::Table {
472            ref schema,
473            ref name,
474            ref columns,
475        } => {
476            let mut builder = ExportQuery::from_table(name);
477            if let Some(s) = schema {
478                builder = builder.schema(s);
479            }
480            if !columns.is_empty() {
481                builder = builder.columns(columns.iter().map(|s| s.as_str()).collect());
482            }
483            builder
484        }
485        ExportSource::Query { ref sql } => ExportQuery::from_query(sql),
486    };
487
488    // Apply options to the query builder, using internal address from handshake
489    query_builder = query_builder
490        .at_address(&internal_addr)
491        .column_separator(options.column_separator)
492        .column_delimiter(options.column_delimiter)
493        .row_separator(options.row_separator)
494        .encoding(&options.encoding)
495        .with_column_names(options.with_column_names)
496        .compressed(options.compression);
497
498    if let Some(ref null_val) = options.null_value {
499        query_builder = query_builder.null_value(null_val);
500    }
501
502    if let Some(ref fp) = fingerprint {
503        query_builder = query_builder.with_public_key(fp);
504    }
505
506    let export_sql = query_builder.build();
507
508    // Create channel for data transfer
509    let (tx, rx) = mpsc::channel::<Vec<u8>>(DEFAULT_PIPE_BUFFER_SIZE);
510    let receiver = DataPipeReceiver { rx };
511
512    // Spawn task to handle the export request from Exasol
513    // This task:
514    // 1. Waits for HTTP PUT request from Exasol
515    // 2. Reads CSV data from PUT request body (chunked or content-length)
516    // 3. Sends HTTP 200 OK response after receiving all data
517    let http_task =
518        tokio::spawn(async move {
519            // Use handle_export_request() to properly handle the EXPORT protocol
520            let (_request, body) = client.handle_export_request().await.map_err(|e| {
521                ExportError::HttpTransportError {
522                    message: format!("Failed to handle export request: {e}"),
523                }
524            })?;
525
526            // Send all data to receiver
527            if !body.is_empty() && tx.send(body).await.is_err() {
528                return Err(ExportError::ChannelError("Receiver dropped".to_string()));
529            }
530
531            // Shutdown connection gracefully
532            let _ = client.shutdown().await;
533
534            Ok::<(), ExportError>(())
535        });
536
537    // Execute the EXPORT SQL in parallel
538    // This triggers Exasol to send data through the established connection
539    let sql_task = async {
540        ws_transport
541            .execute_query(&export_sql)
542            .await
543            .map_err(|e| ExportError::SqlExecutionError {
544                message: e.to_string(),
545            })
546    };
547
548    // Run callback with the receiver
549    let callback_task = callback(receiver);
550
551    // Use tokio::select to run all tasks concurrently
552    let timeout = tokio::time::Duration::from_millis(options.timeout_ms);
553
554    let result = tokio::time::timeout(timeout, async {
555        // Execute SQL first (this triggers Exasol to send data through our connection)
556        let sql_result = sql_task.await;
557
558        // Then wait for HTTP task and callback
559        let (http_result, callback_result) = tokio::join!(http_task, callback_task);
560
561        // Check for errors
562        sql_result?;
563        http_result.map_err(|e| ExportError::HttpTransportError {
564            message: format!("HTTP task panicked: {}", e),
565        })??;
566
567        callback_result
568    })
569    .await
570    .map_err(|_| ExportError::Timeout {
571        timeout_ms: options.timeout_ms,
572    })?;
573
574    result
575}
576
577/// Parses CSV data into a vector of rows.
578fn parse_csv(
579    data: &str,
580    separator: char,
581    delimiter: char,
582) -> Result<Vec<Vec<String>>, ExportError> {
583    let mut rows = Vec::new();
584    let mut current_row = Vec::new();
585    let mut current_field = String::new();
586    let mut in_quotes = false;
587    let mut row_num = 0;
588
589    let chars: Vec<char> = data.chars().collect();
590    let mut i = 0;
591
592    while i < chars.len() {
593        let c = chars[i];
594
595        if in_quotes {
596            if c == delimiter {
597                // Check for escaped delimiter (two consecutive delimiters)
598                if i + 1 < chars.len() && chars[i + 1] == delimiter {
599                    current_field.push(delimiter);
600                    i += 2;
601                    continue;
602                }
603                // End of quoted field
604                in_quotes = false;
605            } else {
606                current_field.push(c);
607            }
608        } else if c == delimiter {
609            // Start of quoted field
610            in_quotes = true;
611        } else if c == separator {
612            // End of field
613            current_row.push(current_field);
614            current_field = String::new();
615        } else if c == '\n' {
616            // End of row
617            current_row.push(current_field);
618            current_field = String::new();
619            rows.push(current_row);
620            current_row = Vec::new();
621            row_num += 1;
622        } else if c == '\r' {
623            // Handle CRLF - skip the CR, the LF will handle the row end
624            if i + 1 < chars.len() && chars[i + 1] == '\n' {
625                // CRLF, skip CR and let LF handle it
626            } else {
627                // Just CR (old Mac-style)
628                current_row.push(current_field);
629                current_field = String::new();
630                rows.push(current_row);
631                current_row = Vec::new();
632                row_num += 1;
633            }
634        } else {
635            current_field.push(c);
636        }
637        i += 1;
638    }
639
640    // Handle the last field/row if not empty
641    if !current_field.is_empty() || !current_row.is_empty() {
642        current_row.push(current_field);
643        rows.push(current_row);
644    }
645
646    // Check for unclosed quotes
647    if in_quotes {
648        return Err(ExportError::CsvParseError {
649            row: row_num,
650            message: "Unclosed quote at end of data".to_string(),
651        });
652    }
653
654    Ok(rows)
655}
656
657#[cfg(test)]
658mod tests {
659    use super::*;
660
661    // Tests for CsvExportOptions
662
663    #[test]
664    fn test_csv_export_options_default() {
665        let options = CsvExportOptions::default();
666
667        assert_eq!(options.column_separator, ',');
668        assert_eq!(options.column_delimiter, '"');
669        assert_eq!(options.row_separator, RowSeparator::LF);
670        assert_eq!(options.encoding, "UTF-8");
671        assert!(options.null_value.is_none());
672        assert_eq!(options.compression, Compression::None);
673        assert!(!options.with_column_names);
674        assert!(!options.use_tls);
675        assert_eq!(options.timeout_ms, 300_000);
676        assert_eq!(options.host, "");
677        assert_eq!(options.port, 0);
678    }
679
680    #[test]
681    fn test_csv_export_options_builder() {
682        let options = CsvExportOptions::new()
683            .column_separator(';')
684            .column_delimiter('\'')
685            .row_separator(RowSeparator::CRLF)
686            .encoding("ISO-8859-1")
687            .null_value("NULL")
688            .compression(Compression::Gzip)
689            .with_column_names(true)
690            .use_tls(false)
691            .timeout_ms(60_000)
692            .exasol_host("exasol.example.com")
693            .exasol_port(8563);
694
695        assert_eq!(options.column_separator, ';');
696        assert_eq!(options.column_delimiter, '\'');
697        assert_eq!(options.row_separator, RowSeparator::CRLF);
698        assert_eq!(options.encoding, "ISO-8859-1");
699        assert_eq!(options.null_value, Some("NULL".to_string()));
700        assert_eq!(options.compression, Compression::Gzip);
701        assert!(options.with_column_names);
702        assert!(!options.use_tls);
703        assert_eq!(options.timeout_ms, 60_000);
704        assert_eq!(options.host, "exasol.example.com");
705        assert_eq!(options.port, 8563);
706    }
707
708    // Tests for CSV parsing
709
710    #[test]
711    fn test_parse_csv_simple() {
712        let data = "a,b,c\n1,2,3\n";
713        let result = parse_csv(data, ',', '"').unwrap();
714
715        assert_eq!(result.len(), 2);
716        assert_eq!(result[0], vec!["a", "b", "c"]);
717        assert_eq!(result[1], vec!["1", "2", "3"]);
718    }
719
720    #[test]
721    fn test_parse_csv_with_quotes() {
722        let data = "\"hello\",\"world\"\n\"foo\",\"bar\"\n";
723        let result = parse_csv(data, ',', '"').unwrap();
724
725        assert_eq!(result.len(), 2);
726        assert_eq!(result[0], vec!["hello", "world"]);
727        assert_eq!(result[1], vec!["foo", "bar"]);
728    }
729
730    #[test]
731    fn test_parse_csv_with_escaped_quotes() {
732        let data = "\"hello \"\"world\"\"\",normal\n";
733        let result = parse_csv(data, ',', '"').unwrap();
734
735        assert_eq!(result.len(), 1);
736        assert_eq!(result[0], vec!["hello \"world\"", "normal"]);
737    }
738
739    #[test]
740    fn test_parse_csv_with_separator_in_quotes() {
741        let data = "\"a,b,c\",\"d,e\"\n";
742        let result = parse_csv(data, ',', '"').unwrap();
743
744        assert_eq!(result.len(), 1);
745        assert_eq!(result[0], vec!["a,b,c", "d,e"]);
746    }
747
748    #[test]
749    fn test_parse_csv_with_newline_in_quotes() {
750        let data = "\"line1\nline2\",normal\n";
751        let result = parse_csv(data, ',', '"').unwrap();
752
753        assert_eq!(result.len(), 1);
754        assert_eq!(result[0], vec!["line1\nline2", "normal"]);
755    }
756
757    #[test]
758    fn test_parse_csv_with_crlf() {
759        let data = "a,b\r\nc,d\r\n";
760        let result = parse_csv(data, ',', '"').unwrap();
761
762        assert_eq!(result.len(), 2);
763        assert_eq!(result[0], vec!["a", "b"]);
764        assert_eq!(result[1], vec!["c", "d"]);
765    }
766
767    #[test]
768    fn test_parse_csv_with_semicolon_separator() {
769        let data = "a;b;c\n1;2;3\n";
770        let result = parse_csv(data, ';', '"').unwrap();
771
772        assert_eq!(result.len(), 2);
773        assert_eq!(result[0], vec!["a", "b", "c"]);
774        assert_eq!(result[1], vec!["1", "2", "3"]);
775    }
776
777    #[test]
778    fn test_parse_csv_empty_fields() {
779        let data = "a,,c\n,b,\n";
780        let result = parse_csv(data, ',', '"').unwrap();
781
782        assert_eq!(result.len(), 2);
783        assert_eq!(result[0], vec!["a", "", "c"]);
784        assert_eq!(result[1], vec!["", "b", ""]);
785    }
786
787    #[test]
788    fn test_parse_csv_unclosed_quote_error() {
789        let data = "\"unclosed,quote\n";
790        let result = parse_csv(data, ',', '"');
791
792        assert!(result.is_err());
793        if let Err(ExportError::CsvParseError { row, message }) = result {
794            assert_eq!(row, 0);
795            assert!(message.contains("Unclosed quote"));
796        } else {
797            panic!("Expected CsvParseError");
798        }
799    }
800
801    #[test]
802    fn test_parse_csv_empty_data() {
803        let data = "";
804        let result = parse_csv(data, ',', '"').unwrap();
805
806        assert!(result.is_empty());
807    }
808
809    #[test]
810    fn test_parse_csv_single_field() {
811        let data = "single\n";
812        let result = parse_csv(data, ',', '"').unwrap();
813
814        assert_eq!(result.len(), 1);
815        assert_eq!(result[0], vec!["single"]);
816    }
817
818    #[test]
819    fn test_parse_csv_no_trailing_newline() {
820        let data = "a,b,c";
821        let result = parse_csv(data, ',', '"').unwrap();
822
823        assert_eq!(result.len(), 1);
824        assert_eq!(result[0], vec!["a", "b", "c"]);
825    }
826
827    // Tests for ExportError
828
829    #[test]
830    fn test_export_error_display() {
831        let err = ExportError::IoError(io::Error::new(io::ErrorKind::NotFound, "file not found"));
832        assert!(err.to_string().contains("I/O error"));
833
834        let err = ExportError::HttpTransportError {
835            message: "connection refused".to_string(),
836        };
837        assert!(err.to_string().contains("HTTP transport error"));
838        assert!(err.to_string().contains("connection refused"));
839
840        let err = ExportError::CsvParseError {
841            row: 5,
842            message: "invalid data".to_string(),
843        };
844        assert!(err.to_string().contains("row 5"));
845        assert!(err.to_string().contains("invalid data"));
846
847        let err = ExportError::Timeout { timeout_ms: 5000 };
848        assert!(err.to_string().contains("5000ms"));
849    }
850
851    #[test]
852    fn test_export_error_from_io_error() {
853        let io_err = io::Error::new(io::ErrorKind::PermissionDenied, "access denied");
854        let export_err: ExportError = io_err.into();
855
856        assert!(matches!(export_err, ExportError::IoError(_)));
857    }
858
859    // Tests for DataPipeReceiver
860
861    #[tokio::test]
862    async fn test_data_pipe_receiver_recv() {
863        let (tx, rx) = mpsc::channel::<Vec<u8>>(16);
864        let mut receiver = DataPipeReceiver { rx };
865
866        tx.send(vec![1, 2, 3]).await.unwrap();
867        tx.send(vec![4, 5, 6]).await.unwrap();
868        drop(tx);
869
870        let chunk1 = receiver.recv().await;
871        assert_eq!(chunk1, Some(vec![1, 2, 3]));
872
873        let chunk2 = receiver.recv().await;
874        assert_eq!(chunk2, Some(vec![4, 5, 6]));
875
876        let chunk3 = receiver.recv().await;
877        assert!(chunk3.is_none());
878    }
879
880    #[tokio::test]
881    async fn test_data_pipe_receiver_empty() {
882        let (tx, rx) = mpsc::channel::<Vec<u8>>(16);
883        let mut receiver = DataPipeReceiver { rx };
884
885        drop(tx);
886
887        let chunk = receiver.recv().await;
888        assert!(chunk.is_none());
889    }
890
891    // Tests for CSV parsing with column headers
892
893    #[test]
894    fn test_parse_csv_with_column_headers() {
895        let data = "id,name,email\n1,Alice,alice@example.com\n2,Bob,bob@example.com\n";
896        let result = parse_csv(data, ',', '"').unwrap();
897
898        assert_eq!(result.len(), 3);
899        // First row is the header
900        assert_eq!(result[0], vec!["id", "name", "email"]);
901        // Data rows
902        assert_eq!(result[1], vec!["1", "Alice", "alice@example.com"]);
903        assert_eq!(result[2], vec!["2", "Bob", "bob@example.com"]);
904    }
905
906    #[test]
907    fn test_parse_csv_with_quoted_headers() {
908        let data = "\"Column A\",\"Column B\",\"Column C\"\n1,2,3\n";
909        let result = parse_csv(data, ',', '"').unwrap();
910
911        assert_eq!(result.len(), 2);
912        assert_eq!(result[0], vec!["Column A", "Column B", "Column C"]);
913        assert_eq!(result[1], vec!["1", "2", "3"]);
914    }
915
916    // Tests for decompression
917
918    #[test]
919    fn test_gzip_decompression() {
920        use flate2::write::GzEncoder;
921        use std::io::Write;
922
923        // Create gzip compressed data
924        let original = b"hello,world\n1,2\n";
925        let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
926        encoder.write_all(original).unwrap();
927        let compressed = encoder.finish().unwrap();
928
929        // Decompress using our logic
930        let decoder = GzDecoder::new(compressed.as_slice());
931        let mut decompressed = Vec::new();
932        std::io::Read::read_to_end(&mut std::io::BufReader::new(decoder), &mut decompressed)
933            .unwrap();
934
935        assert_eq!(decompressed, original);
936
937        // Parse the decompressed CSV
938        let csv_string = String::from_utf8(decompressed).unwrap();
939        let rows = parse_csv(&csv_string, ',', '"').unwrap();
940        assert_eq!(rows.len(), 2);
941        assert_eq!(rows[0], vec!["hello", "world"]);
942        assert_eq!(rows[1], vec!["1", "2"]);
943    }
944
945    #[test]
946    fn test_bzip2_decompression() {
947        use bzip2::write::BzEncoder;
948        use std::io::Write;
949
950        // Create bzip2 compressed data
951        let original = b"foo,bar\na,b\n";
952        let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::default());
953        encoder.write_all(original).unwrap();
954        let compressed = encoder.finish().unwrap();
955
956        // Decompress using our logic
957        let decoder = BzDecoder::new(compressed.as_slice());
958        let mut decompressed = Vec::new();
959        std::io::Read::read_to_end(&mut std::io::BufReader::new(decoder), &mut decompressed)
960            .unwrap();
961
962        assert_eq!(decompressed, original);
963
964        // Parse the decompressed CSV
965        let csv_string = String::from_utf8(decompressed).unwrap();
966        let rows = parse_csv(&csv_string, ',', '"').unwrap();
967        assert_eq!(rows.len(), 2);
968        assert_eq!(rows[0], vec!["foo", "bar"]);
969        assert_eq!(rows[1], vec!["a", "b"]);
970    }
971
972    // Tests for export options with column names
973
974    #[test]
975    fn test_csv_export_options_with_column_names() {
976        let options = CsvExportOptions::new().with_column_names(true);
977        assert!(options.with_column_names);
978
979        let options = CsvExportOptions::new().with_column_names(false);
980        assert!(!options.with_column_names);
981    }
982
983    // Tests for export options with compression
984
985    #[test]
986    fn test_csv_export_options_compression_gzip() {
987        let options = CsvExportOptions::new().compression(Compression::Gzip);
988        assert_eq!(options.compression, Compression::Gzip);
989    }
990
991    #[test]
992    fn test_csv_export_options_compression_bzip2() {
993        let options = CsvExportOptions::new().compression(Compression::Bzip2);
994        assert_eq!(options.compression, Compression::Bzip2);
995    }
996
997    #[test]
998    fn test_csv_export_options_compression_none() {
999        let options = CsvExportOptions::new().compression(Compression::None);
1000        assert_eq!(options.compression, Compression::None);
1001    }
1002
1003    // Tests for error types
1004
1005    #[test]
1006    fn test_export_error_decompression() {
1007        let err = ExportError::DecompressionError("invalid gzip header".to_string());
1008        assert!(err.to_string().contains("Decompression error"));
1009        assert!(err.to_string().contains("invalid gzip header"));
1010    }
1011
1012    #[test]
1013    fn test_export_error_channel() {
1014        let err = ExportError::ChannelError("receiver dropped".to_string());
1015        assert!(err.to_string().contains("Channel error"));
1016        assert!(err.to_string().contains("receiver dropped"));
1017    }
1018
1019    #[test]
1020    fn test_export_error_cancelled() {
1021        let err = ExportError::Cancelled;
1022        assert!(err.to_string().contains("cancelled"));
1023    }
1024
1025    // Test DataPipeReceiver with multiple chunks
1026
1027    #[tokio::test]
1028    async fn test_data_pipe_receiver_multiple_chunks() {
1029        let (tx, rx) = mpsc::channel::<Vec<u8>>(DEFAULT_PIPE_BUFFER_SIZE);
1030        let mut receiver = DataPipeReceiver { rx };
1031
1032        // Send CSV data in chunks
1033        let chunk1 = b"id,name\n".to_vec();
1034        let chunk2 = b"1,Alice\n".to_vec();
1035        let chunk3 = b"2,Bob\n".to_vec();
1036
1037        tx.send(chunk1.clone()).await.unwrap();
1038        tx.send(chunk2.clone()).await.unwrap();
1039        tx.send(chunk3.clone()).await.unwrap();
1040        drop(tx);
1041
1042        // Receive and concatenate
1043        let mut buffer = Vec::new();
1044        while let Some(chunk) = receiver.recv().await {
1045            buffer.extend_from_slice(&chunk);
1046        }
1047
1048        let csv_string = String::from_utf8(buffer).unwrap();
1049        let rows = parse_csv(&csv_string, ',', '"').unwrap();
1050
1051        assert_eq!(rows.len(), 3);
1052        assert_eq!(rows[0], vec!["id", "name"]);
1053        assert_eq!(rows[1], vec!["1", "Alice"]);
1054        assert_eq!(rows[2], vec!["2", "Bob"]);
1055    }
1056}