1#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum RowSeparator {
12 #[default]
14 LF,
15 CR,
17 CRLF,
19}
20
21impl RowSeparator {
22 pub fn to_sql(&self) -> &'static str {
24 match self {
25 RowSeparator::LF => "LF",
26 RowSeparator::CR => "CR",
27 RowSeparator::CRLF => "CRLF",
28 }
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
34pub enum Compression {
35 #[default]
37 None,
38 Gzip,
40 Bzip2,
42}
43
44impl Compression {
45 pub fn extension(&self) -> &'static str {
47 match self {
48 Compression::None => ".csv",
49 Compression::Gzip => ".csv.gz",
50 Compression::Bzip2 => ".csv.bz2",
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
57pub enum TrimMode {
58 #[default]
60 None,
61 LTrim,
63 RTrim,
65 Trim,
67}
68
69impl TrimMode {
70 pub fn to_sql(&self) -> Option<&'static str> {
72 match self {
73 TrimMode::None => None,
74 TrimMode::LTrim => Some("LTRIM"),
75 TrimMode::RTrim => Some("RTRIM"),
76 TrimMode::Trim => Some("TRIM"),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
86pub struct ImportFileEntry {
87 pub address: String,
89 pub file_name: String,
91 pub public_key: Option<String>,
93}
94
95impl ImportFileEntry {
96 pub fn new(address: String, file_name: String, public_key: Option<String>) -> Self {
104 Self {
105 address,
106 file_name,
107 public_key,
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
134pub struct ImportQuery {
135 table: String,
137 schema: Option<String>,
139 columns: Option<Vec<String>>,
141 address: Option<String>,
143 public_key: Option<String>,
145 file_name: String,
147 file_entries: Option<Vec<ImportFileEntry>>,
149 column_separator: char,
151 column_delimiter: char,
153 row_separator: RowSeparator,
155 encoding: String,
157 skip: u32,
159 null_value: Option<String>,
161 trim: TrimMode,
163 compression: Compression,
165 reject_limit: Option<u32>,
167}
168
169impl ImportQuery {
170 pub fn new(table: &str) -> Self {
176 Self {
177 table: table.to_string(),
178 schema: None,
179 columns: None,
180 address: None,
181 public_key: None,
182 file_name: "001.csv".to_string(),
183 file_entries: None,
184 column_separator: ',',
185 column_delimiter: '"',
186 row_separator: RowSeparator::default(),
187 encoding: "UTF-8".to_string(),
188 skip: 0,
189 null_value: None,
190 trim: TrimMode::default(),
191 compression: Compression::default(),
192 reject_limit: None,
193 }
194 }
195
196 pub fn schema(mut self, schema: &str) -> Self {
201 self.schema = Some(schema.to_string());
202 self
203 }
204
205 pub fn columns(mut self, cols: Vec<&str>) -> Self {
212 self.columns = Some(cols.into_iter().map(String::from).collect());
213 self
214 }
215
216 pub fn at_address(mut self, addr: &str) -> Self {
221 self.address = Some(addr.to_string());
222 self
223 }
224
225 pub fn with_public_key(mut self, fingerprint: &str) -> Self {
232 self.public_key = Some(fingerprint.to_string());
233 self
234 }
235
236 pub fn file_name(mut self, name: &str) -> Self {
241 self.file_name = name.to_string();
242 self
243 }
244
245 pub fn column_separator(mut self, sep: char) -> Self {
250 self.column_separator = sep;
251 self
252 }
253
254 pub fn column_delimiter(mut self, delim: char) -> Self {
259 self.column_delimiter = delim;
260 self
261 }
262
263 pub fn row_separator(mut self, sep: RowSeparator) -> Self {
268 self.row_separator = sep;
269 self
270 }
271
272 pub fn encoding(mut self, enc: &str) -> Self {
277 self.encoding = enc.to_string();
278 self
279 }
280
281 pub fn skip(mut self, rows: u32) -> Self {
286 self.skip = rows;
287 self
288 }
289
290 pub fn null_value(mut self, val: &str) -> Self {
295 self.null_value = Some(val.to_string());
296 self
297 }
298
299 pub fn trim(mut self, trim: TrimMode) -> Self {
304 self.trim = trim;
305 self
306 }
307
308 pub fn compressed(mut self, compression: Compression) -> Self {
315 self.compression = compression;
316 self
317 }
318
319 pub fn reject_limit(mut self, limit: u32) -> Self {
324 self.reject_limit = Some(limit);
325 self
326 }
327
328 pub fn with_files(mut self, entries: Vec<ImportFileEntry>) -> Self {
341 self.file_entries = Some(entries);
342 self
343 }
344
345 fn get_file_name(&self) -> String {
347 let base_name = self
349 .file_name
350 .trim_end_matches(".csv")
351 .trim_end_matches(".gz")
352 .trim_end_matches(".bz2")
353 .trim_end_matches(".csv");
354
355 format!("{}{}", base_name, self.compression.extension())
356 }
357
358 pub fn build(&self) -> String {
364 let mut sql = String::with_capacity(512);
365
366 sql.push_str("IMPORT INTO ");
368 if let Some(ref schema) = self.schema {
369 sql.push_str(schema);
370 sql.push('.');
371 }
372 sql.push_str(&self.table);
373
374 if let Some(ref cols) = self.columns {
376 sql.push_str(" (");
377 sql.push_str(&cols.join(", "));
378 sql.push(')');
379 }
380
381 if let Some(ref entries) = self.file_entries {
383 sql.push_str("\nFROM CSV");
385
386 for entry in entries {
387 sql.push_str(" AT '");
388
389 if entry.public_key.is_some() {
391 sql.push_str("https://");
392 } else {
393 sql.push_str("http://");
394 }
395 sql.push_str(&entry.address);
396 sql.push('\'');
397
398 if let Some(ref pk) = entry.public_key {
400 sql.push_str(" PUBLIC KEY '");
401 sql.push_str(pk);
402 sql.push('\'');
403 }
404
405 sql.push_str(" FILE '");
407 sql.push_str(&self.get_file_name_for(&entry.file_name));
408 sql.push('\'');
409 }
410 } else {
411 sql.push_str("\nFROM CSV AT '");
413
414 if self.public_key.is_some() {
416 sql.push_str("https://");
417 } else {
418 sql.push_str("http://");
419 }
420
421 if let Some(ref addr) = self.address {
422 sql.push_str(addr);
423 }
424 sql.push('\'');
425
426 if let Some(ref pk) = self.public_key {
428 sql.push_str(" PUBLIC KEY '");
429 sql.push_str(pk);
430 sql.push('\'');
431 }
432
433 sql.push_str("\nFILE '");
435 sql.push_str(&self.get_file_name());
436 sql.push('\'');
437 }
438
439 sql.push_str("\nENCODING = '");
441 sql.push_str(&self.encoding);
442 sql.push('\'');
443
444 sql.push_str("\nCOLUMN SEPARATOR = '");
445 sql.push(self.column_separator);
446 sql.push('\'');
447
448 sql.push_str("\nCOLUMN DELIMITER = '");
449 sql.push(self.column_delimiter);
450 sql.push('\'');
451
452 sql.push_str("\nROW SEPARATOR = '");
453 sql.push_str(self.row_separator.to_sql());
454 sql.push('\'');
455
456 if self.skip > 0 {
458 sql.push_str("\nSKIP = ");
459 sql.push_str(&self.skip.to_string());
460 }
461
462 if let Some(ref null_val) = self.null_value {
464 sql.push_str("\nNULL = '");
465 sql.push_str(null_val);
466 sql.push('\'');
467 }
468
469 if let Some(trim_sql) = self.trim.to_sql() {
471 sql.push_str("\nTRIM = '");
472 sql.push_str(trim_sql);
473 sql.push('\'');
474 }
475
476 if let Some(limit) = self.reject_limit {
478 sql.push_str("\nREJECT LIMIT ");
479 sql.push_str(&limit.to_string());
480 }
481
482 sql
483 }
484
485 fn get_file_name_for(&self, base_name: &str) -> String {
487 let base = base_name
488 .trim_end_matches(".csv")
489 .trim_end_matches(".gz")
490 .trim_end_matches(".bz2")
491 .trim_end_matches(".csv");
492
493 format!("{}{}", base, self.compression.extension())
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500
501 #[test]
502 fn test_basic_import_statement() {
503 let sql = ImportQuery::new("users")
504 .at_address("192.168.1.1:8080")
505 .build();
506
507 assert!(sql.contains("IMPORT INTO users"));
508 assert!(sql.contains("FROM CSV AT 'http://192.168.1.1:8080'"));
509 assert!(sql.contains("FILE '001.csv'"));
510 assert!(sql.contains("ENCODING = 'UTF-8'"));
511 assert!(sql.contains("COLUMN SEPARATOR = ','"));
512 assert!(sql.contains("COLUMN DELIMITER = '\"'"));
513 assert!(sql.contains("ROW SEPARATOR = 'LF'"));
514 }
515
516 #[test]
517 fn test_import_with_schema() {
518 let sql = ImportQuery::new("users")
519 .schema("myschema")
520 .at_address("192.168.1.1:8080")
521 .build();
522
523 assert!(sql.contains("IMPORT INTO myschema.users"));
524 }
525
526 #[test]
527 fn test_import_with_columns() {
528 let sql = ImportQuery::new("users")
529 .columns(vec!["id", "name", "email"])
530 .at_address("192.168.1.1:8080")
531 .build();
532
533 assert!(sql.contains("IMPORT INTO users (id, name, email)"));
534 }
535
536 #[test]
537 fn test_import_with_all_format_options() {
538 let sql = ImportQuery::new("data")
539 .at_address("10.0.0.1:9000")
540 .column_separator(';')
541 .column_delimiter('\'')
542 .row_separator(RowSeparator::CRLF)
543 .encoding("ISO-8859-1")
544 .skip(2)
545 .null_value("NULL")
546 .trim(TrimMode::Trim)
547 .reject_limit(100)
548 .build();
549
550 assert!(sql.contains("COLUMN SEPARATOR = ';'"));
551 assert!(sql.contains("COLUMN DELIMITER = '''"));
552 assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
553 assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
554 assert!(sql.contains("SKIP = 2"));
555 assert!(sql.contains("NULL = 'NULL'"));
556 assert!(sql.contains("TRIM = 'TRIM'"));
557 assert!(sql.contains("REJECT LIMIT 100"));
558 }
559
560 #[test]
561 fn test_import_with_encryption() {
562 let fingerprint = "SHA256:abc123def456";
563 let sql = ImportQuery::new("secure_data")
564 .at_address("192.168.1.1:8443")
565 .with_public_key(fingerprint)
566 .build();
567
568 assert!(sql.contains("FROM CSV AT 'https://192.168.1.1:8443'"));
569 assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
570 }
571
572 #[test]
573 fn test_import_with_gzip_compression() {
574 let sql = ImportQuery::new("compressed_data")
575 .at_address("192.168.1.1:8080")
576 .compressed(Compression::Gzip)
577 .build();
578
579 assert!(sql.contains("FILE '001.csv.gz'"));
580 }
581
582 #[test]
583 fn test_import_with_bzip2_compression() {
584 let sql = ImportQuery::new("compressed_data")
585 .at_address("192.168.1.1:8080")
586 .compressed(Compression::Bzip2)
587 .build();
588
589 assert!(sql.contains("FILE '001.csv.bz2'"));
590 }
591
592 #[test]
593 fn test_import_custom_file_name() {
594 let sql = ImportQuery::new("data")
595 .at_address("192.168.1.1:8080")
596 .file_name("custom_file")
597 .build();
598
599 assert!(sql.contains("FILE 'custom_file.csv'"));
600 }
601
602 #[test]
603 fn test_import_custom_file_name_with_compression() {
604 let sql = ImportQuery::new("data")
605 .at_address("192.168.1.1:8080")
606 .file_name("custom_file")
607 .compressed(Compression::Gzip)
608 .build();
609
610 assert!(sql.contains("FILE 'custom_file.csv.gz'"));
611 }
612
613 #[test]
614 fn test_row_separator_to_sql() {
615 assert_eq!(RowSeparator::LF.to_sql(), "LF");
616 assert_eq!(RowSeparator::CR.to_sql(), "CR");
617 assert_eq!(RowSeparator::CRLF.to_sql(), "CRLF");
618 }
619
620 #[test]
621 fn test_compression_extension() {
622 assert_eq!(Compression::None.extension(), ".csv");
623 assert_eq!(Compression::Gzip.extension(), ".csv.gz");
624 assert_eq!(Compression::Bzip2.extension(), ".csv.bz2");
625 }
626
627 #[test]
628 fn test_trim_mode_to_sql() {
629 assert_eq!(TrimMode::None.to_sql(), None);
630 assert_eq!(TrimMode::LTrim.to_sql(), Some("LTRIM"));
631 assert_eq!(TrimMode::RTrim.to_sql(), Some("RTRIM"));
632 assert_eq!(TrimMode::Trim.to_sql(), Some("TRIM"));
633 }
634
635 #[test]
636 fn test_defaults() {
637 assert_eq!(RowSeparator::default(), RowSeparator::LF);
638 assert_eq!(Compression::default(), Compression::None);
639 assert_eq!(TrimMode::default(), TrimMode::None);
640 }
641
642 #[test]
643 fn test_import_file_entry() {
644 let entry = ImportFileEntry::new(
645 "10.0.0.5:8563".to_string(),
646 "001.csv".to_string(),
647 Some("sha256//abc123".to_string()),
648 );
649
650 assert_eq!(entry.address, "10.0.0.5:8563");
651 assert_eq!(entry.file_name, "001.csv");
652 assert_eq!(entry.public_key, Some("sha256//abc123".to_string()));
653 }
654
655 #[test]
656 fn test_multi_file_import_basic() {
657 let entries = vec![
658 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
659 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
660 ];
661
662 let sql = ImportQuery::new("my_table").with_files(entries).build();
663
664 assert!(sql.contains("IMPORT INTO my_table"));
665 assert!(sql.contains("FROM CSV"));
666 assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
667 assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
668 }
669
670 #[test]
671 fn test_multi_file_import_with_tls() {
672 let entries = vec![
673 ImportFileEntry::new(
674 "10.0.0.5:8563".to_string(),
675 "001.csv".to_string(),
676 Some("sha256//fingerprint1".to_string()),
677 ),
678 ImportFileEntry::new(
679 "10.0.0.6:8564".to_string(),
680 "002.csv".to_string(),
681 Some("sha256//fingerprint2".to_string()),
682 ),
683 ];
684
685 let sql = ImportQuery::new("secure_table").with_files(entries).build();
686
687 assert!(sql.contains("AT 'https://10.0.0.5:8563' PUBLIC KEY 'sha256//fingerprint1'"));
688 assert!(sql.contains("AT 'https://10.0.0.6:8564' PUBLIC KEY 'sha256//fingerprint2'"));
689 }
690
691 #[test]
692 fn test_multi_file_import_with_compression() {
693 let entries = vec![
694 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
695 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
696 ];
697
698 let sql = ImportQuery::new("compressed_table")
699 .with_files(entries)
700 .compressed(Compression::Gzip)
701 .build();
702
703 assert!(sql.contains("FILE '001.csv.gz'"));
704 assert!(sql.contains("FILE '002.csv.gz'"));
705 }
706
707 #[test]
708 fn test_multi_file_import_three_files() {
709 let entries = vec![
710 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
711 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
712 ImportFileEntry::new("10.0.0.7:8565".to_string(), "003.csv".to_string(), None),
713 ];
714
715 let sql = ImportQuery::new("data").with_files(entries).build();
716
717 assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
718 assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
719 assert!(sql.contains("AT 'http://10.0.0.7:8565' FILE '003.csv'"));
720 }
721
722 #[test]
723 fn test_multi_file_import_with_schema_and_columns() {
724 let entries = vec![
725 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
726 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
727 ];
728
729 let sql = ImportQuery::new("my_table")
730 .schema("my_schema")
731 .columns(vec!["id", "name", "value"])
732 .with_files(entries)
733 .build();
734
735 assert!(sql.contains("IMPORT INTO my_schema.my_table (id, name, value)"));
736 }
737
738 #[test]
739 fn test_multi_file_import_with_all_options() {
740 let entries = vec![
741 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
742 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
743 ];
744
745 let sql = ImportQuery::new("data")
746 .with_files(entries)
747 .encoding("ISO-8859-1")
748 .column_separator(';')
749 .skip(1)
750 .null_value("NULL")
751 .reject_limit(100)
752 .build();
753
754 assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
755 assert!(sql.contains("COLUMN SEPARATOR = ';'"));
756 assert!(sql.contains("SKIP = 1"));
757 assert!(sql.contains("NULL = 'NULL'"));
758 assert!(sql.contains("REJECT LIMIT 100"));
759 }
760
761 #[test]
762 fn test_import_no_skip_when_zero() {
763 let sql = ImportQuery::new("data")
764 .at_address("192.168.1.1:8080")
765 .build();
766
767 assert!(!sql.contains("SKIP"));
769 }
770
771 #[test]
772 fn test_import_skip_header_row() {
773 let sql = ImportQuery::new("data")
774 .at_address("192.168.1.1:8080")
775 .skip(1)
776 .build();
777
778 assert!(sql.contains("SKIP = 1"));
779 }
780
781 #[test]
782 fn test_complete_import_statement_format() {
783 let sql = ImportQuery::new("employees")
784 .schema("hr")
785 .columns(vec!["id", "first_name", "last_name", "department"])
786 .at_address("10.20.30.40:8080")
787 .with_public_key("SHA256:fingerprint123")
788 .skip(1)
789 .reject_limit(10)
790 .build();
791
792 let expected_parts = [
794 "IMPORT INTO hr.employees (id, first_name, last_name, department)",
795 "FROM CSV AT 'https://10.20.30.40:8080' PUBLIC KEY 'SHA256:fingerprint123'",
796 "FILE '001.csv'",
797 "ENCODING = 'UTF-8'",
798 "COLUMN SEPARATOR = ','",
799 "COLUMN DELIMITER = '\"'",
800 "ROW SEPARATOR = 'LF'",
801 "SKIP = 1",
802 "REJECT LIMIT 10",
803 ];
804
805 for part in expected_parts {
806 assert!(sql.contains(part), "SQL should contain: {}", part);
807 }
808 }
809}