1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::{
cmp::min,
io::{self, Write},
};
use ripline::LineTerminator;
use crate::{core::JoinAppend, field_range::FieldRange};
pub struct SingleByteDelimParser<'a> {
line_terminator: LineTerminator,
output_delimiter: &'a [u8],
fields: &'a [FieldRange],
sep: u8,
max_field: usize,
offset: usize,
newline: u8,
line: Vec<(usize, usize)>,
}
impl<'a> SingleByteDelimParser<'a> {
pub fn new(
line_terminator: LineTerminator,
output_delimiter: &'a [u8],
fields: &'a [FieldRange],
sep: u8,
) -> Self {
Self {
line_terminator,
output_delimiter,
fields,
sep,
max_field: fields.last().map_or(usize::MAX, |f| f.high + 1),
offset: 0,
newline: line_terminator.as_byte(),
line: vec![],
}
}
#[inline]
pub fn reset(&mut self) {
self.offset = 0;
}
#[inline]
pub fn process_buffer<W: Write>(
&mut self,
buffer: &[u8],
mut output: W,
) -> Result<(), io::Error> {
if let Some(byte) = buffer.get(0) {
if *byte == self.newline {
output.join_append(
self.output_delimiter,
std::iter::empty(),
&self.line_terminator,
)?;
self.offset += 1;
}
}
while self.offset < buffer.len() {
self.fill_line(buffer)?;
let items = self.fields.iter().flat_map(|f| {
let slice = self
.line
.get(f.low..=min(f.high, self.line.len().saturating_sub(1)))
.unwrap_or(&[]);
slice.iter().map(|(start, stop)| &buffer[*start..=*stop])
});
output.join_append(self.output_delimiter, items, &self.line_terminator)?;
self.line.clear();
}
Ok(())
}
#[inline]
fn fill_line(&mut self, buffer: &[u8]) -> Result<(), io::Error> {
let mut field_count = 0;
let iter = memchr::memchr2_iter(self.sep, self.newline, &buffer[self.offset..]);
let mut line_offset = 0;
let mut found_newline = false;
for index in iter {
if buffer[self.offset + index] == self.sep {
field_count += 1;
} else {
found_newline = true;
}
self.line
.push((self.offset + line_offset, self.offset + index - 1));
line_offset = index + 1;
if found_newline || field_count == self.max_field {
break;
}
}
if !found_newline {
let end = memchr::memchr(self.newline, &buffer[self.offset + line_offset..])
.ok_or(io::ErrorKind::InvalidData)?;
self.offset += line_offset + end + 1;
} else {
self.offset += line_offset;
}
Ok(())
}
}