1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;

use crate::chopper::chopper::{DataSink, HeaderSink};
use crate::chopper::header_graph::PinId;
use crate::chopper::types::{FieldValue, Header, Row};
use crate::error::{CliResult, Error};
use crate::source::csv_configs::CSVOutputConfig;

pub struct CSVSink {
    writer: BufWriter<Box<dyn io::Write + 'static>>,
    csv_output_config: CSVOutputConfig,
}

impl CSVSink {
    pub fn new(path: &Option<String>, csv_output_config: CSVOutputConfig) -> CliResult<Self> {
        let writer = BufWriter::new(CSVSink::into_writer(path)?);
        Ok(CSVSink {
            writer,
            csv_output_config,
        })
    }

    fn into_writer(path: &Option<String>) -> io::Result<Box<dyn io::Write>> {
        match path {
            None => Ok(Box::new(io::stdout())),
            Some(p) => {
                let path = PathBuf::from(p);
                let file = File::create(path)?;
                Ok(Box::new(file))
            }
        }
    }

    fn write_csv_header(&mut self, header: &mut Header) -> CliResult<()> {
        let writer = &mut self.writer;
        let field_name = header.field_names().clone();
        let mut first = true;

        if self.csv_output_config.print_timestamp() {
            write!(writer, "timestamp,")?;
        }
        for name in field_name {
            if first {
                write!(writer, "{}", name)?;
                first = false;
            } else {
                write!(writer, ",{}", name)?;
            }
        }
        write!(writer, "\n")?;
        Ok(())
    }
}

impl HeaderSink for CSVSink {
    fn process_header(mut self: Box<Self>, header: &mut Header) -> CliResult<Box<dyn DataSink>> {
        self.write_csv_header(header)?;
        Ok(self.boxed())
    }
}

impl DataSink for CSVSink {
    fn write_row(&mut self, row: Row) -> CliResult<Option<Row>> {
        let mut first_col = true;
        if self.csv_output_config.print_timestamp() {
            write!(self.writer, "{}", row.timestamp)?;
            first_col = false;
        }
        let field_values = &row.field_values;
        let delimiter = self.csv_output_config.delimiter();
        for value in field_values {
            if first_col {
                first_col = false;
            } else {
                write!(self.writer, "{}", delimiter)?;
            }

            match value {
                FieldValue::Boolean(_x) => {
                    return Err(Error::from(
                        "CSVSink -- boolean field type is not supported",
                    ))
                }
                FieldValue::Byte(x) => write!(self.writer, "{}", x)?,
                FieldValue::ByteBuf(_x) => {
                    return Err(Error::from(
                        "CSVSink -- ByteBuffer field type is not supported",
                    ))
                }
                FieldValue::Char(x) => write!(self.writer, "{}", x)?,
                FieldValue::Double(x) => {
                    dtoa::write(&mut self.writer, *x)?;
                }
                FieldValue::Float(x) => {
                    dtoa::write(&mut self.writer, *x)?;
                }
                FieldValue::Int(x) => write!(self.writer, "{}", x)?,
                FieldValue::Long(x) => write!(self.writer, "{}", x)?,
                FieldValue::Short(x) => write!(self.writer, "{}", x)?,
                FieldValue::String(x) => write!(self.writer, "{}", x)?,
                FieldValue::None => (),
            };
        }
        write!(self.writer, "\n")?;
        Ok(None)
    }

    fn write_row_to_pin(&mut self, _pin_id: PinId, row: Row) -> CliResult<Option<Row>> {
        self.write_row(row)
    }

    fn flush(&mut self) -> CliResult<()> {
        self.writer.flush()?;
        Ok(())
    }

    fn boxed(self) -> Box<dyn DataSink> {
        Box::new(self)
    }
}