1use crate::args::Select;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::filename::new_path;
5use crate::utils::filter::Filter;
6use crate::utils::progress::Progress;
7use crate::utils::reader::{ChunkReader, Task};
8use crate::utils::writer::Writer;
9use crossbeam_channel::bounded;
10use rayon::prelude::*;
11use std::thread;
12
13impl Select {
14 pub fn csv_run(&self) -> CliResult {
15 let path = &self.path();
16
17 let filter = Filter::new(&self.filter)
19 .total_col_of(path, self.sep, self.quote)
20 .parse();
21 let cols = Columns::new(&self.cols)
22 .total_col_of(path, self.sep, self.quote)
23 .parse();
24
25 let out = new_path(path, "-selected");
27 let mut wtr = Writer::file_or_stdout(self.export, &out)?;
28 let mut rdr = ChunkReader::new(path)?;
29
30 if !self.no_header {
32 let Some(r) = rdr.next() else { return Ok(()) };
33 let r = r?;
34 if cols.select_all {
35 wtr.write_str_unchecked(&r)
36 } else {
37 let mut r = self.split_row_to_vec(&r);
38 r = cols.iter().map(|&i| r[i]).collect();
39 wtr.write_fields_unchecked(&r);
40 }
41 }
42
43 let (tx, rx) = bounded(1);
45
46 thread::spawn(move || rdr.send_to_channel_by_chunks(tx, 10_000));
48
49 let mut prog = Progress::new();
51 for task in rx {
52 handle_task(self, task, &filter, &cols, &mut wtr, &mut prog)
53 }
54
55 if self.export {
56 println!("\nSaved to file: {}", out.display())
57 }
58
59 Ok(())
60 }
61}
62
63#[allow(clippy::too_many_arguments)]
64fn handle_task(
65 args: &Select,
66 task: Task,
67 filter: &Filter,
68 cols: &Columns,
69 wtr: &mut Writer,
70 prog: &mut Progress,
71) {
72 let filtered = task
74 .lines
75 .par_iter()
76 .filter_map(|row| filter.record_valid_map(row, args.sep, args.quote))
77 .collect::<Vec<(_, _)>>();
78
79 for (r, f) in filtered {
81 if cols.select_all {
83 wtr.write_str_unchecked(r.unwrap());
84 continue;
85 }
86
87 let f = f.unwrap_or_else(|| args.split_row_to_vec(r.unwrap()));
89 let row = cols.iter().map(|&i| f[i]).collect::<Vec<_>>();
90 wtr.write_fields_unchecked(&row);
91 }
92
93 if args.export {
94 prog.add_chunks(1);
95 prog.add_bytes(task.bytes);
96 prog.print();
97 }
98}