dsq_formats/
writer.rs

1use crate::error::{Error, FormatError, Result};
2use dsq_shared::value::Value;
3#[cfg(any(
4    feature = "csv",
5    feature = "json",
6    feature = "parquet",
7    feature = "avro"
8))]
9use polars::prelude::*;
10
11#[cfg(feature = "parquet")]
12use polars::prelude::ParquetWriter;
13
14use std::io::Write;
15
16/// Options for writing data
17#[derive(Debug, Clone)]
18pub struct WriteOptions {
19    /// Whether to include header row (for CSV/TSV)
20    pub include_header: bool,
21    /// Whether to overwrite existing files
22    pub overwrite: bool,
23    /// Compression level (if supported by format)
24    pub compression: Option<CompressionLevel>,
25    /// Custom schema to enforce
26    #[cfg(any(
27        feature = "csv",
28        feature = "json",
29        feature = "parquet",
30        feature = "avro"
31    ))]
32    pub schema: Option<Schema>,
33    /// Batch size for streaming writes
34    pub batch_size: Option<usize>,
35}
36
37impl Default for WriteOptions {
38    fn default() -> Self {
39        Self {
40            include_header: true,
41            overwrite: false,
42            compression: None,
43            #[cfg(any(
44                feature = "csv",
45                feature = "json",
46                feature = "parquet",
47                feature = "avro"
48            ))]
49            schema: None,
50            batch_size: None,
51        }
52    }
53}
54
55/// Compression levels for output formats
56#[derive(Debug, Clone, Copy)]
57pub enum CompressionLevel {
58    /// No compression
59    None,
60    /// Fast compression
61    Fast,
62    /// Balanced compression
63    Balanced,
64    /// High compression
65    High,
66}
67
68/// Format-specific write options
69#[derive(Debug, Clone)]
70pub enum FormatWriteOptions {
71    /// CSV format options
72    Csv {
73        /// Separator character
74        separator: u8,
75        /// Quote character
76        quote_char: Option<u8>,
77        /// Line terminator
78        line_terminator: Option<String>,
79        /// Quote style
80        quote_style: Option<String>,
81        /// Null value string
82        null_value: Option<String>,
83        /// DateTime format
84        datetime_format: Option<String>,
85        /// Date format
86        date_format: Option<String>,
87        /// Time format
88        time_format: Option<String>,
89        /// Float precision
90        float_precision: Option<usize>,
91        /// Null values
92        null_values: Option<Vec<String>>,
93        /// Encoding
94        encoding: CsvEncoding,
95    },
96    /// Parquet format options
97    #[cfg(feature = "parquet")]
98    Parquet {
99        /// Compression type
100        compression: ParquetCompression,
101    },
102    /// JSON format options
103    Json {
104        /// Whether to write lines
105        lines: bool,
106        /// Whether to pretty print
107        pretty: bool,
108    },
109    /// Avro format options
110    Avro {
111        /// Compression type
112        compression: AvroCompression,
113    },
114    /// Arrow format
115    Arrow,
116    /// Excel format options
117    Excel {
118        /// Worksheet name
119        worksheet_name: String,
120        /// Include header
121        include_header: bool,
122        /// Autofit columns
123        autofit: bool,
124        /// Float precision
125        float_precision: Option<usize>,
126    },
127    /// ORC format options
128    Orc {
129        /// Compression type
130        compression: OrcCompression,
131    },
132}
133
134/// ORC compression options
135#[derive(Debug, Clone)]
136pub enum OrcCompression {
137    /// No compression
138    Uncompressed,
139    /// Zlib compression
140    Zlib,
141    /// Snappy compression
142    Snappy,
143    /// LZO compression
144    Lzo,
145    /// LZ4 compression
146    Lz4,
147    /// Zstandard compression
148    Zstd,
149}
150
151/// CSV encoding options
152#[derive(Debug, Clone)]
153pub enum CsvEncoding {
154    /// UTF-8 encoding
155    Utf8,
156    /// UTF-8 with lossy conversion
157    Utf8Lossy,
158}
159
160/// Parquet compression options
161#[cfg(feature = "parquet")]
162#[derive(Debug, Clone)]
163pub enum ParquetCompression {
164    /// No compression
165    Uncompressed,
166    /// Snappy compression
167    Snappy,
168    /// Gzip compression
169    Gzip,
170    /// LZO compression
171    Lzo,
172    /// Brotli compression
173    Brotli,
174    /// LZ4 compression
175    Lz4,
176    /// Zstandard compression
177    Zstd,
178}
179
180/// Avro compression options
181#[derive(Debug, Clone)]
182pub enum AvroCompression {
183    /// No compression
184    Null,
185    /// Deflate compression
186    Deflate,
187    /// Snappy compression
188    Snappy,
189    /// Bzip2 compression
190    Bzip2,
191    /// XZ compression
192    Xz,
193    /// Zstandard compression
194    Zstandard,
195}
196
197impl Default for FormatWriteOptions {
198    fn default() -> Self {
199        FormatWriteOptions::Csv {
200            separator: b',',
201            quote_char: Some(b'"'),
202            line_terminator: None,
203            quote_style: None,
204            null_value: None,
205            datetime_format: None,
206            date_format: None,
207            time_format: None,
208            float_precision: None,
209            null_values: None,
210            encoding: CsvEncoding::Utf8,
211        }
212    }
213}
214
215/// Serialize CSV data to a writer
216#[cfg(feature = "csv")]
217pub fn serialize_csv<W: Write>(
218    writer: W,
219    value: &Value,
220    options: &WriteOptions,
221    format_options: &FormatWriteOptions,
222) -> Result<()> {
223    crate::csv::serialize_csv(writer, value, options, format_options)
224}
225
226/// Serialize JSON data to a writer
227#[cfg(feature = "json")]
228pub fn serialize_json<W: Write>(
229    writer: W,
230    value: &Value,
231    options: &WriteOptions,
232    format_options: &FormatWriteOptions,
233) -> Result<()> {
234    crate::json::serialize_json(writer, value, options, format_options)
235}
236
237/// Serialize Parquet data to a writer
238#[cfg(feature = "parquet")]
239pub fn serialize_parquet<W: Write>(
240    writer: W,
241    value: &Value,
242    _options: &WriteOptions,
243    format_options: &FormatWriteOptions,
244) -> Result<()> {
245    let df = match value {
246        Value::DataFrame(df) => df.clone(),
247        Value::LazyFrame(lf) => (*lf).clone().collect().map_err(Error::from)?,
248        _ => {
249            return Err(Error::operation(
250                "Expected DataFrame for Parquet serialization",
251            ));
252        }
253    };
254
255    let parquet_opts = match format_options {
256        FormatWriteOptions::Parquet { compression } => compression,
257        _ => &ParquetCompression::Snappy,
258    };
259
260    let compression = match parquet_opts {
261        ParquetCompression::Uncompressed => polars::prelude::ParquetCompression::Uncompressed,
262        ParquetCompression::Snappy => polars::prelude::ParquetCompression::Snappy,
263        ParquetCompression::Gzip => polars::prelude::ParquetCompression::Gzip(None),
264        ParquetCompression::Lzo => polars::prelude::ParquetCompression::Lzo,
265        ParquetCompression::Brotli => polars::prelude::ParquetCompression::Brotli(None),
266        ParquetCompression::Lz4 => polars::prelude::ParquetCompression::Lz4Raw,
267        ParquetCompression::Zstd => polars::prelude::ParquetCompression::Zstd(None),
268    };
269
270    let parquet_writer = ParquetWriter::new(writer).with_compression(compression);
271
272    parquet_writer
273        .finish(&mut df.clone())
274        .map_err(Error::from)?;
275    Ok(())
276}
277
278/// Serialize Avro data to a writer
279#[cfg(feature = "avro")]
280pub fn serialize_avro<W: Write>(
281    mut writer: W,
282    value: &Value,
283    _options: &WriteOptions,
284    format_options: &FormatWriteOptions,
285) -> Result<()> {
286    use apache_avro::{types::Value as AvroValue, Codec, Writer as AvroWriter};
287
288    let df = match value {
289        Value::DataFrame(df) => df.clone(),
290        Value::LazyFrame(lf) => (*lf).clone().collect().map_err(Error::from)?,
291        _ => {
292            return Err(Error::operation(
293                "Expected DataFrame for Avro serialization",
294            ));
295        }
296    };
297
298    // Get compression codec
299    let codec = match format_options {
300        FormatWriteOptions::Avro { compression } => match compression {
301            AvroCompression::Null => Codec::Null,
302            AvroCompression::Deflate => Codec::Deflate(Default::default()),
303            AvroCompression::Snappy => Codec::Snappy,
304            #[allow(unreachable_patterns)]
305            _ => Codec::Null,
306        },
307        _ => Codec::Null,
308    };
309
310    // Build Avro schema from DataFrame schema
311    let schema = dataframe_to_avro_schema(&df)?;
312
313    // Create temporary buffer since AvroWriter needs seekable writer
314    let mut buffer = Vec::new();
315    let mut avro_writer = AvroWriter::with_codec(&schema, &mut buffer, codec);
316
317    // Write rows
318    for row_idx in 0..df.height() {
319        let mut record_fields = Vec::new();
320
321        for (col_idx, column) in df.get_columns().iter().enumerate() {
322            let field_name = df.get_column_names()[col_idx];
323            let avro_value = polars_value_to_avro(column.get(row_idx).map_err(Error::from)?)?;
324            record_fields.push((field_name.to_string(), avro_value));
325        }
326
327        let record = AvroValue::Record(record_fields);
328        avro_writer.append(record).map_err(Error::from)?;
329    }
330
331    avro_writer.flush().map_err(Error::from)?;
332    drop(avro_writer); // Drop the writer to release the mutable borrow on buffer
333    writer.write_all(&buffer)?;
334    Ok(())
335}
336
337/// Convert DataFrame schema to Avro schema
338#[cfg(feature = "avro")]
339fn dataframe_to_avro_schema(df: &DataFrame) -> Result<apache_avro::Schema> {
340    use apache_avro::Schema;
341
342    let mut fields = Vec::new();
343
344    for (col_idx, column) in df.get_columns().iter().enumerate() {
345        let field_name = df.get_column_names()[col_idx];
346        let field_schema = polars_dtype_to_avro_schema(column.dtype())?;
347
348        fields.push(apache_avro::schema::RecordField {
349            name: field_name.to_string(),
350            doc: None,
351            aliases: None,
352            default: None,
353            schema: field_schema,
354            order: apache_avro::schema::RecordFieldOrder::Ascending,
355            position: col_idx,
356            custom_attributes: Default::default(),
357        });
358    }
359
360    Ok(Schema::Record(apache_avro::schema::RecordSchema {
361        name: apache_avro::schema::Name::new("record").map_err(Error::from)?,
362        aliases: None,
363        doc: None,
364        fields,
365        lookup: Default::default(),
366        attributes: Default::default(),
367    }))
368}
369
370/// Convert Polars DataType to Avro Schema
371#[cfg(feature = "avro")]
372fn polars_dtype_to_avro_schema(dtype: &DataType) -> Result<apache_avro::Schema> {
373    use apache_avro::Schema;
374
375    match dtype {
376        DataType::Boolean => Ok(Schema::Boolean),
377        DataType::Int8 | DataType::Int16 | DataType::Int32 => Ok(Schema::Int),
378        DataType::Int64 => Ok(Schema::Long),
379        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => Ok(Schema::Int),
380        DataType::UInt64 => Ok(Schema::Long),
381        DataType::Float32 => Ok(Schema::Float),
382        DataType::Float64 => Ok(Schema::Double),
383        DataType::String => Ok(Schema::String),
384        DataType::Binary => Ok(Schema::Bytes),
385        DataType::Null => Ok(Schema::Null),
386        _ => Ok(Schema::String), // Default to string for unsupported types
387    }
388}
389
390/// Convert Polars AnyValue to Avro Value
391#[cfg(feature = "avro")]
392fn polars_value_to_avro(value: AnyValue) -> Result<apache_avro::types::Value> {
393    use apache_avro::types::Value as AvroValue;
394
395    match value {
396        AnyValue::Null => Ok(AvroValue::Null),
397        AnyValue::Boolean(b) => Ok(AvroValue::Boolean(b)),
398        AnyValue::Int8(i) => Ok(AvroValue::Int(i as i32)),
399        AnyValue::Int16(i) => Ok(AvroValue::Int(i as i32)),
400        AnyValue::Int32(i) => Ok(AvroValue::Int(i)),
401        AnyValue::Int64(i) => Ok(AvroValue::Long(i)),
402        AnyValue::UInt8(u) => Ok(AvroValue::Int(u as i32)),
403        AnyValue::UInt16(u) => Ok(AvroValue::Int(u as i32)),
404        AnyValue::UInt32(u) => Ok(AvroValue::Int(u as i32)),
405        AnyValue::UInt64(u) => Ok(AvroValue::Long(u as i64)),
406        AnyValue::Float32(f) => Ok(AvroValue::Float(f)),
407        AnyValue::Float64(f) => Ok(AvroValue::Double(f)),
408        AnyValue::String(s) => Ok(AvroValue::String(s.to_string())),
409        AnyValue::Binary(b) => Ok(AvroValue::Bytes(b.to_vec())),
410        _ => Ok(AvroValue::String(format!("{}", value))),
411    }
412}
413
414/// Serialize ADT (ASCII Delimited Text) data to a writer
415#[cfg(any(
416    feature = "csv",
417    feature = "json",
418    feature = "parquet",
419    feature = "avro"
420))]
421pub fn serialize_adt<W: Write>(
422    writer: W,
423    value: &Value,
424    options: &WriteOptions,
425    format_options: &FormatWriteOptions,
426) -> Result<()> {
427    crate::adt::serialize_adt(writer, value, options, format_options)
428}
429
430/// Serialize data to a writer based on format
431#[cfg(any(
432    feature = "csv",
433    feature = "json",
434    feature = "parquet",
435    feature = "avro"
436))]
437pub fn serialize<W: Write>(
438    writer: W,
439    value: &Value,
440    format: DataFormat,
441    options: &WriteOptions,
442    format_options: &FormatWriteOptions,
443) -> Result<()> {
444    match format {
445        #[cfg(feature = "csv")]
446        DataFormat::Csv => serialize_csv(writer, value, options, format_options),
447        #[cfg(not(feature = "csv"))]
448        DataFormat::Csv => Err(Error::Format(FormatError::UnsupportedFeature(
449            "CSV not supported in this build".to_string(),
450        ))),
451        #[cfg(feature = "csv")]
452        DataFormat::Tsv => {
453            // For TSV, override the separator in format_options
454            let tsv_options = match format_options {
455                FormatWriteOptions::Csv {
456                    quote_char,
457                    line_terminator,
458                    quote_style,
459                    null_value,
460                    datetime_format,
461                    date_format,
462                    time_format,
463                    float_precision,
464                    null_values,
465                    encoding,
466                    ..
467                } => FormatWriteOptions::Csv {
468                    separator: b'\t',
469                    quote_char: *quote_char,
470                    line_terminator: line_terminator.clone(),
471                    quote_style: quote_style.clone(),
472                    null_value: null_value.clone(),
473                    datetime_format: datetime_format.clone(),
474                    date_format: date_format.clone(),
475                    time_format: time_format.clone(),
476                    float_precision: *float_precision,
477                    null_values: null_values.clone(),
478                    encoding: encoding.clone(),
479                },
480                _ => FormatWriteOptions::Csv {
481                    separator: b'\t',
482                    quote_char: Some(b'"'),
483                    line_terminator: None,
484                    quote_style: None,
485                    null_value: None,
486                    datetime_format: None,
487                    date_format: None,
488                    time_format: None,
489                    float_precision: None,
490                    null_values: None,
491                    encoding: CsvEncoding::Utf8,
492                },
493            };
494            serialize_csv(writer, value, options, &tsv_options)
495        }
496        #[cfg(not(feature = "csv"))]
497        DataFormat::Tsv => Err(Error::Format(FormatError::UnsupportedFeature(
498            "TSV not supported in this build".to_string(),
499        ))),
500        #[cfg(feature = "json")]
501        DataFormat::Json | DataFormat::JsonLines => {
502            serialize_json(writer, value, options, format_options)
503        }
504        #[cfg(not(feature = "json"))]
505        DataFormat::Json | DataFormat::JsonLines => Err(Error::Format(
506            FormatError::UnsupportedFeature("JSON not supported in this build".to_string()),
507        )),
508        #[cfg(feature = "parquet")]
509        DataFormat::Parquet => serialize_parquet(writer, value, options, format_options),
510        #[cfg(not(feature = "parquet"))]
511        DataFormat::Parquet => Err(Error::Format(FormatError::UnsupportedFeature(
512            "Parquet not supported in this build".to_string(),
513        ))),
514        DataFormat::Arrow => Err(Error::Format(FormatError::UnsupportedFeature(
515            "Arrow serialization not yet implemented".to_string(),
516        ))),
517        #[cfg(feature = "avro")]
518        DataFormat::Avro => serialize_avro(writer, value, options, format_options),
519        #[cfg(not(feature = "avro"))]
520        DataFormat::Avro => Err(Error::Format(FormatError::UnsupportedFeature(
521            "Avro not supported in this build".to_string(),
522        ))),
523        DataFormat::Excel => Err(Error::Format(FormatError::UnsupportedFeature(
524            "Excel serialization not yet implemented".to_string(),
525        ))),
526        DataFormat::Orc => Err(Error::Format(FormatError::UnsupportedFeature(
527            "ORC serialization not yet implemented".to_string(),
528        ))),
529        _ => Err(Error::Format(FormatError::Unknown(format.to_string()))),
530    }
531}
532
533/// Legacy compatibility - these will be removed in future versions
534pub use crate::format::DataFormat;
535
536/// Trait for writing data to various formats
537#[cfg(any(
538    feature = "csv",
539    feature = "json",
540    feature = "parquet",
541    feature = "avro"
542))]
543pub trait DataWriter {
544    /// Write a value with options
545    fn write(&mut self, value: &Value, options: &WriteOptions) -> Result<()>;
546    /// Get the data format
547    fn format(&self) -> DataFormat;
548}
549
550/// File-based data writer
551#[cfg(any(
552    feature = "csv",
553    feature = "json",
554    feature = "parquet",
555    feature = "avro"
556))]
557pub struct FileWriter {
558    path: String,
559    format: DataFormat,
560    format_options: FormatWriteOptions,
561}
562
563#[cfg(any(
564    feature = "csv",
565    feature = "json",
566    feature = "parquet",
567    feature = "avro"
568))]
569impl FileWriter {
570    /// Create a new file writer with automatic format detection
571    pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
572        let path_ref = path.as_ref();
573        let format = crate::format::DataFormat::from_path(path_ref)?;
574
575        Ok(Self {
576            path: path_ref.to_string_lossy().to_string(),
577            format,
578            format_options: Self::default_format_options_for_format(format),
579        })
580    }
581
582    /// Create a new file writer with explicit format
583    pub fn with_format<P: AsRef<std::path::Path>>(path: P, format: DataFormat) -> Self {
584        Self {
585            path: path.as_ref().to_string_lossy().to_string(),
586            format,
587            format_options: Self::default_format_options_for_format(format),
588        }
589    }
590
591    /// Get default format options for a given format
592    fn default_format_options_for_format(format: DataFormat) -> FormatWriteOptions {
593        match format {
594            DataFormat::Csv => FormatWriteOptions::Csv {
595                separator: b',',
596                quote_char: Some(b'"'),
597                line_terminator: None,
598                quote_style: None,
599                null_value: None,
600                datetime_format: None,
601                date_format: None,
602                time_format: None,
603                float_precision: None,
604                null_values: None,
605                encoding: CsvEncoding::Utf8,
606            },
607            DataFormat::Tsv => FormatWriteOptions::Csv {
608                separator: b'\t',
609                quote_char: Some(b'"'),
610                line_terminator: None,
611                quote_style: None,
612                null_value: None,
613                datetime_format: None,
614                date_format: None,
615                time_format: None,
616                float_precision: None,
617                null_values: None,
618                encoding: CsvEncoding::Utf8,
619            },
620            DataFormat::Json => FormatWriteOptions::Json {
621                lines: false,
622                pretty: false,
623            },
624            DataFormat::JsonLines => FormatWriteOptions::Json {
625                lines: true,
626                pretty: false,
627            },
628            #[cfg(feature = "parquet")]
629            DataFormat::Parquet => FormatWriteOptions::Parquet {
630                compression: ParquetCompression::Snappy,
631            },
632            #[cfg(not(feature = "parquet"))]
633            DataFormat::Parquet => FormatWriteOptions::Json {
634                lines: false,
635                pretty: false,
636            },
637            DataFormat::Arrow => FormatWriteOptions::Arrow,
638            DataFormat::Avro => FormatWriteOptions::Avro {
639                compression: AvroCompression::Null,
640            },
641            _ => FormatWriteOptions::default(),
642        }
643    }
644
645    /// Set format-specific options
646    pub fn with_format_options(mut self, options: FormatWriteOptions) -> Self {
647        self.format_options = options;
648        self
649    }
650
651    /// Check if file exists and handle overwrite logic
652    fn check_overwrite(&self, options: &WriteOptions) -> Result<()> {
653        use std::path::Path;
654        let path = Path::new(&self.path);
655        if path.exists() && !options.overwrite {
656            return Err(Error::operation(format!(
657                "File {} already exists. Use --overwrite to replace it.",
658                self.path
659            )));
660        }
661        Ok(())
662    }
663}
664
665#[cfg(any(
666    feature = "csv",
667    feature = "json",
668    feature = "parquet",
669    feature = "avro"
670))]
671impl DataWriter for FileWriter {
672    fn write(&mut self, value: &Value, options: &WriteOptions) -> Result<()> {
673        use std::fs::File;
674        use std::io::BufWriter;
675
676        self.check_overwrite(options)?;
677        let file = File::create(&self.path)?;
678        let writer = BufWriter::new(file);
679        serialize(writer, value, self.format, options, &self.format_options)
680    }
681
682    fn format(&self) -> DataFormat {
683        self.format
684    }
685}
686
687/// Writer for in-memory output
688#[cfg(any(
689    feature = "csv",
690    feature = "json",
691    feature = "parquet",
692    feature = "avro"
693))]
694pub struct MemoryWriter {
695    buffer: Vec<u8>,
696    format: DataFormat,
697    format_options: FormatWriteOptions,
698}
699
700#[cfg(any(
701    feature = "csv",
702    feature = "json",
703    feature = "parquet",
704    feature = "avro"
705))]
706impl MemoryWriter {
707    /// Create a new memory writer
708    pub fn new(format: DataFormat) -> Self {
709        Self {
710            buffer: Vec::new(),
711            format,
712            format_options: FormatWriteOptions::default(),
713        }
714    }
715
716    /// Set format-specific options
717    pub fn with_format_options(mut self, options: FormatWriteOptions) -> Self {
718        self.format_options = options;
719        self
720    }
721
722    /// Get the written data
723    pub fn into_inner(self) -> Vec<u8> {
724        self.buffer
725    }
726
727    /// Get a reference to the written data
728    pub fn as_slice(&self) -> &[u8] {
729        &self.buffer
730    }
731}
732
733#[cfg(any(
734    feature = "csv",
735    feature = "json",
736    feature = "parquet",
737    feature = "avro"
738))]
739impl DataWriter for MemoryWriter {
740    fn write(&mut self, value: &Value, options: &WriteOptions) -> Result<()> {
741        use std::io::Cursor;
742        let cursor = Cursor::new(&mut self.buffer);
743        serialize(cursor, value, self.format, options, &self.format_options)
744    }
745
746    fn format(&self) -> DataFormat {
747        self.format
748    }
749}
750
751/// Create a data writer to a file path
752#[cfg(any(
753    feature = "csv",
754    feature = "json",
755    feature = "parquet",
756    feature = "avro"
757))]
758pub fn to_path<P: AsRef<std::path::Path>>(path: P) -> Result<FileWriter> {
759    FileWriter::new(path)
760}
761
762/// Create a data writer to a file path with format
763#[cfg(any(
764    feature = "csv",
765    feature = "json",
766    feature = "parquet",
767    feature = "avro"
768))]
769pub fn to_path_with_format<P: AsRef<std::path::Path>>(path: P, format: DataFormat) -> FileWriter {
770    FileWriter::with_format(path, format)
771}
772
773/// Create a data writer to memory
774#[cfg(any(
775    feature = "csv",
776    feature = "json",
777    feature = "parquet",
778    feature = "avro"
779))]
780pub fn to_memory(format: DataFormat) -> MemoryWriter {
781    MemoryWriter::new(format)
782}
783
784#[cfg(test)]
785#[cfg(any(
786    feature = "csv",
787    feature = "json",
788    feature = "parquet",
789    feature = "avro"
790))]
791mod tests {
792    use super::*;
793    use dsq_shared::value::Value;
794
795    use std::io::Cursor;
796
797    fn create_test_dataframe() -> DataFrame {
798        let s1 = Series::new("name".into(), &["Alice", "Bob", "Charlie"]);
799        let s2 = Series::new("age".into(), &[25i64, 30, 35]);
800        let s3 = Series::new("active".into(), &[true, false, true]);
801        DataFrame::new(vec![s1.into(), s2.into(), s3.into()]).unwrap()
802    }
803
804    #[test]
805    fn test_serialize_csv() {
806        let df = create_test_dataframe();
807        let value = Value::DataFrame(df);
808        let options = WriteOptions::default();
809        let format_options = FormatWriteOptions::default();
810
811        let mut buffer = Vec::new();
812        let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
813        assert!(result.is_ok());
814
815        let output = String::from_utf8(buffer).unwrap();
816        assert!(output.contains("name,age,active"));
817        assert!(output.contains("Alice,25,true"));
818        assert!(output.contains("Bob,30,false"));
819        assert!(output.contains("Charlie,35,true"));
820    }
821
822    #[test]
823    fn test_serialize_csv_with_header_false() {
824        let df = create_test_dataframe();
825        let value = Value::DataFrame(df);
826        let options = WriteOptions {
827            include_header: false,
828            ..Default::default()
829        };
830        let format_options = FormatWriteOptions::default();
831
832        let mut buffer = Vec::new();
833        let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
834        assert!(result.is_ok());
835
836        let output = String::from_utf8(buffer).unwrap();
837        assert!(!output.contains("name,age,active"));
838        assert!(output.contains("Alice,25,true"));
839    }
840
841    #[test]
842    fn test_serialize_csv_custom_separator() {
843        let df = create_test_dataframe();
844        let value = Value::DataFrame(df);
845        let options = WriteOptions::default();
846        let format_options = FormatWriteOptions::Csv {
847            separator: b';',
848            quote_char: Some(b'"'),
849            line_terminator: None,
850            quote_style: None,
851            null_value: None,
852            datetime_format: None,
853            date_format: None,
854            time_format: None,
855            float_precision: None,
856            null_values: None,
857            encoding: CsvEncoding::Utf8,
858        };
859
860        let mut buffer = Vec::new();
861        let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
862        assert!(result.is_ok());
863
864        let output = String::from_utf8(buffer).unwrap();
865        assert!(output.contains("name;age;active"));
866        assert!(output.contains("Alice;25;true"));
867    }
868
869    #[test]
870    fn test_serialize_csv_lazy_frame() {
871        let df = create_test_dataframe();
872        let lf = df.clone().lazy();
873        let value = Value::LazyFrame(Box::new(lf));
874        let options = WriteOptions::default();
875        let format_options = FormatWriteOptions::default();
876
877        let mut buffer = Vec::new();
878        let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
879        assert!(result.is_ok());
880
881        let output = String::from_utf8(buffer).unwrap();
882        assert!(output.contains("Alice,25,true"));
883    }
884
885    #[test]
886    fn test_serialize_csv_wrong_value_type() {
887        let value = Value::String("not a dataframe".to_string());
888        let options = WriteOptions::default();
889        let format_options = FormatWriteOptions::default();
890
891        let mut buffer = Vec::new();
892        let result = serialize_csv(Cursor::new(&mut buffer), &value, &options, &format_options);
893        assert!(result.is_err());
894        assert!(result
895            .unwrap_err()
896            .to_string()
897            .contains("Expected DataFrame"));
898    }
899
900    #[test]
901    fn test_serialize_json() {
902        let df = create_test_dataframe();
903        let value = Value::DataFrame(df);
904        let options = WriteOptions::default();
905        let format_options = FormatWriteOptions::Json {
906            lines: false,
907            pretty: false,
908        };
909
910        let mut buffer = Vec::new();
911        let result = serialize_json(Cursor::new(&mut buffer), &value, &options, &format_options);
912        assert!(result.is_ok());
913
914        let output = String::from_utf8(buffer).unwrap();
915        // Check that all expected key-value pairs are present (order may vary)
916        assert!(output.contains(r#""name":"Alice""#));
917        assert!(output.contains(r#""age":25"#));
918        assert!(output.contains(r#""active":true"#));
919        assert!(output.contains(r#""name":"Bob""#));
920        assert!(output.contains(r#""age":30"#));
921        assert!(output.contains(r#""active":false"#));
922    }
923
924    #[test]
925    fn test_serialize_json_lines() {
926        let df = create_test_dataframe();
927        let value = Value::DataFrame(df);
928        let options = WriteOptions::default();
929        let format_options = FormatWriteOptions::Json {
930            lines: true,
931            pretty: false,
932        };
933
934        let mut buffer = Vec::new();
935        let result = serialize_json(Cursor::new(&mut buffer), &value, &options, &format_options);
936        assert!(result.is_ok());
937
938        let output = String::from_utf8(buffer).unwrap();
939        let lines: Vec<&str> = output.lines().collect();
940        assert_eq!(lines.len(), 3);
941        // Check that each line contains the expected key-value pairs (order may vary)
942        assert!(lines[0].contains(r#""name":"Alice""#));
943        assert!(lines[0].contains(r#""age":25"#));
944        assert!(lines[0].contains(r#""active":true"#));
945        assert!(lines[1].contains(r#""name":"Bob""#));
946        assert!(lines[1].contains(r#""age":30"#));
947        assert!(lines[1].contains(r#""active":false"#));
948        assert!(lines[2].contains(r#""name":"Charlie""#));
949        assert!(lines[2].contains(r#""age":35"#));
950        assert!(lines[2].contains(r#""active":true"#));
951    }
952
953    #[test]
954    fn test_serialize_json_pretty() {
955        let df = create_test_dataframe();
956        let value = Value::DataFrame(df);
957        let options = WriteOptions::default();
958        let format_options = FormatWriteOptions::Json {
959            lines: false,
960            pretty: true,
961        };
962
963        let mut buffer = Vec::new();
964        let result = serialize_json(Cursor::new(&mut buffer), &value, &options, &format_options);
965        assert!(result.is_ok());
966
967        let output = String::from_utf8(buffer).unwrap();
968        assert!(output.contains("  \"name\": \"Alice\""));
969        assert!(output.contains("  \"age\": 25"));
970    }
971
972    #[cfg(feature = "parquet")]
973    #[test]
974    fn test_serialize_parquet() {
975        let df = create_test_dataframe();
976        let value = Value::DataFrame(df);
977        let options = WriteOptions::default();
978        let format_options = FormatWriteOptions::Parquet {
979            compression: ParquetCompression::Uncompressed,
980        };
981
982        let mut buffer = Vec::new();
983        let result = serialize_parquet(Cursor::new(&mut buffer), &value, &options, &format_options);
984        assert!(result.is_ok());
985        assert!(!buffer.is_empty());
986    }
987
988    #[cfg(feature = "parquet")]
989    #[test]
990    fn test_serialize_parquet_compression() {
991        let df = create_test_dataframe();
992        let value = Value::DataFrame(df);
993        let options = WriteOptions::default();
994        let format_options = FormatWriteOptions::Parquet {
995            compression: ParquetCompression::Snappy,
996        };
997
998        let mut buffer = Vec::new();
999        let result = serialize_parquet(Cursor::new(&mut buffer), &value, &options, &format_options);
1000        assert!(result.is_ok());
1001        assert!(!buffer.is_empty());
1002    }
1003
1004    #[test]
1005    fn test_row_to_json_value() {
1006        use crate::json::row_to_json_value;
1007
1008        let df = create_test_dataframe();
1009        let row = df.get_row(0).unwrap();
1010        let column_names = df
1011            .get_column_names()
1012            .iter()
1013            .map(|s| s.to_string())
1014            .collect::<Vec<_>>();
1015        let json_value = row_to_json_value(&row.0, &column_names);
1016
1017        if let serde_json::Value::Object(map) = json_value {
1018            assert_eq!(
1019                map.get("name"),
1020                Some(&serde_json::Value::String("Alice".to_string()))
1021            );
1022            assert_eq!(map.get("age"), Some(&serde_json::Value::Number(25.into())));
1023            assert_eq!(map.get("active"), Some(&serde_json::Value::Bool(true)));
1024        } else {
1025            panic!("Expected object");
1026        }
1027    }
1028
1029    #[test]
1030    fn test_row_to_json_value_with_nulls() {
1031        use crate::json::row_to_json_value;
1032
1033        let s1 = Series::new("name".into(), &["Alice", "Bob"]);
1034        let s2 = Series::new("age".into(), &[Some(25i64), None]);
1035        let df = DataFrame::new(vec![s1.into(), s2.into()]).unwrap();
1036        let row = df.get_row(1).unwrap();
1037        let column_names = df
1038            .get_column_names()
1039            .iter()
1040            .map(|s| s.to_string())
1041            .collect::<Vec<_>>();
1042        let json_value = row_to_json_value(&row.0, &column_names);
1043
1044        if let serde_json::Value::Object(map) = json_value {
1045            assert_eq!(
1046                map.get("name"),
1047                Some(&serde_json::Value::String("Bob".to_string()))
1048            );
1049            assert_eq!(map.get("age"), Some(&serde_json::Value::Null));
1050        } else {
1051            panic!("Expected object");
1052        }
1053    }
1054
1055    #[test]
1056    fn test_write_options_default() {
1057        let opts = WriteOptions::default();
1058        assert!(opts.include_header);
1059        assert!(!opts.overwrite);
1060        assert!(opts.compression.is_none());
1061        assert!(opts.schema.is_none());
1062        assert!(opts.batch_size.is_none());
1063    }
1064
1065    #[test]
1066    fn test_format_write_options_default() {
1067        let opts = FormatWriteOptions::default();
1068        match opts {
1069            FormatWriteOptions::Csv {
1070                separator,
1071                quote_char,
1072                line_terminator,
1073                quote_style,
1074                null_value,
1075                datetime_format,
1076                date_format,
1077                time_format,
1078                float_precision,
1079                null_values,
1080                encoding,
1081            } => {
1082                assert_eq!(separator, b',');
1083                assert_eq!(quote_char, Some(b'"'));
1084                assert!(line_terminator.is_none());
1085                assert!(quote_style.is_none());
1086                assert!(null_value.is_none());
1087                assert!(datetime_format.is_none());
1088                assert!(date_format.is_none());
1089                assert!(time_format.is_none());
1090                assert!(float_precision.is_none());
1091                assert!(null_values.is_none());
1092                assert!(matches!(encoding, CsvEncoding::Utf8));
1093            }
1094            _ => panic!("Expected Csv"),
1095        }
1096    }
1097}