csv_lib/csv/
csv_reader.rs

1
2#[cfg(target_arch = "x86_64")]
3use crate::helpers::bytes_helper::locate_line_break_avx2;
4#[cfg(target_arch = "aarch64")]
5use crate::helpers::bytes_helper::locate_line_break_neon;
6use crate::helpers::bytes_helper::locate_line_break_memchr3;
7use crate::models::csv_config::CsvConfig;
8use crate::models::csv_error::CsvError;
9use crate::models::platform_info::PlatformInfo;
10use memmap2::Mmap;
11use std::fs::File;
12use std::path::Path;
13use crate::models::row::Row;
14#[derive(Debug)]
15#[repr(C)]
16pub struct CsvReaderWithMap {
17    config: CsvConfig,
18    mmap: Mmap,
19    platform: PlatformInfo,
20    cursor: usize,
21}
22
23impl CsvReaderWithMap {
24    /// ## Get Config
25    /// - Extracts configuration reference
26    pub fn get_config(&self) -> &CsvConfig {
27        &self.config
28    }
29
30    /// ## Get Slice
31    /// - Extracts the slice reference
32    pub fn get_slice(&self) -> &[u8] {
33        &self.mmap[..]
34    }
35
36    /// ## Open
37    /// - Sync execution.
38    /// - Open a CSV file and create a memory-mapped reader.
39    pub fn open<P: AsRef<Path>>(path: P, config: &CsvConfig) -> Result<CsvReaderWithMap, CsvError> {
40        let file = File::open(path).map_err(|err| {
41            CsvError::FileError(format!("Cannot open file. Detail: {}", err))
42        })?;
43
44        let mmap = unsafe {
45            Mmap::map(&file).map_err(|bad| {
46                CsvError::FileError(format!("Cannot map file. Detail: {}", bad))
47            })?
48        };
49
50        Ok(CsvReaderWithMap {
51            config: config.clone(),
52            platform: PlatformInfo::new(),
53            mmap,
54            cursor: 0,
55        })
56    }
57
58    /// ## Next Raw
59    /// - Sync execution.
60    /// - Returns the next row of data from the CSV file as a slice of bytes.
61    pub fn next_raw(&mut self) -> Option<Row<'_>> {
62        let string_separator = self.config.string_separator;
63        let delimiter = self.config.delimiter;
64        let fm = self.config.force_memcach3;
65        let slice = if &self.config.force_memcach3 == &true {
66            self.next_raw_memchr3()
67        } else {
68            #[cfg(target_arch = "x86_64")]
69            {
70                if is_x86_feature_detected!("avx2") {
71                    unsafe { self.new_raw_avx2() }
72                } else {
73                    self.next_raw_memchr3()
74                }
75            }
76            #[cfg(target_arch = "aarch64")]
77            {
78                self.new_raw_neon()
79            }
80        }?;
81        Some(Row::new(slice, delimiter, string_separator, fm))
82    }
83
84    /// ## Peek Raw
85    /// - Returns next line without moving th cursor
86    pub fn peek_raw(&self) -> Option<Row<'_>> {
87        let string_separator = self.config.string_separator;
88        let delimiter = self.config.delimiter;
89        let fm = self.config.force_memcach3;
90        let slice = if fm {
91            Self::peek_raw_memchr3(&self.mmap, self.cursor, self.config.line_break)
92        } else {
93            #[cfg(target_arch = "x86_64")]
94            {
95                if is_x86_feature_detected!("avx2") {
96                    unsafe { Self::peek_raw_avx2(&self.mmap, self.cursor, self.config.line_break) }
97                } else {
98                    Self::peek_raw_memchr3(&self.mmap, self.cursor, self.config.line_break)
99                }
100            }
101            #[cfg(target_arch = "aarch64")]
102            {
103                Self::peek_raw_neon(&self.mmap, self.cursor, self.config.line_break)
104            }
105        }?;
106        Some(Row::new(slice, delimiter, string_separator, fm))
107    }
108
109    /// ## Advance Next
110    /// - Advance one line without returning it
111    pub fn advance_next(&mut self) {
112        let _ = self.next_raw();
113    }
114
115    //--------------------- INTERNAL ---------------------------------------------------------------//
116    #[cfg(target_arch = "aarch64")]
117    pub(crate) fn new_raw_neon(&mut self) -> Option<&[u8]> {
118        unsafe {
119            let slice = &self.mmap[self.cursor..];
120            match locate_line_break_neon(slice, self.config.line_break) {
121                0 => {
122                    self.reset_cursor();
123                    None
124                }
125                sep_index => {
126                    let row = &self.mmap[self.cursor..self.cursor + sep_index];
127                    let end = if row.ends_with(b"\r\n") {
128                        2
129                    } else if row.ends_with(&[b'\n']) || row.ends_with(&[b'\r']) {
130                        1
131                    } else {
132                        0
133                    };
134                    let row = &row[..row.len() - end];
135                    self.cursor += sep_index;
136                    Some(row)
137                }
138            }
139        }
140    }
141
142    #[cfg(target_arch = "x86_64")]
143    #[target_feature(enable = "avx2")]
144    pub(crate) unsafe fn new_raw_avx2(&mut self) -> Option<&[u8]> { unsafe {
145        let slice = &self.mmap[self.cursor..];
146        let sep_index = locate_line_break_avx2(slice, self.config.line_break);
147
148        if sep_index == 0 {
149            self.reset_cursor();
150            return None;
151        }
152
153        let full_row = &self.mmap[self.cursor..self.cursor + sep_index];
154        let trim_len = if full_row.ends_with(b"\r\n") {
155            2
156        } else if full_row.ends_with(&[b'\r']) || full_row.ends_with(&[b'\n']) {
157            1
158        } else {
159            0
160        };
161
162        let valid_len = full_row.len().saturating_sub(trim_len);
163        let row = &full_row[..valid_len];
164        self.cursor += sep_index;
165        Some(row)
166        }
167    }
168
169    pub(crate) fn next_raw_memchr3(&mut self) -> Option<&[u8]> {
170        let slice = &self.mmap[self.cursor..];
171        match locate_line_break_memchr3(slice, self.cursor, self.config.line_break) {
172            0 => {
173                self.reset_cursor();
174                None
175            }
176            i => {
177                let row = &self.mmap[self.cursor..i];
178                self.cursor = i;
179                Some(row)
180            }
181        }
182    }
183
184
185    #[cfg(target_arch = "x86_64")]
186    #[target_feature(enable = "avx2")]
187    unsafe fn peek_raw_avx2(mmap: &[u8], cursor: usize, line_break: u8) -> Option<&[u8]> {
188        let slice = &mmap[cursor..];
189        let sep_index = locate_line_break_avx2(slice, line_break);
190
191        if sep_index == 0 {
192            return None;
193        }
194
195        let full_row = &mmap[cursor..cursor + sep_index];
196        let trim_len = if full_row.ends_with(b"\r\n") {
197            2
198        } else if full_row.ends_with(&[b'\r']) || full_row.ends_with(&[b'\n']) {
199            1
200        } else {
201            0
202        };
203        Some(&full_row[..full_row.len().saturating_sub(trim_len)])
204    }
205
206    #[cfg(target_arch = "aarch64")]
207    fn peek_raw_neon(mmap: &[u8], cursor: usize, line_break: u8) -> Option<&[u8]> {
208        unsafe {
209            let slice = &mmap[cursor..];
210            match locate_line_break_neon(slice, line_break) {
211                0 => None,
212                sep_index => {
213                    let row = &mmap[cursor..cursor + sep_index];
214                    let end = if row.ends_with(b"\r\n") {
215                        2
216                    } else if row.ends_with(&[b'\n']) || row.ends_with(&[b'\r']) {
217                        1
218                    } else {
219                        0
220                    };
221                    Some(&row[..row.len() - end])
222                }
223            }
224        }
225    }
226
227    fn peek_raw_memchr3(mmap: &[u8], cursor: usize, line_break: u8) -> Option<&[u8]> {
228        let slice = &mmap[cursor..];
229        match locate_line_break_memchr3(slice, cursor, line_break) {
230            0 => None,
231            i => Some(&mmap[cursor..i]),
232        }
233    }
234
235
236    pub(crate) fn reset_cursor(&mut self) {
237        self.cursor = 0;
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use crate::csv::csv_reader::CsvReaderWithMap;
244    use crate::models::csv_config::CsvConfig;
245    use std::time::Instant;
246
247    #[test]
248    fn test_open_correct_file() {
249        let cfg = CsvConfig::default();
250        let time = Instant::now();
251        let file = CsvReaderWithMap::open("data.csv", &cfg);
252        println!("Performed in :{:?}", time.elapsed());
253        assert!(file.is_ok());
254    }
255
256    #[test]
257    fn test_open_file_dont_exists() {
258        let cfg = CsvConfig::default();
259        let time = Instant::now();
260        let file = CsvReaderWithMap::open("no_existo.csv", &cfg);
261        println!("Performed in :{:?}", time.elapsed());
262        assert!(file.is_err());
263    }
264    #[test]
265    fn test_file_raw() {
266        let mut cfg = CsvConfig::default();
267        cfg.line_break = b'\n';
268        cfg.delimiter = b',';
269        cfg.force_memcach3 = false;
270        let file = CsvReaderWithMap::open("data.csv", &cfg);
271        match file {
272            Ok(mut ok) => {
273                let mut ctr = 0 ;
274                let t = Instant::now();
275                while let Some(_row) = ok.next_raw() {
276                    
277                    ctr = ctr + 1;
278                }
279                println!("Finished after {} milisecs,  and  {} iterations",t.elapsed().as_millis(), ctr);
280            }
281            Err(_) => {
282                println!("File err");
283            }
284        }
285    }
286}