1use std::future::Future;
7use std::io::Write;
8use std::path::Path;
9
10use bzip2::write::BzEncoder;
11use flate2::write::GzEncoder;
12use tokio::io::{AsyncRead, AsyncReadExt};
13use tokio::sync::mpsc;
14
15use crate::query::import::{Compression, ImportFileEntry, ImportQuery, RowSeparator, TrimMode};
16use crate::transport::HttpTransportClient;
17
18use super::parallel::{stream_files_parallel, ParallelTransportPool};
19use super::source::IntoFileSources;
20use super::ImportError;
21
22const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
24
25const CHANNEL_BUFFER_SIZE: usize = 16;
27
28#[derive(Debug, Clone)]
30pub struct CsvImportOptions {
31 pub encoding: String,
33
34 pub column_separator: char,
36
37 pub column_delimiter: char,
39
40 pub row_separator: RowSeparator,
42
43 pub skip_rows: u32,
45
46 pub null_value: Option<String>,
48
49 pub trim_mode: TrimMode,
51
52 pub compression: Compression,
54
55 pub reject_limit: Option<u32>,
57
58 pub use_tls: bool,
60
61 pub schema: Option<String>,
63
64 pub columns: Option<Vec<String>>,
66
67 pub host: String,
70
71 pub port: u16,
74}
75
76impl Default for CsvImportOptions {
77 fn default() -> Self {
78 Self {
79 encoding: "UTF-8".to_string(),
80 column_separator: ',',
81 column_delimiter: '"',
82 row_separator: RowSeparator::LF,
83 skip_rows: 0,
84 null_value: None,
85 trim_mode: TrimMode::None,
86 compression: Compression::None,
87 reject_limit: None,
88 use_tls: false,
89 schema: None,
90 columns: None,
91 host: String::new(),
92 port: 0,
93 }
94 }
95}
96
97impl CsvImportOptions {
98 #[must_use]
100 pub fn new() -> Self {
101 Self::default()
102 }
103
104 #[must_use]
105 pub fn encoding(mut self, encoding: &str) -> Self {
106 self.encoding = encoding.to_string();
107 self
108 }
109
110 #[must_use]
111 pub fn column_separator(mut self, sep: char) -> Self {
112 self.column_separator = sep;
113 self
114 }
115
116 #[must_use]
117 pub fn column_delimiter(mut self, delim: char) -> Self {
118 self.column_delimiter = delim;
119 self
120 }
121
122 #[must_use]
123 pub fn row_separator(mut self, sep: RowSeparator) -> Self {
124 self.row_separator = sep;
125 self
126 }
127
128 #[must_use]
129 pub fn skip_rows(mut self, rows: u32) -> Self {
130 self.skip_rows = rows;
131 self
132 }
133
134 #[must_use]
135 pub fn null_value(mut self, val: &str) -> Self {
136 self.null_value = Some(val.to_string());
137 self
138 }
139
140 #[must_use]
141 pub fn trim_mode(mut self, mode: TrimMode) -> Self {
142 self.trim_mode = mode;
143 self
144 }
145
146 #[must_use]
147 pub fn compression(mut self, compression: Compression) -> Self {
148 self.compression = compression;
149 self
150 }
151
152 #[must_use]
153 pub fn reject_limit(mut self, limit: u32) -> Self {
154 self.reject_limit = Some(limit);
155 self
156 }
157
158 #[must_use]
159 pub fn use_tls(mut self, use_tls: bool) -> Self {
160 self.use_tls = use_tls;
161 self
162 }
163
164 #[must_use]
165 pub fn schema(mut self, schema: &str) -> Self {
166 self.schema = Some(schema.to_string());
167 self
168 }
169
170 #[must_use]
171 pub fn columns(mut self, columns: Vec<String>) -> Self {
172 self.columns = Some(columns);
173 self
174 }
175
176 #[must_use]
183 pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
184 self.host = host.into();
185 self
186 }
187
188 #[must_use]
192 pub fn exasol_port(mut self, port: u16) -> Self {
193 self.port = port;
194 self
195 }
196
197 fn build_query(&self, table: &str, address: &str, public_key: Option<&str>) -> ImportQuery {
199 let mut query = ImportQuery::new(table).at_address(address);
200
201 if let Some(ref schema) = self.schema {
202 query = query.schema(schema);
203 }
204
205 if let Some(ref columns) = self.columns {
206 let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
207 query = query.columns(cols);
208 }
209
210 if let Some(pk) = public_key {
211 query = query.with_public_key(pk);
212 }
213
214 query = query
215 .encoding(&self.encoding)
216 .column_separator(self.column_separator)
217 .column_delimiter(self.column_delimiter)
218 .row_separator(self.row_separator)
219 .skip(self.skip_rows)
220 .trim(self.trim_mode)
221 .compressed(self.compression);
222
223 if let Some(ref null_val) = self.null_value {
224 query = query.null_value(null_val);
225 }
226
227 if let Some(limit) = self.reject_limit {
228 query = query.reject_limit(limit);
229 }
230
231 query
232 }
233}
234
235pub struct DataPipeSender {
239 tx: mpsc::Sender<Vec<u8>>,
240}
241
242impl DataPipeSender {
243 fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
245 Self { tx }
246 }
247
248 pub async fn send(&self, data: Vec<u8>) -> Result<(), ImportError> {
258 self.tx
259 .send(data)
260 .await
261 .map_err(|e| ImportError::ChannelError(format!("Failed to send data: {e}")))
262 }
263
264 pub async fn send_row<I, T>(
277 &self,
278 row: I,
279 separator: char,
280 delimiter: char,
281 row_separator: &RowSeparator,
282 ) -> Result<(), ImportError>
283 where
284 I: IntoIterator<Item = T>,
285 T: AsRef<str>,
286 {
287 let formatted = format_csv_row(row, separator, delimiter, row_separator);
288 self.send(formatted.into_bytes()).await
289 }
290}
291
292fn format_csv_row<I, T>(
294 row: I,
295 separator: char,
296 delimiter: char,
297 row_separator: &RowSeparator,
298) -> String
299where
300 I: IntoIterator<Item = T>,
301 T: AsRef<str>,
302{
303 let mut line = String::new();
304 let mut first = true;
305
306 for field in row {
307 if !first {
308 line.push(separator);
309 }
310 first = false;
311
312 let value = field.as_ref();
313 let needs_quoting = value.contains(separator)
315 || value.contains(delimiter)
316 || value.contains('\n')
317 || value.contains('\r');
318
319 if needs_quoting {
320 line.push(delimiter);
321 for ch in value.chars() {
323 if ch == delimiter {
324 line.push(delimiter);
325 }
326 line.push(ch);
327 }
328 line.push(delimiter);
329 } else {
330 line.push_str(value);
331 }
332 }
333
334 match row_separator {
336 RowSeparator::LF => line.push('\n'),
337 RowSeparator::CR => line.push('\r'),
338 RowSeparator::CRLF => {
339 line.push('\r');
340 line.push('\n');
341 }
342 }
343
344 line
345}
346
347fn detect_compression(file_path: &Path, explicit_compression: Compression) -> Compression {
358 if explicit_compression != Compression::None {
359 return explicit_compression;
360 }
361
362 let path_str = file_path.to_string_lossy().to_lowercase();
363
364 if path_str.ends_with(".gz") || path_str.ends_with(".gzip") {
365 Compression::Gzip
366 } else if path_str.ends_with(".bz2") || path_str.ends_with(".bzip2") {
367 Compression::Bzip2
368 } else {
369 Compression::None
370 }
371}
372
373fn is_compressed_file(file_path: &Path) -> bool {
383 let path_str = file_path.to_string_lossy().to_lowercase();
384 path_str.ends_with(".gz")
385 || path_str.ends_with(".gzip")
386 || path_str.ends_with(".bz2")
387 || path_str.ends_with(".bzip2")
388}
389
390pub async fn import_from_file<F, Fut>(
413 execute_sql: F,
414 table: &str,
415 file_path: &Path,
416 options: CsvImportOptions,
417) -> Result<u64, ImportError>
418where
419 F: FnOnce(String) -> Fut,
420 Fut: Future<Output = Result<u64, String>>,
421{
422 let compression = detect_compression(file_path, options.compression);
424 let options = CsvImportOptions {
425 compression,
426 ..options
427 };
428
429 let file_is_compressed = is_compressed_file(file_path);
431
432 let data = std::fs::read(file_path)?;
434
435 import_csv_internal(
436 execute_sql,
437 table,
438 options,
439 move |mut client, compression| {
440 Box::pin(async move {
441 let compressed_data = if file_is_compressed {
443 data
445 } else {
446 compress_data(&data, compression)?
447 };
448
449 send_import_response(&mut client, &compressed_data).await?;
451
452 Ok(())
453 })
454 },
455 )
456 .await
457}
458
459pub async fn import_from_files<F, Fut, S>(
506 execute_sql: F,
507 table: &str,
508 file_paths: S,
509 options: CsvImportOptions,
510) -> Result<u64, ImportError>
511where
512 F: FnOnce(String) -> Fut,
513 Fut: Future<Output = Result<u64, String>>,
514 S: IntoFileSources,
515{
516 let paths = file_paths.into_sources();
517
518 if paths.len() == 1 {
520 return import_from_file(execute_sql, table, &paths[0], options).await;
521 }
522
523 if paths.is_empty() {
524 return Err(ImportError::InvalidConfig(
525 "No files provided for import".to_string(),
526 ));
527 }
528
529 let compression = options.compression;
531 let mut file_data_vec: Vec<Vec<u8>> = Vec::with_capacity(paths.len());
532
533 for path in &paths {
534 let detected_compression = detect_compression(path, compression);
535 let file_is_compressed = is_compressed_file(path);
536
537 let data = std::fs::read(path)?;
539
540 let compressed_data = if file_is_compressed {
542 data
543 } else {
544 compress_data(&data, detected_compression)?
545 };
546
547 file_data_vec.push(compressed_data);
548 }
549
550 let pool =
552 ParallelTransportPool::connect(&options.host, options.port, options.use_tls, paths.len())
553 .await?;
554
555 let entries: Vec<ImportFileEntry> = pool
557 .file_entries()
558 .iter()
559 .map(|e| ImportFileEntry::new(e.address.clone(), e.file_name.clone(), e.public_key.clone()))
560 .collect();
561
562 let query = build_multi_file_query(table, &options, entries);
563 let sql = query.build();
564
565 let connections = pool.into_connections();
567
568 let stream_handle = tokio::spawn(async move {
570 stream_files_parallel(connections, file_data_vec, compression).await
571 });
572
573 let sql_result = execute_sql(sql).await;
575
576 let stream_result = stream_handle.await;
578
579 match stream_result {
581 Ok(Ok(())) => {}
582 Ok(Err(e)) => return Err(e),
583 Err(e) => {
584 return Err(ImportError::StreamError(format!(
585 "Stream task panicked: {e}"
586 )))
587 }
588 }
589
590 sql_result.map_err(ImportError::SqlError)
592}
593
594fn build_multi_file_query(
596 table: &str,
597 options: &CsvImportOptions,
598 entries: Vec<ImportFileEntry>,
599) -> ImportQuery {
600 let mut query = ImportQuery::new(table).with_files(entries);
601
602 if let Some(ref schema) = options.schema {
603 query = query.schema(schema);
604 }
605
606 if let Some(ref columns) = options.columns {
607 let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
608 query = query.columns(cols);
609 }
610
611 query = query
612 .encoding(&options.encoding)
613 .column_separator(options.column_separator)
614 .column_delimiter(options.column_delimiter)
615 .row_separator(options.row_separator)
616 .skip(options.skip_rows)
617 .trim(options.trim_mode)
618 .compressed(options.compression);
619
620 if let Some(ref null_val) = options.null_value {
621 query = query.null_value(null_val);
622 }
623
624 if let Some(limit) = options.reject_limit {
625 query = query.reject_limit(limit);
626 }
627
628 query
629}
630
631pub async fn import_from_stream<R, F, Fut>(
651 execute_sql: F,
652 table: &str,
653 reader: R,
654 options: CsvImportOptions,
655) -> Result<u64, ImportError>
656where
657 R: AsyncRead + Unpin + Send + 'static,
658 F: FnOnce(String) -> Fut,
659 Fut: Future<Output = Result<u64, String>>,
660{
661 import_csv_internal(execute_sql, table, options, |mut client, compression| {
662 Box::pin(async move { stream_reader_to_connection(reader, &mut client, compression).await })
663 })
664 .await
665}
666
667pub async fn import_from_iter<I, T, S, F, Fut>(
690 execute_sql: F,
691 table: &str,
692 rows: I,
693 options: CsvImportOptions,
694) -> Result<u64, ImportError>
695where
696 I: IntoIterator<Item = T> + Send + 'static,
697 T: IntoIterator<Item = S> + Send,
698 S: AsRef<str>,
699 F: FnOnce(String) -> Fut,
700 Fut: Future<Output = Result<u64, String>>,
701{
702 let separator = options.column_separator;
703 let delimiter = options.column_delimiter;
704 let row_sep = options.row_separator;
705
706 import_csv_internal(
707 execute_sql,
708 table,
709 options,
710 move |mut client, compression| {
711 Box::pin(async move {
712 let mut data = Vec::new();
714
715 for row in rows {
716 let formatted = format_csv_row(row, separator, delimiter, &row_sep);
717 data.extend_from_slice(formatted.as_bytes());
718 }
719
720 let compressed_data = compress_data(&data, compression)?;
722
723 send_import_response(&mut client, &compressed_data).await?;
725
726 Ok(())
727 })
728 },
729 )
730 .await
731}
732
733pub async fn import_from_callback<F, Fut, C, CFut>(
756 execute_sql: F,
757 table: &str,
758 callback: C,
759 options: CsvImportOptions,
760) -> Result<u64, ImportError>
761where
762 F: FnOnce(String) -> Fut,
763 Fut: Future<Output = Result<u64, String>>,
764 C: FnOnce(DataPipeSender) -> CFut + Send + 'static,
765 CFut: Future<Output = Result<(), ImportError>> + Send,
766{
767 import_csv_internal(execute_sql, table, options, |mut client, compression| {
768 Box::pin(async move {
769 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
771 let sender = DataPipeSender::new(tx);
772
773 let callback_handle = tokio::spawn(async move { callback(sender).await });
775
776 let mut all_data = Vec::new();
778 while let Some(chunk) = rx.recv().await {
779 all_data.extend_from_slice(&chunk);
780 }
781
782 callback_handle
784 .await
785 .map_err(|e| ImportError::StreamError(format!("Callback task panicked: {e}")))?
786 .map_err(|e| ImportError::StreamError(format!("Callback error: {e}")))?;
787
788 let compressed_data = compress_data(&all_data, compression)?;
790
791 send_import_response(&mut client, &compressed_data).await?;
793
794 Ok(())
795 })
796 })
797 .await
798}
799
800async fn import_csv_internal<F, Fut, S, SFut>(
801 execute_sql: F,
802 table: &str,
803 options: CsvImportOptions,
804 stream_fn: S,
805) -> Result<u64, ImportError>
806where
807 F: FnOnce(String) -> Fut,
808 Fut: Future<Output = Result<u64, String>>,
809 S: FnOnce(HttpTransportClient, Compression) -> SFut + Send + 'static,
810 SFut: Future<Output = Result<(), ImportError>> + Send,
811{
812 let client = HttpTransportClient::connect(&options.host, options.port, options.use_tls)
814 .await
815 .map_err(|e| {
816 ImportError::HttpTransportError(format!("Failed to connect to Exasol: {e}"))
817 })?;
818
819 let internal_addr = client.internal_address().to_string();
821 let public_key = client.public_key_fingerprint().map(String::from);
822
823 let query = options.build_query(table, &internal_addr, public_key.as_deref());
825 let sql = query.build();
826
827 let compression = options.compression;
828
829 let stream_future = stream_fn(client, compression);
833 tokio::pin!(stream_future);
834
835 let sql_future = execute_sql(sql);
837 tokio::pin!(sql_future);
838
839 let (sql_result, stream_done) = tokio::select! {
843 result = &mut sql_future => {
844 (result, false)
845 },
846 stream_result = &mut stream_future => {
847 match stream_result {
848 Err(e) => return Err(e),
849 Ok(()) => {
850 (sql_future.await, true)
851 },
852 }
853 }
854 };
855
856 let row_count = sql_result.map_err(ImportError::SqlError)?;
858
859 if !stream_done {
861 stream_future.await?;
862 }
863
864 Ok(row_count)
865}
866
867async fn stream_reader_to_connection<R>(
875 mut reader: R,
876 client: &mut HttpTransportClient,
877 compression: Compression,
878) -> Result<(), ImportError>
879where
880 R: AsyncRead + Unpin,
881{
882 client.handle_import_request().await.map_err(|e| {
884 ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
885 })?;
886
887 let mut data = Vec::new();
889 reader.read_to_end(&mut data).await?;
890
891 let compressed_data = compress_data(&data, compression)?;
893
894 write_chunked_data(client, &compressed_data).await
896}
897
898async fn write_chunked_data(
904 client: &mut HttpTransportClient,
905 data: &[u8],
906) -> Result<(), ImportError> {
907 for chunk in data.chunks(DEFAULT_BUFFER_SIZE) {
909 client
910 .write_chunked_body(chunk)
911 .await
912 .map_err(ImportError::TransportError)?;
913 }
914
915 client
917 .write_final_chunk()
918 .await
919 .map_err(ImportError::TransportError)?;
920
921 Ok(())
922}
923
924async fn send_import_response(
932 client: &mut HttpTransportClient,
933 data: &[u8],
934) -> Result<(), ImportError> {
935 client.handle_import_request().await.map_err(|e| {
937 ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
938 })?;
939
940 write_chunked_data(client, data).await
942}
943
944fn compress_data(data: &[u8], compression: Compression) -> Result<Vec<u8>, ImportError> {
946 match compression {
947 Compression::None => Ok(data.to_vec()),
948 Compression::Gzip => {
949 let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
950 encoder.write_all(data).map_err(|e| {
951 ImportError::CompressionError(format!("Gzip compression failed: {e}"))
952 })?;
953 encoder.finish().map_err(|e| {
954 ImportError::CompressionError(format!("Gzip finalization failed: {e}"))
955 })
956 }
957 Compression::Bzip2 => {
958 let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::default());
959 encoder.write_all(data).map_err(|e| {
960 ImportError::CompressionError(format!("Bzip2 compression failed: {e}"))
961 })?;
962 encoder.finish().map_err(|e| {
963 ImportError::CompressionError(format!("Bzip2 finalization failed: {e}"))
964 })
965 }
966 }
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972
973 #[test]
975 fn test_csv_import_options_default() {
976 let opts = CsvImportOptions::default();
977
978 assert_eq!(opts.encoding, "UTF-8");
979 assert_eq!(opts.column_separator, ',');
980 assert_eq!(opts.column_delimiter, '"');
981 assert_eq!(opts.row_separator, RowSeparator::LF);
982 assert_eq!(opts.skip_rows, 0);
983 assert!(opts.null_value.is_none());
984 assert_eq!(opts.trim_mode, TrimMode::None);
985 assert_eq!(opts.compression, Compression::None);
986 assert!(opts.reject_limit.is_none());
987 assert!(!opts.use_tls);
988 assert!(opts.schema.is_none());
989 assert!(opts.columns.is_none());
990 assert_eq!(opts.host, "");
991 assert_eq!(opts.port, 0);
992 }
993
994 #[test]
996 fn test_csv_import_options_builder() {
997 let opts = CsvImportOptions::new()
998 .encoding("ISO-8859-1")
999 .column_separator(';')
1000 .column_delimiter('\'')
1001 .row_separator(RowSeparator::CRLF)
1002 .skip_rows(1)
1003 .null_value("NULL")
1004 .trim_mode(TrimMode::Trim)
1005 .compression(Compression::Gzip)
1006 .reject_limit(100)
1007 .use_tls(false)
1008 .schema("my_schema")
1009 .columns(vec!["col1".to_string(), "col2".to_string()])
1010 .exasol_host("exasol.example.com")
1011 .exasol_port(8563);
1012
1013 assert_eq!(opts.encoding, "ISO-8859-1");
1014 assert_eq!(opts.column_separator, ';');
1015 assert_eq!(opts.column_delimiter, '\'');
1016 assert_eq!(opts.row_separator, RowSeparator::CRLF);
1017 assert_eq!(opts.skip_rows, 1);
1018 assert_eq!(opts.null_value, Some("NULL".to_string()));
1019 assert_eq!(opts.trim_mode, TrimMode::Trim);
1020 assert_eq!(opts.compression, Compression::Gzip);
1021 assert_eq!(opts.reject_limit, Some(100));
1022 assert!(!opts.use_tls);
1023 assert_eq!(opts.schema, Some("my_schema".to_string()));
1024 assert_eq!(
1025 opts.columns,
1026 Some(vec!["col1".to_string(), "col2".to_string()])
1027 );
1028 assert_eq!(opts.host, "exasol.example.com");
1029 assert_eq!(opts.port, 8563);
1030 }
1031
1032 #[test]
1034 fn test_build_query_basic() {
1035 let opts = CsvImportOptions::default();
1036 let query = opts.build_query("my_table", "127.0.0.1:8080", None);
1037 let sql = query.build();
1038
1039 assert!(sql.contains("IMPORT INTO my_table"));
1040 assert!(sql.contains("FROM CSV AT 'http://127.0.0.1:8080'"));
1041 assert!(sql.contains("ENCODING = 'UTF-8'"));
1042 assert!(sql.contains("COLUMN SEPARATOR = ','"));
1043 }
1044
1045 #[test]
1046 fn test_build_query_with_schema_and_columns() {
1047 let opts = CsvImportOptions::default()
1048 .schema("test_schema")
1049 .columns(vec!["id".to_string(), "name".to_string()]);
1050
1051 let query = opts.build_query("users", "127.0.0.1:8080", None);
1052 let sql = query.build();
1053
1054 assert!(sql.contains("IMPORT INTO test_schema.users"));
1055 assert!(sql.contains("(id, name)"));
1056 }
1057
1058 #[test]
1059 fn test_build_query_with_tls() {
1060 let opts = CsvImportOptions::default();
1061 let fingerprint = "ABC123DEF456";
1062 let query = opts.build_query("my_table", "127.0.0.1:8080", Some(fingerprint));
1063 let sql = query.build();
1064
1065 assert!(sql.contains("FROM CSV AT 'https://127.0.0.1:8080'"));
1066 assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
1067 }
1068
1069 #[test]
1070 fn test_build_query_with_all_options() {
1071 let opts = CsvImportOptions::default()
1072 .encoding("ISO-8859-1")
1073 .column_separator(';')
1074 .column_delimiter('\'')
1075 .row_separator(RowSeparator::CRLF)
1076 .skip_rows(2)
1077 .null_value("\\N")
1078 .trim_mode(TrimMode::LTrim)
1079 .compression(Compression::Bzip2)
1080 .reject_limit(50);
1081
1082 let query = opts.build_query("data", "127.0.0.1:8080", None);
1083 let sql = query.build();
1084
1085 assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
1086 assert!(sql.contains("COLUMN SEPARATOR = ';'"));
1087 assert!(sql.contains("COLUMN DELIMITER = '''"));
1088 assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
1089 assert!(sql.contains("SKIP = 2"));
1090 assert!(sql.contains("NULL = '\\N'"));
1091 assert!(sql.contains("TRIM = 'LTRIM'"));
1092 assert!(sql.contains("FILE '001.csv.bz2'"));
1093 assert!(sql.contains("REJECT LIMIT 50"));
1094 }
1095
1096 #[test]
1098 fn test_format_csv_row_basic() {
1099 let row = vec!["a", "b", "c"];
1100 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1101 assert_eq!(formatted, "a,b,c\n");
1102 }
1103
1104 #[test]
1105 fn test_format_csv_row_with_different_separator() {
1106 let row = vec!["a", "b", "c"];
1107 let formatted = format_csv_row(row, ';', '"', &RowSeparator::LF);
1108 assert_eq!(formatted, "a;b;c\n");
1109 }
1110
1111 #[test]
1112 fn test_format_csv_row_with_crlf() {
1113 let row = vec!["a", "b", "c"];
1114 let formatted = format_csv_row(row, ',', '"', &RowSeparator::CRLF);
1115 assert_eq!(formatted, "a,b,c\r\n");
1116 }
1117
1118 #[test]
1119 fn test_format_csv_row_with_cr() {
1120 let row = vec!["a", "b"];
1121 let formatted = format_csv_row(row, ',', '"', &RowSeparator::CR);
1122 assert_eq!(formatted, "a,b\r");
1123 }
1124
1125 #[test]
1126 fn test_format_csv_row_needs_quoting_separator() {
1127 let row = vec!["a,b", "c"];
1128 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1129 assert_eq!(formatted, "\"a,b\",c\n");
1130 }
1131
1132 #[test]
1133 fn test_format_csv_row_needs_quoting_newline() {
1134 let row = vec!["a\nb", "c"];
1135 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1136 assert_eq!(formatted, "\"a\nb\",c\n");
1137 }
1138
1139 #[test]
1140 fn test_format_csv_row_needs_quoting_delimiter() {
1141 let row = vec!["a\"b", "c"];
1142 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1143 assert_eq!(formatted, "\"a\"\"b\",c\n");
1145 }
1146
1147 #[test]
1148 fn test_format_csv_row_empty_fields() {
1149 let row = vec!["", "b", ""];
1150 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1151 assert_eq!(formatted, ",b,\n");
1152 }
1153
1154 #[test]
1155 fn test_format_csv_row_single_field() {
1156 let row = vec!["only"];
1157 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1158 assert_eq!(formatted, "only\n");
1159 }
1160
1161 #[test]
1163 fn test_compress_data_none() {
1164 let data = b"test data";
1165 let result = compress_data(data, Compression::None).unwrap();
1166 assert_eq!(result, data);
1167 }
1168
1169 #[test]
1170 fn test_compress_data_gzip() {
1171 let data = b"test data for gzip compression";
1172 let result = compress_data(data, Compression::Gzip).unwrap();
1173
1174 assert!(result.len() >= 2);
1176 assert_eq!(result[0], 0x1f);
1177 assert_eq!(result[1], 0x8b);
1178 }
1179
1180 #[test]
1181 fn test_compress_data_bzip2() {
1182 let data = b"test data for bzip2 compression";
1183 let result = compress_data(data, Compression::Bzip2).unwrap();
1184
1185 assert!(result.len() >= 2);
1187 assert_eq!(result[0], b'B');
1188 assert_eq!(result[1], b'Z');
1189 }
1190
1191 #[test]
1193 fn test_import_error_display() {
1194 let err = ImportError::IoError(std::io::Error::new(
1195 std::io::ErrorKind::NotFound,
1196 "file not found",
1197 ));
1198 assert!(err.to_string().contains("IO error"));
1199
1200 let err = ImportError::SqlError("syntax error".to_string());
1201 assert!(err.to_string().contains("SQL execution failed"));
1202
1203 let err = ImportError::HttpTransportError("connection refused".to_string());
1204 assert!(err.to_string().contains("HTTP transport failed"));
1205
1206 let err = ImportError::CompressionError("invalid data".to_string());
1207 assert!(err.to_string().contains("Compression error"));
1208 }
1209
1210 #[tokio::test]
1212 async fn test_data_pipe_sender_send() {
1213 let (tx, mut rx) = mpsc::channel(10);
1214 let sender = DataPipeSender::new(tx);
1215
1216 sender.send(b"test data".to_vec()).await.unwrap();
1217
1218 let received = rx.recv().await.unwrap();
1219 assert_eq!(received, b"test data");
1220 }
1221
1222 #[tokio::test]
1223 async fn test_data_pipe_sender_send_row() {
1224 let (tx, mut rx) = mpsc::channel(10);
1225 let sender = DataPipeSender::new(tx);
1226
1227 sender
1228 .send_row(vec!["a", "b", "c"], ',', '"', &RowSeparator::LF)
1229 .await
1230 .unwrap();
1231
1232 let received = rx.recv().await.unwrap();
1233 assert_eq!(received, b"a,b,c\n");
1234 }
1235
1236 #[tokio::test]
1237 async fn test_data_pipe_sender_closed_channel() {
1238 let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
1239 let sender = DataPipeSender::new(tx);
1240
1241 drop(rx);
1243
1244 let result = sender.send(b"test".to_vec()).await;
1245 assert!(result.is_err());
1246 assert!(matches!(result.unwrap_err(), ImportError::ChannelError(_)));
1247 }
1248
1249 #[test]
1251 fn test_build_query_compression_gzip() {
1252 let opts = CsvImportOptions::default().compression(Compression::Gzip);
1253 let query = opts.build_query("table", "127.0.0.1:8080", None);
1254 let sql = query.build();
1255
1256 assert!(sql.contains("FILE '001.csv.gz'"));
1257 }
1258
1259 #[test]
1260 fn test_build_query_compression_bzip2() {
1261 let opts = CsvImportOptions::default().compression(Compression::Bzip2);
1262 let query = opts.build_query("table", "127.0.0.1:8080", None);
1263 let sql = query.build();
1264
1265 assert!(sql.contains("FILE '001.csv.bz2'"));
1266 }
1267
1268 #[test]
1270 fn test_detect_compression_explicit_overrides() {
1271 let path = Path::new("data.csv");
1273 assert_eq!(
1274 detect_compression(path, Compression::Gzip),
1275 Compression::Gzip
1276 );
1277 assert_eq!(
1278 detect_compression(path, Compression::Bzip2),
1279 Compression::Bzip2
1280 );
1281 }
1282
1283 #[test]
1284 fn test_detect_compression_from_extension_gzip() {
1285 let path = Path::new("data.csv.gz");
1286 assert_eq!(
1287 detect_compression(path, Compression::None),
1288 Compression::Gzip
1289 );
1290
1291 let path = Path::new("data.csv.gzip");
1292 assert_eq!(
1293 detect_compression(path, Compression::None),
1294 Compression::Gzip
1295 );
1296
1297 let path = Path::new("DATA.CSV.GZ");
1299 assert_eq!(
1300 detect_compression(path, Compression::None),
1301 Compression::Gzip
1302 );
1303 }
1304
1305 #[test]
1306 fn test_detect_compression_from_extension_bzip2() {
1307 let path = Path::new("data.csv.bz2");
1308 assert_eq!(
1309 detect_compression(path, Compression::None),
1310 Compression::Bzip2
1311 );
1312
1313 let path = Path::new("data.csv.bzip2");
1314 assert_eq!(
1315 detect_compression(path, Compression::None),
1316 Compression::Bzip2
1317 );
1318 }
1319
1320 #[test]
1321 fn test_detect_compression_no_compression() {
1322 let path = Path::new("data.csv");
1323 assert_eq!(
1324 detect_compression(path, Compression::None),
1325 Compression::None
1326 );
1327
1328 let path = Path::new("data.txt");
1329 assert_eq!(
1330 detect_compression(path, Compression::None),
1331 Compression::None
1332 );
1333 }
1334
1335 #[test]
1337 fn test_is_compressed_file_gzip() {
1338 assert!(is_compressed_file(Path::new("data.csv.gz")));
1339 assert!(is_compressed_file(Path::new("data.csv.gzip")));
1340 assert!(is_compressed_file(Path::new("DATA.CSV.GZ"))); }
1342
1343 #[test]
1344 fn test_is_compressed_file_bzip2() {
1345 assert!(is_compressed_file(Path::new("data.csv.bz2")));
1346 assert!(is_compressed_file(Path::new("data.csv.bzip2")));
1347 }
1348
1349 #[test]
1350 fn test_is_compressed_file_uncompressed() {
1351 assert!(!is_compressed_file(Path::new("data.csv")));
1352 assert!(!is_compressed_file(Path::new("data.txt")));
1353 assert!(!is_compressed_file(Path::new("data")));
1354 }
1355
1356 #[test]
1358 fn test_csv_row_formatting_with_compression() {
1359 let row = vec!["1", "test data", "value"];
1361 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1362
1363 let compressed = compress_data(formatted.as_bytes(), Compression::Gzip).unwrap();
1364
1365 assert!(compressed.len() >= 2);
1367 assert_eq!(compressed[0], 0x1f);
1368 assert_eq!(compressed[1], 0x8b);
1369 }
1370
1371 #[test]
1372 fn test_multiple_rows_formatting() {
1373 let rows = vec![
1375 vec!["1", "Alice", "alice@example.com"],
1376 vec!["2", "Bob", "bob@example.com"],
1377 vec!["3", "Charlie", "charlie@example.com"],
1378 ];
1379
1380 let mut data = Vec::new();
1381 for row in rows {
1382 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1383 data.extend_from_slice(formatted.as_bytes());
1384 }
1385
1386 let expected =
1387 "1,Alice,alice@example.com\n2,Bob,bob@example.com\n3,Charlie,charlie@example.com\n";
1388 assert_eq!(String::from_utf8(data).unwrap(), expected);
1389 }
1390
1391 #[test]
1392 fn test_csv_special_characters_in_data() {
1393 let row = vec!["1", "Hello, World!", "Contains \"quotes\""];
1395 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1396
1397 assert_eq!(
1399 formatted,
1400 "1,\"Hello, World!\",\"Contains \"\"quotes\"\"\"\n"
1401 );
1402 }
1403
1404 #[test]
1405 fn test_csv_row_with_newlines() {
1406 let row = vec!["1", "Line1\nLine2", "normal"];
1408 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1409
1410 assert_eq!(formatted, "1,\"Line1\nLine2\",normal\n");
1411 }
1412
1413 #[test]
1414 fn test_csv_row_with_carriage_return() {
1415 let row = vec!["1", "Line1\rLine2", "normal"];
1417 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1418
1419 assert_eq!(formatted, "1,\"Line1\rLine2\",normal\n");
1420 }
1421
1422 #[test]
1424 fn test_import_protocol_flow_documentation() {
1425 }
1441
1442 #[test]
1444 fn test_detect_compression_path_variations() {
1445 let path = Path::new("/home/user/data/file.csv.gz");
1447 assert_eq!(
1448 detect_compression(path, Compression::None),
1449 Compression::Gzip
1450 );
1451
1452 let path = Path::new("./data/file.csv.bz2");
1454 assert_eq!(
1455 detect_compression(path, Compression::None),
1456 Compression::Bzip2
1457 );
1458
1459 let path = Path::new("file.csv");
1461 assert_eq!(
1462 detect_compression(path, Compression::None),
1463 Compression::None
1464 );
1465 }
1466
1467 #[test]
1469 fn test_build_multi_file_query_basic() {
1470 use crate::query::import::ImportFileEntry;
1471
1472 let options = CsvImportOptions::default();
1473 let entries = vec![
1474 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
1475 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
1476 ];
1477
1478 let query = build_multi_file_query("my_table", &options, entries);
1479 let sql = query.build();
1480
1481 assert!(sql.contains("IMPORT INTO my_table"));
1482 assert!(sql.contains("FROM CSV"));
1483 assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
1484 assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
1485 }
1486
1487 #[test]
1488 fn test_build_multi_file_query_with_options() {
1489 use crate::query::import::ImportFileEntry;
1490
1491 let options = CsvImportOptions::default()
1492 .schema("test_schema")
1493 .columns(vec!["id".to_string(), "name".to_string()])
1494 .skip_rows(1)
1495 .compression(Compression::Gzip);
1496
1497 let entries = vec![ImportFileEntry::new(
1498 "10.0.0.5:8563".to_string(),
1499 "001.csv".to_string(),
1500 None,
1501 )];
1502
1503 let query = build_multi_file_query("data", &options, entries);
1504 let sql = query.build();
1505
1506 assert!(sql.contains("IMPORT INTO test_schema.data (id, name)"));
1507 assert!(sql.contains("SKIP = 1"));
1508 assert!(sql.contains("FILE '001.csv.gz'"));
1509 }
1510}