1use std::sync::Arc;
30
31use arrow::array::builder::{
32 BooleanBuilder, Date32Builder, Decimal128Builder, Float64Builder, Int64Builder, StringBuilder,
33 TimestampMicrosecondBuilder,
34};
35use arrow::array::{ArrayRef, RecordBatch};
36use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
37use thiserror::Error;
38use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
39
40use crate::types::{
41 conversion::{
42 exasol_type_to_arrow as exasol_type_to_arrow_impl, parse_date_to_days,
43 parse_decimal_to_i128, parse_timestamp_to_micros,
44 },
45 ExasolType,
46};
47
48#[derive(Error, Debug)]
50pub enum ExportError {
51 #[error("CSV parsing error at row {row}: {message}")]
53 CsvParseError { row: usize, message: String },
54
55 #[error("Type conversion error at row {row}, column {column}: {message}")]
57 TypeConversionError {
58 row: usize,
59 column: usize,
60 message: String,
61 },
62
63 #[error("Schema error: {0}")]
65 SchemaError(String),
66
67 #[error("I/O error: {0}")]
69 IoError(String),
70
71 #[error("Arrow error: {0}")]
73 ArrowError(String),
74
75 #[error("Transport error: {0}")]
77 TransportError(String),
78}
79
80impl From<std::io::Error> for ExportError {
81 fn from(err: std::io::Error) -> Self {
82 ExportError::IoError(err.to_string())
83 }
84}
85
86impl From<arrow::error::ArrowError> for ExportError {
87 fn from(err: arrow::error::ArrowError) -> Self {
88 ExportError::ArrowError(err.to_string())
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct ArrowExportOptions {
95 pub batch_size: usize,
97 pub null_value: Option<String>,
99 pub schema: Option<Arc<Schema>>,
101 pub column_separator: char,
103 pub column_delimiter: char,
105 pub host: String,
108 pub port: u16,
111 pub use_encryption: bool,
115}
116
117impl Default for ArrowExportOptions {
118 fn default() -> Self {
119 Self {
120 batch_size: 1024,
121 null_value: None,
122 schema: None,
123 column_separator: ',',
124 column_delimiter: '"',
125 host: String::new(),
126 port: 0,
127 use_encryption: false,
128 }
129 }
130}
131
132impl ArrowExportOptions {
133 #[must_use]
135 pub fn new() -> Self {
136 Self::default()
137 }
138
139 #[must_use]
140 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
141 self.batch_size = batch_size;
142 self
143 }
144
145 #[must_use]
146 pub fn with_null_value(mut self, null_value: impl Into<String>) -> Self {
147 self.null_value = Some(null_value.into());
148 self
149 }
150
151 #[must_use]
152 pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
153 self.schema = Some(schema);
154 self
155 }
156
157 #[must_use]
158 pub fn with_column_separator(mut self, sep: char) -> Self {
159 self.column_separator = sep;
160 self
161 }
162
163 #[must_use]
164 pub fn with_column_delimiter(mut self, delim: char) -> Self {
165 self.column_delimiter = delim;
166 self
167 }
168
169 #[must_use]
173 pub fn exasol_host(mut self, host: impl Into<String>) -> Self {
174 self.host = host.into();
175 self
176 }
177
178 #[must_use]
182 pub fn exasol_port(mut self, port: u16) -> Self {
183 self.port = port;
184 self
185 }
186
187 #[must_use]
192 pub fn use_encryption(mut self, use_encryption: bool) -> Self {
193 self.use_encryption = use_encryption;
194 self
195 }
196}
197
198pub struct CsvToArrowReader<R> {
203 reader: BufReader<R>,
204 schema: Arc<Schema>,
205 batch_size: usize,
206 null_value: Option<String>,
207 column_separator: char,
208 column_delimiter: char,
209 current_row: usize,
210 finished: bool,
211}
212
213impl<R: AsyncBufRead + Unpin> CsvToArrowReader<R> {
214 pub fn new(reader: R, schema: Arc<Schema>, options: &ArrowExportOptions) -> Self
222 where
223 R: tokio::io::AsyncRead,
224 {
225 Self {
226 reader: BufReader::new(reader),
227 schema,
228 batch_size: options.batch_size,
229 null_value: options.null_value.clone(),
230 column_separator: options.column_separator,
231 column_delimiter: options.column_delimiter,
232 current_row: 0,
233 finished: false,
234 }
235 }
236
237 pub fn from_buffered(
245 reader: BufReader<R>,
246 schema: Arc<Schema>,
247 options: &ArrowExportOptions,
248 ) -> Self
249 where
250 R: tokio::io::AsyncRead,
251 {
252 Self {
253 reader,
254 schema,
255 batch_size: options.batch_size,
256 null_value: options.null_value.clone(),
257 column_separator: options.column_separator,
258 column_delimiter: options.column_delimiter,
259 current_row: 0,
260 finished: false,
261 }
262 }
263
264 #[must_use]
266 pub fn schema(&self) -> Arc<Schema> {
267 Arc::clone(&self.schema)
268 }
269
270 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>, ExportError> {
278 if self.finished {
279 return Ok(None);
280 }
281
282 let mut rows: Vec<Vec<String>> = Vec::with_capacity(self.batch_size);
283
284 for _ in 0..self.batch_size {
286 let mut line = String::new();
287 let bytes_read = self.reader.read_line(&mut line).await?;
288
289 if bytes_read == 0 {
290 self.finished = true;
291 break;
292 }
293
294 let line = line.trim_end_matches(&['\r', '\n'][..]);
296 if line.is_empty() {
297 continue;
298 }
299
300 let fields = parse_csv_row(
301 line,
302 self.column_separator,
303 self.column_delimiter,
304 self.current_row,
305 )?;
306 rows.push(fields);
307 self.current_row += 1;
308 }
309
310 if rows.is_empty() {
311 return Ok(None);
312 }
313
314 let batch = self.build_record_batch(&rows)?;
316 Ok(Some(batch))
317 }
318
319 fn build_record_batch(&self, rows: &[Vec<String>]) -> Result<RecordBatch, ExportError> {
321 let num_columns = self.schema.fields().len();
322 let num_rows = rows.len();
323
324 let arrays: Result<Vec<ArrayRef>, ExportError> = (0..num_columns)
326 .map(|col_idx| {
327 let field = self.schema.field(col_idx);
328 let values: Vec<&str> = rows
329 .iter()
330 .map(|row| row.get(col_idx).map(|s| s.as_str()).unwrap_or(""))
331 .collect();
332
333 build_array_from_strings(
334 &values,
335 field.data_type(),
336 field.is_nullable(),
337 &self.null_value,
338 self.current_row - num_rows,
339 col_idx,
340 )
341 })
342 .collect();
343
344 let arrays = arrays?;
345
346 RecordBatch::try_new(Arc::clone(&self.schema), arrays)
347 .map_err(|e| ExportError::ArrowError(e.to_string()))
348 }
349}
350
351fn parse_csv_row(
353 line: &str,
354 separator: char,
355 delimiter: char,
356 row: usize,
357) -> Result<Vec<String>, ExportError> {
358 let mut fields = Vec::new();
359 let mut current_field = String::new();
360 let mut in_quotes = false;
361 let mut chars = line.chars().peekable();
362
363 while let Some(c) = chars.next() {
364 if in_quotes {
365 if c == delimiter {
366 if chars.peek() == Some(&delimiter) {
368 current_field.push(delimiter);
369 chars.next();
370 } else {
371 in_quotes = false;
372 }
373 } else {
374 current_field.push(c);
375 }
376 } else if c == delimiter {
377 in_quotes = true;
378 } else if c == separator {
379 fields.push(current_field);
380 current_field = String::new();
381 } else {
382 current_field.push(c);
383 }
384 }
385
386 fields.push(current_field);
388
389 if in_quotes {
391 return Err(ExportError::CsvParseError {
392 row,
393 message: "Unclosed quote in CSV row".to_string(),
394 });
395 }
396
397 Ok(fields)
398}
399
400fn build_array_from_strings(
402 values: &[&str],
403 data_type: &DataType,
404 nullable: bool,
405 null_value: &Option<String>,
406 start_row: usize,
407 column: usize,
408) -> Result<ArrayRef, ExportError> {
409 match data_type {
410 DataType::Boolean => build_boolean_array(values, nullable, null_value, start_row, column),
411 DataType::Int64 => build_int64_array(values, nullable, null_value, start_row, column),
412 DataType::Float64 => build_float64_array(values, nullable, null_value, start_row, column),
413 DataType::Utf8 => build_string_array(values, nullable, null_value),
414 DataType::Date32 => build_date32_array(values, nullable, null_value, start_row, column),
415 DataType::Timestamp(TimeUnit::Microsecond, _) => {
416 build_timestamp_array(values, nullable, null_value, start_row, column)
417 }
418 DataType::Decimal128(precision, scale) => build_decimal128_array(
419 values, *precision, *scale, nullable, null_value, start_row, column,
420 ),
421 _ => Err(ExportError::SchemaError(format!(
422 "Unsupported data type: {:?}",
423 data_type
424 ))),
425 }
426}
427
428fn is_null_value(value: &str, null_value: &Option<String>) -> bool {
430 if value.is_empty() {
431 return true;
432 }
433 if let Some(nv) = null_value {
434 return value == nv;
435 }
436 false
437}
438
439fn build_boolean_array(
441 values: &[&str],
442 nullable: bool,
443 null_value: &Option<String>,
444 start_row: usize,
445 column: usize,
446) -> Result<ArrayRef, ExportError> {
447 let mut builder = BooleanBuilder::with_capacity(values.len());
448
449 for (i, value) in values.iter().enumerate() {
450 if is_null_value(value, null_value) {
451 if nullable {
452 builder.append_null();
453 } else {
454 return Err(ExportError::TypeConversionError {
455 row: start_row + i,
456 column,
457 message: "NULL value in non-nullable column".to_string(),
458 });
459 }
460 } else {
461 let lower = value.to_lowercase();
462 let b = match lower.as_str() {
463 "true" | "1" | "t" | "yes" | "y" => true,
464 "false" | "0" | "f" | "no" | "n" => false,
465 _ => {
466 return Err(ExportError::TypeConversionError {
467 row: start_row + i,
468 column,
469 message: format!("Invalid boolean value: {}", value),
470 });
471 }
472 };
473 builder.append_value(b);
474 }
475 }
476
477 Ok(Arc::new(builder.finish()))
478}
479
480fn build_int64_array(
482 values: &[&str],
483 nullable: bool,
484 null_value: &Option<String>,
485 start_row: usize,
486 column: usize,
487) -> Result<ArrayRef, ExportError> {
488 let mut builder = Int64Builder::with_capacity(values.len());
489
490 for (i, value) in values.iter().enumerate() {
491 if is_null_value(value, null_value) {
492 if nullable {
493 builder.append_null();
494 } else {
495 return Err(ExportError::TypeConversionError {
496 row: start_row + i,
497 column,
498 message: "NULL value in non-nullable column".to_string(),
499 });
500 }
501 } else {
502 let n = value
503 .parse::<i64>()
504 .map_err(|e| ExportError::TypeConversionError {
505 row: start_row + i,
506 column,
507 message: format!("Invalid integer value '{}': {}", value, e),
508 })?;
509 builder.append_value(n);
510 }
511 }
512
513 Ok(Arc::new(builder.finish()))
514}
515
516fn build_float64_array(
518 values: &[&str],
519 nullable: bool,
520 null_value: &Option<String>,
521 start_row: usize,
522 column: usize,
523) -> Result<ArrayRef, ExportError> {
524 let mut builder = Float64Builder::with_capacity(values.len());
525
526 for (i, value) in values.iter().enumerate() {
527 if is_null_value(value, null_value) {
528 if nullable {
529 builder.append_null();
530 } else {
531 return Err(ExportError::TypeConversionError {
532 row: start_row + i,
533 column,
534 message: "NULL value in non-nullable column".to_string(),
535 });
536 }
537 } else {
538 let f = match *value {
540 "Infinity" | "inf" => f64::INFINITY,
541 "-Infinity" | "-inf" => f64::NEG_INFINITY,
542 "NaN" | "nan" => f64::NAN,
543 _ => value
544 .parse::<f64>()
545 .map_err(|e| ExportError::TypeConversionError {
546 row: start_row + i,
547 column,
548 message: format!("Invalid float value '{}': {}", value, e),
549 })?,
550 };
551 builder.append_value(f);
552 }
553 }
554
555 Ok(Arc::new(builder.finish()))
556}
557
558fn build_string_array(
560 values: &[&str],
561 nullable: bool,
562 null_value: &Option<String>,
563) -> Result<ArrayRef, ExportError> {
564 let mut builder =
565 StringBuilder::with_capacity(values.len(), values.iter().map(|s| s.len()).sum());
566
567 for value in values.iter() {
568 if is_null_value(value, null_value) {
569 if nullable {
570 builder.append_null();
571 } else {
572 builder.append_value("");
573 }
574 } else {
575 builder.append_value(value);
576 }
577 }
578
579 Ok(Arc::new(builder.finish()))
580}
581
582fn build_date32_array(
584 values: &[&str],
585 nullable: bool,
586 null_value: &Option<String>,
587 start_row: usize,
588 column: usize,
589) -> Result<ArrayRef, ExportError> {
590 let mut builder = Date32Builder::with_capacity(values.len());
591
592 for (i, value) in values.iter().enumerate() {
593 if is_null_value(value, null_value) {
594 if nullable {
595 builder.append_null();
596 } else {
597 return Err(ExportError::TypeConversionError {
598 row: start_row + i,
599 column,
600 message: "NULL value in non-nullable column".to_string(),
601 });
602 }
603 } else {
604 let days = parse_date_to_days(value).map_err(|e| ExportError::TypeConversionError {
605 row: start_row + i,
606 column,
607 message: e,
608 })?;
609 builder.append_value(days);
610 }
611 }
612
613 Ok(Arc::new(builder.finish()))
614}
615
616fn build_timestamp_array(
618 values: &[&str],
619 nullable: bool,
620 null_value: &Option<String>,
621 start_row: usize,
622 column: usize,
623) -> Result<ArrayRef, ExportError> {
624 let mut builder = TimestampMicrosecondBuilder::with_capacity(values.len());
625
626 for (i, value) in values.iter().enumerate() {
627 if is_null_value(value, null_value) {
628 if nullable {
629 builder.append_null();
630 } else {
631 return Err(ExportError::TypeConversionError {
632 row: start_row + i,
633 column,
634 message: "NULL value in non-nullable column".to_string(),
635 });
636 }
637 } else {
638 let micros =
639 parse_timestamp_to_micros(value).map_err(|e| ExportError::TypeConversionError {
640 row: start_row + i,
641 column,
642 message: e,
643 })?;
644 builder.append_value(micros);
645 }
646 }
647
648 Ok(Arc::new(builder.finish()))
649}
650
651fn build_decimal128_array(
653 values: &[&str],
654 precision: u8,
655 scale: i8,
656 nullable: bool,
657 null_value: &Option<String>,
658 start_row: usize,
659 column: usize,
660) -> Result<ArrayRef, ExportError> {
661 let mut builder = Decimal128Builder::with_capacity(values.len())
662 .with_precision_and_scale(precision, scale)
663 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
664
665 for (i, value) in values.iter().enumerate() {
666 if is_null_value(value, null_value) {
667 if nullable {
668 builder.append_null();
669 } else {
670 return Err(ExportError::TypeConversionError {
671 row: start_row + i,
672 column,
673 message: "NULL value in non-nullable column".to_string(),
674 });
675 }
676 } else {
677 let decimal = parse_decimal_to_i128(value, scale).map_err(|e| {
678 ExportError::TypeConversionError {
679 row: start_row + i,
680 column,
681 message: e,
682 }
683 })?;
684 builder.append_value(decimal);
685 }
686 }
687
688 Ok(Arc::new(builder.finish()))
689}
690
691pub fn exasol_type_to_arrow(exasol_type: &ExasolType) -> Result<DataType, ExportError> {
693 exasol_type_to_arrow_impl(exasol_type).map_err(ExportError::SchemaError)
694}
695
696pub fn build_schema_from_exasol_types(
698 columns: &[(String, ExasolType, bool)],
699) -> Result<Schema, ExportError> {
700 let fields: Result<Vec<Field>, ExportError> = columns
701 .iter()
702 .map(|(name, exasol_type, nullable)| {
703 let data_type = exasol_type_to_arrow(exasol_type)?;
704 Ok(Field::new(name, data_type, *nullable))
705 })
706 .collect();
707
708 Ok(Schema::new(fields?))
709}
710
711pub async fn write_arrow_ipc<W, I>(
723 writer: &mut W,
724 schema: Arc<Schema>,
725 batches: I,
726) -> Result<u64, ExportError>
727where
728 W: AsyncWrite + Unpin + Send,
729 I: IntoIterator<Item = Result<RecordBatch, ExportError>>,
730{
731 use arrow::ipc::writer::StreamWriter;
732 use std::io::Cursor;
733
734 let mut total_rows = 0u64;
735
736 let mut buffer = Cursor::new(Vec::new());
739 {
740 let mut ipc_writer = StreamWriter::try_new(&mut buffer, &schema)
741 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
742
743 for batch_result in batches {
744 let batch = batch_result?;
745 total_rows += batch.num_rows() as u64;
746 ipc_writer
747 .write(&batch)
748 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
749 }
750
751 ipc_writer
752 .finish()
753 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
754 }
755
756 let data = buffer.into_inner();
758 writer
759 .write_all(&data)
760 .await
761 .map_err(|e| ExportError::IoError(e.to_string()))?;
762 writer
763 .flush()
764 .await
765 .map_err(|e| ExportError::IoError(e.to_string()))?;
766
767 Ok(total_rows)
768}
769
770pub async fn write_arrow_ipc_file<W, I>(
782 writer: &mut W,
783 schema: Arc<Schema>,
784 batches: I,
785) -> Result<u64, ExportError>
786where
787 W: AsyncWrite + Unpin + Send,
788 I: IntoIterator<Item = Result<RecordBatch, ExportError>>,
789{
790 use arrow::ipc::writer::FileWriter;
791 use std::io::Cursor;
792
793 let mut total_rows = 0u64;
794
795 let mut buffer = Cursor::new(Vec::new());
797 {
798 let mut ipc_writer = FileWriter::try_new(&mut buffer, &schema)
799 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
800
801 for batch_result in batches {
802 let batch = batch_result?;
803 total_rows += batch.num_rows() as u64;
804 ipc_writer
805 .write(&batch)
806 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
807 }
808
809 ipc_writer
810 .finish()
811 .map_err(|e| ExportError::ArrowError(e.to_string()))?;
812 }
813
814 let data = buffer.into_inner();
816 writer
817 .write_all(&data)
818 .await
819 .map_err(|e| ExportError::IoError(e.to_string()))?;
820 writer
821 .flush()
822 .await
823 .map_err(|e| ExportError::IoError(e.to_string()))?;
824
825 Ok(total_rows)
826}
827
828use crate::query::export::ExportSource;
833use crate::transport::TransportProtocol;
834
835pub async fn export_to_record_batches<T: TransportProtocol + ?Sized>(
854 transport: &mut T,
855 source: ExportSource,
856 options: ArrowExportOptions,
857) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
858 use crate::export::csv::{export_to_list, CsvExportOptions};
859
860 let csv_options = CsvExportOptions::default()
862 .column_separator(options.column_separator)
863 .column_delimiter(options.column_delimiter)
864 .with_column_names(false)
865 .exasol_host(&options.host)
866 .exasol_port(options.port)
867 .use_tls(options.use_encryption);
868
869 let rows = export_to_list(transport, source, csv_options).await?;
871
872 let schema = match options.schema {
874 Some(s) => s,
875 None => {
876 return Err(crate::export::csv::ExportError::CsvParseError {
878 row: 0,
879 message: "Schema required for Arrow export".to_string(),
880 });
881 }
882 };
883
884 let mut batches = Vec::new();
886 for chunk in rows.chunks(options.batch_size) {
887 let arrays: Result<Vec<ArrayRef>, ExportError> = (0..schema.fields().len())
888 .map(|col_idx| {
889 let field = schema.field(col_idx);
890 let values: Vec<&str> = chunk
891 .iter()
892 .map(|row| row.get(col_idx).map(|s| s.as_str()).unwrap_or(""))
893 .collect();
894
895 build_array_from_strings(
896 &values,
897 field.data_type(),
898 field.is_nullable(),
899 &options.null_value,
900 0,
901 col_idx,
902 )
903 })
904 .collect();
905
906 let arrays = arrays.map_err(|e| crate::export::csv::ExportError::CsvParseError {
907 row: 0,
908 message: e.to_string(),
909 })?;
910
911 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
912 crate::export::csv::ExportError::CsvParseError {
913 row: 0,
914 message: e.to_string(),
915 }
916 })?;
917
918 batches.push(batch);
919 }
920
921 Ok(batches)
922}
923
924pub async fn export_to_arrow_ipc<T: TransportProtocol + ?Sized>(
943 transport: &mut T,
944 source: ExportSource,
945 file_path: &std::path::Path,
946 options: ArrowExportOptions,
947) -> Result<u64, crate::export::csv::ExportError> {
948 let batches = export_to_record_batches(transport, source, options.clone()).await?;
950
951 let schema = options
953 .schema
954 .ok_or_else(|| crate::export::csv::ExportError::CsvParseError {
955 row: 0,
956 message: "Schema required for Arrow IPC export".to_string(),
957 })?;
958
959 let file = tokio::fs::File::create(file_path).await?;
961
962 let mut file = tokio::io::BufWriter::new(file);
963
964 let batch_results: Vec<Result<RecordBatch, ExportError>> =
965 batches.into_iter().map(Ok).collect();
966
967 let rows = write_arrow_ipc_file(&mut file, schema, batch_results)
968 .await
969 .map_err(|e| crate::export::csv::ExportError::CsvParseError {
970 row: 0,
971 message: e.to_string(),
972 })?;
973
974 Ok(rows)
975}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980 use tokio::io::BufReader;
981
982 #[test]
987 fn test_arrow_export_options_default() {
988 let options = ArrowExportOptions::default();
989 assert_eq!(options.batch_size, 1024);
990 assert!(options.null_value.is_none());
991 assert!(options.schema.is_none());
992 assert_eq!(options.column_separator, ',');
993 assert_eq!(options.column_delimiter, '"');
994 assert_eq!(options.host, "");
995 assert_eq!(options.port, 0);
996 assert!(!options.use_encryption);
997 }
998
999 #[test]
1000 fn test_arrow_export_options_builder() {
1001 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1002
1003 let options = ArrowExportOptions::new()
1004 .with_batch_size(2048)
1005 .with_null_value("NULL")
1006 .with_schema(Arc::clone(&schema))
1007 .with_column_separator(';')
1008 .with_column_delimiter('\'')
1009 .exasol_host("exasol.example.com")
1010 .exasol_port(8563)
1011 .use_encryption(true);
1012
1013 assert_eq!(options.batch_size, 2048);
1014 assert_eq!(options.null_value, Some("NULL".to_string()));
1015 assert!(options.schema.is_some());
1016 assert_eq!(options.column_separator, ';');
1017 assert_eq!(options.column_delimiter, '\'');
1018 assert_eq!(options.host, "exasol.example.com");
1019 assert_eq!(options.port, 8563);
1020 assert!(options.use_encryption);
1021 }
1022
1023 #[test]
1028 fn test_parse_csv_row_simple() {
1029 let line = "1,hello,world";
1030 let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1031 assert_eq!(fields, vec!["1", "hello", "world"]);
1032 }
1033
1034 #[test]
1035 fn test_parse_csv_row_quoted() {
1036 let line = r#"1,"hello, world","test""#;
1037 let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1038 assert_eq!(fields, vec!["1", "hello, world", "test"]);
1039 }
1040
1041 #[test]
1042 fn test_parse_csv_row_escaped_quote() {
1043 let line = r#"1,"hello ""world""","test""#;
1044 let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1045 assert_eq!(fields, vec!["1", r#"hello "world""#, "test"]);
1046 }
1047
1048 #[test]
1049 fn test_parse_csv_row_empty_fields() {
1050 let line = "1,,3";
1051 let fields = parse_csv_row(line, ',', '"', 0).unwrap();
1052 assert_eq!(fields, vec!["1", "", "3"]);
1053 }
1054
1055 #[test]
1056 fn test_parse_csv_row_custom_separator() {
1057 let line = "1;hello;world";
1058 let fields = parse_csv_row(line, ';', '"', 0).unwrap();
1059 assert_eq!(fields, vec!["1", "hello", "world"]);
1060 }
1061
1062 #[test]
1063 fn test_parse_csv_row_unclosed_quote_error() {
1064 let line = r#"1,"hello"#;
1065 let result = parse_csv_row(line, ',', '"', 0);
1066 assert!(result.is_err());
1067 match result.unwrap_err() {
1068 ExportError::CsvParseError { row, message } => {
1069 assert_eq!(row, 0);
1070 assert!(message.contains("Unclosed quote"));
1071 }
1072 _ => panic!("Expected CsvParseError"),
1073 }
1074 }
1075
1076 #[test]
1081 fn test_is_null_value_empty() {
1082 assert!(is_null_value("", &None));
1083 assert!(is_null_value("", &Some("NULL".to_string())));
1084 }
1085
1086 #[test]
1087 fn test_is_null_value_custom() {
1088 assert!(is_null_value("NULL", &Some("NULL".to_string())));
1089 assert!(!is_null_value("null", &Some("NULL".to_string())));
1090 assert!(!is_null_value("value", &Some("NULL".to_string())));
1091 }
1092
1093 #[test]
1094 fn test_is_null_value_no_custom() {
1095 assert!(!is_null_value("NULL", &None));
1096 assert!(!is_null_value("value", &None));
1097 }
1098
1099 #[test]
1104 fn test_parse_date_to_days_epoch() {
1105 let days = parse_date_to_days("1970-01-01").unwrap();
1106 assert_eq!(days, 0);
1107 }
1108
1109 #[test]
1110 fn test_parse_date_to_days_after_epoch() {
1111 let days = parse_date_to_days("1970-01-02").unwrap();
1112 assert_eq!(days, 1);
1113 }
1114
1115 #[test]
1116 fn test_parse_date_to_days_before_epoch() {
1117 let days = parse_date_to_days("1969-12-31").unwrap();
1118 assert_eq!(days, -1);
1119 }
1120
1121 #[test]
1122 fn test_parse_date_to_days_leap_year() {
1123 let mar1_2000 = parse_date_to_days("2000-03-01").unwrap();
1125 let feb28_2000 = parse_date_to_days("2000-02-28").unwrap();
1126 assert_eq!(mar1_2000 - feb28_2000, 2); }
1128
1129 #[test]
1130 fn test_parse_date_to_days_invalid_format() {
1131 assert!(parse_date_to_days("2024/01/15").is_err());
1132 assert!(parse_date_to_days("2024-01").is_err());
1133 assert!(parse_date_to_days("invalid").is_err());
1134 }
1135
1136 #[test]
1137 fn test_parse_date_to_days_invalid_values() {
1138 assert!(parse_date_to_days("2024-13-01").is_err()); assert!(parse_date_to_days("2024-01-32").is_err()); assert!(parse_date_to_days("2024-00-15").is_err()); }
1142
1143 #[test]
1148 fn test_parse_timestamp_to_micros_epoch() {
1149 let micros = parse_timestamp_to_micros("1970-01-01 00:00:00").unwrap();
1150 assert_eq!(micros, 0);
1151 }
1152
1153 #[test]
1154 fn test_parse_timestamp_to_micros_with_time() {
1155 let micros = parse_timestamp_to_micros("1970-01-01 00:00:01").unwrap();
1156 assert_eq!(micros, 1_000_000);
1157 }
1158
1159 #[test]
1160 fn test_parse_timestamp_to_micros_with_fractional() {
1161 let micros = parse_timestamp_to_micros("1970-01-01 00:00:00.123456").unwrap();
1162 assert_eq!(micros, 123_456);
1163 }
1164
1165 #[test]
1166 fn test_parse_timestamp_to_micros_date_only() {
1167 let micros = parse_timestamp_to_micros("1970-01-02").unwrap();
1168 assert_eq!(micros, 86400 * 1_000_000);
1169 }
1170
1171 #[test]
1176 fn test_parse_decimal_to_i128_integer() {
1177 let result = parse_decimal_to_i128("123", 2).unwrap();
1178 assert_eq!(result, 12300); }
1180
1181 #[test]
1182 fn test_parse_decimal_to_i128_with_fraction() {
1183 let result = parse_decimal_to_i128("123.45", 2).unwrap();
1184 assert_eq!(result, 12345);
1185 }
1186
1187 #[test]
1188 fn test_parse_decimal_to_i128_negative() {
1189 let result = parse_decimal_to_i128("-123.45", 2).unwrap();
1190 assert_eq!(result, -12345);
1191 }
1192
1193 #[test]
1194 fn test_parse_decimal_to_i128_short_fraction() {
1195 let result = parse_decimal_to_i128("123.4", 2).unwrap();
1196 assert_eq!(result, 12340);
1197 }
1198
1199 #[test]
1200 fn test_parse_decimal_to_i128_invalid() {
1201 assert!(parse_decimal_to_i128("abc", 2).is_err());
1202 assert!(parse_decimal_to_i128("1.2.3", 2).is_err());
1203 }
1204
1205 #[test]
1210 fn test_build_boolean_array() {
1211 let values = vec!["true", "false", "", "1", "0"];
1212 let null_value = None;
1213 let array = build_boolean_array(&values, true, &null_value, 0, 0).unwrap();
1214
1215 assert_eq!(array.len(), 5);
1216 assert_eq!(array.null_count(), 1);
1217 }
1218
1219 #[test]
1220 fn test_build_boolean_array_invalid() {
1221 let values = vec!["invalid"];
1222 let null_value = None;
1223 let result = build_boolean_array(&values, true, &null_value, 0, 0);
1224 assert!(result.is_err());
1225 }
1226
1227 #[test]
1228 fn test_build_int64_array() {
1229 let values = vec!["1", "2", "", "3"];
1230 let null_value = None;
1231 let array = build_int64_array(&values, true, &null_value, 0, 0).unwrap();
1232
1233 assert_eq!(array.len(), 4);
1234 assert_eq!(array.null_count(), 1);
1235 }
1236
1237 #[test]
1238 fn test_build_float64_array() {
1239 let values = vec!["1.5", "2.5", "Infinity", "-Infinity", "NaN", ""];
1240 let null_value = None;
1241 let array = build_float64_array(&values, true, &null_value, 0, 0).unwrap();
1242
1243 assert_eq!(array.len(), 6);
1244 assert_eq!(array.null_count(), 1);
1245 }
1246
1247 #[test]
1248 fn test_build_string_array() {
1249 let values = vec!["hello", "world", "", "test"];
1250 let null_value = None;
1251 let array = build_string_array(&values, true, &null_value).unwrap();
1252
1253 assert_eq!(array.len(), 4);
1254 assert_eq!(array.null_count(), 1);
1255 }
1256
1257 #[test]
1258 fn test_build_date32_array() {
1259 let values = vec!["2024-01-15", "2024-06-20", ""];
1260 let null_value = None;
1261 let array = build_date32_array(&values, true, &null_value, 0, 0).unwrap();
1262
1263 assert_eq!(array.len(), 3);
1264 assert_eq!(array.null_count(), 1);
1265 }
1266
1267 #[test]
1268 fn test_build_timestamp_array() {
1269 let values = vec!["2024-01-15 10:30:00", "2024-06-20 14:45:30.123456", ""];
1270 let null_value = None;
1271 let array = build_timestamp_array(&values, true, &null_value, 0, 0).unwrap();
1272
1273 assert_eq!(array.len(), 3);
1274 assert_eq!(array.null_count(), 1);
1275 }
1276
1277 #[test]
1278 fn test_build_decimal128_array() {
1279 let values = vec!["123.45", "678.90", ""];
1280 let null_value = None;
1281 let array = build_decimal128_array(&values, 10, 2, true, &null_value, 0, 0).unwrap();
1282
1283 assert_eq!(array.len(), 3);
1284 assert_eq!(array.null_count(), 1);
1285 }
1286
1287 #[test]
1292 fn test_exasol_type_to_arrow() {
1293 assert_eq!(
1294 exasol_type_to_arrow(&ExasolType::Boolean).unwrap(),
1295 DataType::Boolean
1296 );
1297 assert_eq!(
1298 exasol_type_to_arrow(&ExasolType::Varchar { size: 100 }).unwrap(),
1299 DataType::Utf8
1300 );
1301 assert_eq!(
1302 exasol_type_to_arrow(&ExasolType::Double).unwrap(),
1303 DataType::Float64
1304 );
1305 assert_eq!(
1306 exasol_type_to_arrow(&ExasolType::Date).unwrap(),
1307 DataType::Date32
1308 );
1309 }
1310
1311 #[test]
1312 fn test_build_schema_from_exasol_types() {
1313 let columns = vec![
1314 (
1315 "id".to_string(),
1316 ExasolType::Decimal {
1317 precision: 18,
1318 scale: 0,
1319 },
1320 false,
1321 ),
1322 ("name".to_string(), ExasolType::Varchar { size: 100 }, true),
1323 ("active".to_string(), ExasolType::Boolean, true),
1324 ];
1325
1326 let schema = build_schema_from_exasol_types(&columns).unwrap();
1327 assert_eq!(schema.fields().len(), 3);
1328 assert_eq!(schema.field(0).name(), "id");
1329 assert_eq!(schema.field(1).name(), "name");
1330 assert_eq!(schema.field(2).name(), "active");
1331 }
1332
1333 #[tokio::test]
1338 async fn test_csv_to_arrow_reader_simple() {
1339 let csv_data = "1,hello,true\n2,world,false\n";
1340 let reader = BufReader::new(csv_data.as_bytes());
1341
1342 let schema = Arc::new(Schema::new(vec![
1343 Field::new("id", DataType::Int64, false),
1344 Field::new("name", DataType::Utf8, false),
1345 Field::new("active", DataType::Boolean, false),
1346 ]));
1347
1348 let options = ArrowExportOptions::default().with_batch_size(10);
1349 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1350
1351 let batch = arrow_reader.next_batch().await.unwrap().unwrap();
1352 assert_eq!(batch.num_rows(), 2);
1353 assert_eq!(batch.num_columns(), 3);
1354
1355 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1357 }
1358
1359 #[tokio::test]
1360 async fn test_csv_to_arrow_reader_with_nulls() {
1361 let csv_data = "1,hello\n2,\n";
1362 let reader = BufReader::new(csv_data.as_bytes());
1363
1364 let schema = Arc::new(Schema::new(vec![
1365 Field::new("id", DataType::Int64, false),
1366 Field::new("name", DataType::Utf8, true),
1367 ]));
1368
1369 let options = ArrowExportOptions::default();
1370 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1371
1372 let batch = arrow_reader.next_batch().await.unwrap().unwrap();
1373 assert_eq!(batch.num_rows(), 2);
1374 assert_eq!(batch.column(1).null_count(), 1);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_csv_to_arrow_reader_batching() {
1379 let csv_data = "1\n2\n3\n4\n5\n";
1380 let reader = BufReader::new(csv_data.as_bytes());
1381
1382 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1383
1384 let options = ArrowExportOptions::default().with_batch_size(2);
1385 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1386
1387 let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1389 assert_eq!(batch1.num_rows(), 2);
1390
1391 let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1393 assert_eq!(batch2.num_rows(), 2);
1394
1395 let batch3 = arrow_reader.next_batch().await.unwrap().unwrap();
1397 assert_eq!(batch3.num_rows(), 1);
1398
1399 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1401 }
1402
1403 #[tokio::test]
1404 async fn test_csv_to_arrow_reader_empty() {
1405 let csv_data = "";
1406 let reader = BufReader::new(csv_data.as_bytes());
1407
1408 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1409
1410 let options = ArrowExportOptions::default();
1411 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1412
1413 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1414 }
1415
1416 #[tokio::test]
1421 async fn test_write_arrow_ipc() {
1422 let schema = Arc::new(Schema::new(vec![
1423 Field::new("id", DataType::Int64, false),
1424 Field::new("name", DataType::Utf8, true),
1425 ]));
1426
1427 let batch = RecordBatch::try_new(
1428 Arc::clone(&schema),
1429 vec![
1430 Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3])),
1431 Arc::new(arrow::array::StringArray::from(vec![
1432 Some("a"),
1433 Some("b"),
1434 None,
1435 ])),
1436 ],
1437 )
1438 .unwrap();
1439
1440 let mut buffer = Vec::new();
1441 let rows = write_arrow_ipc(&mut buffer, schema, vec![Ok(batch)])
1442 .await
1443 .unwrap();
1444
1445 assert_eq!(rows, 3);
1446 assert!(!buffer.is_empty());
1447 }
1448
1449 #[tokio::test]
1450 async fn test_write_arrow_ipc_file() {
1451 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1452
1453 let batch = RecordBatch::try_new(
1454 Arc::clone(&schema),
1455 vec![Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3]))],
1456 )
1457 .unwrap();
1458
1459 let mut buffer = Vec::new();
1460 let rows = write_arrow_ipc_file(&mut buffer, schema, vec![Ok(batch)])
1461 .await
1462 .unwrap();
1463
1464 assert_eq!(rows, 3);
1465 assert!(!buffer.is_empty());
1466 assert_eq!(&buffer[0..6], b"ARROW1");
1468 }
1469
1470 #[test]
1475 fn test_boolean_conversion_variants() {
1476 let values = vec!["true", "false", "TRUE", "FALSE", "True", "False"];
1477 let null_value = None;
1478 let array = build_boolean_array(&values, false, &null_value, 0, 0).unwrap();
1479 assert_eq!(array.len(), 6);
1480
1481 let values = vec!["1", "0", "t", "f", "yes", "no", "y", "n"];
1482 let array = build_boolean_array(&values, false, &null_value, 0, 0).unwrap();
1483 assert_eq!(array.len(), 8);
1484 }
1485
1486 #[test]
1487 fn test_non_nullable_null_error() {
1488 let values = vec![""];
1489 let null_value = None;
1490 let result = build_int64_array(&values, false, &null_value, 0, 0);
1491 assert!(result.is_err());
1492 match result.unwrap_err() {
1493 ExportError::TypeConversionError { message, .. } => {
1494 assert!(message.contains("NULL value in non-nullable column"));
1495 }
1496 _ => panic!("Expected TypeConversionError"),
1497 }
1498 }
1499
1500 #[test]
1501 fn test_custom_null_value() {
1502 let values = vec!["1", "NULL", "3"];
1503 let null_value = Some("NULL".to_string());
1504 let array = build_int64_array(&values, true, &null_value, 0, 0).unwrap();
1505
1506 assert_eq!(array.len(), 3);
1507 assert_eq!(array.null_count(), 1);
1508 }
1509
1510 #[test]
1515 fn test_exasol_type_to_arrow_decimal() {
1516 let result = exasol_type_to_arrow(&ExasolType::Decimal {
1517 precision: 18,
1518 scale: 4,
1519 })
1520 .unwrap();
1521 assert_eq!(result, DataType::Decimal128(18, 4));
1522 }
1523
1524 #[test]
1525 fn test_exasol_type_to_arrow_char() {
1526 let result = exasol_type_to_arrow(&ExasolType::Char { size: 50 }).unwrap();
1527 assert_eq!(result, DataType::Utf8);
1528 }
1529
1530 #[test]
1531 fn test_exasol_type_to_arrow_timestamp() {
1532 let result = exasol_type_to_arrow(&ExasolType::Timestamp {
1533 with_local_time_zone: false,
1534 })
1535 .unwrap();
1536 assert_eq!(result, DataType::Timestamp(TimeUnit::Microsecond, None));
1537 }
1538
1539 #[test]
1540 fn test_exasol_type_to_arrow_timestamp_with_tz() {
1541 let result = exasol_type_to_arrow(&ExasolType::Timestamp {
1542 with_local_time_zone: true,
1543 })
1544 .unwrap();
1545 assert_eq!(
1546 result,
1547 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
1548 );
1549 }
1550
1551 #[test]
1552 fn test_exasol_type_to_arrow_interval_year_to_month() {
1553 let result = exasol_type_to_arrow(&ExasolType::IntervalYearToMonth).unwrap();
1554 assert_eq!(result, DataType::Int64);
1555 }
1556
1557 #[test]
1558 fn test_exasol_type_to_arrow_interval_day_to_second() {
1559 let result =
1560 exasol_type_to_arrow(&ExasolType::IntervalDayToSecond { precision: 3 }).unwrap();
1561 assert_eq!(result, DataType::Int64);
1562 }
1563
1564 #[test]
1565 fn test_exasol_type_to_arrow_geometry() {
1566 let result = exasol_type_to_arrow(&ExasolType::Geometry { srid: Some(4326) }).unwrap();
1567 assert_eq!(result, DataType::Binary);
1568 }
1569
1570 #[test]
1571 fn test_exasol_type_to_arrow_hashtype() {
1572 let result = exasol_type_to_arrow(&ExasolType::Hashtype { byte_size: 32 }).unwrap();
1573 assert_eq!(result, DataType::Binary);
1574 }
1575
1576 #[test]
1577 fn test_build_schema_from_exasol_types_all_types() {
1578 let columns = vec![
1580 ("bool_col".to_string(), ExasolType::Boolean, false),
1581 ("char_col".to_string(), ExasolType::Char { size: 10 }, true),
1582 (
1583 "varchar_col".to_string(),
1584 ExasolType::Varchar { size: 100 },
1585 true,
1586 ),
1587 (
1588 "decimal_col".to_string(),
1589 ExasolType::Decimal {
1590 precision: 18,
1591 scale: 4,
1592 },
1593 true,
1594 ),
1595 ("double_col".to_string(), ExasolType::Double, true),
1596 ("date_col".to_string(), ExasolType::Date, true),
1597 (
1598 "timestamp_col".to_string(),
1599 ExasolType::Timestamp {
1600 with_local_time_zone: false,
1601 },
1602 true,
1603 ),
1604 (
1605 "timestamp_tz_col".to_string(),
1606 ExasolType::Timestamp {
1607 with_local_time_zone: true,
1608 },
1609 true,
1610 ),
1611 (
1612 "interval_ym_col".to_string(),
1613 ExasolType::IntervalYearToMonth,
1614 true,
1615 ),
1616 (
1617 "interval_ds_col".to_string(),
1618 ExasolType::IntervalDayToSecond { precision: 3 },
1619 true,
1620 ),
1621 (
1622 "geometry_col".to_string(),
1623 ExasolType::Geometry { srid: Some(4326) },
1624 true,
1625 ),
1626 (
1627 "hashtype_col".to_string(),
1628 ExasolType::Hashtype { byte_size: 32 },
1629 true,
1630 ),
1631 ];
1632
1633 let schema = build_schema_from_exasol_types(&columns).unwrap();
1634
1635 assert_eq!(schema.fields().len(), 12);
1636
1637 assert_eq!(schema.field(0).name(), "bool_col");
1639 assert_eq!(schema.field(0).data_type(), &DataType::Boolean);
1640 assert!(!schema.field(0).is_nullable());
1641
1642 assert_eq!(schema.field(1).name(), "char_col");
1643 assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1644 assert!(schema.field(1).is_nullable());
1645
1646 assert_eq!(schema.field(2).name(), "varchar_col");
1647 assert_eq!(schema.field(2).data_type(), &DataType::Utf8);
1648
1649 assert_eq!(schema.field(3).name(), "decimal_col");
1650 assert_eq!(schema.field(3).data_type(), &DataType::Decimal128(18, 4));
1651
1652 assert_eq!(schema.field(4).name(), "double_col");
1653 assert_eq!(schema.field(4).data_type(), &DataType::Float64);
1654
1655 assert_eq!(schema.field(5).name(), "date_col");
1656 assert_eq!(schema.field(5).data_type(), &DataType::Date32);
1657
1658 assert_eq!(schema.field(6).name(), "timestamp_col");
1659 assert_eq!(
1660 schema.field(6).data_type(),
1661 &DataType::Timestamp(TimeUnit::Microsecond, None)
1662 );
1663
1664 assert_eq!(schema.field(7).name(), "timestamp_tz_col");
1665 assert_eq!(
1666 schema.field(7).data_type(),
1667 &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
1668 );
1669
1670 assert_eq!(schema.field(8).name(), "interval_ym_col");
1671 assert_eq!(schema.field(8).data_type(), &DataType::Int64);
1672
1673 assert_eq!(schema.field(9).name(), "interval_ds_col");
1674 assert_eq!(schema.field(9).data_type(), &DataType::Int64);
1675
1676 assert_eq!(schema.field(10).name(), "geometry_col");
1677 assert_eq!(schema.field(10).data_type(), &DataType::Binary);
1678
1679 assert_eq!(schema.field(11).name(), "hashtype_col");
1680 assert_eq!(schema.field(11).data_type(), &DataType::Binary);
1681 }
1682
1683 #[tokio::test]
1688 async fn test_csv_to_arrow_reader_batch_size_1() {
1689 let csv_data = "1,a\n2,b\n3,c\n";
1691 let reader = BufReader::new(csv_data.as_bytes());
1692
1693 let schema = Arc::new(Schema::new(vec![
1694 Field::new("id", DataType::Int64, false),
1695 Field::new("name", DataType::Utf8, false),
1696 ]));
1697
1698 let options = ArrowExportOptions::default().with_batch_size(1);
1699 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1700
1701 let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1703 assert_eq!(batch1.num_rows(), 1);
1704
1705 let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1706 assert_eq!(batch2.num_rows(), 1);
1707
1708 let batch3 = arrow_reader.next_batch().await.unwrap().unwrap();
1709 assert_eq!(batch3.num_rows(), 1);
1710
1711 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1712 }
1713
1714 #[tokio::test]
1715 async fn test_csv_to_arrow_reader_batch_size_larger_than_data() {
1716 let csv_data = "1,a\n2,b\n";
1718 let reader = BufReader::new(csv_data.as_bytes());
1719
1720 let schema = Arc::new(Schema::new(vec![
1721 Field::new("id", DataType::Int64, false),
1722 Field::new("name", DataType::Utf8, false),
1723 ]));
1724
1725 let options = ArrowExportOptions::default().with_batch_size(1000);
1726 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1727
1728 let batch = arrow_reader.next_batch().await.unwrap().unwrap();
1730 assert_eq!(batch.num_rows(), 2);
1731
1732 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1733 }
1734
1735 #[tokio::test]
1736 async fn test_csv_to_arrow_reader_batch_size_exact_multiple() {
1737 let csv_data = "1\n2\n3\n4\n5\n6\n";
1739 let reader = BufReader::new(csv_data.as_bytes());
1740
1741 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1742
1743 let options = ArrowExportOptions::default().with_batch_size(3);
1744 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1745
1746 let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1748 assert_eq!(batch1.num_rows(), 3);
1749
1750 let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1751 assert_eq!(batch2.num_rows(), 3);
1752
1753 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1754 }
1755
1756 #[tokio::test]
1757 async fn test_csv_to_arrow_reader_batch_size_with_partial_last_batch() {
1758 let csv_data = "1\n2\n3\n4\n5\n6\n7\n";
1760 let reader = BufReader::new(csv_data.as_bytes());
1761
1762 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1763
1764 let options = ArrowExportOptions::default().with_batch_size(3);
1765 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1766
1767 let batch1 = arrow_reader.next_batch().await.unwrap().unwrap();
1769 assert_eq!(batch1.num_rows(), 3);
1770
1771 let batch2 = arrow_reader.next_batch().await.unwrap().unwrap();
1772 assert_eq!(batch2.num_rows(), 3);
1773
1774 let batch3 = arrow_reader.next_batch().await.unwrap().unwrap();
1775 assert_eq!(batch3.num_rows(), 1);
1776
1777 assert!(arrow_reader.next_batch().await.unwrap().is_none());
1778 }
1779
1780 #[test]
1781 fn test_arrow_export_options_batch_size_default() {
1782 let options = ArrowExportOptions::default();
1783 assert_eq!(options.batch_size, 1024);
1784 }
1785
1786 #[test]
1787 fn test_arrow_export_options_batch_size_custom() {
1788 let options = ArrowExportOptions::default().with_batch_size(500);
1789 assert_eq!(options.batch_size, 500);
1790 }
1791
1792 #[tokio::test]
1793 async fn test_csv_to_arrow_reader_total_row_count_across_batches() {
1794 let csv_data = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n";
1796 let reader = BufReader::new(csv_data.as_bytes());
1797
1798 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1799
1800 let options = ArrowExportOptions::default().with_batch_size(3);
1801 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1802
1803 let mut total_rows = 0;
1804 while let Some(batch) = arrow_reader.next_batch().await.unwrap() {
1805 total_rows += batch.num_rows();
1806 }
1807
1808 assert_eq!(total_rows, 10);
1809 }
1810
1811 #[tokio::test]
1812 async fn test_csv_to_arrow_reader_preserves_data_across_batches() {
1813 let csv_data = "1\n2\n3\n4\n5\n";
1815 let reader = BufReader::new(csv_data.as_bytes());
1816
1817 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1818
1819 let options = ArrowExportOptions::default().with_batch_size(2);
1820 let mut arrow_reader = CsvToArrowReader::from_buffered(reader, schema, &options);
1821
1822 let mut all_values: Vec<i64> = Vec::new();
1824 while let Some(batch) = arrow_reader.next_batch().await.unwrap() {
1825 let array = batch
1826 .column(0)
1827 .as_any()
1828 .downcast_ref::<arrow::array::Int64Array>()
1829 .unwrap();
1830 for i in 0..array.len() {
1831 all_values.push(array.value(i));
1832 }
1833 }
1834
1835 assert_eq!(all_values, vec![1, 2, 3, 4, 5]);
1836 }
1837}