csv_lib/csv/
csv_reader.rs1
2#[cfg(target_arch = "x86_64")]
3use crate::helpers::bytes_helper::locate_line_break_avx2;
4#[cfg(target_arch = "aarch64")]
5use crate::helpers::bytes_helper::locate_line_break_neon;
6use crate::helpers::bytes_helper::locate_line_break_memchr3;
7use crate::models::csv_config::CsvConfig;
8use crate::models::csv_error::CsvError;
9use crate::models::platform_info::PlatformInfo;
10use memmap2::Mmap;
11use std::fs::File;
12use std::path::Path;
13use crate::models::row::Row;
14#[derive(Debug)]
15#[repr(C)]
16pub struct CsvReaderWithMap {
17 config: CsvConfig,
18 mmap: Mmap,
19 platform: PlatformInfo,
20 cursor: usize,
21}
22
23impl CsvReaderWithMap {
24 pub fn get_config(&self) -> &CsvConfig {
27 &self.config
28 }
29
30 pub fn get_slice(&self) -> &[u8] {
33 &self.mmap[..]
34 }
35
36 pub fn open<P: AsRef<Path>>(path: P, config: &CsvConfig) -> Result<CsvReaderWithMap, CsvError> {
40 let file = File::open(path).map_err(|err| {
41 CsvError::FileError(format!("Cannot open file. Detail: {}", err))
42 })?;
43
44 let mmap = unsafe {
45 Mmap::map(&file).map_err(|bad| {
46 CsvError::FileError(format!("Cannot map file. Detail: {}", bad))
47 })?
48 };
49
50 Ok(CsvReaderWithMap {
51 config: config.clone(),
52 platform: PlatformInfo::new(),
53 mmap,
54 cursor: 0,
55 })
56 }
57
58 pub fn next_raw(&mut self) -> Option<Row<'_>> {
62 let string_separator = self.config.string_separator;
63 let delimiter = self.config.delimiter;
64 let fm = self.config.force_memcach3;
65 let slice = if &self.config.force_memcach3 == &true {
66 self.next_raw_memchr3()
67 } else {
68 #[cfg(target_arch = "x86_64")]
69 {
70 if is_x86_feature_detected!("avx2") {
71 unsafe { self.new_raw_avx2() }
72 } else {
73 self.next_raw_memchr3()
74 }
75 }
76 #[cfg(target_arch = "aarch64")]
77 {
78 self.new_raw_neon()
79 }
80 }?;
81 Some(Row::new(slice, delimiter, string_separator, fm))
82 }
83
84 pub fn peek_raw(&self) -> Option<Row<'_>> {
87 let string_separator = self.config.string_separator;
88 let delimiter = self.config.delimiter;
89 let fm = self.config.force_memcach3;
90 let slice = if fm {
91 Self::peek_raw_memchr3(&self.mmap, self.cursor, self.config.line_break)
92 } else {
93 #[cfg(target_arch = "x86_64")]
94 {
95 if is_x86_feature_detected!("avx2") {
96 unsafe { Self::peek_raw_avx2(&self.mmap, self.cursor, self.config.line_break) }
97 } else {
98 Self::peek_raw_memchr3(&self.mmap, self.cursor, self.config.line_break)
99 }
100 }
101 #[cfg(target_arch = "aarch64")]
102 {
103 Self::peek_raw_neon(&self.mmap, self.cursor, self.config.line_break)
104 }
105 }?;
106 Some(Row::new(slice, delimiter, string_separator, fm))
107 }
108
109 pub fn advance_next(&mut self) {
112 let _ = self.next_raw();
113 }
114
115 #[cfg(target_arch = "aarch64")]
117 pub(crate) fn new_raw_neon(&mut self) -> Option<&[u8]> {
118 unsafe {
119 let slice = &self.mmap[self.cursor..];
120 match locate_line_break_neon(slice, self.config.line_break) {
121 0 => {
122 self.reset_cursor();
123 None
124 }
125 sep_index => {
126 let row = &self.mmap[self.cursor..self.cursor + sep_index];
127 let end = if row.ends_with(b"\r\n") {
128 2
129 } else if row.ends_with(&[b'\n']) || row.ends_with(&[b'\r']) {
130 1
131 } else {
132 0
133 };
134 let row = &row[..row.len() - end];
135 self.cursor += sep_index;
136 Some(row)
137 }
138 }
139 }
140 }
141
142 #[cfg(target_arch = "x86_64")]
143 #[target_feature(enable = "avx2")]
144 pub(crate) unsafe fn new_raw_avx2(&mut self) -> Option<&[u8]> { unsafe {
145 let slice = &self.mmap[self.cursor..];
146 let sep_index = locate_line_break_avx2(slice, self.config.line_break);
147
148 if sep_index == 0 {
149 self.reset_cursor();
150 return None;
151 }
152
153 let full_row = &self.mmap[self.cursor..self.cursor + sep_index];
154 let trim_len = if full_row.ends_with(b"\r\n") {
155 2
156 } else if full_row.ends_with(&[b'\r']) || full_row.ends_with(&[b'\n']) {
157 1
158 } else {
159 0
160 };
161
162 let valid_len = full_row.len().saturating_sub(trim_len);
163 let row = &full_row[..valid_len];
164 self.cursor += sep_index;
165 Some(row)
166 }
167 }
168
169 pub(crate) fn next_raw_memchr3(&mut self) -> Option<&[u8]> {
170 let slice = &self.mmap[self.cursor..];
171 match locate_line_break_memchr3(slice, self.cursor, self.config.line_break) {
172 0 => {
173 self.reset_cursor();
174 None
175 }
176 i => {
177 let row = &self.mmap[self.cursor..i];
178 self.cursor = i;
179 Some(row)
180 }
181 }
182 }
183
184
185 #[cfg(target_arch = "x86_64")]
186 #[target_feature(enable = "avx2")]
187 unsafe fn peek_raw_avx2(mmap: &[u8], cursor: usize, line_break: u8) -> Option<&[u8]> {
188 let slice = &mmap[cursor..];
189 let sep_index = locate_line_break_avx2(slice, line_break);
190
191 if sep_index == 0 {
192 return None;
193 }
194
195 let full_row = &mmap[cursor..cursor + sep_index];
196 let trim_len = if full_row.ends_with(b"\r\n") {
197 2
198 } else if full_row.ends_with(&[b'\r']) || full_row.ends_with(&[b'\n']) {
199 1
200 } else {
201 0
202 };
203 Some(&full_row[..full_row.len().saturating_sub(trim_len)])
204 }
205
206 #[cfg(target_arch = "aarch64")]
207 fn peek_raw_neon(mmap: &[u8], cursor: usize, line_break: u8) -> Option<&[u8]> {
208 unsafe {
209 let slice = &mmap[cursor..];
210 match locate_line_break_neon(slice, line_break) {
211 0 => None,
212 sep_index => {
213 let row = &mmap[cursor..cursor + sep_index];
214 let end = if row.ends_with(b"\r\n") {
215 2
216 } else if row.ends_with(&[b'\n']) || row.ends_with(&[b'\r']) {
217 1
218 } else {
219 0
220 };
221 Some(&row[..row.len() - end])
222 }
223 }
224 }
225 }
226
227 fn peek_raw_memchr3(mmap: &[u8], cursor: usize, line_break: u8) -> Option<&[u8]> {
228 let slice = &mmap[cursor..];
229 match locate_line_break_memchr3(slice, cursor, line_break) {
230 0 => None,
231 i => Some(&mmap[cursor..i]),
232 }
233 }
234
235
236 pub(crate) fn reset_cursor(&mut self) {
237 self.cursor = 0;
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use crate::csv::csv_reader::CsvReaderWithMap;
244 use crate::models::csv_config::CsvConfig;
245 use std::time::Instant;
246
247 #[test]
248 fn test_open_correct_file() {
249 let cfg = CsvConfig::default();
250 let time = Instant::now();
251 let file = CsvReaderWithMap::open("data.csv", &cfg);
252 println!("Performed in :{:?}", time.elapsed());
253 assert!(file.is_ok());
254 }
255
256 #[test]
257 fn test_open_file_dont_exists() {
258 let cfg = CsvConfig::default();
259 let time = Instant::now();
260 let file = CsvReaderWithMap::open("no_existo.csv", &cfg);
261 println!("Performed in :{:?}", time.elapsed());
262 assert!(file.is_err());
263 }
264 #[test]
265 fn test_file_raw() {
266 let mut cfg = CsvConfig::default();
267 cfg.line_break = b'\n';
268 cfg.delimiter = b',';
269 cfg.force_memcach3 = false;
270 let file = CsvReaderWithMap::open("data.csv", &cfg);
271 match file {
272 Ok(mut ok) => {
273 let mut ctr = 0 ;
274 let t = Instant::now();
275 while let Some(_row) = ok.next_raw() {
276
277 ctr = ctr + 1;
278 }
279 println!("Finished after {} milisecs, and {} iterations",t.elapsed().as_millis(), ctr);
280 }
281 Err(_) => {
282 println!("File err");
283 }
284 }
285 }
286}