1use std::future::Future;
7use std::io::Write;
8use std::path::Path;
9
10use bzip2::write::BzEncoder;
11use flate2::write::GzEncoder;
12use tokio::io::{AsyncRead, AsyncReadExt};
13use tokio::sync::mpsc;
14
15use crate::query::import::{Compression, ImportFileEntry, ImportQuery, RowSeparator, TrimMode};
16use crate::transport::HttpTransportClient;
17
18use super::parallel::{stream_files_parallel, ParallelTransportPool};
19use super::source::IntoFileSources;
20use super::ImportError;
21
22const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
24
25const CHANNEL_BUFFER_SIZE: usize = 16;
27
28#[derive(Debug, Clone)]
30pub struct CsvImportOptions {
31 pub encoding: String,
33
34 pub column_separator: char,
36
37 pub column_delimiter: char,
39
40 pub row_separator: RowSeparator,
42
43 pub skip_rows: u32,
45
46 pub null_value: Option<String>,
48
49 pub trim_mode: TrimMode,
51
52 pub compression: Compression,
54
55 pub reject_limit: Option<u32>,
57
58 pub use_tls: bool,
60
61 pub schema: Option<String>,
63
64 pub columns: Option<Vec<String>>,
66
67 pub host: String,
70
71 pub port: u16,
74}
75
76impl Default for CsvImportOptions {
77 fn default() -> Self {
78 Self {
79 encoding: "UTF-8".to_string(),
80 column_separator: ',',
81 column_delimiter: '"',
82 row_separator: RowSeparator::LF,
83 skip_rows: 0,
84 null_value: None,
85 trim_mode: TrimMode::None,
86 compression: Compression::None,
87 reject_limit: None,
88 use_tls: false,
89 schema: None,
90 columns: None,
91 host: String::new(),
92 port: 0,
93 }
94 }
95}
96
97impl CsvImportOptions {
98 #[must_use]
100 pub fn new() -> Self {
101 Self::default()
102 }
103
104 #[must_use]
105 pub fn encoding(mut self, encoding: &str) -> Self {
106 self.encoding = encoding.to_string();
107 self
108 }
109
110 #[must_use]
111 pub fn column_separator(mut self, sep: char) -> Self {
112 self.column_separator = sep;
113 self
114 }
115
116 #[must_use]
117 pub fn column_delimiter(mut self, delim: char) -> Self {
118 self.column_delimiter = delim;
119 self
120 }
121
122 #[must_use]
123 pub fn row_separator(mut self, sep: RowSeparator) -> Self {
124 self.row_separator = sep;
125 self
126 }
127
128 #[must_use]
129 pub fn skip_rows(mut self, rows: u32) -> Self {
130 self.skip_rows = rows;
131 self
132 }
133
134 #[must_use]
135 pub fn null_value(mut self, val: &str) -> Self {
136 self.null_value = Some(val.to_string());
137 self
138 }
139
140 #[must_use]
141 pub fn trim_mode(mut self, mode: TrimMode) -> Self {
142 self.trim_mode = mode;
143 self
144 }
145
146 #[must_use]
147 pub fn compression(mut self, compression: Compression) -> Self {
148 self.compression = compression;
149 self
150 }
151
152 #[must_use]
153 pub fn reject_limit(mut self, limit: u32) -> Self {
154 self.reject_limit = Some(limit);
155 self
156 }
157
158 #[must_use]
159 pub fn use_tls(mut self, use_tls: bool) -> Self {
160 self.use_tls = use_tls;
161 self
162 }
163
164 #[must_use]
165 pub fn schema(mut self, schema: &str) -> Self {
166 self.schema = Some(schema.to_string());
167 self
168 }
169
170 #[must_use]
171 pub fn columns(mut self, columns: Vec<String>) -> Self {
172 self.columns = Some(columns);
173 self
174 }
175
176 #[must_use]
183 pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
184 self.host = host.into();
185 self
186 }
187
188 #[must_use]
192 pub fn exasol_port(mut self, port: u16) -> Self {
193 self.port = port;
194 self
195 }
196
197 fn build_query(&self, table: &str, address: &str, public_key: Option<&str>) -> ImportQuery {
199 let mut query = ImportQuery::new(table).at_address(address);
200
201 if let Some(ref schema) = self.schema {
202 query = query.schema(schema);
203 }
204
205 if let Some(ref columns) = self.columns {
206 let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
207 query = query.columns(cols);
208 }
209
210 if let Some(pk) = public_key {
211 query = query.with_public_key(pk);
212 }
213
214 query = query
215 .encoding(&self.encoding)
216 .column_separator(self.column_separator)
217 .column_delimiter(self.column_delimiter)
218 .row_separator(self.row_separator)
219 .skip(self.skip_rows)
220 .trim(self.trim_mode)
221 .compressed(self.compression);
222
223 if let Some(ref null_val) = self.null_value {
224 query = query.null_value(null_val);
225 }
226
227 if let Some(limit) = self.reject_limit {
228 query = query.reject_limit(limit);
229 }
230
231 query
232 }
233}
234
235pub struct DataPipeSender {
239 tx: mpsc::Sender<Vec<u8>>,
240}
241
242impl DataPipeSender {
243 fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
245 Self { tx }
246 }
247
248 pub async fn send(&self, data: Vec<u8>) -> Result<(), ImportError> {
258 self.tx
259 .send(data)
260 .await
261 .map_err(|e| ImportError::ChannelError(format!("Failed to send data: {e}")))
262 }
263
264 pub async fn send_row<I, T>(
277 &self,
278 row: I,
279 separator: char,
280 delimiter: char,
281 row_separator: &RowSeparator,
282 ) -> Result<(), ImportError>
283 where
284 I: IntoIterator<Item = T>,
285 T: AsRef<str>,
286 {
287 let formatted = format_csv_row(row, separator, delimiter, row_separator);
288 self.send(formatted.into_bytes()).await
289 }
290}
291
292fn format_csv_row<I, T>(
294 row: I,
295 separator: char,
296 delimiter: char,
297 row_separator: &RowSeparator,
298) -> String
299where
300 I: IntoIterator<Item = T>,
301 T: AsRef<str>,
302{
303 let mut line = String::new();
304 let mut first = true;
305
306 for field in row {
307 if !first {
308 line.push(separator);
309 }
310 first = false;
311
312 let value = field.as_ref();
313 let needs_quoting = value.contains(separator)
315 || value.contains(delimiter)
316 || value.contains('\n')
317 || value.contains('\r');
318
319 if needs_quoting {
320 line.push(delimiter);
321 for ch in value.chars() {
323 if ch == delimiter {
324 line.push(delimiter);
325 }
326 line.push(ch);
327 }
328 line.push(delimiter);
329 } else {
330 line.push_str(value);
331 }
332 }
333
334 match row_separator {
336 RowSeparator::LF => line.push('\n'),
337 RowSeparator::CR => line.push('\r'),
338 RowSeparator::CRLF => {
339 line.push('\r');
340 line.push('\n');
341 }
342 }
343
344 line
345}
346
347fn detect_compression(file_path: &Path, explicit_compression: Compression) -> Compression {
358 if explicit_compression != Compression::None {
359 return explicit_compression;
360 }
361
362 let path_str = file_path.to_string_lossy().to_lowercase();
363
364 if path_str.ends_with(".gz") || path_str.ends_with(".gzip") {
365 Compression::Gzip
366 } else if path_str.ends_with(".bz2") || path_str.ends_with(".bzip2") {
367 Compression::Bzip2
368 } else {
369 Compression::None
370 }
371}
372
373fn is_compressed_file(file_path: &Path) -> bool {
383 let path_str = file_path.to_string_lossy().to_lowercase();
384 path_str.ends_with(".gz")
385 || path_str.ends_with(".gzip")
386 || path_str.ends_with(".bz2")
387 || path_str.ends_with(".bzip2")
388}
389
390pub async fn import_from_file<F, Fut>(
413 execute_sql: F,
414 table: &str,
415 file_path: &Path,
416 options: CsvImportOptions,
417) -> Result<u64, ImportError>
418where
419 F: FnOnce(String) -> Fut,
420 Fut: Future<Output = Result<u64, String>>,
421{
422 let compression = detect_compression(file_path, options.compression);
424 let options = CsvImportOptions {
425 compression,
426 ..options
427 };
428
429 let file_is_compressed = is_compressed_file(file_path);
431
432 let data = std::fs::read(file_path)?;
434
435 import_csv_internal(
436 execute_sql,
437 table,
438 options,
439 move |mut client, compression| {
440 Box::pin(async move {
441 let compressed_data = if file_is_compressed {
443 data
445 } else {
446 compress_data(&data, compression)?
447 };
448
449 send_import_response(&mut client, &compressed_data).await?;
451
452 Ok(())
453 })
454 },
455 )
456 .await
457}
458
459pub async fn import_from_files<F, Fut, S>(
506 execute_sql: F,
507 table: &str,
508 file_paths: S,
509 options: CsvImportOptions,
510) -> Result<u64, ImportError>
511where
512 F: FnOnce(String) -> Fut,
513 Fut: Future<Output = Result<u64, String>>,
514 S: IntoFileSources,
515{
516 let paths = file_paths.into_sources();
517
518 if paths.len() == 1 {
520 return import_from_file(execute_sql, table, &paths[0], options).await;
521 }
522
523 if paths.is_empty() {
524 return Err(ImportError::InvalidConfig(
525 "No files provided for import".to_string(),
526 ));
527 }
528
529 let compression = options.compression;
531 let mut file_data_vec: Vec<Vec<u8>> = Vec::with_capacity(paths.len());
532
533 for path in &paths {
534 let detected_compression = detect_compression(path, compression);
535 let file_is_compressed = is_compressed_file(path);
536
537 let data = std::fs::read(path)?;
539
540 let compressed_data = if file_is_compressed {
542 data
543 } else {
544 compress_data(&data, detected_compression)?
545 };
546
547 file_data_vec.push(compressed_data);
548 }
549
550 let pool =
552 ParallelTransportPool::connect(&options.host, options.port, options.use_tls, paths.len())
553 .await?;
554
555 let entries: Vec<ImportFileEntry> = pool
557 .file_entries()
558 .iter()
559 .map(|e| ImportFileEntry::new(e.address.clone(), e.file_name.clone(), e.public_key.clone()))
560 .collect();
561
562 let query = build_multi_file_query(table, &options, entries);
563 let sql = query.build();
564
565 let connections = pool.into_connections();
567
568 let stream_handle = tokio::spawn(async move {
570 stream_files_parallel(connections, file_data_vec, compression).await
571 });
572
573 let sql_result = execute_sql(sql).await;
575
576 let stream_result = stream_handle.await;
578
579 match stream_result {
581 Ok(Ok(())) => {}
582 Ok(Err(e)) => return Err(e),
583 Err(e) => {
584 return Err(ImportError::StreamError(format!(
585 "Stream task panicked: {e}"
586 )))
587 }
588 }
589
590 sql_result.map_err(ImportError::SqlError)
592}
593
594fn build_multi_file_query(
596 table: &str,
597 options: &CsvImportOptions,
598 entries: Vec<ImportFileEntry>,
599) -> ImportQuery {
600 let mut query = ImportQuery::new(table).with_files(entries);
601
602 if let Some(ref schema) = options.schema {
603 query = query.schema(schema);
604 }
605
606 if let Some(ref columns) = options.columns {
607 let cols: Vec<&str> = columns.iter().map(String::as_str).collect();
608 query = query.columns(cols);
609 }
610
611 query = query
612 .encoding(&options.encoding)
613 .column_separator(options.column_separator)
614 .column_delimiter(options.column_delimiter)
615 .row_separator(options.row_separator)
616 .skip(options.skip_rows)
617 .trim(options.trim_mode)
618 .compressed(options.compression);
619
620 if let Some(ref null_val) = options.null_value {
621 query = query.null_value(null_val);
622 }
623
624 if let Some(limit) = options.reject_limit {
625 query = query.reject_limit(limit);
626 }
627
628 query
629}
630
631pub async fn import_from_stream<R, F, Fut>(
651 execute_sql: F,
652 table: &str,
653 reader: R,
654 options: CsvImportOptions,
655) -> Result<u64, ImportError>
656where
657 R: AsyncRead + Unpin + Send + 'static,
658 F: FnOnce(String) -> Fut,
659 Fut: Future<Output = Result<u64, String>>,
660{
661 import_csv_internal(execute_sql, table, options, |mut client, compression| {
662 Box::pin(async move { stream_reader_to_connection(reader, &mut client, compression).await })
663 })
664 .await
665}
666
667pub async fn import_from_iter<I, T, S, F, Fut>(
690 execute_sql: F,
691 table: &str,
692 rows: I,
693 options: CsvImportOptions,
694) -> Result<u64, ImportError>
695where
696 I: IntoIterator<Item = T> + Send + 'static,
697 T: IntoIterator<Item = S> + Send,
698 S: AsRef<str>,
699 F: FnOnce(String) -> Fut,
700 Fut: Future<Output = Result<u64, String>>,
701{
702 let separator = options.column_separator;
703 let delimiter = options.column_delimiter;
704 let row_sep = options.row_separator;
705
706 import_csv_internal(
707 execute_sql,
708 table,
709 options,
710 move |mut client, compression| {
711 Box::pin(async move {
712 let mut data = Vec::new();
714
715 for row in rows {
716 let formatted = format_csv_row(row, separator, delimiter, &row_sep);
717 data.extend_from_slice(formatted.as_bytes());
718 }
719
720 let compressed_data = compress_data(&data, compression)?;
722
723 send_import_response(&mut client, &compressed_data).await?;
725
726 Ok(())
727 })
728 },
729 )
730 .await
731}
732
733pub async fn import_from_callback<F, Fut, C, CFut>(
756 execute_sql: F,
757 table: &str,
758 callback: C,
759 options: CsvImportOptions,
760) -> Result<u64, ImportError>
761where
762 F: FnOnce(String) -> Fut,
763 Fut: Future<Output = Result<u64, String>>,
764 C: FnOnce(DataPipeSender) -> CFut + Send + 'static,
765 CFut: Future<Output = Result<(), ImportError>> + Send,
766{
767 import_csv_internal(execute_sql, table, options, |mut client, compression| {
768 Box::pin(async move {
769 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
771 let sender = DataPipeSender::new(tx);
772
773 let callback_handle = tokio::spawn(async move { callback(sender).await });
775
776 let mut all_data = Vec::new();
778 while let Some(chunk) = rx.recv().await {
779 all_data.extend_from_slice(&chunk);
780 }
781
782 callback_handle
784 .await
785 .map_err(|e| ImportError::StreamError(format!("Callback task panicked: {e}")))?
786 .map_err(|e| ImportError::StreamError(format!("Callback error: {e}")))?;
787
788 let compressed_data = compress_data(&all_data, compression)?;
790
791 send_import_response(&mut client, &compressed_data).await?;
793
794 Ok(())
795 })
796 })
797 .await
798}
799
800async fn import_csv_internal<F, Fut, S, SFut>(
814 execute_sql: F,
815 table: &str,
816 options: CsvImportOptions,
817 stream_fn: S,
818) -> Result<u64, ImportError>
819where
820 F: FnOnce(String) -> Fut,
821 Fut: Future<Output = Result<u64, String>>,
822 S: FnOnce(HttpTransportClient, Compression) -> SFut + Send + 'static,
823 SFut: Future<Output = Result<(), ImportError>> + Send,
824{
825 let client = HttpTransportClient::connect(&options.host, options.port, options.use_tls)
827 .await
828 .map_err(|e| {
829 ImportError::HttpTransportError(format!("Failed to connect to Exasol: {e}"))
830 })?;
831
832 let internal_addr = client.internal_address().to_string();
834 let public_key = client.public_key_fingerprint().map(String::from);
835
836 let query = options.build_query(table, &internal_addr, public_key.as_deref());
838 let sql = query.build();
839
840 let compression = options.compression;
841
842 let stream_handle = tokio::spawn(async move {
844 stream_fn(client, compression).await?;
850
851 Ok::<(), ImportError>(())
852 });
853
854 let sql_result = execute_sql(sql).await;
857
858 let stream_result = stream_handle.await;
860
861 match stream_result {
863 Ok(Ok(())) => {}
864 Ok(Err(e)) => return Err(e),
865 Err(e) => {
866 return Err(ImportError::StreamError(format!(
867 "Stream task panicked: {e}"
868 )))
869 }
870 }
871
872 sql_result.map_err(ImportError::SqlError)
874}
875
876async fn stream_reader_to_connection<R>(
884 mut reader: R,
885 client: &mut HttpTransportClient,
886 compression: Compression,
887) -> Result<(), ImportError>
888where
889 R: AsyncRead + Unpin,
890{
891 client.handle_import_request().await.map_err(|e| {
893 ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
894 })?;
895
896 let mut data = Vec::new();
898 reader.read_to_end(&mut data).await?;
899
900 let compressed_data = compress_data(&data, compression)?;
902
903 write_chunked_data(client, &compressed_data).await
905}
906
907async fn write_chunked_data(
913 client: &mut HttpTransportClient,
914 data: &[u8],
915) -> Result<(), ImportError> {
916 for chunk in data.chunks(DEFAULT_BUFFER_SIZE) {
918 client
919 .write_chunked_body(chunk)
920 .await
921 .map_err(ImportError::TransportError)?;
922 }
923
924 client
926 .write_final_chunk()
927 .await
928 .map_err(ImportError::TransportError)?;
929
930 Ok(())
931}
932
933async fn send_import_response(
941 client: &mut HttpTransportClient,
942 data: &[u8],
943) -> Result<(), ImportError> {
944 client.handle_import_request().await.map_err(|e| {
946 ImportError::HttpTransportError(format!("Failed to handle import request: {e}"))
947 })?;
948
949 write_chunked_data(client, data).await
951}
952
953fn compress_data(data: &[u8], compression: Compression) -> Result<Vec<u8>, ImportError> {
955 match compression {
956 Compression::None => Ok(data.to_vec()),
957 Compression::Gzip => {
958 let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
959 encoder.write_all(data).map_err(|e| {
960 ImportError::CompressionError(format!("Gzip compression failed: {e}"))
961 })?;
962 encoder.finish().map_err(|e| {
963 ImportError::CompressionError(format!("Gzip finalization failed: {e}"))
964 })
965 }
966 Compression::Bzip2 => {
967 let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::default());
968 encoder.write_all(data).map_err(|e| {
969 ImportError::CompressionError(format!("Bzip2 compression failed: {e}"))
970 })?;
971 encoder.finish().map_err(|e| {
972 ImportError::CompressionError(format!("Bzip2 finalization failed: {e}"))
973 })
974 }
975 }
976}
977
978#[cfg(test)]
979mod tests {
980 use super::*;
981
982 #[test]
984 fn test_csv_import_options_default() {
985 let opts = CsvImportOptions::default();
986
987 assert_eq!(opts.encoding, "UTF-8");
988 assert_eq!(opts.column_separator, ',');
989 assert_eq!(opts.column_delimiter, '"');
990 assert_eq!(opts.row_separator, RowSeparator::LF);
991 assert_eq!(opts.skip_rows, 0);
992 assert!(opts.null_value.is_none());
993 assert_eq!(opts.trim_mode, TrimMode::None);
994 assert_eq!(opts.compression, Compression::None);
995 assert!(opts.reject_limit.is_none());
996 assert!(!opts.use_tls);
997 assert!(opts.schema.is_none());
998 assert!(opts.columns.is_none());
999 assert_eq!(opts.host, "");
1000 assert_eq!(opts.port, 0);
1001 }
1002
1003 #[test]
1005 fn test_csv_import_options_builder() {
1006 let opts = CsvImportOptions::new()
1007 .encoding("ISO-8859-1")
1008 .column_separator(';')
1009 .column_delimiter('\'')
1010 .row_separator(RowSeparator::CRLF)
1011 .skip_rows(1)
1012 .null_value("NULL")
1013 .trim_mode(TrimMode::Trim)
1014 .compression(Compression::Gzip)
1015 .reject_limit(100)
1016 .use_tls(false)
1017 .schema("my_schema")
1018 .columns(vec!["col1".to_string(), "col2".to_string()])
1019 .exasol_host("exasol.example.com")
1020 .exasol_port(8563);
1021
1022 assert_eq!(opts.encoding, "ISO-8859-1");
1023 assert_eq!(opts.column_separator, ';');
1024 assert_eq!(opts.column_delimiter, '\'');
1025 assert_eq!(opts.row_separator, RowSeparator::CRLF);
1026 assert_eq!(opts.skip_rows, 1);
1027 assert_eq!(opts.null_value, Some("NULL".to_string()));
1028 assert_eq!(opts.trim_mode, TrimMode::Trim);
1029 assert_eq!(opts.compression, Compression::Gzip);
1030 assert_eq!(opts.reject_limit, Some(100));
1031 assert!(!opts.use_tls);
1032 assert_eq!(opts.schema, Some("my_schema".to_string()));
1033 assert_eq!(
1034 opts.columns,
1035 Some(vec!["col1".to_string(), "col2".to_string()])
1036 );
1037 assert_eq!(opts.host, "exasol.example.com");
1038 assert_eq!(opts.port, 8563);
1039 }
1040
1041 #[test]
1043 fn test_build_query_basic() {
1044 let opts = CsvImportOptions::default();
1045 let query = opts.build_query("my_table", "127.0.0.1:8080", None);
1046 let sql = query.build();
1047
1048 assert!(sql.contains("IMPORT INTO my_table"));
1049 assert!(sql.contains("FROM CSV AT 'http://127.0.0.1:8080'"));
1050 assert!(sql.contains("ENCODING = 'UTF-8'"));
1051 assert!(sql.contains("COLUMN SEPARATOR = ','"));
1052 }
1053
1054 #[test]
1055 fn test_build_query_with_schema_and_columns() {
1056 let opts = CsvImportOptions::default()
1057 .schema("test_schema")
1058 .columns(vec!["id".to_string(), "name".to_string()]);
1059
1060 let query = opts.build_query("users", "127.0.0.1:8080", None);
1061 let sql = query.build();
1062
1063 assert!(sql.contains("IMPORT INTO test_schema.users"));
1064 assert!(sql.contains("(id, name)"));
1065 }
1066
1067 #[test]
1068 fn test_build_query_with_tls() {
1069 let opts = CsvImportOptions::default();
1070 let fingerprint = "ABC123DEF456";
1071 let query = opts.build_query("my_table", "127.0.0.1:8080", Some(fingerprint));
1072 let sql = query.build();
1073
1074 assert!(sql.contains("FROM CSV AT 'https://127.0.0.1:8080'"));
1075 assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
1076 }
1077
1078 #[test]
1079 fn test_build_query_with_all_options() {
1080 let opts = CsvImportOptions::default()
1081 .encoding("ISO-8859-1")
1082 .column_separator(';')
1083 .column_delimiter('\'')
1084 .row_separator(RowSeparator::CRLF)
1085 .skip_rows(2)
1086 .null_value("\\N")
1087 .trim_mode(TrimMode::LTrim)
1088 .compression(Compression::Bzip2)
1089 .reject_limit(50);
1090
1091 let query = opts.build_query("data", "127.0.0.1:8080", None);
1092 let sql = query.build();
1093
1094 assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
1095 assert!(sql.contains("COLUMN SEPARATOR = ';'"));
1096 assert!(sql.contains("COLUMN DELIMITER = '''"));
1097 assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
1098 assert!(sql.contains("SKIP = 2"));
1099 assert!(sql.contains("NULL = '\\N'"));
1100 assert!(sql.contains("TRIM = 'LTRIM'"));
1101 assert!(sql.contains("FILE '001.csv.bz2'"));
1102 assert!(sql.contains("REJECT LIMIT 50"));
1103 }
1104
1105 #[test]
1107 fn test_format_csv_row_basic() {
1108 let row = vec!["a", "b", "c"];
1109 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1110 assert_eq!(formatted, "a,b,c\n");
1111 }
1112
1113 #[test]
1114 fn test_format_csv_row_with_different_separator() {
1115 let row = vec!["a", "b", "c"];
1116 let formatted = format_csv_row(row, ';', '"', &RowSeparator::LF);
1117 assert_eq!(formatted, "a;b;c\n");
1118 }
1119
1120 #[test]
1121 fn test_format_csv_row_with_crlf() {
1122 let row = vec!["a", "b", "c"];
1123 let formatted = format_csv_row(row, ',', '"', &RowSeparator::CRLF);
1124 assert_eq!(formatted, "a,b,c\r\n");
1125 }
1126
1127 #[test]
1128 fn test_format_csv_row_with_cr() {
1129 let row = vec!["a", "b"];
1130 let formatted = format_csv_row(row, ',', '"', &RowSeparator::CR);
1131 assert_eq!(formatted, "a,b\r");
1132 }
1133
1134 #[test]
1135 fn test_format_csv_row_needs_quoting_separator() {
1136 let row = vec!["a,b", "c"];
1137 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1138 assert_eq!(formatted, "\"a,b\",c\n");
1139 }
1140
1141 #[test]
1142 fn test_format_csv_row_needs_quoting_newline() {
1143 let row = vec!["a\nb", "c"];
1144 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1145 assert_eq!(formatted, "\"a\nb\",c\n");
1146 }
1147
1148 #[test]
1149 fn test_format_csv_row_needs_quoting_delimiter() {
1150 let row = vec!["a\"b", "c"];
1151 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1152 assert_eq!(formatted, "\"a\"\"b\",c\n");
1154 }
1155
1156 #[test]
1157 fn test_format_csv_row_empty_fields() {
1158 let row = vec!["", "b", ""];
1159 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1160 assert_eq!(formatted, ",b,\n");
1161 }
1162
1163 #[test]
1164 fn test_format_csv_row_single_field() {
1165 let row = vec!["only"];
1166 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1167 assert_eq!(formatted, "only\n");
1168 }
1169
1170 #[test]
1172 fn test_compress_data_none() {
1173 let data = b"test data";
1174 let result = compress_data(data, Compression::None).unwrap();
1175 assert_eq!(result, data);
1176 }
1177
1178 #[test]
1179 fn test_compress_data_gzip() {
1180 let data = b"test data for gzip compression";
1181 let result = compress_data(data, Compression::Gzip).unwrap();
1182
1183 assert!(result.len() >= 2);
1185 assert_eq!(result[0], 0x1f);
1186 assert_eq!(result[1], 0x8b);
1187 }
1188
1189 #[test]
1190 fn test_compress_data_bzip2() {
1191 let data = b"test data for bzip2 compression";
1192 let result = compress_data(data, Compression::Bzip2).unwrap();
1193
1194 assert!(result.len() >= 2);
1196 assert_eq!(result[0], b'B');
1197 assert_eq!(result[1], b'Z');
1198 }
1199
1200 #[test]
1202 fn test_import_error_display() {
1203 let err = ImportError::IoError(std::io::Error::new(
1204 std::io::ErrorKind::NotFound,
1205 "file not found",
1206 ));
1207 assert!(err.to_string().contains("IO error"));
1208
1209 let err = ImportError::SqlError("syntax error".to_string());
1210 assert!(err.to_string().contains("SQL execution failed"));
1211
1212 let err = ImportError::HttpTransportError("connection refused".to_string());
1213 assert!(err.to_string().contains("HTTP transport failed"));
1214
1215 let err = ImportError::CompressionError("invalid data".to_string());
1216 assert!(err.to_string().contains("Compression error"));
1217 }
1218
1219 #[tokio::test]
1221 async fn test_data_pipe_sender_send() {
1222 let (tx, mut rx) = mpsc::channel(10);
1223 let sender = DataPipeSender::new(tx);
1224
1225 sender.send(b"test data".to_vec()).await.unwrap();
1226
1227 let received = rx.recv().await.unwrap();
1228 assert_eq!(received, b"test data");
1229 }
1230
1231 #[tokio::test]
1232 async fn test_data_pipe_sender_send_row() {
1233 let (tx, mut rx) = mpsc::channel(10);
1234 let sender = DataPipeSender::new(tx);
1235
1236 sender
1237 .send_row(vec!["a", "b", "c"], ',', '"', &RowSeparator::LF)
1238 .await
1239 .unwrap();
1240
1241 let received = rx.recv().await.unwrap();
1242 assert_eq!(received, b"a,b,c\n");
1243 }
1244
1245 #[tokio::test]
1246 async fn test_data_pipe_sender_closed_channel() {
1247 let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
1248 let sender = DataPipeSender::new(tx);
1249
1250 drop(rx);
1252
1253 let result = sender.send(b"test".to_vec()).await;
1254 assert!(result.is_err());
1255 assert!(matches!(result.unwrap_err(), ImportError::ChannelError(_)));
1256 }
1257
1258 #[test]
1260 fn test_build_query_compression_gzip() {
1261 let opts = CsvImportOptions::default().compression(Compression::Gzip);
1262 let query = opts.build_query("table", "127.0.0.1:8080", None);
1263 let sql = query.build();
1264
1265 assert!(sql.contains("FILE '001.csv.gz'"));
1266 }
1267
1268 #[test]
1269 fn test_build_query_compression_bzip2() {
1270 let opts = CsvImportOptions::default().compression(Compression::Bzip2);
1271 let query = opts.build_query("table", "127.0.0.1:8080", None);
1272 let sql = query.build();
1273
1274 assert!(sql.contains("FILE '001.csv.bz2'"));
1275 }
1276
1277 #[test]
1279 fn test_detect_compression_explicit_overrides() {
1280 let path = Path::new("data.csv");
1282 assert_eq!(
1283 detect_compression(path, Compression::Gzip),
1284 Compression::Gzip
1285 );
1286 assert_eq!(
1287 detect_compression(path, Compression::Bzip2),
1288 Compression::Bzip2
1289 );
1290 }
1291
1292 #[test]
1293 fn test_detect_compression_from_extension_gzip() {
1294 let path = Path::new("data.csv.gz");
1295 assert_eq!(
1296 detect_compression(path, Compression::None),
1297 Compression::Gzip
1298 );
1299
1300 let path = Path::new("data.csv.gzip");
1301 assert_eq!(
1302 detect_compression(path, Compression::None),
1303 Compression::Gzip
1304 );
1305
1306 let path = Path::new("DATA.CSV.GZ");
1308 assert_eq!(
1309 detect_compression(path, Compression::None),
1310 Compression::Gzip
1311 );
1312 }
1313
1314 #[test]
1315 fn test_detect_compression_from_extension_bzip2() {
1316 let path = Path::new("data.csv.bz2");
1317 assert_eq!(
1318 detect_compression(path, Compression::None),
1319 Compression::Bzip2
1320 );
1321
1322 let path = Path::new("data.csv.bzip2");
1323 assert_eq!(
1324 detect_compression(path, Compression::None),
1325 Compression::Bzip2
1326 );
1327 }
1328
1329 #[test]
1330 fn test_detect_compression_no_compression() {
1331 let path = Path::new("data.csv");
1332 assert_eq!(
1333 detect_compression(path, Compression::None),
1334 Compression::None
1335 );
1336
1337 let path = Path::new("data.txt");
1338 assert_eq!(
1339 detect_compression(path, Compression::None),
1340 Compression::None
1341 );
1342 }
1343
1344 #[test]
1346 fn test_is_compressed_file_gzip() {
1347 assert!(is_compressed_file(Path::new("data.csv.gz")));
1348 assert!(is_compressed_file(Path::new("data.csv.gzip")));
1349 assert!(is_compressed_file(Path::new("DATA.CSV.GZ"))); }
1351
1352 #[test]
1353 fn test_is_compressed_file_bzip2() {
1354 assert!(is_compressed_file(Path::new("data.csv.bz2")));
1355 assert!(is_compressed_file(Path::new("data.csv.bzip2")));
1356 }
1357
1358 #[test]
1359 fn test_is_compressed_file_uncompressed() {
1360 assert!(!is_compressed_file(Path::new("data.csv")));
1361 assert!(!is_compressed_file(Path::new("data.txt")));
1362 assert!(!is_compressed_file(Path::new("data")));
1363 }
1364
1365 #[test]
1367 fn test_csv_row_formatting_with_compression() {
1368 let row = vec!["1", "test data", "value"];
1370 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1371
1372 let compressed = compress_data(formatted.as_bytes(), Compression::Gzip).unwrap();
1373
1374 assert!(compressed.len() >= 2);
1376 assert_eq!(compressed[0], 0x1f);
1377 assert_eq!(compressed[1], 0x8b);
1378 }
1379
1380 #[test]
1381 fn test_multiple_rows_formatting() {
1382 let rows = vec![
1384 vec!["1", "Alice", "alice@example.com"],
1385 vec!["2", "Bob", "bob@example.com"],
1386 vec!["3", "Charlie", "charlie@example.com"],
1387 ];
1388
1389 let mut data = Vec::new();
1390 for row in rows {
1391 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1392 data.extend_from_slice(formatted.as_bytes());
1393 }
1394
1395 let expected =
1396 "1,Alice,alice@example.com\n2,Bob,bob@example.com\n3,Charlie,charlie@example.com\n";
1397 assert_eq!(String::from_utf8(data).unwrap(), expected);
1398 }
1399
1400 #[test]
1401 fn test_csv_special_characters_in_data() {
1402 let row = vec!["1", "Hello, World!", "Contains \"quotes\""];
1404 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1405
1406 assert_eq!(
1408 formatted,
1409 "1,\"Hello, World!\",\"Contains \"\"quotes\"\"\"\n"
1410 );
1411 }
1412
1413 #[test]
1414 fn test_csv_row_with_newlines() {
1415 let row = vec!["1", "Line1\nLine2", "normal"];
1417 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1418
1419 assert_eq!(formatted, "1,\"Line1\nLine2\",normal\n");
1420 }
1421
1422 #[test]
1423 fn test_csv_row_with_carriage_return() {
1424 let row = vec!["1", "Line1\rLine2", "normal"];
1426 let formatted = format_csv_row(row, ',', '"', &RowSeparator::LF);
1427
1428 assert_eq!(formatted, "1,\"Line1\rLine2\",normal\n");
1429 }
1430
1431 #[test]
1433 fn test_import_protocol_flow_documentation() {
1434 }
1450
1451 #[test]
1453 fn test_detect_compression_path_variations() {
1454 let path = Path::new("/home/user/data/file.csv.gz");
1456 assert_eq!(
1457 detect_compression(path, Compression::None),
1458 Compression::Gzip
1459 );
1460
1461 let path = Path::new("./data/file.csv.bz2");
1463 assert_eq!(
1464 detect_compression(path, Compression::None),
1465 Compression::Bzip2
1466 );
1467
1468 let path = Path::new("file.csv");
1470 assert_eq!(
1471 detect_compression(path, Compression::None),
1472 Compression::None
1473 );
1474 }
1475
1476 #[test]
1478 fn test_build_multi_file_query_basic() {
1479 use crate::query::import::ImportFileEntry;
1480
1481 let options = CsvImportOptions::default();
1482 let entries = vec![
1483 ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
1484 ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
1485 ];
1486
1487 let query = build_multi_file_query("my_table", &options, entries);
1488 let sql = query.build();
1489
1490 assert!(sql.contains("IMPORT INTO my_table"));
1491 assert!(sql.contains("FROM CSV"));
1492 assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
1493 assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
1494 }
1495
1496 #[test]
1497 fn test_build_multi_file_query_with_options() {
1498 use crate::query::import::ImportFileEntry;
1499
1500 let options = CsvImportOptions::default()
1501 .schema("test_schema")
1502 .columns(vec!["id".to_string(), "name".to_string()])
1503 .skip_rows(1)
1504 .compression(Compression::Gzip);
1505
1506 let entries = vec![ImportFileEntry::new(
1507 "10.0.0.5:8563".to_string(),
1508 "001.csv".to_string(),
1509 None,
1510 )];
1511
1512 let query = build_multi_file_query("data", &options, entries);
1513 let sql = query.build();
1514
1515 assert!(sql.contains("IMPORT INTO test_schema.data (id, name)"));
1516 assert!(sql.contains("SKIP = 1"));
1517 assert!(sql.contains("FILE '001.csv.gz'"));
1518 }
1519}