Skip to main content

exarrow_rs/query/
import.rs

1//! Import query builder for generating IMPORT SQL statements.
2//!
3//! This module provides a builder pattern for constructing Exasol IMPORT statements
4//! that import data from HTTP endpoints into database tables.
5//!
6//! # Example
7//!
8
9/// Row separator options for CSV import.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum RowSeparator {
12    /// Line feed (Unix-style)
13    #[default]
14    LF,
15    /// Carriage return (old Mac-style)
16    CR,
17    /// Carriage return + line feed (Windows-style)
18    CRLF,
19}
20
21impl RowSeparator {
22    /// Convert to SQL string representation.
23    pub fn to_sql(&self) -> &'static str {
24        match self {
25            RowSeparator::LF => "LF",
26            RowSeparator::CR => "CR",
27            RowSeparator::CRLF => "CRLF",
28        }
29    }
30}
31
32/// Compression options for import files.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
34pub enum Compression {
35    /// No compression
36    #[default]
37    None,
38    /// Gzip compression (.gz extension)
39    Gzip,
40    /// Bzip2 compression (.bz2 extension)
41    Bzip2,
42}
43
44impl Compression {
45    /// Get the file extension for this compression type.
46    pub fn extension(&self) -> &'static str {
47        match self {
48            Compression::None => ".csv",
49            Compression::Gzip => ".csv.gz",
50            Compression::Bzip2 => ".csv.bz2",
51        }
52    }
53}
54
55/// Trim mode options for CSV import.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
57pub enum TrimMode {
58    /// No trimming
59    #[default]
60    None,
61    /// Trim leading whitespace
62    LTrim,
63    /// Trim trailing whitespace
64    RTrim,
65    /// Trim both leading and trailing whitespace
66    Trim,
67}
68
69impl TrimMode {
70    /// Convert to SQL string representation.
71    pub fn to_sql(&self) -> Option<&'static str> {
72        match self {
73            TrimMode::None => None,
74            TrimMode::LTrim => Some("LTRIM"),
75            TrimMode::RTrim => Some("RTRIM"),
76            TrimMode::Trim => Some("TRIM"),
77        }
78    }
79}
80
81/// Entry for a single file in a multi-file IMPORT statement.
82///
83/// Used with `ImportQuery::with_files()` to build IMPORT statements
84/// that reference multiple FILE clauses for parallel import.
85#[derive(Debug, Clone)]
86pub struct ImportFileEntry {
87    /// Internal address from EXA handshake (format: "host:port")
88    pub address: String,
89    /// File name for this entry (e.g., "001.csv", "002.csv")
90    pub file_name: String,
91    /// Optional public key fingerprint for TLS
92    pub public_key: Option<String>,
93}
94
95impl ImportFileEntry {
96    /// Create a new import file entry.
97    ///
98    /// # Arguments
99    ///
100    /// * `address` - Internal address from EXA handshake
101    /// * `file_name` - File name for this entry
102    /// * `public_key` - Optional public key fingerprint for TLS
103    pub fn new(address: String, file_name: String, public_key: Option<String>) -> Self {
104        Self {
105            address,
106            file_name,
107            public_key,
108        }
109    }
110}
111
112/// Builder for constructing Exasol IMPORT SQL statements.
113///
114/// The ImportQuery builder allows you to configure all aspects of an IMPORT statement
115/// including the target table, columns, CSV format options, and error handling.
116///
117/// # Multi-file Import
118///
119/// For parallel imports, use `with_files()` instead of `at_address()` and `file_name()`:
120///
121/// ```rust
122/// use exarrow_rs::query::import::{ImportQuery, ImportFileEntry};
123///
124/// let entries = vec![
125///     ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
126///     ImportFileEntry::new("10.0.0.6:8563".to_string(), "002.csv".to_string(), None),
127/// ];
128///
129/// let query = ImportQuery::new("my_table")
130///     .with_files(entries)
131///     .build();
132/// ```
133#[derive(Debug, Clone)]
134pub struct ImportQuery {
135    /// Target table name (required)
136    table: String,
137    /// Target schema (optional)
138    schema: Option<String>,
139    /// Columns to import into (optional, imports all if not specified)
140    columns: Option<Vec<String>>,
141    /// HTTP address for the CSV source (e.g., "192.168.1.1:8080") - single file mode
142    address: Option<String>,
143    /// SHA-256 fingerprint for TLS public key verification - single file mode
144    public_key: Option<String>,
145    /// File name (default "001.csv") - single file mode
146    file_name: String,
147    /// Multiple file entries for parallel import
148    file_entries: Option<Vec<ImportFileEntry>>,
149    /// Column separator character (default ',')
150    column_separator: char,
151    /// Column delimiter character for quoting (default '"')
152    column_delimiter: char,
153    /// Row separator (default LF)
154    row_separator: RowSeparator,
155    /// Character encoding (default "UTF-8")
156    encoding: String,
157    /// Number of header rows to skip (default 0)
158    skip: u32,
159    /// Custom NULL value representation
160    null_value: Option<String>,
161    /// Trim mode for values
162    trim: TrimMode,
163    /// Compression type
164    compression: Compression,
165    /// Maximum number of invalid rows before failure
166    reject_limit: Option<u32>,
167}
168
169impl ImportQuery {
170    /// Create a new ImportQuery builder for the specified table.
171    ///
172    /// # Arguments
173    /// * `table` - The name of the target table to import into
174    ///
175    pub fn new(table: &str) -> Self {
176        Self {
177            table: table.to_string(),
178            schema: None,
179            columns: None,
180            address: None,
181            public_key: None,
182            file_name: "001.csv".to_string(),
183            file_entries: None,
184            column_separator: ',',
185            column_delimiter: '"',
186            row_separator: RowSeparator::default(),
187            encoding: "UTF-8".to_string(),
188            skip: 0,
189            null_value: None,
190            trim: TrimMode::default(),
191            compression: Compression::default(),
192            reject_limit: None,
193        }
194    }
195
196    /// Set the target schema for the import.
197    ///
198    /// # Arguments
199    /// * `schema` - The schema name
200    pub fn schema(mut self, schema: &str) -> Self {
201        self.schema = Some(schema.to_string());
202        self
203    }
204
205    /// Set the columns to import into.
206    ///
207    /// If not specified, all columns in the table will be used.
208    ///
209    /// # Arguments
210    /// * `cols` - List of column names
211    pub fn columns(mut self, cols: Vec<&str>) -> Self {
212        self.columns = Some(cols.into_iter().map(String::from).collect());
213        self
214    }
215
216    /// Set the HTTP address to import from.
217    ///
218    /// # Arguments
219    /// * `addr` - HTTP address in "host:port" format
220    pub fn at_address(mut self, addr: &str) -> Self {
221        self.address = Some(addr.to_string());
222        self
223    }
224
225    /// Set the public key fingerprint for TLS verification.
226    ///
227    /// When set, HTTPS will be used and the PUBLIC KEY clause will be added.
228    ///
229    /// # Arguments
230    /// * `fingerprint` - SHA-256 fingerprint of the server's public key
231    pub fn with_public_key(mut self, fingerprint: &str) -> Self {
232        self.public_key = Some(fingerprint.to_string());
233        self
234    }
235
236    /// Set the file name for the import.
237    ///
238    /// # Arguments
239    /// * `name` - File name (default "001.csv")
240    pub fn file_name(mut self, name: &str) -> Self {
241        self.file_name = name.to_string();
242        self
243    }
244
245    /// Set the column separator character.
246    ///
247    /// # Arguments
248    /// * `sep` - Separator character (default ',')
249    pub fn column_separator(mut self, sep: char) -> Self {
250        self.column_separator = sep;
251        self
252    }
253
254    /// Set the column delimiter character for quoting.
255    ///
256    /// # Arguments
257    /// * `delim` - Delimiter character (default '"')
258    pub fn column_delimiter(mut self, delim: char) -> Self {
259        self.column_delimiter = delim;
260        self
261    }
262
263    /// Set the row separator.
264    ///
265    /// # Arguments
266    /// * `sep` - Row separator (default LF)
267    pub fn row_separator(mut self, sep: RowSeparator) -> Self {
268        self.row_separator = sep;
269        self
270    }
271
272    /// Set the character encoding.
273    ///
274    /// # Arguments
275    /// * `enc` - Encoding name (default "UTF-8")
276    pub fn encoding(mut self, enc: &str) -> Self {
277        self.encoding = enc.to_string();
278        self
279    }
280
281    /// Set the number of header rows to skip.
282    ///
283    /// # Arguments
284    /// * `rows` - Number of rows to skip (default 0)
285    pub fn skip(mut self, rows: u32) -> Self {
286        self.skip = rows;
287        self
288    }
289
290    /// Set a custom NULL value representation.
291    ///
292    /// # Arguments
293    /// * `val` - String representing NULL values in the CSV
294    pub fn null_value(mut self, val: &str) -> Self {
295        self.null_value = Some(val.to_string());
296        self
297    }
298
299    /// Set the trim mode for imported values.
300    ///
301    /// # Arguments
302    /// * `trim` - Trim mode to apply
303    pub fn trim(mut self, trim: TrimMode) -> Self {
304        self.trim = trim;
305        self
306    }
307
308    /// Set the compression type for the import file.
309    ///
310    /// This affects the file extension in the generated SQL.
311    ///
312    /// # Arguments
313    /// * `compression` - Compression type
314    pub fn compressed(mut self, compression: Compression) -> Self {
315        self.compression = compression;
316        self
317    }
318
319    /// Set the reject limit for error handling.
320    ///
321    /// # Arguments
322    /// * `limit` - Maximum number of invalid rows before the import fails
323    pub fn reject_limit(mut self, limit: u32) -> Self {
324        self.reject_limit = Some(limit);
325        self
326    }
327
328    /// Set multiple file entries for parallel import.
329    ///
330    /// This method enables multi-file IMPORT statements where each file
331    /// has its own AT/FILE clause with a unique internal address.
332    ///
333    /// # Arguments
334    /// * `entries` - Vector of file entries with addresses and file names
335    ///
336    /// # Note
337    ///
338    /// When using `with_files()`, the `at_address()` and `file_name()` methods
339    /// are ignored and should not be called.
340    pub fn with_files(mut self, entries: Vec<ImportFileEntry>) -> Self {
341        self.file_entries = Some(entries);
342        self
343    }
344
345    /// Get the configured file name with appropriate extension for compression.
346    fn get_file_name(&self) -> String {
347        // If compression is set and file_name doesn't already have the right extension
348        let base_name = self
349            .file_name
350            .trim_end_matches(".csv")
351            .trim_end_matches(".gz")
352            .trim_end_matches(".bz2")
353            .trim_end_matches(".csv");
354
355        format!("{}{}", base_name, self.compression.extension())
356    }
357
358    /// Build the complete IMPORT SQL statement.
359    ///
360    /// # Returns
361    /// The generated IMPORT SQL statement as a string.
362    ///
363    pub fn build(&self) -> String {
364        let mut sql = String::with_capacity(512);
365
366        // IMPORT INTO clause
367        sql.push_str("IMPORT INTO ");
368        if let Some(ref schema) = self.schema {
369            sql.push_str(schema);
370            sql.push('.');
371        }
372        sql.push_str(&self.table);
373
374        // Column list
375        if let Some(ref cols) = self.columns {
376            sql.push_str(" (");
377            sql.push_str(&cols.join(", "));
378            sql.push(')');
379        }
380
381        // FROM CSV clause - either multi-file or single-file mode
382        if let Some(ref entries) = self.file_entries {
383            // Multi-file mode: FROM CSV AT 'addr1' FILE '001.csv' AT 'addr2' FILE '002.csv' ...
384            sql.push_str("\nFROM CSV");
385
386            for entry in entries {
387                sql.push_str(" AT '");
388
389                // Use https:// if public_key is set, otherwise http://
390                if entry.public_key.is_some() {
391                    sql.push_str("https://");
392                } else {
393                    sql.push_str("http://");
394                }
395                sql.push_str(&entry.address);
396                sql.push('\'');
397
398                // PUBLIC KEY clause for this entry
399                if let Some(ref pk) = entry.public_key {
400                    sql.push_str(" PUBLIC KEY '");
401                    sql.push_str(pk);
402                    sql.push('\'');
403                }
404
405                // FILE clause for this entry
406                sql.push_str(" FILE '");
407                sql.push_str(&self.get_file_name_for(&entry.file_name));
408                sql.push('\'');
409            }
410        } else {
411            // Single-file mode: FROM CSV AT 'addr' FILE '001.csv'
412            sql.push_str("\nFROM CSV AT '");
413
414            // Use https:// if public_key is set, otherwise http://
415            if self.public_key.is_some() {
416                sql.push_str("https://");
417            } else {
418                sql.push_str("http://");
419            }
420
421            if let Some(ref addr) = self.address {
422                sql.push_str(addr);
423            }
424            sql.push('\'');
425
426            // PUBLIC KEY clause
427            if let Some(ref pk) = self.public_key {
428                sql.push_str(" PUBLIC KEY '");
429                sql.push_str(pk);
430                sql.push('\'');
431            }
432
433            // FILE clause
434            sql.push_str("\nFILE '");
435            sql.push_str(&self.get_file_name());
436            sql.push('\'');
437        }
438
439        // Format options
440        sql.push_str("\nENCODING = '");
441        sql.push_str(&self.encoding);
442        sql.push('\'');
443
444        sql.push_str("\nCOLUMN SEPARATOR = '");
445        sql.push(self.column_separator);
446        sql.push('\'');
447
448        sql.push_str("\nCOLUMN DELIMITER = '");
449        sql.push(self.column_delimiter);
450        sql.push('\'');
451
452        sql.push_str("\nROW SEPARATOR = '");
453        sql.push_str(self.row_separator.to_sql());
454        sql.push('\'');
455
456        // Optional SKIP
457        if self.skip > 0 {
458            sql.push_str("\nSKIP = ");
459            sql.push_str(&self.skip.to_string());
460        }
461
462        // Optional NULL value
463        if let Some(ref null_val) = self.null_value {
464            sql.push_str("\nNULL = '");
465            sql.push_str(null_val);
466            sql.push('\'');
467        }
468
469        // Optional TRIM
470        if let Some(trim_sql) = self.trim.to_sql() {
471            sql.push_str("\nTRIM = '");
472            sql.push_str(trim_sql);
473            sql.push('\'');
474        }
475
476        // Optional REJECT LIMIT
477        if let Some(limit) = self.reject_limit {
478            sql.push_str("\nREJECT LIMIT ");
479            sql.push_str(&limit.to_string());
480        }
481
482        sql
483    }
484
485    /// Get file name with compression extension for multi-file entries.
486    fn get_file_name_for(&self, base_name: &str) -> String {
487        let base = base_name
488            .trim_end_matches(".csv")
489            .trim_end_matches(".gz")
490            .trim_end_matches(".bz2")
491            .trim_end_matches(".csv");
492
493        format!("{}{}", base, self.compression.extension())
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    #[test]
502    fn test_basic_import_statement() {
503        let sql = ImportQuery::new("users")
504            .at_address("192.168.1.1:8080")
505            .build();
506
507        assert!(sql.contains("IMPORT INTO users"));
508        assert!(sql.contains("FROM CSV AT 'http://192.168.1.1:8080'"));
509        assert!(sql.contains("FILE '001.csv'"));
510        assert!(sql.contains("ENCODING = 'UTF-8'"));
511        assert!(sql.contains("COLUMN SEPARATOR = ','"));
512        assert!(sql.contains("COLUMN DELIMITER = '\"'"));
513        assert!(sql.contains("ROW SEPARATOR = 'LF'"));
514    }
515
516    #[test]
517    fn test_import_with_schema() {
518        let sql = ImportQuery::new("users")
519            .schema("myschema")
520            .at_address("192.168.1.1:8080")
521            .build();
522
523        assert!(sql.contains("IMPORT INTO myschema.users"));
524    }
525
526    #[test]
527    fn test_import_with_columns() {
528        let sql = ImportQuery::new("users")
529            .columns(vec!["id", "name", "email"])
530            .at_address("192.168.1.1:8080")
531            .build();
532
533        assert!(sql.contains("IMPORT INTO users (id, name, email)"));
534    }
535
536    #[test]
537    fn test_import_with_all_format_options() {
538        let sql = ImportQuery::new("data")
539            .at_address("10.0.0.1:9000")
540            .column_separator(';')
541            .column_delimiter('\'')
542            .row_separator(RowSeparator::CRLF)
543            .encoding("ISO-8859-1")
544            .skip(2)
545            .null_value("NULL")
546            .trim(TrimMode::Trim)
547            .reject_limit(100)
548            .build();
549
550        assert!(sql.contains("COLUMN SEPARATOR = ';'"));
551        assert!(sql.contains("COLUMN DELIMITER = '''"));
552        assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
553        assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
554        assert!(sql.contains("SKIP = 2"));
555        assert!(sql.contains("NULL = 'NULL'"));
556        assert!(sql.contains("TRIM = 'TRIM'"));
557        assert!(sql.contains("REJECT LIMIT 100"));
558    }
559
560    #[test]
561    fn test_import_with_encryption() {
562        let fingerprint = "SHA256:abc123def456";
563        let sql = ImportQuery::new("secure_data")
564            .at_address("192.168.1.1:8443")
565            .with_public_key(fingerprint)
566            .build();
567
568        assert!(sql.contains("FROM CSV AT 'https://192.168.1.1:8443'"));
569        assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
570    }
571
572    #[test]
573    fn test_import_with_gzip_compression() {
574        let sql = ImportQuery::new("compressed_data")
575            .at_address("192.168.1.1:8080")
576            .compressed(Compression::Gzip)
577            .build();
578
579        assert!(sql.contains("FILE '001.csv.gz'"));
580    }
581
582    #[test]
583    fn test_import_with_bzip2_compression() {
584        let sql = ImportQuery::new("compressed_data")
585            .at_address("192.168.1.1:8080")
586            .compressed(Compression::Bzip2)
587            .build();
588
589        assert!(sql.contains("FILE '001.csv.bz2'"));
590    }
591
592    #[test]
593    fn test_import_custom_file_name() {
594        let sql = ImportQuery::new("data")
595            .at_address("192.168.1.1:8080")
596            .file_name("custom_file")
597            .build();
598
599        assert!(sql.contains("FILE 'custom_file.csv'"));
600    }
601
602    #[test]
603    fn test_import_custom_file_name_with_compression() {
604        let sql = ImportQuery::new("data")
605            .at_address("192.168.1.1:8080")
606            .file_name("custom_file")
607            .compressed(Compression::Gzip)
608            .build();
609
610        assert!(sql.contains("FILE 'custom_file.csv.gz'"));
611    }
612
613    #[test]
614    fn test_row_separator_to_sql() {
615        assert_eq!(RowSeparator::LF.to_sql(), "LF");
616        assert_eq!(RowSeparator::CR.to_sql(), "CR");
617        assert_eq!(RowSeparator::CRLF.to_sql(), "CRLF");
618    }
619
620    #[test]
621    fn test_compression_extension() {
622        assert_eq!(Compression::None.extension(), ".csv");
623        assert_eq!(Compression::Gzip.extension(), ".csv.gz");
624        assert_eq!(Compression::Bzip2.extension(), ".csv.bz2");
625    }
626
627    #[test]
628    fn test_trim_mode_to_sql() {
629        assert_eq!(TrimMode::None.to_sql(), None);
630        assert_eq!(TrimMode::LTrim.to_sql(), Some("LTRIM"));
631        assert_eq!(TrimMode::RTrim.to_sql(), Some("RTRIM"));
632        assert_eq!(TrimMode::Trim.to_sql(), Some("TRIM"));
633    }
634
635    #[test]
636    fn test_defaults() {
637        assert_eq!(RowSeparator::default(), RowSeparator::LF);
638        assert_eq!(Compression::default(), Compression::None);
639        assert_eq!(TrimMode::default(), TrimMode::None);
640    }
641
642    #[test]
643    fn test_import_file_entry() {
644        let entry = ImportFileEntry::new(
645            "10.0.0.5:8563".to_string(),
646            "001.csv".to_string(),
647            Some("sha256//abc123".to_string()),
648        );
649
650        assert_eq!(entry.address, "10.0.0.5:8563");
651        assert_eq!(entry.file_name, "001.csv");
652        assert_eq!(entry.public_key, Some("sha256//abc123".to_string()));
653    }
654
655    #[test]
656    fn test_multi_file_import_basic() {
657        let entries = vec![
658            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
659            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
660        ];
661
662        let sql = ImportQuery::new("my_table").with_files(entries).build();
663
664        assert!(sql.contains("IMPORT INTO my_table"));
665        assert!(sql.contains("FROM CSV"));
666        assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
667        assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
668    }
669
670    #[test]
671    fn test_multi_file_import_with_tls() {
672        let entries = vec![
673            ImportFileEntry::new(
674                "10.0.0.5:8563".to_string(),
675                "001.csv".to_string(),
676                Some("sha256//fingerprint1".to_string()),
677            ),
678            ImportFileEntry::new(
679                "10.0.0.6:8564".to_string(),
680                "002.csv".to_string(),
681                Some("sha256//fingerprint2".to_string()),
682            ),
683        ];
684
685        let sql = ImportQuery::new("secure_table").with_files(entries).build();
686
687        assert!(sql.contains("AT 'https://10.0.0.5:8563' PUBLIC KEY 'sha256//fingerprint1'"));
688        assert!(sql.contains("AT 'https://10.0.0.6:8564' PUBLIC KEY 'sha256//fingerprint2'"));
689    }
690
691    #[test]
692    fn test_multi_file_import_with_compression() {
693        let entries = vec![
694            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
695            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
696        ];
697
698        let sql = ImportQuery::new("compressed_table")
699            .with_files(entries)
700            .compressed(Compression::Gzip)
701            .build();
702
703        assert!(sql.contains("FILE '001.csv.gz'"));
704        assert!(sql.contains("FILE '002.csv.gz'"));
705    }
706
707    #[test]
708    fn test_multi_file_import_three_files() {
709        let entries = vec![
710            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
711            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
712            ImportFileEntry::new("10.0.0.7:8565".to_string(), "003.csv".to_string(), None),
713        ];
714
715        let sql = ImportQuery::new("data").with_files(entries).build();
716
717        assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
718        assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
719        assert!(sql.contains("AT 'http://10.0.0.7:8565' FILE '003.csv'"));
720    }
721
722    #[test]
723    fn test_multi_file_import_with_schema_and_columns() {
724        let entries = vec![
725            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
726            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
727        ];
728
729        let sql = ImportQuery::new("my_table")
730            .schema("my_schema")
731            .columns(vec!["id", "name", "value"])
732            .with_files(entries)
733            .build();
734
735        assert!(sql.contains("IMPORT INTO my_schema.my_table (id, name, value)"));
736    }
737
738    #[test]
739    fn test_multi_file_import_with_all_options() {
740        let entries = vec![
741            ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
742            ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
743        ];
744
745        let sql = ImportQuery::new("data")
746            .with_files(entries)
747            .encoding("ISO-8859-1")
748            .column_separator(';')
749            .skip(1)
750            .null_value("NULL")
751            .reject_limit(100)
752            .build();
753
754        assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
755        assert!(sql.contains("COLUMN SEPARATOR = ';'"));
756        assert!(sql.contains("SKIP = 1"));
757        assert!(sql.contains("NULL = 'NULL'"));
758        assert!(sql.contains("REJECT LIMIT 100"));
759    }
760
761    #[test]
762    fn test_import_no_skip_when_zero() {
763        let sql = ImportQuery::new("data")
764            .at_address("192.168.1.1:8080")
765            .build();
766
767        // SKIP should not appear when it's 0
768        assert!(!sql.contains("SKIP"));
769    }
770
771    #[test]
772    fn test_import_skip_header_row() {
773        let sql = ImportQuery::new("data")
774            .at_address("192.168.1.1:8080")
775            .skip(1)
776            .build();
777
778        assert!(sql.contains("SKIP = 1"));
779    }
780
781    #[test]
782    fn test_complete_import_statement_format() {
783        let sql = ImportQuery::new("employees")
784            .schema("hr")
785            .columns(vec!["id", "first_name", "last_name", "department"])
786            .at_address("10.20.30.40:8080")
787            .with_public_key("SHA256:fingerprint123")
788            .skip(1)
789            .reject_limit(10)
790            .build();
791
792        // Verify the complete structure
793        let expected_parts = [
794            "IMPORT INTO hr.employees (id, first_name, last_name, department)",
795            "FROM CSV AT 'https://10.20.30.40:8080' PUBLIC KEY 'SHA256:fingerprint123'",
796            "FILE '001.csv'",
797            "ENCODING = 'UTF-8'",
798            "COLUMN SEPARATOR = ','",
799            "COLUMN DELIMITER = '\"'",
800            "ROW SEPARATOR = 'LF'",
801            "SKIP = 1",
802            "REJECT LIMIT 10",
803        ];
804
805        for part in expected_parts {
806            assert!(sql.contains(part), "SQL should contain: {}", part);
807        }
808    }
809}