Skip to main content

exarrow_rs/import/
csv.rs

1//! CSV import functionality for Exasol.
2//!
3//! This module provides functions for importing CSV data into Exasol tables
4//! using the HTTP transport layer.
5
6use std::future::Future;
7use std::io::Write;
8use std::path::Path;
9
10use bzip2::write::BzEncoder;
11use flate2::write::GzEncoder;
12use tokio::io::{AsyncRead, AsyncReadExt};
13use tokio::sync::mpsc;
14
15use crate::query::import::{Compression, ImportFileEntry, ImportQuery, RowSeparator, TrimMode};
16use crate::transport::HttpTransportClient;
17
18use super::parallel::{stream_files_parallel, ParallelTransportPool};
19use super::source::IntoFileSources;
20use super::ImportError;
21
22/// Default buffer size for reading data (64KB).
23const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
24
25/// Channel buffer size for data pipe.
26const CHANNEL_BUFFER_SIZE: usize = 16;
27
28/// Options for CSV import.
29#[derive(Debug, Clone)]
30pub struct CsvImportOptions {
31    /// Character encoding (default: UTF-8).
32    pub encoding: String,
33
34    /// Column separator character (default: ',').
35    pub column_separator: char,
36
37    /// Column delimiter character for quoting (default: '"').
38    pub column_delimiter: char,
39
40    /// Row separator (default: LF).
41    pub row_separator: RowSeparator,
42
43    /// Number of header rows to skip (default: 0).
44    pub skip_rows: u32,
45
46    /// Custom NULL value representation (default: None, empty string is NULL).
47    pub null_value: Option<String>,
48
49    /// Trim mode for imported values (default: None).
50    pub trim_mode: TrimMode,
51
52    /// Compression type (default: None).
53    pub compression: Compression,
54
55    /// Maximum number of invalid rows before failure (default: None = fail on first error).
56    pub reject_limit: Option<u32>,
57
58    /// Use TLS encryption for HTTP transport (default: true).
59    pub use_tls: bool,
60
61    /// Target schema (optional).
62    pub schema: Option<String>,
63
64    /// Target columns (optional, imports all if not specified).
65    pub columns: Option<Vec<String>>,
66
67    /// Exasol host for HTTP transport connection.
68    /// This is typically the same host as the WebSocket connection.
69    pub host: String,
70
71    /// Exasol port for HTTP transport connection.
72    /// This is typically the same port as the WebSocket connection.
73    pub port: u16,
74}
75
76impl Default for CsvImportOptions {
77    fn default() -> Self {
78        Self {
79            encoding: "UTF-8".to_string(),
80            column_separator: ',',
81            column_delimiter: '"',
82            row_separator: RowSeparator::LF,
83            skip_rows: 0,
84            null_value: None,
85            trim_mode: TrimMode::None,
86            compression: Compression::None,
87            reject_limit: None,
88            use_tls: false,
89            schema: None,
90            columns: None,
91            host: String::new(),
92            port: 0,
93        }
94    }
95}
96
97impl CsvImportOptions {
98    /// Create new import options with default values.
99    #[must_use]
100    pub fn new() -> Self {
101        Self::default()
102    }
103
104    #[must_use]
105    pub fn encoding(mut self, encoding: &str) -> Self {
106        self.encoding = encoding.to_string();
107        self
108    }
109
110    #[must_use]
111    pub fn column_separator(mut self, sep: char) -> Self {
112        self.column_separator = sep;
113        self
114    }
115
116    #[must_use]
117    pub fn column_delimiter(mut self, delim: char) -> Self {
118        self.column_delimiter = delim;
119        self
120    }
121
122    #[must_use]
123    pub fn row_separator(mut self, sep: RowSeparator) -> Self {
124        self.row_separator = sep;
125        self
126    }
127
128    #[must_use]
129    pub fn skip_rows(mut self, rows: u32) -> Self {
130        self.skip_rows = rows;
131        self
132    }
133
134    #[must_use]
135    pub fn null_value(mut self, val: &str) -> Self {
136        self.null_value = Some(val.to_string());
137        self
138    }
139
140    #[must_use]
141    pub fn trim_mode(mut self, mode: TrimMode) -> Self {
142        self.trim_mode = mode;
143        self
144    }
145
146    #[must_use]
147    pub fn compression(mut self, compression: Compression) -> Self {
148        self.compression = compression;
149        self
150    }
151
152    #[must_use]
153    pub fn reject_limit(mut self, limit: u32) -> Self {
154        self.reject_limit = Some(limit);
155        self
156    }
157
158    #[must_use]
159    pub fn use_tls(mut self, use_tls: bool) -> Self {
160        self.use_tls = use_tls;
161        self
162    }
163
164    #[must_use]
165    pub fn schema(mut self, schema: &str) -> Self {
166        self.schema = Some(schema.to_string());
167        self
168    }
169
170    #[must_use]
171    pub fn columns(mut self, columns: Vec<String>) -> Self {
172        self.columns = Some(columns);
173        self
174    }
175
176    /// Set the Exasol host for HTTP transport connection.
177    ///
178    /// This should be the same host as used for the WebSocket connection.
179    ///
180    /// # Example
181    ///
182    #[must_use]
183    pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
184        self.host = host.into();
185        self
186    }
187
188    /// Set the Exasol port for HTTP transport connection.
189    ///
190    /// This should be the same port as used for the WebSocket connection.
191    #[must_use]
192    pub fn exasol_port(mut self, port: u16) -> Self {
193        self.port = port;
194        self
195    }
196
197    /// Build an ImportQuery from these options.
198    fn build_query(&self, table: &str, address: &str, public_key: Option<&str>) -> ImportQuery {
199        let mut query = ImportQuery::new(table).at_address(address);
200
201        if let Some(ref schema) = self.schema {
202            query = query.schema(schema);
203        }
204
205        if let Some(ref columns) = self.columns {
206            let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
207            query = query.columns(cols);
208        }
209
210        if let Some(pk) = public_key {
211            query = query.with_public_key(pk);
212        }
213
214        query = query
215            .encoding(&self.encoding)
216            .column_separator(self.column_separator)
217            .column_delimiter(self.column_delimiter)
218            .row_separator(self.row_separator)
219            .skip(self.skip_rows)
220            .trim(self.trim_mode)
221            .compressed(self.compression);
222
223        if let Some(ref null_val) = self.null_value {
224            query = query.null_value(null_val);
225        }
226
227        if let Some(limit) = self.reject_limit {
228            query = query.reject_limit(limit);
229        }
230
231        query
232    }
233}
234
235/// Sender for streaming data to the HTTP transport.
236///
237/// This is used with the callback-based import to send data chunks.
238pub struct DataPipeSender {
239    tx: mpsc::Sender<Vec<u8>>,
240}
241
242impl DataPipeSender {
243    /// Create a new data pipe sender.
244    fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
245        Self { tx }
246    }
247
248    /// Send a chunk of data.
249    ///
250    /// # Arguments
251    ///
252    /// * `data` - The data to send.
253    ///
254    /// # Errors
255    ///
256    /// Returns `ImportError::ChannelError` if the channel is closed.
257    pub async fn send(&self, data: Vec<u8>) -> Result<(), ImportError> {
258        self.tx
259            .send(data)
260            .await
261            .map_err(|e| ImportError::ChannelError(format!("Failed to send data: {e}")))
262    }
263
264    /// Send a CSV row as formatted data.
265    ///
266    /// # Arguments
267    ///
268    /// * `row` - Iterator of field values.
269    /// * `separator` - Column separator character.
270    /// * `delimiter` - Column delimiter character.
271    /// * `row_separator` - Row separator to append.
272    ///
273    /// # Errors
274    ///
275    /// Returns `ImportError::ChannelError` if the channel is closed.
276    pub async fn send_row<I, T>(
277        &self,
278        row: I,
279        separator: char,
280        delimiter: char,
281        row_separator: &RowSeparator,
282    ) -> Result<(), ImportError>
283    where
284        I: IntoIterator<Item = T>,
285        T: AsRef<str>,
286    {
287        let formatted = format_csv_row(row, separator, delimiter, row_separator);
288        self.send(formatted.into_bytes()).await
289    }
290}
291
292/// Format a row of values as a CSV line.
293fn format_csv_row<I, T>(
294    row: I,
295    separator: char,
296    delimiter: char,
297    row_separator: &RowSeparator,
298) -> String
299where
300    I: IntoIterator<Item = T>,
301    T: AsRef<str>,
302{
303    let mut line = String::new();
304    let mut first = true;
305
306    for field in row {
307        if !first {
308            line.push(separator);
309        }
310        first = false;
311
312        let value = field.as_ref();
313        // Check if the value needs quoting
314        let needs_quoting = value.contains(separator)
315            || value.contains(delimiter)
316            || value.contains('\n')
317            || value.contains('\r');
318
319        if needs_quoting {
320            line.push(delimiter);
321            // Escape delimiter characters by doubling them
322            for ch in value.chars() {
323                if ch == delimiter {
324                    line.push(delimiter);
325                }
326                line.push(ch);
327            }
328            line.push(delimiter);
329        } else {
330            line.push_str(value);
331        }
332    }
333
334    // Add row separator
335    match row_separator {
336        RowSeparator::LF => line.push('\n'),
337        RowSeparator::CR => line.push('\r'),
338        RowSeparator::CRLF => {
339            line.push('\r');
340            line.push('\n');
341        }
342    }
343
344    line
345}
346
347/// Detects compression type from file extension if not explicitly set.
348///
349/// # Arguments
350///
351/// * `file_path` - Path to the file
352/// * `explicit_compression` - Explicitly set compression type
353///
354/// # Returns
355///
356/// The detected or explicit compression type.
357fn detect_compression(file_path: &Path, explicit_compression: Compression) -> Compression {
358    if explicit_compression != Compression::None {
359        return explicit_compression;
360    }
361
362    let path_str = file_path.to_string_lossy().to_lowercase();
363
364    if path_str.ends_with(".gz") || path_str.ends_with(".gzip") {
365        Compression::Gzip
366    } else if path_str.ends_with(".bz2") || path_str.ends_with(".bzip2") {
367        Compression::Bzip2
368    } else {
369        Compression::None
370    }
371}
372
373/// Checks if a file is already compressed based on extension.
374///
375/// # Arguments
376///
377/// * `file_path` - Path to the file
378///
379/// # Returns
380///
381/// `true` if the file appears to be compressed.
382fn is_compressed_file(file_path: &Path) -> bool {
383    let path_str = file_path.to_string_lossy().to_lowercase();
384    path_str.ends_with(".gz")
385        || path_str.ends_with(".gzip")
386        || path_str.ends_with(".bz2")
387        || path_str.ends_with(".bzip2")
388}
389
390/// Import CSV data from a file path.
391///
392/// This function reads CSV data from the specified file and imports it into
393/// the target table using the HTTP transport layer.
394///
395/// # Arguments
396///
397/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
398/// * `table` - Name of the target table.
399/// * `file_path` - Path to the CSV file.
400/// * `options` - Import options.
401///
402/// # Returns
403///
404/// The number of rows imported on success.
405///
406/// # Errors
407///
408/// Returns `ImportError` if the import fails.
409///
410/// # Example
411///
412pub async fn import_from_file<F, Fut>(
413    execute_sql: F,
414    table: &str,
415    file_path: &Path,
416    options: CsvImportOptions,
417) -> Result<u64, ImportError>
418where
419    F: FnOnce(String) -> Fut,
420    Fut: Future<Output = Result<u64, String>>,
421{
422    // Detect compression from file extension if not explicitly set
423    let compression = detect_compression(file_path, options.compression);
424    let options = CsvImportOptions {
425        compression,
426        ..options
427    };
428
429    // Check if file is already compressed before moving into async closure
430    let file_is_compressed = is_compressed_file(file_path);
431
432    // Read the file synchronously
433    let data = std::fs::read(file_path)?;
434
435    import_csv_internal(
436        execute_sql,
437        table,
438        options,
439        move |mut client, compression| {
440            Box::pin(async move {
441                // Apply compression if needed (unless file is already compressed)
442                let compressed_data = if file_is_compressed {
443                    // File is already compressed, send as-is
444                    data
445                } else {
446                    compress_data(&data, compression)?
447                };
448
449                // Wait for HTTP GET from Exasol and send response with chunked encoding
450                send_import_response(&mut client, &compressed_data).await?;
451
452                Ok(())
453            })
454        },
455    )
456    .await
457}
458
459/// Import multiple CSV files in parallel.
460///
461/// This function reads CSV data from multiple files and imports them into
462/// the target table using parallel HTTP transport connections. Each file
463/// gets its own connection with a unique internal address from the EXA
464/// tunneling handshake.
465///
466/// For a single file, this function delegates to `import_from_file` for
467/// optimal single-file performance.
468///
469/// # Arguments
470///
471/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
472/// * `table` - Name of the target table.
473/// * `file_paths` - File paths (accepts single path, Vec, array, or slice).
474/// * `options` - Import options.
475///
476/// # Returns
477///
478/// The number of rows imported on success.
479///
480/// # Errors
481///
482/// Returns `ImportError` if the import fails. Uses fail-fast semantics.
483///
484/// # Example
485///
486/// ```no_run
487/// use exarrow_rs::import::csv::{import_from_files, CsvImportOptions};
488/// use std::path::PathBuf;
489///
490/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
491/// let files = vec![
492///     PathBuf::from("/data/part1.csv"),
493///     PathBuf::from("/data/part2.csv"),
494///     PathBuf::from("/data/part3.csv"),
495/// ];
496///
497/// let options = CsvImportOptions::default()
498///     .exasol_host("localhost")
499///     .exasol_port(8563);
500///
501/// // let rows = import_from_files(execute_sql, "my_table", files, options).await?;
502/// # Ok(())
503/// # }
504/// ```
505pub async fn import_from_files<F, Fut, S>(
506    execute_sql: F,
507    table: &str,
508    file_paths: S,
509    options: CsvImportOptions,
510) -> Result<u64, ImportError>
511where
512    F: FnOnce(String) -> Fut,
513    Fut: Future<Output = Result<u64, String>>,
514    S: IntoFileSources,
515{
516    let paths = file_paths.into_sources();
517
518    // Delegate to single-file implementation for one file
519    if paths.len() == 1 {
520        return import_from_file(execute_sql, table, &paths[0], options).await;
521    }
522
523    if paths.is_empty() {
524        return Err(ImportError::InvalidConfig(
525            "No files provided for import".to_string(),
526        ));
527    }
528
529    // Read all files and detect compression
530    let compression = options.compression;
531    let mut file_data_vec: Vec<Vec<u8>> = Vec::with_capacity(paths.len());
532
533    for path in &paths {
534        let detected_compression = detect_compression(path, compression);
535        let file_is_compressed = is_compressed_file(path);
536
537        // Read file
538        let data = std::fs::read(path)?;
539
540        // Apply compression if needed
541        let compressed_data = if file_is_compressed {
542            data
543        } else {
544            compress_data(&data, detected_compression)?
545        };
546
547        file_data_vec.push(compressed_data);
548    }
549
550    // Establish parallel connections
551    let pool =
552        ParallelTransportPool::connect(&options.host, options.port, options.use_tls, paths.len())
553            .await?;
554
555    // Build multi-file IMPORT SQL
556    let entries: Vec<ImportFileEntry> = pool
557        .file_entries()
558        .iter()
559        .map(|e| ImportFileEntry::new(e.address.clone(), e.file_name.clone(), e.public_key.clone()))
560        .collect();
561
562    let query = build_multi_file_query(table, &options, entries);
563    let sql = query.build();
564
565    // Get connections for streaming
566    let connections = pool.into_connections();
567
568    // Spawn parallel streaming task
569    let stream_handle = tokio::spawn(async move {
570        stream_files_parallel(connections, file_data_vec, compression).await
571    });
572
573    // Execute the IMPORT SQL in parallel
574    let sql_result = execute_sql(sql).await;
575
576    // Wait for streaming to complete
577    let stream_result = stream_handle.await;
578
579    // Handle results - check stream task first
580    match stream_result {
581        Ok(Ok(())) => {}
582        Ok(Err(e)) => return Err(e),
583        Err(e) => {
584            return Err(ImportError::StreamError(format!(
585                "Stream task panicked: {e}"
586            )))
587        }
588    }
589
590    // Return the row count from SQL execution
591    sql_result.map_err(ImportError::SqlError)
592}
593
594/// Build an ImportQuery for multi-file import.
595fn build_multi_file_query(
596    table: &str,
597    options: &CsvImportOptions,
598    entries: Vec<ImportFileEntry>,
599) -> ImportQuery {
600    let mut query = ImportQuery::new(table).with_files(entries);
601
602    if let Some(ref schema) = options.schema {
603        query = query.schema(schema);
604    }
605
606    if let Some(ref columns) = options.columns {
607        let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
608        query = query.columns(cols);
609    }
610
611    query = query
612        .encoding(&options.encoding)
613        .column_separator(options.column_separator)
614        .column_delimiter(options.column_delimiter)
615        .row_separator(options.row_separator)
616        .skip(options.skip_rows)
617        .trim(options.trim_mode)
618        .compressed(options.compression);
619
620    if let Some(ref null_val) = options.null_value {
621        query = query.null_value(null_val);
622    }
623
624    if let Some(limit) = options.reject_limit {
625        query = query.reject_limit(limit);
626    }
627
628    query
629}
630
631/// Import CSV data from an async reader (stream).
632///
633/// This function reads CSV data from an async reader and imports it into
634/// the target table using the HTTP transport layer.
635///
636/// # Arguments
637///
638/// * `execute_sql` - Function to execute SQL statements.
639/// * `table` - Name of the target table.
640/// * `reader` - Async reader providing CSV data.
641/// * `options` - Import options.
642///
643/// # Returns
644///
645/// The number of rows imported on success.
646///
647/// # Errors
648///
649/// Returns `ImportError` if the import fails.
650pub async fn import_from_stream<R, F, Fut>(
651    execute_sql: F,
652    table: &str,
653    reader: R,
654    options: CsvImportOptions,
655) -> Result<u64, ImportError>
656where
657    R: AsyncRead + Unpin + Send + 'static,
658    F: FnOnce(String) -> Fut,
659    Fut: Future<Output = Result<u64, String>>,
660{
661    import_csv_internal(execute_sql, table, options, |mut client, compression| {
662        Box::pin(async move { stream_reader_to_connection(reader, &mut client, compression).await })
663    })
664    .await
665}
666
667/// Import CSV data from an iterator of rows.
668///
669/// This function converts iterator rows to CSV format and imports them into
670/// the target table using the HTTP transport layer.
671///
672/// # Arguments
673///
674/// * `execute_sql` - Function to execute SQL statements.
675/// * `table` - Name of the target table.
676/// * `rows` - Iterator of rows, where each row is an iterator of field values.
677/// * `options` - Import options.
678///
679/// # Returns
680///
681/// The number of rows imported on success.
682///
683/// # Errors
684///
685/// Returns `ImportError` if the import fails.
686///
687/// # Example
688///
689pub async fn import_from_iter<I, T, S, F, Fut>(
690    execute_sql: F,
691    table: &str,
692    rows: I,
693    options: CsvImportOptions,
694) -> Result<u64, ImportError>
695where
696    I: IntoIterator<Item = T> + Send + 'static,
697    T: IntoIterator<Item = S> + Send,
698    S: AsRef<str>,
699    F: FnOnce(String) -> Fut,
700    Fut: Future<Output = Result<u64, String>>,
701{
702    let separator = options.column_separator;
703    let delimiter = options.column_delimiter;
704    let row_sep = options.row_separator;
705
706    import_csv_internal(
707        execute_sql,
708        table,
709        options,
710        move |mut client, compression| {
711            Box::pin(async move {
712                // Format all rows as CSV
713                let mut data = Vec::new();
714
715                for row in rows {
716                    let formatted = format_csv_row(row, separator, delimiter, &row_sep);
717                    data.extend_from_slice(formatted.as_bytes());
718                }
719
720                // Apply compression if needed
721                let compressed_data = compress_data(&data, compression)?;
722
723                // Wait for HTTP GET from Exasol and send response with chunked encoding
724                send_import_response(&mut client, &compressed_data).await?;
725
726                Ok(())
727            })
728        },
729    )
730    .await
731}
732
733/// Import CSV data using a callback function.
734///
735/// This function allows custom data generation through a callback that receives
736/// a `DataPipeSender` for streaming data to the import.
737///
738/// # Arguments
739///
740/// * `execute_sql` - Function to execute SQL statements.
741/// * `table` - Name of the target table.
742/// * `callback` - Callback function that generates data.
743/// * `options` - Import options.
744///
745/// # Returns
746///
747/// The number of rows imported on success.
748///
749/// # Errors
750///
751/// Returns `ImportError` if the import fails.
752///
753/// # Example
754///
755pub async fn import_from_callback<F, Fut, C, CFut>(
756    execute_sql: F,
757    table: &str,
758    callback: C,
759    options: CsvImportOptions,
760) -> Result<u64, ImportError>
761where
762    F: FnOnce(String) -> Fut,
763    Fut: Future<Output = Result<u64, String>>,
764    C: FnOnce(DataPipeSender) -> CFut + Send + 'static,
765    CFut: Future<Output = Result<(), ImportError>> + Send,
766{
767    import_csv_internal(execute_sql, table, options, |mut client, compression| {
768        Box::pin(async move {
769            // Create a channel for data streaming
770            let (tx, mut rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
771            let sender = DataPipeSender::new(tx);
772
773            // Spawn the callback task
774            let callback_handle = tokio::spawn(async move { callback(sender).await });
775
776            // Collect data from the channel
777            let mut all_data = Vec::new();
778            while let Some(chunk) = rx.recv().await {
779                all_data.extend_from_slice(&chunk);
780            }
781
782            // Wait for callback to complete
783            callback_handle
784                .await
785                .map_err(|e| ImportError::StreamError(format!("Callback task panicked: {e}")))?
786                .map_err(|e| ImportError::StreamError(format!("Callback error: {e}")))?;
787
788            // Apply compression if needed
789            let compressed_data = compress_data(&all_data, compression)?;
790
791            // Wait for HTTP GET from Exasol and send response with chunked encoding
792            send_import_response(&mut client, &compressed_data).await?;
793
794            Ok(())
795        })
796    })
797    .await
798}
799
800async fn import_csv_internal<F, Fut, S, SFut>(
801    execute_sql: F,
802    table: &str,
803    options: CsvImportOptions,
804    stream_fn: S,
805) -> Result<u64, ImportError>
806where
807    F: FnOnce(String) -> Fut,
808    Fut: Future<Output = Result<u64, String>>,
809    S: FnOnce(HttpTransportClient, Compression) -> SFut + Send + 'static,
810    SFut: Future<Output = Result<(), ImportError>> + Send,
811{
812    // Connect to Exasol via HTTP transport client (performs handshake automatically)
813    let client = HttpTransportClient::connect(&options.host, options.port, options.use_tls)
814        .await
815        .map_err(|e| {
816            ImportError::HttpTransportError(format!("Failed to connect to Exasol: {e}"))
817        })?;
818
819    // Get internal address from the handshake response
820    let internal_addr = client.internal_address().to_string();
821    let public_key = client.public_key_fingerprint().map(String::from);
822
823    // Build the IMPORT SQL statement using internal address
824    let query = options.build_query(table, &internal_addr, public_key.as_deref());
825    let sql = query.build();
826
827    let compression = options.compression;
828
829    // Create the stream future — runs cooperatively on the same task via select!
830    // (no tokio::spawn needed, avoiding worker thread dependencies that cause
831    // deadlocks under block_on or constrained environments)
832    let stream_future = stream_fn(client, compression);
833    tokio::pin!(stream_future);
834
835    // Pin the SQL future so it can be polled in select! and awaited later
836    let sql_future = execute_sql(sql);
837    tokio::pin!(sql_future);
838
839    // Run SQL execution and data streaming concurrently on the same task.
840    // The stream normally completes before SQL for small batches — that's expected.
841    // If the stream errors, abort immediately without waiting for SQL.
842    let (sql_result, stream_done) = tokio::select! {
843        result = &mut sql_future => {
844            (result, false)
845        },
846        stream_result = &mut stream_future => {
847            match stream_result {
848                Err(e) => return Err(e),
849                Ok(()) => {
850                    (sql_future.await, true)
851                },
852            }
853        }
854    };
855
856    // SQL completed — if it failed, drop stream_future (cancels it)
857    let row_count = sql_result.map_err(ImportError::SqlError)?;
858
859    // SQL succeeded — wait for stream to finish (unless it already completed)
860    if !stream_done {
861        stream_future.await?;
862    }
863
864    Ok(row_count)
865}
866
867/// Stream data from an async reader to the HTTP transport client.
868///
869/// This function:
870/// 1. Waits for HTTP GET request from Exasol
871/// 2. Reads all data from the reader
872/// 3. Applies compression if configured
873/// 4. Sends HTTP response with chunked encoding
874async fn stream_reader_to_connection<R>(
875    mut reader: R,
876    client: &mut HttpTransportClient,
877    compression: Compression,
878) -> Result<(), ImportError>
879where
880    R: AsyncRead + Unpin,
881{
882    // Wait for HTTP GET request from Exasol before sending any data
883    client.handle_import_request().await.map_err(|e| {
884        ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
885    })?;
886
887    // Read all data from the reader
888    let mut data = Vec::new();
889    reader.read_to_end(&mut data).await?;
890
891    // Apply compression if needed
892    let compressed_data = compress_data(&data, compression)?;
893
894    // Write data using chunked encoding
895    write_chunked_data(client, &compressed_data).await
896}
897
898/// Write data using HTTP chunked transfer encoding.
899///
900/// The HTTP response headers (with chunked encoding) should already be sent
901/// by `handle_import_request()`. This function writes the data chunks and
902/// the final empty chunk.
903async fn write_chunked_data(
904    client: &mut HttpTransportClient,
905    data: &[u8],
906) -> Result<(), ImportError> {
907    // Write data in chunks using HTTP chunked transfer encoding
908    for chunk in data.chunks(DEFAULT_BUFFER_SIZE) {
909        client
910            .write_chunked_body(chunk)
911            .await
912            .map_err(ImportError::TransportError)?;
913    }
914
915    // Send final empty chunk to signal end of transfer
916    client
917        .write_final_chunk()
918        .await
919        .map_err(ImportError::TransportError)?;
920
921    Ok(())
922}
923
924/// Send HTTP response for import with the given data.
925///
926/// This function:
927/// 1. Waits for HTTP GET request from Exasol
928/// 2. Sends HTTP response with chunked encoding
929/// 3. Writes the data in chunks
930/// 4. Sends the final empty chunk
931async fn send_import_response(
932    client: &mut HttpTransportClient,
933    data: &[u8],
934) -> Result<(), ImportError> {
935    // Wait for HTTP GET request from Exasol
936    client.handle_import_request().await.map_err(|e| {
937        ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
938    })?;
939
940    // Write data using chunked encoding
941    write_chunked_data(client, data).await
942}
943
944/// Compress data using the specified compression type.
945fn compress_data(data: &[u8], compression: Compression) -> Result<Vec<u8>, ImportError> {
946    match compression {
947        Compression::None => Ok(data.to_vec()),
948        Compression::Gzip => {
949            let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
950            encoder.write_all(data).map_err(|e| {
951                ImportError::CompressionError(format!("Gzip compression failed: {e}"))
952            })?;
953            encoder.finish().map_err(|e| {
954                ImportError::CompressionError(format!("Gzip finalization failed: {e}"))
955            })
956        }
957        Compression::Bzip2 => {
958            let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::default());
959            encoder.write_all(data).map_err(|e| {
960                ImportError::CompressionError(format!("Bzip2 compression failed: {e}"))
961            })?;
962            encoder.finish().map_err(|e| {
963                ImportError::CompressionError(format!("Bzip2 finalization failed: {e}"))
964            })
965        }
966    }
967}
968
969#[cfg(test)]
970mod tests {
971    use super::*;
972
973    // Test CsvImportOptions defaults
974    #[test]
975    fn test_csv_import_options_default() {
976        let opts = CsvImportOptions::default();
977
978        assert_eq!(opts.encoding, "UTF-8");
979        assert_eq!(opts.column_separator, ',');
980        assert_eq!(opts.column_delimiter, '"');
981        assert_eq!(opts.row_separator, RowSeparator::LF);
982        assert_eq!(opts.skip_rows, 0);
983        assert!(opts.null_value.is_none());
984        assert_eq!(opts.trim_mode, TrimMode::None);
985        assert_eq!(opts.compression, Compression::None);
986        assert!(opts.reject_limit.is_none());
987        assert!(!opts.use_tls);
988        assert!(opts.schema.is_none());
989        assert!(opts.columns.is_none());
990        assert_eq!(opts.host, "");
991        assert_eq!(opts.port, 0);
992    }
993
994    // Test CsvImportOptions builder methods
995    #[test]
996    fn test_csv_import_options_builder() {
997        let opts = CsvImportOptions::new()
998            .encoding("ISO-8859-1")
999            .column_separator(';')
1000            .column_delimiter('\'')
1001            .row_separator(RowSeparator::CRLF)
1002            .skip_rows(1)
1003            .null_value("NULL")
1004            .trim_mode(TrimMode::Trim)
1005            .compression(Compression::Gzip)
1006            .reject_limit(100)
1007            .use_tls(false)
1008            .schema("my_schema")
1009            .columns(vec!["col1".to_string(), "col2".to_string()])
1010            .exasol_host("exasol.example.com")
1011            .exasol_port(8563);
1012
1013        assert_eq!(opts.encoding, "ISO-8859-1");
1014        assert_eq!(opts.column_separator, ';');
1015        assert_eq!(opts.column_delimiter, '\'');
1016        assert_eq!(opts.row_separator, RowSeparator::CRLF);
1017        assert_eq!(opts.skip_rows, 1);
1018        assert_eq!(opts.null_value, Some("NULL".to_string()));
1019        assert_eq!(opts.trim_mode, TrimMode::Trim);
1020        assert_eq!(opts.compression, Compression::Gzip);
1021        assert_eq!(opts.reject_limit, Some(100));
1022        assert!(!opts.use_tls);
1023        assert_eq!(opts.schema, Some("my_schema".to_string()));
1024        assert_eq!(
1025            opts.columns,
1026            Some(vec!["col1".to_string(), "col2".to_string()])
1027        );
1028        assert_eq!(opts.host, "exasol.example.com");
1029        assert_eq!(opts.port, 8563);
1030    }
1031
1032    // Test ImportQuery building from options
1033    #[test]
1034    fn test_build_query_basic() {
1035        let opts = CsvImportOptions::default();
1036        let query = opts.build_query("my_table", "127.0.0.1:8080", None);
1037        let sql = query.build();
1038
1039        assert!(sql.contains("IMPORT INTO my_table"));
1040        assert!(sql.contains("FROM CSV AT 'http://127.0.0.1:8080'"));
1041        assert!(sql.contains("ENCODING = 'UTF-8'"));
1042        assert!(sql.contains("COLUMN SEPARATOR = ','"));
1043    }
1044
1045    #[test]
1046    fn test_build_query_with_schema_and_columns() {
1047        let opts = CsvImportOptions::default()
1048            .schema("test_schema")
1049            .columns(vec!["id".to_string(), "name".to_string()]);
1050
1051        let query = opts.build_query("users", "127.0.0.1:8080", None);
1052        let sql = query.build();
1053
1054        assert!(sql.contains("IMPORT INTO test_schema.users"));
1055        assert!(sql.contains("(id, name)"));
1056    }
1057
1058    #[test]
1059    fn test_build_query_with_tls() {
1060        let opts = CsvImportOptions::default();
1061        let fingerprint = "ABC123DEF456";
1062        let query = opts.build_query("my_table", "127.0.0.1:8080", Some(fingerprint));
1063        let sql = query.build();
1064
1065        assert!(sql.contains("FROM CSV AT 'https://127.0.0.1:8080'"));
1066        assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
1067    }
1068
1069    #[test]
1070    fn test_build_query_with_all_options() {
1071        let opts = CsvImportOptions::default()
1072            .encoding("ISO-8859-1")
1073            .column_separator(';')
1074            .column_delimiter('\'')
1075            .row_separator(RowSeparator::CRLF)
1076            .skip_rows(2)
1077            .null_value("\\N")
1078            .trim_mode(TrimMode::LTrim)
1079            .compression(Compression::Bzip2)
1080            .reject_limit(50);
1081
1082        let query = opts.build_query("data", "127.0.0.1:8080", None);
1083        let sql = query.build();
1084
1085        assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
1086        assert!(sql.contains("COLUMN SEPARATOR = ';'"));
1087        assert!(sql.contains("COLUMN DELIMITER = '''"));
1088        assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
1089        assert!(sql.contains("SKIP = 2"));
1090        assert!(sql.contains("NULL = '\\N'"));
1091        assert!(sql.contains("TRIM = 'LTRIM'"));
1092        assert!(sql.contains("FILE '001.csv.bz2'"));
1093        assert!(sql.contains("REJECT LIMIT 50"));
1094    }
1095
1096    // Test CSV row formatting
1097    #[test]
1098    fn test_format_csv_row_basic() {
1099        let row = vec!["a", "b", "c"];
1100        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1101        assert_eq!(formatted, "a,b,c\n");
1102    }
1103
1104    #[test]
1105    fn test_format_csv_row_with_different_separator() {
1106        let row = vec!["a", "b", "c"];
1107        let formatted = format_csv_row(row, ';', '"', &RowSeparator::LF);
1108        assert_eq!(formatted, "a;b;c\n");
1109    }
1110
1111    #[test]
1112    fn test_format_csv_row_with_crlf() {
1113        let row = vec!["a", "b", "c"];
1114        let formatted = format_csv_row(row, ',', '"', &RowSeparator::CRLF);
1115        assert_eq!(formatted, "a,b,c\r\n");
1116    }
1117
1118    #[test]
1119    fn test_format_csv_row_with_cr() {
1120        let row = vec!["a", "b"];
1121        let formatted = format_csv_row(row, ',', '"', &RowSeparator::CR);
1122        assert_eq!(formatted, "a,b\r");
1123    }
1124
1125    #[test]
1126    fn test_format_csv_row_needs_quoting_separator() {
1127        let row = vec!["a,b", "c"];
1128        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1129        assert_eq!(formatted, "\"a,b\",c\n");
1130    }
1131
1132    #[test]
1133    fn test_format_csv_row_needs_quoting_newline() {
1134        let row = vec!["a\nb", "c"];
1135        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1136        assert_eq!(formatted, "\"a\nb\",c\n");
1137    }
1138
1139    #[test]
1140    fn test_format_csv_row_needs_quoting_delimiter() {
1141        let row = vec!["a\"b", "c"];
1142        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1143        // Delimiter should be escaped by doubling
1144        assert_eq!(formatted, "\"a\"\"b\",c\n");
1145    }
1146
1147    #[test]
1148    fn test_format_csv_row_empty_fields() {
1149        let row = vec!["", "b", ""];
1150        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1151        assert_eq!(formatted, ",b,\n");
1152    }
1153
1154    #[test]
1155    fn test_format_csv_row_single_field() {
1156        let row = vec!["only"];
1157        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1158        assert_eq!(formatted, "only\n");
1159    }
1160
1161    // Test compression
1162    #[test]
1163    fn test_compress_data_none() {
1164        let data = b"test data";
1165        let result = compress_data(data, Compression::None).unwrap();
1166        assert_eq!(result, data);
1167    }
1168
1169    #[test]
1170    fn test_compress_data_gzip() {
1171        let data = b"test data for gzip compression";
1172        let result = compress_data(data, Compression::Gzip).unwrap();
1173
1174        // Verify it's valid gzip (starts with gzip magic bytes)
1175        assert!(result.len() >= 2);
1176        assert_eq!(result[0], 0x1f);
1177        assert_eq!(result[1], 0x8b);
1178    }
1179
1180    #[test]
1181    fn test_compress_data_bzip2() {
1182        let data = b"test data for bzip2 compression";
1183        let result = compress_data(data, Compression::Bzip2).unwrap();
1184
1185        // Verify it's valid bzip2 (starts with "BZ" magic)
1186        assert!(result.len() >= 2);
1187        assert_eq!(result[0], b'B');
1188        assert_eq!(result[1], b'Z');
1189    }
1190
1191    // Test error types
1192    #[test]
1193    fn test_import_error_display() {
1194        let err = ImportError::IoError(std::io::Error::new(
1195            std::io::ErrorKind::NotFound,
1196            "file not found",
1197        ));
1198        assert!(err.to_string().contains("IO error"));
1199
1200        let err = ImportError::SqlError("syntax error".to_string());
1201        assert!(err.to_string().contains("SQL execution failed"));
1202
1203        let err = ImportError::HttpTransportError("connection refused".to_string());
1204        assert!(err.to_string().contains("HTTP transport failed"));
1205
1206        let err = ImportError::CompressionError("invalid data".to_string());
1207        assert!(err.to_string().contains("Compression error"));
1208    }
1209
1210    // Test DataPipeSender
1211    #[tokio::test]
1212    async fn test_data_pipe_sender_send() {
1213        let (tx, mut rx) = mpsc::channel(10);
1214        let sender = DataPipeSender::new(tx);
1215
1216        sender.send(b"test data".to_vec()).await.unwrap();
1217
1218        let received = rx.recv().await.unwrap();
1219        assert_eq!(received, b"test data");
1220    }
1221
1222    #[tokio::test]
1223    async fn test_data_pipe_sender_send_row() {
1224        let (tx, mut rx) = mpsc::channel(10);
1225        let sender = DataPipeSender::new(tx);
1226
1227        sender
1228            .send_row(vec!["a", "b", "c"], ',', '"', &RowSeparator::LF)
1229            .await
1230            .unwrap();
1231
1232        let received = rx.recv().await.unwrap();
1233        assert_eq!(received, b"a,b,c\n");
1234    }
1235
1236    #[tokio::test]
1237    async fn test_data_pipe_sender_closed_channel() {
1238        let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
1239        let sender = DataPipeSender::new(tx);
1240
1241        // Drop receiver to close channel
1242        drop(rx);
1243
1244        let result = sender.send(b"test".to_vec()).await;
1245        assert!(result.is_err());
1246        assert!(matches!(result.unwrap_err(), ImportError::ChannelError(_)));
1247    }
1248
1249    // Test building SQL with compression
1250    #[test]
1251    fn test_build_query_compression_gzip() {
1252        let opts = CsvImportOptions::default().compression(Compression::Gzip);
1253        let query = opts.build_query("table", "127.0.0.1:8080", None);
1254        let sql = query.build();
1255
1256        assert!(sql.contains("FILE '001.csv.gz'"));
1257    }
1258
1259    #[test]
1260    fn test_build_query_compression_bzip2() {
1261        let opts = CsvImportOptions::default().compression(Compression::Bzip2);
1262        let query = opts.build_query("table", "127.0.0.1:8080", None);
1263        let sql = query.build();
1264
1265        assert!(sql.contains("FILE '001.csv.bz2'"));
1266    }
1267
1268    // Test detect_compression function
1269    #[test]
1270    fn test_detect_compression_explicit_overrides() {
1271        // Explicit compression should override file extension detection
1272        let path = Path::new("data.csv");
1273        assert_eq!(
1274            detect_compression(path, Compression::Gzip),
1275            Compression::Gzip
1276        );
1277        assert_eq!(
1278            detect_compression(path, Compression::Bzip2),
1279            Compression::Bzip2
1280        );
1281    }
1282
1283    #[test]
1284    fn test_detect_compression_from_extension_gzip() {
1285        let path = Path::new("data.csv.gz");
1286        assert_eq!(
1287            detect_compression(path, Compression::None),
1288            Compression::Gzip
1289        );
1290
1291        let path = Path::new("data.csv.gzip");
1292        assert_eq!(
1293            detect_compression(path, Compression::None),
1294            Compression::Gzip
1295        );
1296
1297        // Case insensitive
1298        let path = Path::new("DATA.CSV.GZ");
1299        assert_eq!(
1300            detect_compression(path, Compression::None),
1301            Compression::Gzip
1302        );
1303    }
1304
1305    #[test]
1306    fn test_detect_compression_from_extension_bzip2() {
1307        let path = Path::new("data.csv.bz2");
1308        assert_eq!(
1309            detect_compression(path, Compression::None),
1310            Compression::Bzip2
1311        );
1312
1313        let path = Path::new("data.csv.bzip2");
1314        assert_eq!(
1315            detect_compression(path, Compression::None),
1316            Compression::Bzip2
1317        );
1318    }
1319
1320    #[test]
1321    fn test_detect_compression_no_compression() {
1322        let path = Path::new("data.csv");
1323        assert_eq!(
1324            detect_compression(path, Compression::None),
1325            Compression::None
1326        );
1327
1328        let path = Path::new("data.txt");
1329        assert_eq!(
1330            detect_compression(path, Compression::None),
1331            Compression::None
1332        );
1333    }
1334
1335    // Test is_compressed_file function
1336    #[test]
1337    fn test_is_compressed_file_gzip() {
1338        assert!(is_compressed_file(Path::new("data.csv.gz")));
1339        assert!(is_compressed_file(Path::new("data.csv.gzip")));
1340        assert!(is_compressed_file(Path::new("DATA.CSV.GZ"))); // case insensitive
1341    }
1342
1343    #[test]
1344    fn test_is_compressed_file_bzip2() {
1345        assert!(is_compressed_file(Path::new("data.csv.bz2")));
1346        assert!(is_compressed_file(Path::new("data.csv.bzip2")));
1347    }
1348
1349    #[test]
1350    fn test_is_compressed_file_uncompressed() {
1351        assert!(!is_compressed_file(Path::new("data.csv")));
1352        assert!(!is_compressed_file(Path::new("data.txt")));
1353        assert!(!is_compressed_file(Path::new("data")));
1354    }
1355
1356    // Test write_chunked_data with mock (indirectly via compress + format)
1357    #[test]
1358    fn test_csv_row_formatting_with_compression() {
1359        // Test that compression works with formatted CSV data
1360        let row = vec!["1", "test data", "value"];
1361        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1362
1363        let compressed = compress_data(formatted.as_bytes(), Compression::Gzip).unwrap();
1364
1365        // Verify gzip magic bytes
1366        assert!(compressed.len() >= 2);
1367        assert_eq!(compressed[0], 0x1f);
1368        assert_eq!(compressed[1], 0x8b);
1369    }
1370
1371    #[test]
1372    fn test_multiple_rows_formatting() {
1373        // Test formatting multiple rows as CSV
1374        let rows = vec![
1375            vec!["1", "Alice", "alice@example.com"],
1376            vec!["2", "Bob", "bob@example.com"],
1377            vec!["3", "Charlie", "charlie@example.com"],
1378        ];
1379
1380        let mut data = Vec::new();
1381        for row in rows {
1382            let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1383            data.extend_from_slice(formatted.as_bytes());
1384        }
1385
1386        let expected =
1387            "1,Alice,alice@example.com\n2,Bob,bob@example.com\n3,Charlie,charlie@example.com\n";
1388        assert_eq!(String::from_utf8(data).unwrap(), expected);
1389    }
1390
1391    #[test]
1392    fn test_csv_special_characters_in_data() {
1393        // Test that special characters are properly escaped
1394        let row = vec!["1", "Hello, World!", "Contains \"quotes\""];
1395        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1396
1397        // Field with comma should be quoted, field with quotes should be quoted and escaped
1398        assert_eq!(
1399            formatted,
1400            "1,\"Hello, World!\",\"Contains \"\"quotes\"\"\"\n"
1401        );
1402    }
1403
1404    #[test]
1405    fn test_csv_row_with_newlines() {
1406        // Test that newlines in data are properly quoted
1407        let row = vec!["1", "Line1\nLine2", "normal"];
1408        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1409
1410        assert_eq!(formatted, "1,\"Line1\nLine2\",normal\n");
1411    }
1412
1413    #[test]
1414    fn test_csv_row_with_carriage_return() {
1415        // Test that carriage returns in data are properly quoted
1416        let row = vec!["1", "Line1\rLine2", "normal"];
1417        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1418
1419        assert_eq!(formatted, "1,\"Line1\rLine2\",normal\n");
1420    }
1421
1422    // Test import flow documentation (protocol flow)
1423    #[test]
1424    fn test_import_protocol_flow_documentation() {
1425        // This test documents the expected protocol flow for IMPORT operations
1426        // The actual flow is tested in integration tests, but this ensures
1427        // the documentation in import_csv_internal matches expectations
1428
1429        // Protocol flow for IMPORT:
1430        // 1. Client connects to Exasol and gets internal address via handshake
1431        // 2. Client starts executing IMPORT SQL via WebSocket (async)
1432        // 3. Exasol sends HTTP GET request through the tunnel connection
1433        // 4. Client receives GET, sends HTTP response headers (chunked encoding)
1434        // 5. Client streams CSV data as chunked body
1435        // 6. Client sends final chunk (0\r\n\r\n)
1436        // 7. IMPORT SQL completes
1437
1438        // This test is informational - actual integration testing is done
1439        // against a real Exasol instance
1440    }
1441
1442    // Test compression detection with various path formats
1443    #[test]
1444    fn test_detect_compression_path_variations() {
1445        // Test with absolute paths
1446        let path = Path::new("/home/user/data/file.csv.gz");
1447        assert_eq!(
1448            detect_compression(path, Compression::None),
1449            Compression::Gzip
1450        );
1451
1452        // Test with relative paths
1453        let path = Path::new("./data/file.csv.bz2");
1454        assert_eq!(
1455            detect_compression(path, Compression::None),
1456            Compression::Bzip2
1457        );
1458
1459        // Test with just filename
1460        let path = Path::new("file.csv");
1461        assert_eq!(
1462            detect_compression(path, Compression::None),
1463            Compression::None
1464        );
1465    }
1466
1467    // Tests for build_multi_file_query
1468    #[test]
1469    fn test_build_multi_file_query_basic() {
1470        use crate::query::import::ImportFileEntry;
1471
1472        let options = CsvImportOptions::default();
1473        let entries = vec![
1474            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
1475            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
1476        ];
1477
1478        let query = build_multi_file_query("my_table", &options, entries);
1479        let sql = query.build();
1480
1481        assert!(sql.contains("IMPORT INTO my_table"));
1482        assert!(sql.contains("FROM CSV"));
1483        assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
1484        assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
1485    }
1486
1487    #[test]
1488    fn test_build_multi_file_query_with_options() {
1489        use crate::query::import::ImportFileEntry;
1490
1491        let options = CsvImportOptions::default()
1492            .schema("test_schema")
1493            .columns(vec!["id".to_string(), "name".to_string()])
1494            .skip_rows(1)
1495            .compression(Compression::Gzip);
1496
1497        let entries = vec![ImportFileEntry::new(
1498            "10.0.0.5:8563".to_string(),
1499            "001.csv".to_string(),
1500            None,
1501        )];
1502
1503        let query = build_multi_file_query("data", &options, entries);
1504        let sql = query.build();
1505
1506        assert!(sql.contains("IMPORT INTO test_schema.data (id, name)"));
1507        assert!(sql.contains("SKIP = 1"));
1508        assert!(sql.contains("FILE '001.csv.gz'"));
1509    }
1510}