1use crate::args::Search;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::filename::new_path;
5use crate::utils::progress::Progress;
6use crate::utils::reader::ChunkReader;
7use crate::utils::regex::Re;
8use crate::utils::writer::Writer;
9use crossbeam_channel::bounded;
10use rayon::prelude::*;
11use std::thread;
12
13impl Search {
14 pub fn csv_run(&self) -> CliResult {
15 let path = &self.path();
16 let cols = Columns::new(&self.cols)
17 .total_col_of(path, self.sep, self.quote)
18 .parse();
19 let filter = Columns::new(&self.filter)
20 .total_col_of(path, self.sep, self.quote)
21 .parse();
22
23 let out = new_path(path, "-searched");
25 let mut wtr = Writer::file_or_stdout(self.export, &out)?;
26 let mut rdr = ChunkReader::new(path)?;
27
28 if !self.no_header {
30 let Some(r) = rdr.next() else { return Ok(()) };
31 let r = r?;
32 if cols.select_all {
33 wtr.write_str_unchecked(&r)
34 } else {
35 let mut r = self.split_row_to_vec(&r);
36 r = cols.iter().map(|&i| r[i]).collect();
37 wtr.write_fields_unchecked(&r);
38 }
39 }
40
41 let (tx, rx) = bounded(2);
43 thread::spawn(move || rdr.send_to_channel_by_chunks(tx, 10_000));
44
45 let mut prog = Progress::new();
47
48 let re = Re::new(&self.pattern)?;
50 let mut matched_n = 0;
51 for task in rx {
52 matched_n += match (filter.select_all, cols.select_all) {
53 (true, true) => {
54 let lines = task
55 .lines
56 .par_iter()
57 .filter(|&i| re.is_match(i))
58 .collect::<Vec<_>>();
59 wtr.write_strings_unchecked(&lines);
60 lines.len()
61 }
62 (true, false) => {
63 let lines = task
64 .lines
65 .par_iter()
66 .filter_map(|r| {
67 re.is_match(r).then_some({
68 let r = self.split_row_to_vec(r);
69 cols.iter().map(|&i| r[i]).collect::<Vec<_>>()
70 })
71 })
72 .collect::<Vec<_>>();
73 wtr.write_fields_of_lines_unchecked(&lines);
74 lines.len()
75 }
76 (false, true) => {
77 let lines = task
78 .lines
79 .par_iter()
80 .filter(|r| {
81 let r = self.split_row_to_vec(r);
82 filter.iter().any(|&i| re.is_match(r[i]))
83 })
84 .collect::<Vec<_>>();
85 wtr.write_strings_unchecked(&lines);
86 lines.len()
87 }
88 (false, false) => {
89 let lines = task
90 .lines
91 .par_iter()
92 .filter_map(|r| {
93 let r = self.split_row_to_vec(r);
94 filter
95 .iter()
96 .any(|&i| re.is_match(r[i]))
97 .then_some(cols.iter().map(|&i| r[i]).collect::<Vec<_>>())
98 })
99 .collect::<Vec<_>>();
100 wtr.write_fields_of_lines_unchecked(&lines);
101 lines.len()
102 }
103 };
104
105 if self.export {
106 prog.add_chunks(1);
107 prog.add_bytes(task.bytes);
108 prog.print();
109 }
110 }
111
112 if self.export {
113 println!("\nMatched rows: {matched_n}");
114 println!("Saved to file: {}", out.display());
115 }
116
117 Ok(())
118 }
119}