rsv_lib/csv/
search.rs

1use crate::args::Search;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::filename::new_path;
5use crate::utils::progress::Progress;
6use crate::utils::reader::ChunkReader;
7use crate::utils::regex::Re;
8use crate::utils::writer::Writer;
9use crossbeam_channel::bounded;
10use rayon::prelude::*;
11use std::thread;
12
13impl Search {
14    pub fn csv_run(&self) -> CliResult {
15        let path = &self.path();
16        let cols = Columns::new(&self.cols)
17            .total_col_of(path, self.sep, self.quote)
18            .parse();
19        let filter = Columns::new(&self.filter)
20            .total_col_of(path, self.sep, self.quote)
21            .parse();
22
23        // wtr and rdr
24        let out = new_path(path, "-searched");
25        let mut wtr = Writer::file_or_stdout(self.export, &out)?;
26        let mut rdr = ChunkReader::new(path)?;
27
28        // header
29        if !self.no_header {
30            let Some(r) = rdr.next() else { return Ok(()) };
31            let r = r?;
32            if cols.select_all {
33                wtr.write_str_unchecked(&r)
34            } else {
35                let mut r = self.split_row_to_vec(&r);
36                r = cols.iter().map(|&i| r[i]).collect();
37                wtr.write_fields_unchecked(&r);
38            }
39        }
40
41        // read file
42        let (tx, rx) = bounded(2);
43        thread::spawn(move || rdr.send_to_channel_by_chunks(tx, 10_000));
44
45        // progress for export option
46        let mut prog = Progress::new();
47
48        // regex search
49        let re = Re::new(&self.pattern)?;
50        let mut matched_n = 0;
51        for task in rx {
52            matched_n += match (filter.select_all, cols.select_all) {
53                (true, true) => {
54                    let lines = task
55                        .lines
56                        .par_iter()
57                        .filter(|&i| re.is_match(i))
58                        .collect::<Vec<_>>();
59                    wtr.write_strings_unchecked(&lines);
60                    lines.len()
61                }
62                (true, false) => {
63                    let lines = task
64                        .lines
65                        .par_iter()
66                        .filter_map(|r| {
67                            re.is_match(r).then_some({
68                                let r = self.split_row_to_vec(r);
69                                cols.iter().map(|&i| r[i]).collect::<Vec<_>>()
70                            })
71                        })
72                        .collect::<Vec<_>>();
73                    wtr.write_fields_of_lines_unchecked(&lines);
74                    lines.len()
75                }
76                (false, true) => {
77                    let lines = task
78                        .lines
79                        .par_iter()
80                        .filter(|r| {
81                            let r = self.split_row_to_vec(r);
82                            filter.iter().any(|&i| re.is_match(r[i]))
83                        })
84                        .collect::<Vec<_>>();
85                    wtr.write_strings_unchecked(&lines);
86                    lines.len()
87                }
88                (false, false) => {
89                    let lines = task
90                        .lines
91                        .par_iter()
92                        .filter_map(|r| {
93                            let r = self.split_row_to_vec(r);
94                            filter
95                                .iter()
96                                .any(|&i| re.is_match(r[i]))
97                                .then_some(cols.iter().map(|&i| r[i]).collect::<Vec<_>>())
98                        })
99                        .collect::<Vec<_>>();
100                    wtr.write_fields_of_lines_unchecked(&lines);
101                    lines.len()
102                }
103            };
104
105            if self.export {
106                prog.add_chunks(1);
107                prog.add_bytes(task.bytes);
108                prog.print();
109            }
110        }
111
112        if self.export {
113            println!("\nMatched rows: {matched_n}");
114            println!("Saved to file: {}", out.display());
115        }
116
117        Ok(())
118    }
119}