Skip to main content

exarrow_rs/import/
csv.rs

1//! CSV import functionality for Exasol.
2//!
3//! This module provides functions for importing CSV data into Exasol tables
4//! using the HTTP transport layer.
5
6use std::future::Future;
7use std::io::Write;
8use std::path::Path;
9
10use bzip2::write::BzEncoder;
11use flate2::write::GzEncoder;
12use tokio::io::{AsyncRead, AsyncReadExt};
13use tokio::sync::mpsc;
14
15use crate::query::import::{Compression, ImportFileEntry, ImportQuery, RowSeparator, TrimMode};
16use crate::transport::HttpTransportClient;
17
18use super::parallel::{stream_files_parallel, ParallelTransportPool};
19use super::source::IntoFileSources;
20use super::ImportError;
21
22/// Default buffer size for reading data (64KB).
23const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
24
25/// Channel buffer size for data pipe.
26const CHANNEL_BUFFER_SIZE: usize = 16;
27
28/// Options for CSV import.
29#[derive(Debug, Clone)]
30pub struct CsvImportOptions {
31    /// Character encoding (default: UTF-8).
32    pub encoding: String,
33
34    /// Column separator character (default: ',').
35    pub column_separator: char,
36
37    /// Column delimiter character for quoting (default: '"').
38    pub column_delimiter: char,
39
40    /// Row separator (default: LF).
41    pub row_separator: RowSeparator,
42
43    /// Number of header rows to skip (default: 0).
44    pub skip_rows: u32,
45
46    /// Custom NULL value representation (default: None, empty string is NULL).
47    pub null_value: Option<String>,
48
49    /// Trim mode for imported values (default: None).
50    pub trim_mode: TrimMode,
51
52    /// Compression type (default: None).
53    pub compression: Compression,
54
55    /// Maximum number of invalid rows before failure (default: None = fail on first error).
56    pub reject_limit: Option<u32>,
57
58    /// Use TLS encryption for HTTP transport (default: true).
59    pub use_tls: bool,
60
61    /// Target schema (optional).
62    pub schema: Option<String>,
63
64    /// Target columns (optional, imports all if not specified).
65    pub columns: Option<Vec<String>>,
66
67    /// Exasol host for HTTP transport connection.
68    /// This is typically the same host as the WebSocket connection.
69    pub host: String,
70
71    /// Exasol port for HTTP transport connection.
72    /// This is typically the same port as the WebSocket connection.
73    pub port: u16,
74}
75
76impl Default for CsvImportOptions {
77    fn default() -> Self {
78        Self {
79            encoding: "UTF-8".to_string(),
80            column_separator: ',',
81            column_delimiter: '"',
82            row_separator: RowSeparator::LF,
83            skip_rows: 0,
84            null_value: None,
85            trim_mode: TrimMode::None,
86            compression: Compression::None,
87            reject_limit: None,
88            use_tls: false,
89            schema: None,
90            columns: None,
91            host: String::new(),
92            port: 0,
93        }
94    }
95}
96
97impl CsvImportOptions {
98    /// Create new import options with default values.
99    #[must_use]
100    pub fn new() -> Self {
101        Self::default()
102    }
103
104    #[must_use]
105    pub fn encoding(mut self, encoding: &str) -> Self {
106        self.encoding = encoding.to_string();
107        self
108    }
109
110    #[must_use]
111    pub fn column_separator(mut self, sep: char) -> Self {
112        self.column_separator = sep;
113        self
114    }
115
116    #[must_use]
117    pub fn column_delimiter(mut self, delim: char) -> Self {
118        self.column_delimiter = delim;
119        self
120    }
121
122    #[must_use]
123    pub fn row_separator(mut self, sep: RowSeparator) -> Self {
124        self.row_separator = sep;
125        self
126    }
127
128    #[must_use]
129    pub fn skip_rows(mut self, rows: u32) -> Self {
130        self.skip_rows = rows;
131        self
132    }
133
134    #[must_use]
135    pub fn null_value(mut self, val: &str) -> Self {
136        self.null_value = Some(val.to_string());
137        self
138    }
139
140    #[must_use]
141    pub fn trim_mode(mut self, mode: TrimMode) -> Self {
142        self.trim_mode = mode;
143        self
144    }
145
146    #[must_use]
147    pub fn compression(mut self, compression: Compression) -> Self {
148        self.compression = compression;
149        self
150    }
151
152    #[must_use]
153    pub fn reject_limit(mut self, limit: u32) -> Self {
154        self.reject_limit = Some(limit);
155        self
156    }
157
158    #[must_use]
159    pub fn use_tls(mut self, use_tls: bool) -> Self {
160        self.use_tls = use_tls;
161        self
162    }
163
164    #[must_use]
165    pub fn schema(mut self, schema: &str) -> Self {
166        self.schema = Some(schema.to_string());
167        self
168    }
169
170    #[must_use]
171    pub fn columns(mut self, columns: Vec<String>) -> Self {
172        self.columns = Some(columns);
173        self
174    }
175
176    /// Set the Exasol host for HTTP transport connection.
177    ///
178    /// This should be the same host as used for the WebSocket connection.
179    ///
180    /// # Example
181    ///
182    #[must_use]
183    pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
184        self.host = host.into();
185        self
186    }
187
188    /// Set the Exasol port for HTTP transport connection.
189    ///
190    /// This should be the same port as used for the WebSocket connection.
191    #[must_use]
192    pub fn exasol_port(mut self, port: u16) -> Self {
193        self.port = port;
194        self
195    }
196
197    /// Build an ImportQuery from these options.
198    fn build_query(&self, table: &str, address: &str, public_key: Option<&str>) -> ImportQuery {
199        let mut query = ImportQuery::new(table).at_address(address);
200
201        if let Some(ref schema) = self.schema {
202            query = query.schema(schema);
203        }
204
205        if let Some(ref columns) = self.columns {
206            let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
207            query = query.columns(cols);
208        }
209
210        if let Some(pk) = public_key {
211            query = query.with_public_key(pk);
212        }
213
214        query = query
215            .encoding(&self.encoding)
216            .column_separator(self.column_separator)
217            .column_delimiter(self.column_delimiter)
218            .row_separator(self.row_separator)
219            .skip(self.skip_rows)
220            .trim(self.trim_mode)
221            .compressed(self.compression);
222
223        if let Some(ref null_val) = self.null_value {
224            query = query.null_value(null_val);
225        }
226
227        if let Some(limit) = self.reject_limit {
228            query = query.reject_limit(limit);
229        }
230
231        query
232    }
233}
234
235/// Sender for streaming data to the HTTP transport.
236///
237/// This is used with the callback-based import to send data chunks.
238pub struct DataPipeSender {
239    tx: mpsc::Sender<Vec<u8>>,
240}
241
242impl DataPipeSender {
243    /// Create a new data pipe sender.
244    fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
245        Self { tx }
246    }
247
248    /// Send a chunk of data.
249    ///
250    /// # Arguments
251    ///
252    /// * `data` - The data to send.
253    ///
254    /// # Errors
255    ///
256    /// Returns `ImportError::ChannelError` if the channel is closed.
257    pub async fn send(&self, data: Vec<u8>) -> Result<(), ImportError> {
258        self.tx
259            .send(data)
260            .await
261            .map_err(|e| ImportError::ChannelError(format!("Failed to send data: {e}")))
262    }
263
264    /// Send a CSV row as formatted data.
265    ///
266    /// # Arguments
267    ///
268    /// * `row` - Iterator of field values.
269    /// * `separator` - Column separator character.
270    /// * `delimiter` - Column delimiter character.
271    /// * `row_separator` - Row separator to append.
272    ///
273    /// # Errors
274    ///
275    /// Returns `ImportError::ChannelError` if the channel is closed.
276    pub async fn send_row<I, T>(
277        &self,
278        row: I,
279        separator: char,
280        delimiter: char,
281        row_separator: &RowSeparator,
282    ) -> Result<(), ImportError>
283    where
284        I: IntoIterator<Item = T>,
285        T: AsRef<str>,
286    {
287        let formatted = format_csv_row(row, separator, delimiter, row_separator);
288        self.send(formatted.into_bytes()).await
289    }
290}
291
292/// Format a row of values as a CSV line.
293fn format_csv_row<I, T>(
294    row: I,
295    separator: char,
296    delimiter: char,
297    row_separator: &RowSeparator,
298) -> String
299where
300    I: IntoIterator<Item = T>,
301    T: AsRef<str>,
302{
303    let mut line = String::new();
304    let mut first = true;
305
306    for field in row {
307        if !first {
308            line.push(separator);
309        }
310        first = false;
311
312        let value = field.as_ref();
313        // Check if the value needs quoting
314        let needs_quoting = value.contains(separator)
315            || value.contains(delimiter)
316            || value.contains('\n')
317            || value.contains('\r');
318
319        if needs_quoting {
320            line.push(delimiter);
321            // Escape delimiter characters by doubling them
322            for ch in value.chars() {
323                if ch == delimiter {
324                    line.push(delimiter);
325                }
326                line.push(ch);
327            }
328            line.push(delimiter);
329        } else {
330            line.push_str(value);
331        }
332    }
333
334    // Add row separator
335    match row_separator {
336        RowSeparator::LF => line.push('\n'),
337        RowSeparator::CR => line.push('\r'),
338        RowSeparator::CRLF => {
339            line.push('\r');
340            line.push('\n');
341        }
342    }
343
344    line
345}
346
347/// Detects compression type from file extension if not explicitly set.
348///
349/// # Arguments
350///
351/// * `file_path` - Path to the file
352/// * `explicit_compression` - Explicitly set compression type
353///
354/// # Returns
355///
356/// The detected or explicit compression type.
357fn detect_compression(file_path: &Path, explicit_compression: Compression) -> Compression {
358    if explicit_compression != Compression::None {
359        return explicit_compression;
360    }
361
362    let path_str = file_path.to_string_lossy().to_lowercase();
363
364    if path_str.ends_with(".gz") || path_str.ends_with(".gzip") {
365        Compression::Gzip
366    } else if path_str.ends_with(".bz2") || path_str.ends_with(".bzip2") {
367        Compression::Bzip2
368    } else {
369        Compression::None
370    }
371}
372
373/// Checks if a file is already compressed based on extension.
374///
375/// # Arguments
376///
377/// * `file_path` - Path to the file
378///
379/// # Returns
380///
381/// `true` if the file appears to be compressed.
382fn is_compressed_file(file_path: &Path) -> bool {
383    let path_str = file_path.to_string_lossy().to_lowercase();
384    path_str.ends_with(".gz")
385        || path_str.ends_with(".gzip")
386        || path_str.ends_with(".bz2")
387        || path_str.ends_with(".bzip2")
388}
389
390/// Import CSV data from a file path.
391///
392/// This function reads CSV data from the specified file and imports it into
393/// the target table using the HTTP transport layer.
394///
395/// # Arguments
396///
397/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
398/// * `table` - Name of the target table.
399/// * `file_path` - Path to the CSV file.
400/// * `options` - Import options.
401///
402/// # Returns
403///
404/// The number of rows imported on success.
405///
406/// # Errors
407///
408/// Returns `ImportError` if the import fails.
409///
410/// # Example
411///
412pub async fn import_from_file<F, Fut>(
413    execute_sql: F,
414    table: &str,
415    file_path: &Path,
416    options: CsvImportOptions,
417) -> Result<u64, ImportError>
418where
419    F: FnOnce(String) -> Fut,
420    Fut: Future<Output = Result<u64, String>>,
421{
422    // Detect compression from file extension if not explicitly set
423    let compression = detect_compression(file_path, options.compression);
424    let options = CsvImportOptions {
425        compression,
426        ..options
427    };
428
429    // Check if file is already compressed before moving into async closure
430    let file_is_compressed = is_compressed_file(file_path);
431
432    // Read the file synchronously
433    let data = std::fs::read(file_path)?;
434
435    import_csv_internal(
436        execute_sql,
437        table,
438        options,
439        move |mut client, compression| {
440            Box::pin(async move {
441                // Apply compression if needed (unless file is already compressed)
442                let compressed_data = if file_is_compressed {
443                    // File is already compressed, send as-is
444                    data
445                } else {
446                    compress_data(&data, compression)?
447                };
448
449                // Wait for HTTP GET from Exasol and send response with chunked encoding
450                send_import_response(&mut client, &compressed_data).await?;
451
452                Ok(())
453            })
454        },
455    )
456    .await
457}
458
459/// Import multiple CSV files in parallel.
460///
461/// This function reads CSV data from multiple files and imports them into
462/// the target table using parallel HTTP transport connections. Each file
463/// gets its own connection with a unique internal address from the EXA
464/// tunneling handshake.
465///
466/// For a single file, this function delegates to `import_from_file` for
467/// optimal single-file performance.
468///
469/// # Arguments
470///
471/// * `execute_sql` - Function to execute SQL statements. Takes SQL string and returns row count.
472/// * `table` - Name of the target table.
473/// * `file_paths` - File paths (accepts single path, Vec, array, or slice).
474/// * `options` - Import options.
475///
476/// # Returns
477///
478/// The number of rows imported on success.
479///
480/// # Errors
481///
482/// Returns `ImportError` if the import fails. Uses fail-fast semantics.
483///
484/// # Example
485///
486/// ```no_run
487/// use exarrow_rs::import::csv::{import_from_files, CsvImportOptions};
488/// use std::path::PathBuf;
489///
490/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
491/// let files = vec![
492///     PathBuf::from("/data/part1.csv"),
493///     PathBuf::from("/data/part2.csv"),
494///     PathBuf::from("/data/part3.csv"),
495/// ];
496///
497/// let options = CsvImportOptions::default()
498///     .exasol_host("localhost")
499///     .exasol_port(8563);
500///
501/// // let rows = import_from_files(execute_sql, "my_table", files, options).await?;
502/// # Ok(())
503/// # }
504/// ```
505pub async fn import_from_files<F, Fut, S>(
506    execute_sql: F,
507    table: &str,
508    file_paths: S,
509    options: CsvImportOptions,
510) -> Result<u64, ImportError>
511where
512    F: FnOnce(String) -> Fut,
513    Fut: Future<Output = Result<u64, String>>,
514    S: IntoFileSources,
515{
516    let paths = file_paths.into_sources();
517
518    // Delegate to single-file implementation for one file
519    if paths.len() == 1 {
520        return import_from_file(execute_sql, table, &paths[0], options).await;
521    }
522
523    if paths.is_empty() {
524        return Err(ImportError::InvalidConfig(
525            "No files provided for import".to_string(),
526        ));
527    }
528
529    // Read all files and detect compression
530    let compression = options.compression;
531    let mut file_data_vec: Vec<Vec<u8>> = Vec::with_capacity(paths.len());
532
533    for path in &paths {
534        let detected_compression = detect_compression(path, compression);
535        let file_is_compressed = is_compressed_file(path);
536
537        // Read file
538        let data = std::fs::read(path)?;
539
540        // Apply compression if needed
541        let compressed_data = if file_is_compressed {
542            data
543        } else {
544            compress_data(&data, detected_compression)?
545        };
546
547        file_data_vec.push(compressed_data);
548    }
549
550    // Establish parallel connections
551    let pool =
552        ParallelTransportPool::connect(&options.host, options.port, options.use_tls, paths.len())
553            .await?;
554
555    // Build multi-file IMPORT SQL
556    let entries: Vec<ImportFileEntry> = pool
557        .file_entries()
558        .iter()
559        .map(|e| ImportFileEntry::new(e.address.clone(), e.file_name.clone(), e.public_key.clone()))
560        .collect();
561
562    let query = build_multi_file_query(table, &options, entries);
563    let sql = query.build();
564
565    // Get connections for streaming
566    let connections = pool.into_connections();
567
568    // Spawn parallel streaming task
569    let stream_handle = tokio::spawn(async move {
570        stream_files_parallel(connections, file_data_vec, compression).await
571    });
572
573    // Execute the IMPORT SQL in parallel
574    let sql_result = execute_sql(sql).await;
575
576    // Wait for streaming to complete
577    let stream_result = stream_handle.await;
578
579    // Handle results - check stream task first
580    match stream_result {
581        Ok(Ok(())) => {}
582        Ok(Err(e)) => return Err(e),
583        Err(e) => {
584            return Err(ImportError::StreamError(format!(
585                "Stream task panicked: {e}"
586            )))
587        }
588    }
589
590    // Return the row count from SQL execution
591    sql_result.map_err(ImportError::SqlError)
592}
593
594/// Build an ImportQuery for multi-file import.
595fn build_multi_file_query(
596    table: &str,
597    options: &CsvImportOptions,
598    entries: Vec<ImportFileEntry>,
599) -> ImportQuery {
600    let mut query = ImportQuery::new(table).with_files(entries);
601
602    if let Some(ref schema) = options.schema {
603        query = query.schema(schema);
604    }
605
606    if let Some(ref columns) = options.columns {
607        let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
608        query = query.columns(cols);
609    }
610
611    query = query
612        .encoding(&options.encoding)
613        .column_separator(options.column_separator)
614        .column_delimiter(options.column_delimiter)
615        .row_separator(options.row_separator)
616        .skip(options.skip_rows)
617        .trim(options.trim_mode)
618        .compressed(options.compression);
619
620    if let Some(ref null_val) = options.null_value {
621        query = query.null_value(null_val);
622    }
623
624    if let Some(limit) = options.reject_limit {
625        query = query.reject_limit(limit);
626    }
627
628    query
629}
630
631/// Import CSV data from an async reader (stream).
632///
633/// This function reads CSV data from an async reader and imports it into
634/// the target table using the HTTP transport layer.
635///
636/// # Arguments
637///
638/// * `execute_sql` - Function to execute SQL statements.
639/// * `table` - Name of the target table.
640/// * `reader` - Async reader providing CSV data.
641/// * `options` - Import options.
642///
643/// # Returns
644///
645/// The number of rows imported on success.
646///
647/// # Errors
648///
649/// Returns `ImportError` if the import fails.
650pub async fn import_from_stream<R, F, Fut>(
651    execute_sql: F,
652    table: &str,
653    reader: R,
654    options: CsvImportOptions,
655) -> Result<u64, ImportError>
656where
657    R: AsyncRead + Unpin + Send + 'static,
658    F: FnOnce(String) -> Fut,
659    Fut: Future<Output = Result<u64, String>>,
660{
661    import_csv_internal(execute_sql, table, options, |mut client, compression| {
662        Box::pin(async move { stream_reader_to_connection(reader, &mut client, compression).await })
663    })
664    .await
665}
666
667/// Import CSV data from an iterator of rows.
668///
669/// This function converts iterator rows to CSV format and imports them into
670/// the target table using the HTTP transport layer.
671///
672/// # Arguments
673///
674/// * `execute_sql` - Function to execute SQL statements.
675/// * `table` - Name of the target table.
676/// * `rows` - Iterator of rows, where each row is an iterator of field values.
677/// * `options` - Import options.
678///
679/// # Returns
680///
681/// The number of rows imported on success.
682///
683/// # Errors
684///
685/// Returns `ImportError` if the import fails.
686///
687/// # Example
688///
689pub async fn import_from_iter<I, T, S, F, Fut>(
690    execute_sql: F,
691    table: &str,
692    rows: I,
693    options: CsvImportOptions,
694) -> Result<u64, ImportError>
695where
696    I: IntoIterator<Item = T> + Send + 'static,
697    T: IntoIterator<Item = S> + Send,
698    S: AsRef<str>,
699    F: FnOnce(String) -> Fut,
700    Fut: Future<Output = Result<u64, String>>,
701{
702    let separator = options.column_separator;
703    let delimiter = options.column_delimiter;
704    let row_sep = options.row_separator;
705
706    import_csv_internal(
707        execute_sql,
708        table,
709        options,
710        move |mut client, compression| {
711            Box::pin(async move {
712                // Format all rows as CSV
713                let mut data = Vec::new();
714
715                for row in rows {
716                    let formatted = format_csv_row(row, separator, delimiter, &row_sep);
717                    data.extend_from_slice(formatted.as_bytes());
718                }
719
720                // Apply compression if needed
721                let compressed_data = compress_data(&data, compression)?;
722
723                // Wait for HTTP GET from Exasol and send response with chunked encoding
724                send_import_response(&mut client, &compressed_data).await?;
725
726                Ok(())
727            })
728        },
729    )
730    .await
731}
732
733/// Import CSV data using a callback function.
734///
735/// This function allows custom data generation through a callback that receives
736/// a `DataPipeSender` for streaming data to the import.
737///
738/// # Arguments
739///
740/// * `execute_sql` - Function to execute SQL statements.
741/// * `table` - Name of the target table.
742/// * `callback` - Callback function that generates data.
743/// * `options` - Import options.
744///
745/// # Returns
746///
747/// The number of rows imported on success.
748///
749/// # Errors
750///
751/// Returns `ImportError` if the import fails.
752///
753/// # Example
754///
755pub async fn import_from_callback<F, Fut, C, CFut>(
756    execute_sql: F,
757    table: &str,
758    callback: C,
759    options: CsvImportOptions,
760) -> Result<u64, ImportError>
761where
762    F: FnOnce(String) -> Fut,
763    Fut: Future<Output = Result<u64, String>>,
764    C: FnOnce(DataPipeSender) -> CFut + Send + 'static,
765    CFut: Future<Output = Result<(), ImportError>> + Send,
766{
767    import_csv_internal(execute_sql, table, options, |mut client, compression| {
768        Box::pin(async move {
769            // Create a channel for data streaming
770            let (tx, mut rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
771            let sender = DataPipeSender::new(tx);
772
773            // Spawn the callback task
774            let callback_handle = tokio::spawn(async move { callback(sender).await });
775
776            // Collect data from the channel
777            let mut all_data = Vec::new();
778            while let Some(chunk) = rx.recv().await {
779                all_data.extend_from_slice(&chunk);
780            }
781
782            // Wait for callback to complete
783            callback_handle
784                .await
785                .map_err(|e| ImportError::StreamError(format!("Callback task panicked: {e}")))?
786                .map_err(|e| ImportError::StreamError(format!("Callback error: {e}")))?;
787
788            // Apply compression if needed
789            let compressed_data = compress_data(&all_data, compression)?;
790
791            // Wait for HTTP GET from Exasol and send response with chunked encoding
792            send_import_response(&mut client, &compressed_data).await?;
793
794            Ok(())
795        })
796    })
797    .await
798}
799
800/// Internal function to orchestrate CSV import.
801///
802/// This function implements the correct IMPORT protocol flow:
803/// 1. Connects to Exasol via HTTP transport client (client mode)
804/// 2. Gets the internal address from the handshake response
805/// 3. Builds the IMPORT SQL statement using the internal address
806/// 4. Starts IMPORT SQL execution in parallel via WebSocket
807/// 5. Waits for HTTP GET request from Exasol
808/// 6. Sends HTTP response with CSV data using chunked encoding
809/// 7. Coordinates completion between HTTP and SQL tasks
810///
811/// # Protocol Flow
812///
813async fn import_csv_internal<F, Fut, S, SFut>(
814    execute_sql: F,
815    table: &str,
816    options: CsvImportOptions,
817    stream_fn: S,
818) -> Result<u64, ImportError>
819where
820    F: FnOnce(String) -> Fut,
821    Fut: Future<Output = Result<u64, String>>,
822    S: FnOnce(HttpTransportClient, Compression) -> SFut + Send + 'static,
823    SFut: Future<Output = Result<(), ImportError>> + Send,
824{
825    // Connect to Exasol via HTTP transport client (performs handshake automatically)
826    let client = HttpTransportClient::connect(&options.host, options.port, options.use_tls)
827        .await
828        .map_err(|e| {
829            ImportError::HttpTransportError(format!("Failed to connect to Exasol: {e}"))
830        })?;
831
832    // Get internal address from the handshake response
833    let internal_addr = client.internal_address().to_string();
834    let public_key = client.public_key_fingerprint().map(String::from);
835
836    // Build the IMPORT SQL statement using internal address
837    let query = options.build_query(table, &internal_addr, public_key.as_deref());
838    let sql = query.build();
839
840    let compression = options.compression;
841
842    // Spawn the data streaming task - this will wait for GET request from Exasol
843    let stream_handle = tokio::spawn(async move {
844        // Stream data through the established connection
845        // The stream_fn is responsible for:
846        // 1. Waiting for HTTP GET from Exasol (handle_import_request)
847        // 2. Sending chunked response data
848        // 3. Sending final chunk
849        stream_fn(client, compression).await?;
850
851        Ok::<(), ImportError>(())
852    });
853
854    // Execute the IMPORT SQL in parallel
855    // This triggers Exasol to send the HTTP GET request through the tunnel
856    let sql_result = execute_sql(sql).await;
857
858    // Wait for the stream task to complete
859    let stream_result = stream_handle.await;
860
861    // Handle results - check stream task first as it may have protocol errors
862    match stream_result {
863        Ok(Ok(())) => {}
864        Ok(Err(e)) => return Err(e),
865        Err(e) => {
866            return Err(ImportError::StreamError(format!(
867                "Stream task panicked: {e}"
868            )))
869        }
870    }
871
872    // Return the row count from SQL execution
873    sql_result.map_err(ImportError::SqlError)
874}
875
876/// Stream data from an async reader to the HTTP transport client.
877///
878/// This function:
879/// 1. Waits for HTTP GET request from Exasol
880/// 2. Reads all data from the reader
881/// 3. Applies compression if configured
882/// 4. Sends HTTP response with chunked encoding
883async fn stream_reader_to_connection<R>(
884    mut reader: R,
885    client: &mut HttpTransportClient,
886    compression: Compression,
887) -> Result<(), ImportError>
888where
889    R: AsyncRead + Unpin,
890{
891    // Wait for HTTP GET request from Exasol before sending any data
892    client.handle_import_request().await.map_err(|e| {
893        ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
894    })?;
895
896    // Read all data from the reader
897    let mut data = Vec::new();
898    reader.read_to_end(&mut data).await?;
899
900    // Apply compression if needed
901    let compressed_data = compress_data(&data, compression)?;
902
903    // Write data using chunked encoding
904    write_chunked_data(client, &compressed_data).await
905}
906
907/// Write data using HTTP chunked transfer encoding.
908///
909/// The HTTP response headers (with chunked encoding) should already be sent
910/// by `handle_import_request()`. This function writes the data chunks and
911/// the final empty chunk.
912async fn write_chunked_data(
913    client: &mut HttpTransportClient,
914    data: &[u8],
915) -> Result<(), ImportError> {
916    // Write data in chunks using HTTP chunked transfer encoding
917    for chunk in data.chunks(DEFAULT_BUFFER_SIZE) {
918        client
919            .write_chunked_body(chunk)
920            .await
921            .map_err(ImportError::TransportError)?;
922    }
923
924    // Send final empty chunk to signal end of transfer
925    client
926        .write_final_chunk()
927        .await
928        .map_err(ImportError::TransportError)?;
929
930    Ok(())
931}
932
933/// Send HTTP response for import with the given data.
934///
935/// This function:
936/// 1. Waits for HTTP GET request from Exasol
937/// 2. Sends HTTP response with chunked encoding
938/// 3. Writes the data in chunks
939/// 4. Sends the final empty chunk
940async fn send_import_response(
941    client: &mut HttpTransportClient,
942    data: &[u8],
943) -> Result<(), ImportError> {
944    // Wait for HTTP GET request from Exasol
945    client.handle_import_request().await.map_err(|e| {
946        ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
947    })?;
948
949    // Write data using chunked encoding
950    write_chunked_data(client, data).await
951}
952
953/// Compress data using the specified compression type.
954fn compress_data(data: &[u8], compression: Compression) -> Result<Vec<u8>, ImportError> {
955    match compression {
956        Compression::None => Ok(data.to_vec()),
957        Compression::Gzip => {
958            let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
959            encoder.write_all(data).map_err(|e| {
960                ImportError::CompressionError(format!("Gzip compression failed: {e}"))
961            })?;
962            encoder.finish().map_err(|e| {
963                ImportError::CompressionError(format!("Gzip finalization failed: {e}"))
964            })
965        }
966        Compression::Bzip2 => {
967            let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::default());
968            encoder.write_all(data).map_err(|e| {
969                ImportError::CompressionError(format!("Bzip2 compression failed: {e}"))
970            })?;
971            encoder.finish().map_err(|e| {
972                ImportError::CompressionError(format!("Bzip2 finalization failed: {e}"))
973            })
974        }
975    }
976}
977
978#[cfg(test)]
979mod tests {
980    use super::*;
981
982    // Test CsvImportOptions defaults
983    #[test]
984    fn test_csv_import_options_default() {
985        let opts = CsvImportOptions::default();
986
987        assert_eq!(opts.encoding, "UTF-8");
988        assert_eq!(opts.column_separator, ',');
989        assert_eq!(opts.column_delimiter, '"');
990        assert_eq!(opts.row_separator, RowSeparator::LF);
991        assert_eq!(opts.skip_rows, 0);
992        assert!(opts.null_value.is_none());
993        assert_eq!(opts.trim_mode, TrimMode::None);
994        assert_eq!(opts.compression, Compression::None);
995        assert!(opts.reject_limit.is_none());
996        assert!(!opts.use_tls);
997        assert!(opts.schema.is_none());
998        assert!(opts.columns.is_none());
999        assert_eq!(opts.host, "");
1000        assert_eq!(opts.port, 0);
1001    }
1002
1003    // Test CsvImportOptions builder methods
1004    #[test]
1005    fn test_csv_import_options_builder() {
1006        let opts = CsvImportOptions::new()
1007            .encoding("ISO-8859-1")
1008            .column_separator(';')
1009            .column_delimiter('\'')
1010            .row_separator(RowSeparator::CRLF)
1011            .skip_rows(1)
1012            .null_value("NULL")
1013            .trim_mode(TrimMode::Trim)
1014            .compression(Compression::Gzip)
1015            .reject_limit(100)
1016            .use_tls(false)
1017            .schema("my_schema")
1018            .columns(vec!["col1".to_string(), "col2".to_string()])
1019            .exasol_host("exasol.example.com")
1020            .exasol_port(8563);
1021
1022        assert_eq!(opts.encoding, "ISO-8859-1");
1023        assert_eq!(opts.column_separator, ';');
1024        assert_eq!(opts.column_delimiter, '\'');
1025        assert_eq!(opts.row_separator, RowSeparator::CRLF);
1026        assert_eq!(opts.skip_rows, 1);
1027        assert_eq!(opts.null_value, Some("NULL".to_string()));
1028        assert_eq!(opts.trim_mode, TrimMode::Trim);
1029        assert_eq!(opts.compression, Compression::Gzip);
1030        assert_eq!(opts.reject_limit, Some(100));
1031        assert!(!opts.use_tls);
1032        assert_eq!(opts.schema, Some("my_schema".to_string()));
1033        assert_eq!(
1034            opts.columns,
1035            Some(vec!["col1".to_string(), "col2".to_string()])
1036        );
1037        assert_eq!(opts.host, "exasol.example.com");
1038        assert_eq!(opts.port, 8563);
1039    }
1040
1041    // Test ImportQuery building from options
1042    #[test]
1043    fn test_build_query_basic() {
1044        let opts = CsvImportOptions::default();
1045        let query = opts.build_query("my_table", "127.0.0.1:8080", None);
1046        let sql = query.build();
1047
1048        assert!(sql.contains("IMPORT INTO my_table"));
1049        assert!(sql.contains("FROM CSV AT 'http://127.0.0.1:8080'"));
1050        assert!(sql.contains("ENCODING = 'UTF-8'"));
1051        assert!(sql.contains("COLUMN SEPARATOR = ','"));
1052    }
1053
1054    #[test]
1055    fn test_build_query_with_schema_and_columns() {
1056        let opts = CsvImportOptions::default()
1057            .schema("test_schema")
1058            .columns(vec!["id".to_string(), "name".to_string()]);
1059
1060        let query = opts.build_query("users", "127.0.0.1:8080", None);
1061        let sql = query.build();
1062
1063        assert!(sql.contains("IMPORT INTO test_schema.users"));
1064        assert!(sql.contains("(id, name)"));
1065    }
1066
1067    #[test]
1068    fn test_build_query_with_tls() {
1069        let opts = CsvImportOptions::default();
1070        let fingerprint = "ABC123DEF456";
1071        let query = opts.build_query("my_table", "127.0.0.1:8080", Some(fingerprint));
1072        let sql = query.build();
1073
1074        assert!(sql.contains("FROM CSV AT 'https://127.0.0.1:8080'"));
1075        assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
1076    }
1077
1078    #[test]
1079    fn test_build_query_with_all_options() {
1080        let opts = CsvImportOptions::default()
1081            .encoding("ISO-8859-1")
1082            .column_separator(';')
1083            .column_delimiter('\'')
1084            .row_separator(RowSeparator::CRLF)
1085            .skip_rows(2)
1086            .null_value("\\N")
1087            .trim_mode(TrimMode::LTrim)
1088            .compression(Compression::Bzip2)
1089            .reject_limit(50);
1090
1091        let query = opts.build_query("data", "127.0.0.1:8080", None);
1092        let sql = query.build();
1093
1094        assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
1095        assert!(sql.contains("COLUMN SEPARATOR = ';'"));
1096        assert!(sql.contains("COLUMN DELIMITER = '''"));
1097        assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
1098        assert!(sql.contains("SKIP = 2"));
1099        assert!(sql.contains("NULL = '\\N'"));
1100        assert!(sql.contains("TRIM = 'LTRIM'"));
1101        assert!(sql.contains("FILE '001.csv.bz2'"));
1102        assert!(sql.contains("REJECT LIMIT 50"));
1103    }
1104
1105    // Test CSV row formatting
1106    #[test]
1107    fn test_format_csv_row_basic() {
1108        let row = vec!["a", "b", "c"];
1109        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1110        assert_eq!(formatted, "a,b,c\n");
1111    }
1112
1113    #[test]
1114    fn test_format_csv_row_with_different_separator() {
1115        let row = vec!["a", "b", "c"];
1116        let formatted = format_csv_row(row, ';', '"', &RowSeparator::LF);
1117        assert_eq!(formatted, "a;b;c\n");
1118    }
1119
1120    #[test]
1121    fn test_format_csv_row_with_crlf() {
1122        let row = vec!["a", "b", "c"];
1123        let formatted = format_csv_row(row, ',', '"', &RowSeparator::CRLF);
1124        assert_eq!(formatted, "a,b,c\r\n");
1125    }
1126
1127    #[test]
1128    fn test_format_csv_row_with_cr() {
1129        let row = vec!["a", "b"];
1130        let formatted = format_csv_row(row, ',', '"', &RowSeparator::CR);
1131        assert_eq!(formatted, "a,b\r");
1132    }
1133
1134    #[test]
1135    fn test_format_csv_row_needs_quoting_separator() {
1136        let row = vec!["a,b", "c"];
1137        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1138        assert_eq!(formatted, "\"a,b\",c\n");
1139    }
1140
1141    #[test]
1142    fn test_format_csv_row_needs_quoting_newline() {
1143        let row = vec!["a\nb", "c"];
1144        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1145        assert_eq!(formatted, "\"a\nb\",c\n");
1146    }
1147
1148    #[test]
1149    fn test_format_csv_row_needs_quoting_delimiter() {
1150        let row = vec!["a\"b", "c"];
1151        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1152        // Delimiter should be escaped by doubling
1153        assert_eq!(formatted, "\"a\"\"b\",c\n");
1154    }
1155
1156    #[test]
1157    fn test_format_csv_row_empty_fields() {
1158        let row = vec!["", "b", ""];
1159        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1160        assert_eq!(formatted, ",b,\n");
1161    }
1162
1163    #[test]
1164    fn test_format_csv_row_single_field() {
1165        let row = vec!["only"];
1166        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1167        assert_eq!(formatted, "only\n");
1168    }
1169
1170    // Test compression
1171    #[test]
1172    fn test_compress_data_none() {
1173        let data = b"test data";
1174        let result = compress_data(data, Compression::None).unwrap();
1175        assert_eq!(result, data);
1176    }
1177
1178    #[test]
1179    fn test_compress_data_gzip() {
1180        let data = b"test data for gzip compression";
1181        let result = compress_data(data, Compression::Gzip).unwrap();
1182
1183        // Verify it's valid gzip (starts with gzip magic bytes)
1184        assert!(result.len() >= 2);
1185        assert_eq!(result[0], 0x1f);
1186        assert_eq!(result[1], 0x8b);
1187    }
1188
1189    #[test]
1190    fn test_compress_data_bzip2() {
1191        let data = b"test data for bzip2 compression";
1192        let result = compress_data(data, Compression::Bzip2).unwrap();
1193
1194        // Verify it's valid bzip2 (starts with "BZ" magic)
1195        assert!(result.len() >= 2);
1196        assert_eq!(result[0], b'B');
1197        assert_eq!(result[1], b'Z');
1198    }
1199
1200    // Test error types
1201    #[test]
1202    fn test_import_error_display() {
1203        let err = ImportError::IoError(std::io::Error::new(
1204            std::io::ErrorKind::NotFound,
1205            "file not found",
1206        ));
1207        assert!(err.to_string().contains("IO error"));
1208
1209        let err = ImportError::SqlError("syntax error".to_string());
1210        assert!(err.to_string().contains("SQL execution failed"));
1211
1212        let err = ImportError::HttpTransportError("connection refused".to_string());
1213        assert!(err.to_string().contains("HTTP transport failed"));
1214
1215        let err = ImportError::CompressionError("invalid data".to_string());
1216        assert!(err.to_string().contains("Compression error"));
1217    }
1218
1219    // Test DataPipeSender
1220    #[tokio::test]
1221    async fn test_data_pipe_sender_send() {
1222        let (tx, mut rx) = mpsc::channel(10);
1223        let sender = DataPipeSender::new(tx);
1224
1225        sender.send(b"test data".to_vec()).await.unwrap();
1226
1227        let received = rx.recv().await.unwrap();
1228        assert_eq!(received, b"test data");
1229    }
1230
1231    #[tokio::test]
1232    async fn test_data_pipe_sender_send_row() {
1233        let (tx, mut rx) = mpsc::channel(10);
1234        let sender = DataPipeSender::new(tx);
1235
1236        sender
1237            .send_row(vec!["a", "b", "c"], ',', '"', &RowSeparator::LF)
1238            .await
1239            .unwrap();
1240
1241        let received = rx.recv().await.unwrap();
1242        assert_eq!(received, b"a,b,c\n");
1243    }
1244
1245    #[tokio::test]
1246    async fn test_data_pipe_sender_closed_channel() {
1247        let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
1248        let sender = DataPipeSender::new(tx);
1249
1250        // Drop receiver to close channel
1251        drop(rx);
1252
1253        let result = sender.send(b"test".to_vec()).await;
1254        assert!(result.is_err());
1255        assert!(matches!(result.unwrap_err(), ImportError::ChannelError(_)));
1256    }
1257
1258    // Test building SQL with compression
1259    #[test]
1260    fn test_build_query_compression_gzip() {
1261        let opts = CsvImportOptions::default().compression(Compression::Gzip);
1262        let query = opts.build_query("table", "127.0.0.1:8080", None);
1263        let sql = query.build();
1264
1265        assert!(sql.contains("FILE '001.csv.gz'"));
1266    }
1267
1268    #[test]
1269    fn test_build_query_compression_bzip2() {
1270        let opts = CsvImportOptions::default().compression(Compression::Bzip2);
1271        let query = opts.build_query("table", "127.0.0.1:8080", None);
1272        let sql = query.build();
1273
1274        assert!(sql.contains("FILE '001.csv.bz2'"));
1275    }
1276
1277    // Test detect_compression function
1278    #[test]
1279    fn test_detect_compression_explicit_overrides() {
1280        // Explicit compression should override file extension detection
1281        let path = Path::new("data.csv");
1282        assert_eq!(
1283            detect_compression(path, Compression::Gzip),
1284            Compression::Gzip
1285        );
1286        assert_eq!(
1287            detect_compression(path, Compression::Bzip2),
1288            Compression::Bzip2
1289        );
1290    }
1291
1292    #[test]
1293    fn test_detect_compression_from_extension_gzip() {
1294        let path = Path::new("data.csv.gz");
1295        assert_eq!(
1296            detect_compression(path, Compression::None),
1297            Compression::Gzip
1298        );
1299
1300        let path = Path::new("data.csv.gzip");
1301        assert_eq!(
1302            detect_compression(path, Compression::None),
1303            Compression::Gzip
1304        );
1305
1306        // Case insensitive
1307        let path = Path::new("DATA.CSV.GZ");
1308        assert_eq!(
1309            detect_compression(path, Compression::None),
1310            Compression::Gzip
1311        );
1312    }
1313
1314    #[test]
1315    fn test_detect_compression_from_extension_bzip2() {
1316        let path = Path::new("data.csv.bz2");
1317        assert_eq!(
1318            detect_compression(path, Compression::None),
1319            Compression::Bzip2
1320        );
1321
1322        let path = Path::new("data.csv.bzip2");
1323        assert_eq!(
1324            detect_compression(path, Compression::None),
1325            Compression::Bzip2
1326        );
1327    }
1328
1329    #[test]
1330    fn test_detect_compression_no_compression() {
1331        let path = Path::new("data.csv");
1332        assert_eq!(
1333            detect_compression(path, Compression::None),
1334            Compression::None
1335        );
1336
1337        let path = Path::new("data.txt");
1338        assert_eq!(
1339            detect_compression(path, Compression::None),
1340            Compression::None
1341        );
1342    }
1343
1344    // Test is_compressed_file function
1345    #[test]
1346    fn test_is_compressed_file_gzip() {
1347        assert!(is_compressed_file(Path::new("data.csv.gz")));
1348        assert!(is_compressed_file(Path::new("data.csv.gzip")));
1349        assert!(is_compressed_file(Path::new("DATA.CSV.GZ"))); // case insensitive
1350    }
1351
1352    #[test]
1353    fn test_is_compressed_file_bzip2() {
1354        assert!(is_compressed_file(Path::new("data.csv.bz2")));
1355        assert!(is_compressed_file(Path::new("data.csv.bzip2")));
1356    }
1357
1358    #[test]
1359    fn test_is_compressed_file_uncompressed() {
1360        assert!(!is_compressed_file(Path::new("data.csv")));
1361        assert!(!is_compressed_file(Path::new("data.txt")));
1362        assert!(!is_compressed_file(Path::new("data")));
1363    }
1364
1365    // Test write_chunked_data with mock (indirectly via compress + format)
1366    #[test]
1367    fn test_csv_row_formatting_with_compression() {
1368        // Test that compression works with formatted CSV data
1369        let row = vec!["1", "test data", "value"];
1370        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1371
1372        let compressed = compress_data(formatted.as_bytes(), Compression::Gzip).unwrap();
1373
1374        // Verify gzip magic bytes
1375        assert!(compressed.len() >= 2);
1376        assert_eq!(compressed[0], 0x1f);
1377        assert_eq!(compressed[1], 0x8b);
1378    }
1379
1380    #[test]
1381    fn test_multiple_rows_formatting() {
1382        // Test formatting multiple rows as CSV
1383        let rows = vec![
1384            vec!["1", "Alice", "alice@example.com"],
1385            vec!["2", "Bob", "bob@example.com"],
1386            vec!["3", "Charlie", "charlie@example.com"],
1387        ];
1388
1389        let mut data = Vec::new();
1390        for row in rows {
1391            let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1392            data.extend_from_slice(formatted.as_bytes());
1393        }
1394
1395        let expected =
1396            "1,Alice,alice@example.com\n2,Bob,bob@example.com\n3,Charlie,charlie@example.com\n";
1397        assert_eq!(String::from_utf8(data).unwrap(), expected);
1398    }
1399
1400    #[test]
1401    fn test_csv_special_characters_in_data() {
1402        // Test that special characters are properly escaped
1403        let row = vec!["1", "Hello, World!", "Contains \"quotes\""];
1404        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1405
1406        // Field with comma should be quoted, field with quotes should be quoted and escaped
1407        assert_eq!(
1408            formatted,
1409            "1,\"Hello, World!\",\"Contains \"\"quotes\"\"\"\n"
1410        );
1411    }
1412
1413    #[test]
1414    fn test_csv_row_with_newlines() {
1415        // Test that newlines in data are properly quoted
1416        let row = vec!["1", "Line1\nLine2", "normal"];
1417        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1418
1419        assert_eq!(formatted, "1,\"Line1\nLine2\",normal\n");
1420    }
1421
1422    #[test]
1423    fn test_csv_row_with_carriage_return() {
1424        // Test that carriage returns in data are properly quoted
1425        let row = vec!["1", "Line1\rLine2", "normal"];
1426        let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1427
1428        assert_eq!(formatted, "1,\"Line1\rLine2\",normal\n");
1429    }
1430
1431    // Test import flow documentation (protocol flow)
1432    #[test]
1433    fn test_import_protocol_flow_documentation() {
1434        // This test documents the expected protocol flow for IMPORT operations
1435        // The actual flow is tested in integration tests, but this ensures
1436        // the documentation in import_csv_internal matches expectations
1437
1438        // Protocol flow for IMPORT:
1439        // 1. Client connects to Exasol and gets internal address via handshake
1440        // 2. Client starts executing IMPORT SQL via WebSocket (async)
1441        // 3. Exasol sends HTTP GET request through the tunnel connection
1442        // 4. Client receives GET, sends HTTP response headers (chunked encoding)
1443        // 5. Client streams CSV data as chunked body
1444        // 6. Client sends final chunk (0\r\n\r\n)
1445        // 7. IMPORT SQL completes
1446
1447        // This test is informational - actual integration testing is done
1448        // against a real Exasol instance
1449    }
1450
1451    // Test compression detection with various path formats
1452    #[test]
1453    fn test_detect_compression_path_variations() {
1454        // Test with absolute paths
1455        let path = Path::new("/home/user/data/file.csv.gz");
1456        assert_eq!(
1457            detect_compression(path, Compression::None),
1458            Compression::Gzip
1459        );
1460
1461        // Test with relative paths
1462        let path = Path::new("./data/file.csv.bz2");
1463        assert_eq!(
1464            detect_compression(path, Compression::None),
1465            Compression::Bzip2
1466        );
1467
1468        // Test with just filename
1469        let path = Path::new("file.csv");
1470        assert_eq!(
1471            detect_compression(path, Compression::None),
1472            Compression::None
1473        );
1474    }
1475
1476    // Tests for build_multi_file_query
1477    #[test]
1478    fn test_build_multi_file_query_basic() {
1479        use crate::query::import::ImportFileEntry;
1480
1481        let options = CsvImportOptions::default();
1482        let entries = vec![
1483            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
1484            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
1485        ];
1486
1487        let query = build_multi_file_query("my_table", &options, entries);
1488        let sql = query.build();
1489
1490        assert!(sql.contains("IMPORT INTO my_table"));
1491        assert!(sql.contains("FROM CSV"));
1492        assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
1493        assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
1494    }
1495
1496    #[test]
1497    fn test_build_multi_file_query_with_options() {
1498        use crate::query::import::ImportFileEntry;
1499
1500        let options = CsvImportOptions::default()
1501            .schema("test_schema")
1502            .columns(vec!["id".to_string(), "name".to_string()])
1503            .skip_rows(1)
1504            .compression(Compression::Gzip);
1505
1506        let entries = vec![ImportFileEntry::new(
1507            "10.0.0.5:8563".to_string(),
1508            "001.csv".to_string(),
1509            None,
1510        )];
1511
1512        let query = build_multi_file_query("data", &options, entries);
1513        let sql = query.build();
1514
1515        assert!(sql.contains("IMPORT INTO test_schema.data (id, name)"));
1516        assert!(sql.contains("SKIP = 1"));
1517        assert!(sql.contains("FILE '001.csv.gz'"));
1518    }
1519}