1use crate::error::{Error, FormatError, Result};
2use dsq_shared::value::Value;
3#[cfg(any(
4 feature = "csv",
5 feature = "json",
6 feature = "parquet",
7 feature = "avro"
8))]
9use polars::prelude::*;
10
11#[cfg(feature = "parquet")]
12use polars::prelude::ParquetWriter;
13
14use std::io::Write;
15
16#[derive(Debug, Clone)]
18pub struct WriteOptions {
19 pub include_header: bool,
21 pub overwrite: bool,
23 pub compression: Option<CompressionLevel>,
25 #[cfg(any(
27 feature = "csv",
28 feature = "json",
29 feature = "parquet",
30 feature = "avro"
31 ))]
32 pub schema: Option<Schema>,
33 pub batch_size: Option<usize>,
35}
36
37impl Default for WriteOptions {
38 fn default() -> Self {
39 Self {
40 include_header: true,
41 overwrite: false,
42 compression: None,
43 #[cfg(any(
44 feature = "csv",
45 feature = "json",
46 feature = "parquet",
47 feature = "avro"
48 ))]
49 schema: None,
50 batch_size: None,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy)]
57pub enum CompressionLevel {
58 None,
60 Fast,
62 Balanced,
64 High,
66}
67
68#[derive(Debug, Clone)]
70pub enum FormatWriteOptions {
71 Csv {
73 separator: u8,
75 quote_char: Option<u8>,
77 line_terminator: Option<String>,
79 quote_style: Option<String>,
81 null_value: Option<String>,
83 datetime_format: Option<String>,
85 date_format: Option<String>,
87 time_format: Option<String>,
89 float_precision: Option<usize>,
91 null_values: Option<Vec<String>>,
93 encoding: CsvEncoding,
95 },
96 #[cfg(feature = "parquet")]
98 Parquet {
99 compression: ParquetCompression,
101 },
102 Json {
104 lines: bool,
106 pretty: bool,
108 },
109 Avro {
111 compression: AvroCompression,
113 },
114 Arrow,
116 Excel {
118 worksheet_name: String,
120 include_header: bool,
122 autofit: bool,
124 float_precision: Option<usize>,
126 },
127 Orc {
129 compression: OrcCompression,
131 },
132}
133
134#[derive(Debug, Clone)]
136pub enum OrcCompression {
137 Uncompressed,
139 Zlib,
141 Snappy,
143 Lzo,
145 Lz4,
147 Zstd,
149}
150
151#[derive(Debug, Clone)]
153pub enum CsvEncoding {
154 Utf8,
156 Utf8Lossy,
158}
159
160#[cfg(feature = "parquet")]
162#[derive(Debug, Clone)]
163pub enum ParquetCompression {
164 Uncompressed,
166 Snappy,
168 Gzip,
170 Lzo,
172 Brotli,
174 Lz4,
176 Zstd,
178}
179
180#[derive(Debug, Clone)]
182pub enum AvroCompression {
183 Null,
185 Deflate,
187 Snappy,
189 Bzip2,
191 Xz,
193 Zstandard,
195}
196
197impl Default for FormatWriteOptions {
198 fn default() -> Self {
199 FormatWriteOptions::Csv {
200 separator: b',',
201 quote_char: Some(b'"'),
202 line_terminator: None,
203 quote_style: None,
204 null_value: None,
205 datetime_format: None,
206 date_format: None,
207 time_format: None,
208 float_precision: None,
209 null_values: None,
210 encoding: CsvEncoding::Utf8,
211 }
212 }
213}
214
215#[cfg(feature = "csv")]
217pub fn serialize_csv<W: Write>(
218 writer: W,
219 value: &Value,
220 options: &WriteOptions,
221 format_options: &FormatWriteOptions,
222) -> Result<()> {
223 crate::csv::serialize_csv(writer, value, options, format_options)
224}
225
226#[cfg(feature = "json")]
228pub fn serialize_json<W: Write>(
229 writer: W,
230 value: &Value,
231 options: &WriteOptions,
232 format_options: &FormatWriteOptions,
233) -> Result<()> {
234 crate::json::serialize_json(writer, value, options, format_options)
235}
236
237#[cfg(feature = "parquet")]
239pub fn serialize_parquet<W: Write>(
240 writer: W,
241 value: &Value,
242 _options: &WriteOptions,
243 format_options: &FormatWriteOptions,
244) -> Result<()> {
245 let df = match value {
246 Value::DataFrame(df) => df.clone(),
247 Value::LazyFrame(lf) => (*lf).clone().collect().map_err(Error::from)?,
248 _ => {
249 return Err(Error::operation(
250 "Expected DataFrame for Parquet serialization",
251 ));
252 }
253 };
254
255 let parquet_opts = match format_options {
256 FormatWriteOptions::Parquet { compression } => compression,
257 _ => &ParquetCompression::Snappy,
258 };
259
260 let compression = match parquet_opts {
261 ParquetCompression::Uncompressed => polars::prelude::ParquetCompression::Uncompressed,
262 ParquetCompression::Snappy => polars::prelude::ParquetCompression::Snappy,
263 ParquetCompression::Gzip => polars::prelude::ParquetCompression::Gzip(None),
264 ParquetCompression::Lzo => polars::prelude::ParquetCompression::Lzo,
265 ParquetCompression::Brotli => polars::prelude::ParquetCompression::Brotli(None),
266 ParquetCompression::Lz4 => polars::prelude::ParquetCompression::Lz4Raw,
267 ParquetCompression::Zstd => polars::prelude::ParquetCompression::Zstd(None),
268 };
269
270 let parquet_writer = ParquetWriter::new(writer).with_compression(compression);
271
272 parquet_writer
273 .finish(&mut df.clone())
274 .map_err(Error::from)?;
275 Ok(())
276}
277
278#[cfg(feature = "avro")]
280pub fn serialize_avro<W: Write>(
281 mut writer: W,
282 value: &Value,
283 _options: &WriteOptions,
284 format_options: &FormatWriteOptions,
285) -> Result<()> {
286 use apache_avro::{types::Value as AvroValue, Codec, Writer as AvroWriter};
287
288 let df = match value {
289 Value::DataFrame(df) => df.clone(),
290 Value::LazyFrame(lf) => (*lf).clone().collect().map_err(Error::from)?,
291 _ => {
292 return Err(Error::operation(
293 "Expected DataFrame for Avro serialization",
294 ));
295 }
296 };
297
298 let codec = match format_options {
300 FormatWriteOptions::Avro { compression } => match compression {
301 AvroCompression::Null => Codec::Null,
302 AvroCompression::Deflate => Codec::Deflate(Default::default()),
303 AvroCompression::Snappy => Codec::Snappy,
304 #[allow(unreachable_patterns)]
305 _ => Codec::Null,
306 },
307 _ => Codec::Null,
308 };
309
310 let schema = dataframe_to_avro_schema(&df)?;
312
313 let mut buffer = Vec::new();
315 let mut avro_writer = AvroWriter::with_codec(&schema, &mut buffer, codec);
316
317 for row_idx in 0..df.height() {
319 let mut record_fields = Vec::new();
320
321 for (col_idx, column) in df.get_columns().iter().enumerate() {
322 let field_name = df.get_column_names()[col_idx];
323 let avro_value = polars_value_to_avro(column.get(row_idx).map_err(Error::from)?)?;
324 record_fields.push((field_name.to_string(), avro_value));
325 }
326
327 let record = AvroValue::Record(record_fields);
328 avro_writer.append(record).map_err(Error::from)?;
329 }
330
331 avro_writer.flush().map_err(Error::from)?;
332 drop(avro_writer); writer.write_all(&buffer)?;
334 Ok(())
335}
336
337#[cfg(feature = "avro")]
339fn dataframe_to_avro_schema(df: &DataFrame) -> Result<apache_avro::Schema> {
340 use apache_avro::Schema;
341
342 let mut fields = Vec::new();
343
344 for (col_idx, column) in df.get_columns().iter().enumerate() {
345 let field_name = df.get_column_names()[col_idx];
346 let field_schema = polars_dtype_to_avro_schema(column.dtype())?;
347
348 fields.push(apache_avro::schema::RecordField {
349 name: field_name.to_string(),
350 doc: None,
351 aliases: None,
352 default: None,
353 schema: field_schema,
354 order: apache_avro::schema::RecordFieldOrder::Ascending,
355 position: col_idx,
356 custom_attributes: Default::default(),
357 });
358 }
359
360 Ok(Schema::Record(apache_avro::schema::RecordSchema {
361 name: apache_avro::schema::Name::new("record").map_err(Error::from)?,
362 aliases: None,
363 doc: None,
364 fields,
365 lookup: Default::default(),
366 attributes: Default::default(),
367 }))
368}
369
370#[cfg(feature = "avro")]
372fn polars_dtype_to_avro_schema(dtype: &DataType) -> Result<apache_avro::Schema> {
373 use apache_avro::Schema;
374
375 match dtype {
376 DataType::Boolean => Ok(Schema::Boolean),
377 DataType::Int8 | DataType::Int16 | DataType::Int32 => Ok(Schema::Int),
378 DataType::Int64 => Ok(Schema::Long),
379 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => Ok(Schema::Int),
380 DataType::UInt64 => Ok(Schema::Long),
381 DataType::Float32 => Ok(Schema::Float),
382 DataType::Float64 => Ok(Schema::Double),
383 DataType::String => Ok(Schema::String),
384 DataType::Binary => Ok(Schema::Bytes),
385 DataType::Null => Ok(Schema::Null),
386 _ => Ok(Schema::String), }
388}
389
390#[cfg(feature = "avro")]
392fn polars_value_to_avro(value: AnyValue) -> Result<apache_avro::types::Value> {
393 use apache_avro::types::Value as AvroValue;
394
395 match value {
396 AnyValue::Null => Ok(AvroValue::Null),
397 AnyValue::Boolean(b) => Ok(AvroValue::Boolean(b)),
398 AnyValue::Int8(i) => Ok(AvroValue::Int(i as i32)),
399 AnyValue::Int16(i) => Ok(AvroValue::Int(i as i32)),
400 AnyValue::Int32(i) => Ok(AvroValue::Int(i)),
401 AnyValue::Int64(i) => Ok(AvroValue::Long(i)),
402 AnyValue::UInt8(u) => Ok(AvroValue::Int(u as i32)),
403 AnyValue::UInt16(u) => Ok(AvroValue::Int(u as i32)),
404 AnyValue::UInt32(u) => Ok(AvroValue::Int(u as i32)),
405 AnyValue::UInt64(u) => Ok(AvroValue::Long(u as i64)),
406 AnyValue::Float32(f) => Ok(AvroValue::Float(f)),
407 AnyValue::Float64(f) => Ok(AvroValue::Double(f)),
408 AnyValue::String(s) => Ok(AvroValue::String(s.to_string())),
409 AnyValue::Binary(b) => Ok(AvroValue::Bytes(b.to_vec())),
410 _ => Ok(AvroValue::String(format!("{}", value))),
411 }
412}
413
414#[cfg(any(
416 feature = "csv",
417 feature = "json",
418 feature = "parquet",
419 feature = "avro"
420))]
421pub fn serialize_adt<W: Write>(
422 writer: W,
423 value: &Value,
424 options: &WriteOptions,
425 format_options: &FormatWriteOptions,
426) -> Result<()> {
427 crate::adt::serialize_adt(writer, value, options, format_options)
428}
429
430#[cfg(any(
432 feature = "csv",
433 feature = "json",
434 feature = "parquet",
435 feature = "avro"
436))]
437pub fn serialize<W: Write>(
438 writer: W,
439 value: &Value,
440 format: DataFormat,
441 options: &WriteOptions,
442 format_options: &FormatWriteOptions,
443) -> Result<()> {
444 match format {
445 #[cfg(feature = "csv")]
446 DataFormat::Csv => serialize_csv(writer, value, options, format_options),
447 #[cfg(not(feature = "csv"))]
448 DataFormat::Csv => Err(Error::Format(FormatError::UnsupportedFeature(
449 "CSV not supported in this build".to_string(),
450 ))),
451 #[cfg(feature = "csv")]
452 DataFormat::Tsv => {
453 let tsv_options = match format_options {
455 FormatWriteOptions::Csv {
456 quote_char,
457 line_terminator,
458 quote_style,
459 null_value,
460 datetime_format,
461 date_format,
462 time_format,
463 float_precision,
464 null_values,
465 encoding,
466 ..
467 } => FormatWriteOptions::Csv {
468 separator: b'\t',
469 quote_char: *quote_char,
470 line_terminator: line_terminator.clone(),
471 quote_style: quote_style.clone(),
472 null_value: null_value.clone(),
473 datetime_format: datetime_format.clone(),
474 date_format: date_format.clone(),
475 time_format: time_format.clone(),
476 float_precision: *float_precision,
477 null_values: null_values.clone(),
478 encoding: encoding.clone(),
479 },
480 _ => FormatWriteOptions::Csv {
481 separator: b'\t',
482 quote_char: Some(b'"'),
483 line_terminator: None,
484 quote_style: None,
485 null_value: None,
486 datetime_format: None,
487 date_format: None,
488 time_format: None,
489 float_precision: None,
490 null_values: None,
491 encoding: CsvEncoding::Utf8,
492 },
493 };
494 serialize_csv(writer, value, options, &tsv_options)
495 }
496 #[cfg(not(feature = "csv"))]
497 DataFormat::Tsv => Err(Error::Format(FormatError::UnsupportedFeature(
498 "TSV not supported in this build".to_string(),
499 ))),
500 #[cfg(feature = "json")]
501 DataFormat::Json | DataFormat::JsonLines => {
502 serialize_json(writer, value, options, format_options)
503 }
504 #[cfg(not(feature = "json"))]
505 DataFormat::Json | DataFormat::JsonLines => Err(Error::Format(
506 FormatError::UnsupportedFeature("JSON not supported in this build".to_string()),
507 )),
508 #[cfg(feature = "parquet")]
509 DataFormat::Parquet => serialize_parquet(writer, value, options, format_options),
510 #[cfg(not(feature = "parquet"))]
511 DataFormat::Parquet => Err(Error::Format(FormatError::UnsupportedFeature(
512 "Parquet not supported in this build".to_string(),
513 ))),
514 DataFormat::Arrow => Err(Error::Format(FormatError::UnsupportedFeature(
515 "Arrow serialization not yet implemented".to_string(),
516 ))),
517 #[cfg(feature = "avro")]
518 DataFormat::Avro => serialize_avro(writer, value, options, format_options),
519 #[cfg(not(feature = "avro"))]
520 DataFormat::Avro => Err(Error::Format(FormatError::UnsupportedFeature(
521 "Avro not supported in this build".to_string(),
522 ))),
523 DataFormat::Excel => Err(Error::Format(FormatError::UnsupportedFeature(
524 "Excel serialization not yet implemented".to_string(),
525 ))),
526 DataFormat::Orc => Err(Error::Format(FormatError::UnsupportedFeature(
527 "ORC serialization not yet implemented".to_string(),
528 ))),
529 _ => Err(Error::Format(FormatError::Unknown(format.to_string()))),
530 }
531}
532
533pub use crate::format::DataFormat;
535
536#[cfg(any(
538 feature = "csv",
539 feature = "json",
540 feature = "parquet",
541 feature = "avro"
542))]
543pub trait DataWriter {
544 fn write(&mut self, value: &Value, options: &WriteOptions) -> Result<()>;
546 fn format(&self) -> DataFormat;
548}
549
550#[cfg(any(
552 feature = "csv",
553 feature = "json",
554 feature = "parquet",
555 feature = "avro"
556))]
557pub struct FileWriter {
558 path: String,
559 format: DataFormat,
560 format_options: FormatWriteOptions,
561}
562
563#[cfg(any(
564 feature = "csv",
565 feature = "json",
566 feature = "parquet",
567 feature = "avro"
568))]
569impl FileWriter {
570 pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
572 let path_ref = path.as_ref();
573 let format = crate::format::DataFormat::from_path(path_ref)?;
574
575 Ok(Self {
576 path: path_ref.to_string_lossy().to_string(),
577 format,
578 format_options: Self::default_format_options_for_format(format),
579 })
580 }
581
582 pub fn with_format<P: AsRef<std::path::Path>>(path: P, format: DataFormat) -> Self {
584 Self {
585 path: path.as_ref().to_string_lossy().to_string(),
586 format,
587 format_options: Self::default_format_options_for_format(format),
588 }
589 }
590
591 fn default_format_options_for_format(format: DataFormat) -> FormatWriteOptions {
593 match format {
594 DataFormat::Csv => FormatWriteOptions::Csv {
595 separator: b',',
596 quote_char: Some(b'"'),
597 line_terminator: None,
598 quote_style: None,
599 null_value: None,
600 datetime_format: None,
601 date_format: None,
602 time_format: None,
603 float_precision: None,
604 null_values: None,
605 encoding: CsvEncoding::Utf8,
606 },
607 DataFormat::Tsv => FormatWriteOptions::Csv {
608 separator: b'\t',
609 quote_char: Some(b'"'),
610 line_terminator: None,
611 quote_style: None,
612 null_value: None,
613 datetime_format: None,
614 date_format: None,
615 time_format: None,
616 float_precision: None,
617 null_values: None,
618 encoding: CsvEncoding::Utf8,
619 },
620 DataFormat::Json => FormatWriteOptions::Json {
621 lines: false,
622 pretty: false,
623 },
624 DataFormat::JsonLines => FormatWriteOptions::Json {
625 lines: true,
626 pretty: false,
627 },
628 #[cfg(feature = "parquet")]
629 DataFormat::Parquet => FormatWriteOptions::Parquet {
630 compression: ParquetCompression::Snappy,
631 },
632 #[cfg(not(feature = "parquet"))]
633 DataFormat::Parquet => FormatWriteOptions::Json {
634 lines: false,
635 pretty: false,
636 },
637 DataFormat::Arrow => FormatWriteOptions::Arrow,
638 DataFormat::Avro => FormatWriteOptions::Avro {
639 compression: AvroCompression::Null,
640 },
641 _ => FormatWriteOptions::default(),
642 }
643 }
644
645 pub fn with_format_options(mut self, options: FormatWriteOptions) -> Self {
647 self.format_options = options;
648 self
649 }
650
651 fn check_overwrite(&self, options: &WriteOptions) -> Result<()> {
653 use std::path::Path;
654 let path = Path::new(&self.path);
655 if path.exists() && !options.overwrite {
656 return Err(Error::operation(format!(
657 "File {} already exists. Use --overwrite to replace it.",
658 self.path
659 )));
660 }
661 Ok(())
662 }
663}
664
665#[cfg(any(
666 feature = "csv",
667 feature = "json",
668 feature = "parquet",
669 feature = "avro"
670))]
671impl DataWriter for FileWriter {
672 fn write(&mut self, value: &Value, options: &WriteOptions) -> Result<()> {
673 use std::fs::File;
674 use std::io::BufWriter;
675
676 self.check_overwrite(options)?;
677 let file = File::create(&self.path)?;
678 let writer = BufWriter::new(file);
679 serialize(writer, value, self.format, options, &self.format_options)
680 }
681
682 fn format(&self) -> DataFormat {
683 self.format
684 }
685}
686
687#[cfg(any(
689 feature = "csv",
690 feature = "json",
691 feature = "parquet",
692 feature = "avro"
693))]
694pub struct MemoryWriter {
695 buffer: Vec<u8>,
696 format: DataFormat,
697 format_options: FormatWriteOptions,
698}
699
700#[cfg(any(
701 feature = "csv",
702 feature = "json",
703 feature = "parquet",
704 feature = "avro"
705))]
706impl MemoryWriter {
707 pub fn new(format: DataFormat) -> Self {
709 Self {
710 buffer: Vec::new(),
711 format,
712 format_options: FormatWriteOptions::default(),
713 }
714 }
715
716 pub fn with_format_options(mut self, options: FormatWriteOptions) -> Self {
718 self.format_options = options;
719 self
720 }
721
722 pub fn into_inner(self) -> Vec<u8> {
724 self.buffer
725 }
726
727 pub fn as_slice(&self) -> &[u8] {
729 &self.buffer
730 }
731}
732
733#[cfg(any(
734 feature = "csv",
735 feature = "json",
736 feature = "parquet",
737 feature = "avro"
738))]
739impl DataWriter for MemoryWriter {
740 fn write(&mut self, value: &Value, options: &WriteOptions) -> Result<()> {
741 use std::io::Cursor;
742 let cursor = Cursor::new(&mut self.buffer);
743 serialize(cursor, value, self.format, options, &self.format_options)
744 }
745
746 fn format(&self) -> DataFormat {
747 self.format
748 }
749}
750
751#[cfg(any(
753 feature = "csv",
754 feature = "json",
755 feature = "parquet",
756 feature = "avro"
757))]
758pub fn to_path<P: AsRef<std::path::Path>>(path: P) -> Result<FileWriter> {
759 FileWriter::new(path)
760}
761
762#[cfg(any(
764 feature = "csv",
765 feature = "json",
766 feature = "parquet",
767 feature = "avro"
768))]
769pub fn to_path_with_format<P: AsRef<std::path::Path>>(path: P, format: DataFormat) -> FileWriter {
770 FileWriter::with_format(path, format)
771}
772
773#[cfg(any(
775 feature = "csv",
776 feature = "json",
777 feature = "parquet",
778 feature = "avro"
779))]
780pub fn to_memory(format: DataFormat) -> MemoryWriter {
781 MemoryWriter::new(format)
782}
783
784#[cfg(test)]
785#[cfg(any(
786 feature = "csv",
787 feature = "json",
788 feature = "parquet",
789 feature = "avro"
790))]
791mod tests {
792 use super::*;
793 use dsq_shared::value::Value;
794
795 use std::io::Cursor;
796
797 fn create_test_dataframe() -> DataFrame {
798 let s1 = Series::new("name".into(), &["Alice", "Bob", "Charlie"]);
799 let s2 = Series::new("age".into(), &[25i64, 30, 35]);
800 let s3 = Series::new("active".into(), &[true, false, true]);
801 DataFrame::new(vec![s1.into(), s2.into(), s3.into()]).unwrap()
802 }
803
804 #[test]
805 fn test_serialize_csv() {
806 let df = create_test_dataframe();
807 let value = Value::DataFrame(df);
808 let options = WriteOptions::default();
809 let format_options = FormatWriteOptions::default();
810
811 let mut buffer = Vec::new();
812 let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
813 assert!(result.is_ok());
814
815 let output = String::from_utf8(buffer).unwrap();
816 assert!(output.contains("name,age,active"));
817 assert!(output.contains("Alice,25,true"));
818 assert!(output.contains("Bob,30,false"));
819 assert!(output.contains("Charlie,35,true"));
820 }
821
822 #[test]
823 fn test_serialize_csv_with_header_false() {
824 let df = create_test_dataframe();
825 let value = Value::DataFrame(df);
826 let options = WriteOptions {
827 include_header: false,
828 ..Default::default()
829 };
830 let format_options = FormatWriteOptions::default();
831
832 let mut buffer = Vec::new();
833 let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
834 assert!(result.is_ok());
835
836 let output = String::from_utf8(buffer).unwrap();
837 assert!(!output.contains("name,age,active"));
838 assert!(output.contains("Alice,25,true"));
839 }
840
841 #[test]
842 fn test_serialize_csv_custom_separator() {
843 let df = create_test_dataframe();
844 let value = Value::DataFrame(df);
845 let options = WriteOptions::default();
846 let format_options = FormatWriteOptions::Csv {
847 separator: b';',
848 quote_char: Some(b'"'),
849 line_terminator: None,
850 quote_style: None,
851 null_value: None,
852 datetime_format: None,
853 date_format: None,
854 time_format: None,
855 float_precision: None,
856 null_values: None,
857 encoding: CsvEncoding::Utf8,
858 };
859
860 let mut buffer = Vec::new();
861 let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
862 assert!(result.is_ok());
863
864 let output = String::from_utf8(buffer).unwrap();
865 assert!(output.contains("name;age;active"));
866 assert!(output.contains("Alice;25;true"));
867 }
868
869 #[test]
870 fn test_serialize_csv_lazy_frame() {
871 let df = create_test_dataframe();
872 let lf = df.clone().lazy();
873 let value = Value::LazyFrame(Box::new(lf));
874 let options = WriteOptions::default();
875 let format_options = FormatWriteOptions::default();
876
877 let mut buffer = Vec::new();
878 let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
879 assert!(result.is_ok());
880
881 let output = String::from_utf8(buffer).unwrap();
882 assert!(output.contains("Alice,25,true"));
883 }
884
885 #[test]
886 fn test_serialize_csv_wrong_value_type() {
887 let value = Value::String("not a dataframe".to_string());
888 let options = WriteOptions::default();
889 let format_options = FormatWriteOptions::default();
890
891 let mut buffer = Vec::new();
892 let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
893 assert!(result.is_err());
894 assert!(result
895 .unwrap_err()
896 .to_string()
897 .contains("Expected DataFrame"));
898 }
899
900 #[test]
901 fn test_serialize_json() {
902 let df = create_test_dataframe();
903 let value = Value::DataFrame(df);
904 let options = WriteOptions::default();
905 let format_options = FormatWriteOptions::Json {
906 lines: false,
907 pretty: false,
908 };
909
910 let mut buffer = Vec::new();
911 let result = serialize_json(Cursor::new(&mut buffer), &value, &options, &format_options);
912 assert!(result.is_ok());
913
914 let output = String::from_utf8(buffer).unwrap();
915 assert!(output.contains(r#""name":"Alice""#));
917 assert!(output.contains(r#""age":25"#));
918 assert!(output.contains(r#""active":true"#));
919 assert!(output.contains(r#""name":"Bob""#));
920 assert!(output.contains(r#""age":30"#));
921 assert!(output.contains(r#""active":false"#));
922 }
923
924 #[test]
925 fn test_serialize_json_lines() {
926 let df = create_test_dataframe();
927 let value = Value::DataFrame(df);
928 let options = WriteOptions::default();
929 let format_options = FormatWriteOptions::Json {
930 lines: true,
931 pretty: false,
932 };
933
934 let mut buffer = Vec::new();
935 let result = serialize_json(Cursor::new(&mut buffer), &value, &options, &format_options);
936 assert!(result.is_ok());
937
938 let output = String::from_utf8(buffer).unwrap();
939 let lines: Vec<&str> = output.lines().collect();
940 assert_eq!(lines.len(), 3);
941 assert!(lines[0].contains(r#""name":"Alice""#));
943 assert!(lines[0].contains(r#""age":25"#));
944 assert!(lines[0].contains(r#""active":true"#));
945 assert!(lines[1].contains(r#""name":"Bob""#));
946 assert!(lines[1].contains(r#""age":30"#));
947 assert!(lines[1].contains(r#""active":false"#));
948 assert!(lines[2].contains(r#""name":"Charlie""#));
949 assert!(lines[2].contains(r#""age":35"#));
950 assert!(lines[2].contains(r#""active":true"#));
951 }
952
953 #[test]
954 fn test_serialize_json_pretty() {
955 let df = create_test_dataframe();
956 let value = Value::DataFrame(df);
957 let options = WriteOptions::default();
958 let format_options = FormatWriteOptions::Json {
959 lines: false,
960 pretty: true,
961 };
962
963 let mut buffer = Vec::new();
964 let result = serialize_json(Cursor::new(&mut buffer), &value, &options, &format_options);
965 assert!(result.is_ok());
966
967 let output = String::from_utf8(buffer).unwrap();
968 assert!(output.contains(" \"name\": \"Alice\""));
969 assert!(output.contains(" \"age\": 25"));
970 }
971
972 #[cfg(feature = "parquet")]
973 #[test]
974 fn test_serialize_parquet() {
975 let df = create_test_dataframe();
976 let value = Value::DataFrame(df);
977 let options = WriteOptions::default();
978 let format_options = FormatWriteOptions::Parquet {
979 compression: ParquetCompression::Uncompressed,
980 };
981
982 let mut buffer = Vec::new();
983 let result = serialize_parquet(Cursor::new(&mut buffer), &value, &options, &format_options);
984 assert!(result.is_ok());
985 assert!(!buffer.is_empty());
986 }
987
988 #[cfg(feature = "parquet")]
989 #[test]
990 fn test_serialize_parquet_compression() {
991 let df = create_test_dataframe();
992 let value = Value::DataFrame(df);
993 let options = WriteOptions::default();
994 let format_options = FormatWriteOptions::Parquet {
995 compression: ParquetCompression::Snappy,
996 };
997
998 let mut buffer = Vec::new();
999 let result = serialize_parquet(Cursor::new(&mut buffer), &value, &options, &format_options);
1000 assert!(result.is_ok());
1001 assert!(!buffer.is_empty());
1002 }
1003
1004 #[test]
1005 fn test_row_to_json_value() {
1006 use crate::json::row_to_json_value;
1007
1008 let df = create_test_dataframe();
1009 let row = df.get_row(0).unwrap();
1010 let column_names = df
1011 .get_column_names()
1012 .iter()
1013 .map(|s| s.to_string())
1014 .collect::<Vec<_>>();
1015 let json_value = row_to_json_value(&row.0, &column_names);
1016
1017 if let serde_json::Value::Object(map) = json_value {
1018 assert_eq!(
1019 map.get("name"),
1020 Some(&serde_json::Value::String("Alice".to_string()))
1021 );
1022 assert_eq!(map.get("age"), Some(&serde_json::Value::Number(25.into())));
1023 assert_eq!(map.get("active"), Some(&serde_json::Value::Bool(true)));
1024 } else {
1025 panic!("Expected object");
1026 }
1027 }
1028
1029 #[test]
1030 fn test_row_to_json_value_with_nulls() {
1031 use crate::json::row_to_json_value;
1032
1033 let s1 = Series::new("name".into(), &["Alice", "Bob"]);
1034 let s2 = Series::new("age".into(), &[Some(25i64), None]);
1035 let df = DataFrame::new(vec![s1.into(), s2.into()]).unwrap();
1036 let row = df.get_row(1).unwrap();
1037 let column_names = df
1038 .get_column_names()
1039 .iter()
1040 .map(|s| s.to_string())
1041 .collect::<Vec<_>>();
1042 let json_value = row_to_json_value(&row.0, &column_names);
1043
1044 if let serde_json::Value::Object(map) = json_value {
1045 assert_eq!(
1046 map.get("name"),
1047 Some(&serde_json::Value::String("Bob".to_string()))
1048 );
1049 assert_eq!(map.get("age"), Some(&serde_json::Value::Null));
1050 } else {
1051 panic!("Expected object");
1052 }
1053 }
1054
1055 #[test]
1056 fn test_write_options_default() {
1057 let opts = WriteOptions::default();
1058 assert!(opts.include_header);
1059 assert!(!opts.overwrite);
1060 assert!(opts.compression.is_none());
1061 assert!(opts.schema.is_none());
1062 assert!(opts.batch_size.is_none());
1063 }
1064
1065 #[test]
1066 fn test_format_write_options_default() {
1067 let opts = FormatWriteOptions::default();
1068 match opts {
1069 FormatWriteOptions::Csv {
1070 separator,
1071 quote_char,
1072 line_terminator,
1073 quote_style,
1074 null_value,
1075 datetime_format,
1076 date_format,
1077 time_format,
1078 float_precision,
1079 null_values,
1080 encoding,
1081 } => {
1082 assert_eq!(separator, b',');
1083 assert_eq!(quote_char, Some(b'"'));
1084 assert!(line_terminator.is_none());
1085 assert!(quote_style.is_none());
1086 assert!(null_value.is_none());
1087 assert!(datetime_format.is_none());
1088 assert!(date_format.is_none());
1089 assert!(time_format.is_none());
1090 assert!(float_precision.is_none());
1091 assert!(null_values.is_none());
1092 assert!(matches!(encoding, CsvEncoding::Utf8));
1093 }
1094 _ => panic!("Expected Csv"),
1095 }
1096 }
1097}