1use std::future::Future;
18use std::io;
19use std::path::Path;
20
21use bzip2::read::BzDecoder;
22use flate2::read::GzDecoder;
23use thiserror::Error;
24use tokio::fs::File;
25use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
26use tokio::sync::mpsc;
27
28use crate::query::export::{Compression, ExportQuery, ExportSource, RowSeparator};
29use crate::transport::protocol::TransportProtocol;
30use crate::transport::HttpTransportClient;
31
32const DEFAULT_PIPE_BUFFER_SIZE: usize = 16;
34
35#[derive(Error, Debug)]
37pub enum ExportError {
38 #[error("I/O error: {0}")]
40 IoError(#[from] io::Error),
41
42 #[error("Transport error: {0}")]
44 TransportError(#[from] crate::error::TransportError),
45
46 #[error("HTTP transport error: {message}")]
48 HttpTransportError { message: String },
49
50 #[error("SQL execution error: {message}")]
52 SqlExecutionError { message: String },
53
54 #[error("CSV parsing error at row {row}: {message}")]
56 CsvParseError { row: usize, message: String },
57
58 #[error("CSV parsing error at row {row}: {message}")]
60 CsvParse { row: usize, message: String },
61
62 #[error("Decompression error: {0}")]
64 DecompressionError(String),
65
66 #[error("Channel error: {0}")]
68 ChannelError(String),
69
70 #[error("Export timed out after {timeout_ms}ms")]
72 Timeout { timeout_ms: u64 },
73
74 #[error("Export was cancelled")]
76 Cancelled,
77
78 #[error("Arrow error: {0}")]
80 Arrow(String),
81
82 #[error("Schema error: {0}")]
84 Schema(String),
85
86 #[error("Parquet error: {0}")]
88 Parquet(String),
89}
90
91impl From<parquet::errors::ParquetError> for ExportError {
92 fn from(err: parquet::errors::ParquetError) -> Self {
93 ExportError::Parquet(err.to_string())
94 }
95}
96
97#[derive(Debug, Clone)]
99pub struct CsvExportOptions {
100 pub column_separator: char,
102
103 pub column_delimiter: char,
105
106 pub row_separator: RowSeparator,
108
109 pub encoding: String,
111
112 pub null_value: Option<String>,
114
115 pub compression: Compression,
117
118 pub with_column_names: bool,
120
121 pub use_tls: bool,
123
124 pub timeout_ms: u64,
126
127 pub host: String,
130
131 pub port: u16,
134}
135
136impl Default for CsvExportOptions {
137 fn default() -> Self {
138 Self {
139 column_separator: ',',
140 column_delimiter: '"',
141 row_separator: RowSeparator::LF,
142 encoding: "UTF-8".to_string(),
143 null_value: None,
144 compression: Compression::None,
145 with_column_names: false,
146 use_tls: false,
147 timeout_ms: 300_000, host: String::new(),
149 port: 0,
150 }
151 }
152}
153
154impl CsvExportOptions {
155 #[must_use]
157 pub fn new() -> Self {
158 Self::default()
159 }
160
161 #[must_use]
162 pub fn column_separator(mut self, sep: char) -> Self {
163 self.column_separator = sep;
164 self
165 }
166
167 #[must_use]
168 pub fn column_delimiter(mut self, delim: char) -> Self {
169 self.column_delimiter = delim;
170 self
171 }
172
173 #[must_use]
174 pub fn row_separator(mut self, sep: RowSeparator) -> Self {
175 self.row_separator = sep;
176 self
177 }
178
179 #[must_use]
180 pub fn encoding(mut self, enc: &str) -> Self {
181 self.encoding = enc.to_string();
182 self
183 }
184
185 #[must_use]
186 pub fn null_value(mut self, val: &str) -> Self {
187 self.null_value = Some(val.to_string());
188 self
189 }
190
191 #[must_use]
192 pub fn compression(mut self, compression: Compression) -> Self {
193 self.compression = compression;
194 self
195 }
196
197 #[must_use]
198 pub fn with_column_names(mut self, include: bool) -> Self {
199 self.with_column_names = include;
200 self
201 }
202
203 #[must_use]
204 pub fn use_tls(mut self, use_tls: bool) -> Self {
205 self.use_tls = use_tls;
206 self
207 }
208
209 #[must_use]
210 pub fn timeout_ms(mut self, timeout: u64) -> Self {
211 self.timeout_ms = timeout;
212 self
213 }
214
215 #[must_use]
219 pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
220 self.host = host.into();
221 self
222 }
223
224 #[must_use]
228 pub fn exasol_port(mut self, port: u16) -> Self {
229 self.port = port;
230 self
231 }
232}
233
234pub struct DataPipeReceiver {
236 rx: mpsc::Receiver<Vec<u8>>,
237}
238
239impl DataPipeReceiver {
240 pub async fn recv(&mut self) -> Option<Vec<u8>> {
244 self.rx.recv().await
245 }
246}
247
248pub async fn export_to_file<T: TransportProtocol + ?Sized>(
268 ws_transport: &mut T,
269 source: ExportSource,
270 file_path: &Path,
271 options: CsvExportOptions,
272) -> Result<u64, ExportError> {
273 let file = File::create(file_path).await?;
275 let writer = BufWriter::new(file);
276
277 export_to_stream(ws_transport, source, writer, options).await
278}
279
280pub async fn export_to_stream<T: TransportProtocol + ?Sized, W: AsyncWrite + Unpin>(
297 ws_transport: &mut T,
298 source: ExportSource,
299 mut writer: W,
300 options: CsvExportOptions,
301) -> Result<u64, ExportError> {
302 let compression = options.compression;
303
304 export_to_callback(ws_transport, source, options, |mut receiver| async move {
306 let mut row_count = 0u64;
307 let mut buffer = Vec::new();
308
309 while let Some(chunk) = receiver.recv().await {
311 buffer.extend_from_slice(&chunk);
312 }
313
314 let data = match compression {
316 Compression::Gzip => {
317 let decoder = GzDecoder::new(buffer.as_slice());
318 let mut decompressed = Vec::new();
319 std::io::Read::read_to_end(
320 &mut std::io::BufReader::new(decoder),
321 &mut decompressed,
322 )
323 .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
324 decompressed
325 }
326 Compression::Bzip2 => {
327 let decoder = BzDecoder::new(buffer.as_slice());
328 let mut decompressed = Vec::new();
329 std::io::Read::read_to_end(
330 &mut std::io::BufReader::new(decoder),
331 &mut decompressed,
332 )
333 .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
334 decompressed
335 }
336 Compression::None => buffer,
337 };
338
339 writer.write_all(&data).await?;
341 writer.flush().await?;
342
343 for byte in &data {
345 if *byte == b'\n' {
346 row_count += 1;
347 }
348 }
349
350 Ok(row_count)
351 })
352 .await
353}
354
355pub async fn export_to_list<T: TransportProtocol + ?Sized>(
376 ws_transport: &mut T,
377 source: ExportSource,
378 options: CsvExportOptions,
379) -> Result<Vec<Vec<String>>, ExportError> {
380 let separator = options.column_separator;
381 let delimiter = options.column_delimiter;
382 let compression = options.compression;
383
384 export_to_callback(ws_transport, source, options, |mut receiver| async move {
385 let mut buffer = Vec::new();
386
387 while let Some(chunk) = receiver.recv().await {
389 buffer.extend_from_slice(&chunk);
390 }
391
392 let data = match compression {
394 Compression::Gzip => {
395 let decoder = GzDecoder::new(buffer.as_slice());
396 let mut decompressed = Vec::new();
397 std::io::Read::read_to_end(
398 &mut std::io::BufReader::new(decoder),
399 &mut decompressed,
400 )
401 .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
402 decompressed
403 }
404 Compression::Bzip2 => {
405 let decoder = BzDecoder::new(buffer.as_slice());
406 let mut decompressed = Vec::new();
407 std::io::Read::read_to_end(
408 &mut std::io::BufReader::new(decoder),
409 &mut decompressed,
410 )
411 .map_err(|e| ExportError::DecompressionError(e.to_string()))?;
412 decompressed
413 }
414 Compression::None => buffer,
415 };
416
417 let csv_string = String::from_utf8(data).map_err(|e| ExportError::CsvParseError {
419 row: 0,
420 message: format!("Invalid UTF-8: {}", e),
421 })?;
422
423 parse_csv(&csv_string, separator, delimiter)
424 })
425 .await
426}
427
428pub async fn export_to_callback<T, F, Fut, R>(
448 ws_transport: &mut T,
449 source: ExportSource,
450 options: CsvExportOptions,
451 callback: F,
452) -> Result<R, ExportError>
453where
454 T: TransportProtocol + ?Sized,
455 F: FnOnce(DataPipeReceiver) -> Fut,
456 Fut: Future<Output = Result<R, ExportError>>,
457{
458 let mut client = HttpTransportClient::connect(&options.host, options.port, options.use_tls)
460 .await
461 .map_err(|e| ExportError::HttpTransportError {
462 message: format!("Failed to connect to Exasol: {e}"),
463 })?;
464
465 let internal_addr = client.internal_address().to_string();
467 let fingerprint = client.public_key_fingerprint().map(|s| s.to_string());
468
469 let mut query_builder = match source {
471 ExportSource::Table {
472 ref schema,
473 ref name,
474 ref columns,
475 } => {
476 let mut builder = ExportQuery::from_table(name);
477 if let Some(s) = schema {
478 builder = builder.schema(s);
479 }
480 if !columns.is_empty() {
481 builder = builder.columns(columns.iter().map(|s| s.as_str()).collect());
482 }
483 builder
484 }
485 ExportSource::Query { ref sql } => ExportQuery::from_query(sql),
486 };
487
488 query_builder = query_builder
490 .at_address(&internal_addr)
491 .column_separator(options.column_separator)
492 .column_delimiter(options.column_delimiter)
493 .row_separator(options.row_separator)
494 .encoding(&options.encoding)
495 .with_column_names(options.with_column_names)
496 .compressed(options.compression);
497
498 if let Some(ref null_val) = options.null_value {
499 query_builder = query_builder.null_value(null_val);
500 }
501
502 if let Some(ref fp) = fingerprint {
503 query_builder = query_builder.with_public_key(fp);
504 }
505
506 let export_sql = query_builder.build();
507
508 let (tx, rx) = mpsc::channel::<Vec<u8>>(DEFAULT_PIPE_BUFFER_SIZE);
510 let receiver = DataPipeReceiver { rx };
511
512 let http_task =
518 tokio::spawn(async move {
519 let (_request, body) = client.handle_export_request().await.map_err(|e| {
521 ExportError::HttpTransportError {
522 message: format!("Failed to handle export request: {e}"),
523 }
524 })?;
525
526 if !body.is_empty() && tx.send(body).await.is_err() {
528 return Err(ExportError::ChannelError("Receiver dropped".to_string()));
529 }
530
531 let _ = client.shutdown().await;
533
534 Ok::<(), ExportError>(())
535 });
536
537 let sql_task = async {
540 ws_transport
541 .execute_query(&export_sql)
542 .await
543 .map_err(|e| ExportError::SqlExecutionError {
544 message: e.to_string(),
545 })
546 };
547
548 let callback_task = callback(receiver);
550
551 let timeout = tokio::time::Duration::from_millis(options.timeout_ms);
553
554 let result = tokio::time::timeout(timeout, async {
555 let sql_result = sql_task.await;
557
558 let (http_result, callback_result) = tokio::join!(http_task, callback_task);
560
561 sql_result?;
563 http_result.map_err(|e| ExportError::HttpTransportError {
564 message: format!("HTTP task panicked: {}", e),
565 })??;
566
567 callback_result
568 })
569 .await
570 .map_err(|_| ExportError::Timeout {
571 timeout_ms: options.timeout_ms,
572 })?;
573
574 result
575}
576
577fn parse_csv(
579 data: &str,
580 separator: char,
581 delimiter: char,
582) -> Result<Vec<Vec<String>>, ExportError> {
583 let mut rows = Vec::new();
584 let mut current_row = Vec::new();
585 let mut current_field = String::new();
586 let mut in_quotes = false;
587 let mut row_num = 0;
588
589 let chars: Vec<char> = data.chars().collect();
590 let mut i = 0;
591
592 while i < chars.len() {
593 let c = chars[i];
594
595 if in_quotes {
596 if c == delimiter {
597 if i + 1 < chars.len() && chars[i + 1] == delimiter {
599 current_field.push(delimiter);
600 i += 2;
601 continue;
602 }
603 in_quotes = false;
605 } else {
606 current_field.push(c);
607 }
608 } else if c == delimiter {
609 in_quotes = true;
611 } else if c == separator {
612 current_row.push(current_field);
614 current_field = String::new();
615 } else if c == '\n' {
616 current_row.push(current_field);
618 current_field = String::new();
619 rows.push(current_row);
620 current_row = Vec::new();
621 row_num += 1;
622 } else if c == '\r' {
623 if i + 1 < chars.len() && chars[i + 1] == '\n' {
625 } else {
627 current_row.push(current_field);
629 current_field = String::new();
630 rows.push(current_row);
631 current_row = Vec::new();
632 row_num += 1;
633 }
634 } else {
635 current_field.push(c);
636 }
637 i += 1;
638 }
639
640 if !current_field.is_empty() || !current_row.is_empty() {
642 current_row.push(current_field);
643 rows.push(current_row);
644 }
645
646 if in_quotes {
648 return Err(ExportError::CsvParseError {
649 row: row_num,
650 message: "Unclosed quote at end of data".to_string(),
651 });
652 }
653
654 Ok(rows)
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660
661 #[test]
664 fn test_csv_export_options_default() {
665 let options = CsvExportOptions::default();
666
667 assert_eq!(options.column_separator, ',');
668 assert_eq!(options.column_delimiter, '"');
669 assert_eq!(options.row_separator, RowSeparator::LF);
670 assert_eq!(options.encoding, "UTF-8");
671 assert!(options.null_value.is_none());
672 assert_eq!(options.compression, Compression::None);
673 assert!(!options.with_column_names);
674 assert!(!options.use_tls);
675 assert_eq!(options.timeout_ms, 300_000);
676 assert_eq!(options.host, "");
677 assert_eq!(options.port, 0);
678 }
679
680 #[test]
681 fn test_csv_export_options_builder() {
682 let options = CsvExportOptions::new()
683 .column_separator(';')
684 .column_delimiter('\'')
685 .row_separator(RowSeparator::CRLF)
686 .encoding("ISO-8859-1")
687 .null_value("NULL")
688 .compression(Compression::Gzip)
689 .with_column_names(true)
690 .use_tls(false)
691 .timeout_ms(60_000)
692 .exasol_host("exasol.example.com")
693 .exasol_port(8563);
694
695 assert_eq!(options.column_separator, ';');
696 assert_eq!(options.column_delimiter, '\'');
697 assert_eq!(options.row_separator, RowSeparator::CRLF);
698 assert_eq!(options.encoding, "ISO-8859-1");
699 assert_eq!(options.null_value, Some("NULL".to_string()));
700 assert_eq!(options.compression, Compression::Gzip);
701 assert!(options.with_column_names);
702 assert!(!options.use_tls);
703 assert_eq!(options.timeout_ms, 60_000);
704 assert_eq!(options.host, "exasol.example.com");
705 assert_eq!(options.port, 8563);
706 }
707
708 #[test]
711 fn test_parse_csv_simple() {
712 let data = "a,b,c\n1,2,3\n";
713 let result = parse_csv(data, ',', '"').unwrap();
714
715 assert_eq!(result.len(), 2);
716 assert_eq!(result[0], vec!["a", "b", "c"]);
717 assert_eq!(result[1], vec!["1", "2", "3"]);
718 }
719
720 #[test]
721 fn test_parse_csv_with_quotes() {
722 let data = "\"hello\",\"world\"\n\"foo\",\"bar\"\n";
723 let result = parse_csv(data, ',', '"').unwrap();
724
725 assert_eq!(result.len(), 2);
726 assert_eq!(result[0], vec!["hello", "world"]);
727 assert_eq!(result[1], vec!["foo", "bar"]);
728 }
729
730 #[test]
731 fn test_parse_csv_with_escaped_quotes() {
732 let data = "\"hello \"\"world\"\"\",normal\n";
733 let result = parse_csv(data, ',', '"').unwrap();
734
735 assert_eq!(result.len(), 1);
736 assert_eq!(result[0], vec!["hello \"world\"", "normal"]);
737 }
738
739 #[test]
740 fn test_parse_csv_with_separator_in_quotes() {
741 let data = "\"a,b,c\",\"d,e\"\n";
742 let result = parse_csv(data, ',', '"').unwrap();
743
744 assert_eq!(result.len(), 1);
745 assert_eq!(result[0], vec!["a,b,c", "d,e"]);
746 }
747
748 #[test]
749 fn test_parse_csv_with_newline_in_quotes() {
750 let data = "\"line1\nline2\",normal\n";
751 let result = parse_csv(data, ',', '"').unwrap();
752
753 assert_eq!(result.len(), 1);
754 assert_eq!(result[0], vec!["line1\nline2", "normal"]);
755 }
756
757 #[test]
758 fn test_parse_csv_with_crlf() {
759 let data = "a,b\r\nc,d\r\n";
760 let result = parse_csv(data, ',', '"').unwrap();
761
762 assert_eq!(result.len(), 2);
763 assert_eq!(result[0], vec!["a", "b"]);
764 assert_eq!(result[1], vec!["c", "d"]);
765 }
766
767 #[test]
768 fn test_parse_csv_with_semicolon_separator() {
769 let data = "a;b;c\n1;2;3\n";
770 let result = parse_csv(data, ';', '"').unwrap();
771
772 assert_eq!(result.len(), 2);
773 assert_eq!(result[0], vec!["a", "b", "c"]);
774 assert_eq!(result[1], vec!["1", "2", "3"]);
775 }
776
777 #[test]
778 fn test_parse_csv_empty_fields() {
779 let data = "a,,c\n,b,\n";
780 let result = parse_csv(data, ',', '"').unwrap();
781
782 assert_eq!(result.len(), 2);
783 assert_eq!(result[0], vec!["a", "", "c"]);
784 assert_eq!(result[1], vec!["", "b", ""]);
785 }
786
787 #[test]
788 fn test_parse_csv_unclosed_quote_error() {
789 let data = "\"unclosed,quote\n";
790 let result = parse_csv(data, ',', '"');
791
792 assert!(result.is_err());
793 if let Err(ExportError::CsvParseError { row, message }) = result {
794 assert_eq!(row, 0);
795 assert!(message.contains("Unclosed quote"));
796 } else {
797 panic!("Expected CsvParseError");
798 }
799 }
800
801 #[test]
802 fn test_parse_csv_empty_data() {
803 let data = "";
804 let result = parse_csv(data, ',', '"').unwrap();
805
806 assert!(result.is_empty());
807 }
808
809 #[test]
810 fn test_parse_csv_single_field() {
811 let data = "single\n";
812 let result = parse_csv(data, ',', '"').unwrap();
813
814 assert_eq!(result.len(), 1);
815 assert_eq!(result[0], vec!["single"]);
816 }
817
818 #[test]
819 fn test_parse_csv_no_trailing_newline() {
820 let data = "a,b,c";
821 let result = parse_csv(data, ',', '"').unwrap();
822
823 assert_eq!(result.len(), 1);
824 assert_eq!(result[0], vec!["a", "b", "c"]);
825 }
826
827 #[test]
830 fn test_export_error_display() {
831 let err = ExportError::IoError(io::Error::new(io::ErrorKind::NotFound, "file not found"));
832 assert!(err.to_string().contains("I/O error"));
833
834 let err = ExportError::HttpTransportError {
835 message: "connection refused".to_string(),
836 };
837 assert!(err.to_string().contains("HTTP transport error"));
838 assert!(err.to_string().contains("connection refused"));
839
840 let err = ExportError::CsvParseError {
841 row: 5,
842 message: "invalid data".to_string(),
843 };
844 assert!(err.to_string().contains("row 5"));
845 assert!(err.to_string().contains("invalid data"));
846
847 let err = ExportError::Timeout { timeout_ms: 5000 };
848 assert!(err.to_string().contains("5000ms"));
849 }
850
851 #[test]
852 fn test_export_error_from_io_error() {
853 let io_err = io::Error::new(io::ErrorKind::PermissionDenied, "access denied");
854 let export_err: ExportError = io_err.into();
855
856 assert!(matches!(export_err, ExportError::IoError(_)));
857 }
858
859 #[tokio::test]
862 async fn test_data_pipe_receiver_recv() {
863 let (tx, rx) = mpsc::channel::<Vec<u8>>(16);
864 let mut receiver = DataPipeReceiver { rx };
865
866 tx.send(vec![1, 2, 3]).await.unwrap();
867 tx.send(vec![4, 5, 6]).await.unwrap();
868 drop(tx);
869
870 let chunk1 = receiver.recv().await;
871 assert_eq!(chunk1, Some(vec![1, 2, 3]));
872
873 let chunk2 = receiver.recv().await;
874 assert_eq!(chunk2, Some(vec![4, 5, 6]));
875
876 let chunk3 = receiver.recv().await;
877 assert!(chunk3.is_none());
878 }
879
880 #[tokio::test]
881 async fn test_data_pipe_receiver_empty() {
882 let (tx, rx) = mpsc::channel::<Vec<u8>>(16);
883 let mut receiver = DataPipeReceiver { rx };
884
885 drop(tx);
886
887 let chunk = receiver.recv().await;
888 assert!(chunk.is_none());
889 }
890
891 #[test]
894 fn test_parse_csv_with_column_headers() {
895 let data = "id,name,email\n1,Alice,alice@example.com\n2,Bob,bob@example.com\n";
896 let result = parse_csv(data, ',', '"').unwrap();
897
898 assert_eq!(result.len(), 3);
899 assert_eq!(result[0], vec!["id", "name", "email"]);
901 assert_eq!(result[1], vec!["1", "Alice", "alice@example.com"]);
903 assert_eq!(result[2], vec!["2", "Bob", "bob@example.com"]);
904 }
905
906 #[test]
907 fn test_parse_csv_with_quoted_headers() {
908 let data = "\"Column A\",\"Column B\",\"Column C\"\n1,2,3\n";
909 let result = parse_csv(data, ',', '"').unwrap();
910
911 assert_eq!(result.len(), 2);
912 assert_eq!(result[0], vec!["Column A", "Column B", "Column C"]);
913 assert_eq!(result[1], vec!["1", "2", "3"]);
914 }
915
916 #[test]
919 fn test_gzip_decompression() {
920 use flate2::write::GzEncoder;
921 use std::io::Write;
922
923 let original = b"hello,world\n1,2\n";
925 let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
926 encoder.write_all(original).unwrap();
927 let compressed = encoder.finish().unwrap();
928
929 let decoder = GzDecoder::new(compressed.as_slice());
931 let mut decompressed = Vec::new();
932 std::io::Read::read_to_end(&mut std::io::BufReader::new(decoder), &mut decompressed)
933 .unwrap();
934
935 assert_eq!(decompressed, original);
936
937 let csv_string = String::from_utf8(decompressed).unwrap();
939 let rows = parse_csv(&csv_string, ',', '"').unwrap();
940 assert_eq!(rows.len(), 2);
941 assert_eq!(rows[0], vec!["hello", "world"]);
942 assert_eq!(rows[1], vec!["1", "2"]);
943 }
944
945 #[test]
946 fn test_bzip2_decompression() {
947 use bzip2::write::BzEncoder;
948 use std::io::Write;
949
950 let original = b"foo,bar\na,b\n";
952 let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::default());
953 encoder.write_all(original).unwrap();
954 let compressed = encoder.finish().unwrap();
955
956 let decoder = BzDecoder::new(compressed.as_slice());
958 let mut decompressed = Vec::new();
959 std::io::Read::read_to_end(&mut std::io::BufReader::new(decoder), &mut decompressed)
960 .unwrap();
961
962 assert_eq!(decompressed, original);
963
964 let csv_string = String::from_utf8(decompressed).unwrap();
966 let rows = parse_csv(&csv_string, ',', '"').unwrap();
967 assert_eq!(rows.len(), 2);
968 assert_eq!(rows[0], vec!["foo", "bar"]);
969 assert_eq!(rows[1], vec!["a", "b"]);
970 }
971
972 #[test]
975 fn test_csv_export_options_with_column_names() {
976 let options = CsvExportOptions::new().with_column_names(true);
977 assert!(options.with_column_names);
978
979 let options = CsvExportOptions::new().with_column_names(false);
980 assert!(!options.with_column_names);
981 }
982
983 #[test]
986 fn test_csv_export_options_compression_gzip() {
987 let options = CsvExportOptions::new().compression(Compression::Gzip);
988 assert_eq!(options.compression, Compression::Gzip);
989 }
990
991 #[test]
992 fn test_csv_export_options_compression_bzip2() {
993 let options = CsvExportOptions::new().compression(Compression::Bzip2);
994 assert_eq!(options.compression, Compression::Bzip2);
995 }
996
997 #[test]
998 fn test_csv_export_options_compression_none() {
999 let options = CsvExportOptions::new().compression(Compression::None);
1000 assert_eq!(options.compression, Compression::None);
1001 }
1002
1003 #[test]
1006 fn test_export_error_decompression() {
1007 let err = ExportError::DecompressionError("invalid gzip header".to_string());
1008 assert!(err.to_string().contains("Decompression error"));
1009 assert!(err.to_string().contains("invalid gzip header"));
1010 }
1011
1012 #[test]
1013 fn test_export_error_channel() {
1014 let err = ExportError::ChannelError("receiver dropped".to_string());
1015 assert!(err.to_string().contains("Channel error"));
1016 assert!(err.to_string().contains("receiver dropped"));
1017 }
1018
1019 #[test]
1020 fn test_export_error_cancelled() {
1021 let err = ExportError::Cancelled;
1022 assert!(err.to_string().contains("cancelled"));
1023 }
1024
1025 #[tokio::test]
1028 async fn test_data_pipe_receiver_multiple_chunks() {
1029 let (tx, rx) = mpsc::channel::<Vec<u8>>(DEFAULT_PIPE_BUFFER_SIZE);
1030 let mut receiver = DataPipeReceiver { rx };
1031
1032 let chunk1 = b"id,name\n".to_vec();
1034 let chunk2 = b"1,Alice\n".to_vec();
1035 let chunk3 = b"2,Bob\n".to_vec();
1036
1037 tx.send(chunk1.clone()).await.unwrap();
1038 tx.send(chunk2.clone()).await.unwrap();
1039 tx.send(chunk3.clone()).await.unwrap();
1040 drop(tx);
1041
1042 let mut buffer = Vec::new();
1044 while let Some(chunk) = receiver.recv().await {
1045 buffer.extend_from_slice(&chunk);
1046 }
1047
1048 let csv_string = String::from_utf8(buffer).unwrap();
1049 let rows = parse_csv(&csv_string, ',', '"').unwrap();
1050
1051 assert_eq!(rows.len(), 3);
1052 assert_eq!(rows[0], vec!["id", "name"]);
1053 assert_eq!(rows[1], vec!["1", "Alice"]);
1054 assert_eq!(rows[2], vec!["2", "Bob"]);
1055 }
1056}