Skip to main content

arrs/output/
csv.rs

1use std::io::Write;
2
3use arrow_array::RecordBatch;
4use arrow_schema::SchemaRef;
5
6use crate::Result;
7use crate::cli::BinaryFormat;
8use crate::output::{RowWriter, value};
9
10pub struct CsvRowWriter<W: Write> {
11    writer: Option<Box<csv::Writer<W>>>,
12    binary_format: BinaryFormat,
13}
14
15impl<W: Write> CsvRowWriter<W> {
16    pub fn new(writer: W, binary_format: BinaryFormat) -> Self {
17        let inner = csv::WriterBuilder::new()
18            .has_headers(false)
19            .from_writer(writer);
20        Self {
21            writer: Some(Box::new(inner)),
22            binary_format,
23        }
24    }
25}
26
27impl<W: Write> RowWriter for CsvRowWriter<W> {
28    fn start(&mut self, schema: &SchemaRef) -> Result<()> {
29        value::validate_csv_schema(schema)?;
30        let w = self.writer.as_mut().expect("start() called twice");
31        let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
32        w.write_record(&names)?;
33        Ok(())
34    }
35
36    fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
37        let w = self.writer.as_mut().expect("writer already finished");
38        let num_rows = batch.num_rows();
39        let num_cols = batch.num_columns();
40        let mut row_buf: Vec<String> = Vec::with_capacity(num_cols);
41        for row in 0..num_rows {
42            row_buf.clear();
43            for col in 0..num_cols {
44                let arr = batch.column(col);
45                row_buf.push(
46                    value::csv_cell(arr.as_ref(), row, self.binary_format)?.unwrap_or_default(),
47                );
48            }
49            w.write_record(&row_buf)?;
50        }
51        Ok(())
52    }
53
54    fn finish(&mut self) -> Result<()> {
55        if let Some(w) = self.writer.take() {
56            let mut inner = w.into_inner().map_err(|e| e.into_error())?;
57            inner.flush()?;
58        }
59        Ok(())
60    }
61}