1use std::io::Write;
2
3use arrow_array::RecordBatch;
4use arrow_schema::SchemaRef;
5
6use crate::Result;
7use crate::cli::BinaryFormat;
8use crate::output::{RowWriter, value};
9
10pub struct CsvRowWriter<W: Write> {
11 writer: Option<Box<csv::Writer<W>>>,
12 binary_format: BinaryFormat,
13}
14
15impl<W: Write> CsvRowWriter<W> {
16 pub fn new(writer: W, binary_format: BinaryFormat) -> Self {
17 let inner = csv::WriterBuilder::new()
18 .has_headers(false)
19 .from_writer(writer);
20 Self {
21 writer: Some(Box::new(inner)),
22 binary_format,
23 }
24 }
25}
26
27impl<W: Write> RowWriter for CsvRowWriter<W> {
28 fn start(&mut self, schema: &SchemaRef) -> Result<()> {
29 value::validate_csv_schema(schema)?;
30 let w = self.writer.as_mut().expect("start() called twice");
31 let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
32 w.write_record(&names)?;
33 Ok(())
34 }
35
36 fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
37 let w = self.writer.as_mut().expect("writer already finished");
38 let num_rows = batch.num_rows();
39 let num_cols = batch.num_columns();
40 let mut row_buf: Vec<String> = Vec::with_capacity(num_cols);
41 for row in 0..num_rows {
42 row_buf.clear();
43 for col in 0..num_cols {
44 let arr = batch.column(col);
45 row_buf.push(
46 value::csv_cell(arr.as_ref(), row, self.binary_format)?.unwrap_or_default(),
47 );
48 }
49 w.write_record(&row_buf)?;
50 }
51 Ok(())
52 }
53
54 fn finish(&mut self) -> Result<()> {
55 if let Some(w) = self.writer.take() {
56 let mut inner = w.into_inner().map_err(|e| e.into_error())?;
57 inner.flush()?;
58 }
59 Ok(())
60 }
61}