1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use std::io::Read;
use csv::{self, Trim};
use crate::chopper::chopper::Source;
use crate::chopper::types::{FieldType, FieldValue, Header, Nanos, Row};
use crate::cli::util::YesNoAuto;
use crate::error::CliResult;
use crate::source::csv_configs::CSVInputConfig;
use crate::source::csv_timestamp::{self, TimestampCol, TimestampFmt};
use crate::util::csv_util;
use crate::util::preview::Preview;
use crate::util::tz::ChopperTz;
const DELIMITERS: &[u8] = b",\t ";
pub struct CSVSource {
reader: csv::Reader<Box<dyn Read>>,
header: Header,
timestamp_col: TimestampCol,
timestamp_fmt: TimestampFmt,
timezone: ChopperTz,
next_row: Row,
has_next_row: bool,
}
impl CSVSource {
pub fn new(previewer: Box<dyn Preview>, csv_config: &CSVInputConfig) -> CliResult<Self> {
let (line1, line2) = match previewer.get_lines() {
None => (None, None),
Some(lines) => (lines.get(0), lines.get(1)),
};
let delimiter = match csv_config.delimiter() {
None => {
match line1 {
Some(line) => csv_util::guess_delimiter(line.as_str(), DELIMITERS),
None => DELIMITERS[0],
}
}
Some(d) => d,
};
let has_header = match csv_config.has_header() {
YesNoAuto::Yes => true,
YesNoAuto::No => false,
YesNoAuto::Auto => csv_util::guess_has_header(line1, line2, delimiter),
};
let reader = previewer.get_reader();
let mut reader = csv::ReaderBuilder::new()
.delimiter(delimiter)
.has_headers(has_header)
.trim(Trim::All)
.from_reader(reader);
let mut field_names: Vec<String> = Vec::new();
if reader.has_headers() {
let header_record = reader.headers()?;
for i in header_record {
field_names.push(i.to_string());
}
}
let first_row: csv::StringRecord = reader.records().next().unwrap()?;
let field_count = first_row.len();
if !reader.has_headers() {
for i in 0..field_count {
field_names.push(format!("col_{}", i));
}
}
let timestamp: Nanos = 0;
let field_values: Vec<FieldValue> = vec![FieldValue::None; field_count];
let next_row = Row {
timestamp,
field_values,
};
let field_types: Vec<FieldType> = vec![FieldType::String; field_count];
let header: Header = Header::new(field_names, field_types);
let timestamp_config = csv_config.timestamp_config();
let timezone = timestamp_config.timezone();
let (timestamp_col, timestamp_fmt) = csv_timestamp::get_timestamp_col_and_fmt(
&header,
&first_row,
timestamp_config.timestamp_col(),
timestamp_config.timestamp_fmt(),
timezone,
)?;
let mut csv_reader = CSVSource {
reader,
header,
timestamp_col,
timestamp_fmt,
timezone: timezone.clone(),
next_row,
has_next_row: true,
};
csv_reader.update_row(first_row)?;
Ok(csv_reader)
}
fn update_row(&mut self, next_record: csv::StringRecord) -> CliResult<()> {
for i in 0..next_record.len() {
self.next_row.field_values[i] =
FieldValue::String(next_record.get(i).unwrap().to_string());
}
self.next_row.timestamp = csv_timestamp::get_timestamp(
&next_record,
&self.timestamp_col,
&self.timestamp_fmt,
&self.timezone,
)?;
Ok(())
}
fn next_row(&mut self) -> CliResult<Option<Row>> {
if !self.has_next_row {
return Ok(None);
}
let current_row = self.next_row.clone();
match self.reader.records().next() {
Some(r) => self.update_row(r?)?,
None => self.has_next_row = false,
}
Ok(Some(current_row))
}
}
impl Source for CSVSource {
fn header(&self) -> &Header {
&self.header
}
fn next_row(&mut self) -> CliResult<Option<Row>> {
self.next_row()
}
fn has_native_timestamp_column(&self) -> bool {
false
}
}