rsv_lib/csv/
select.rs

1use crate::args::Select;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::filename::new_path;
5use crate::utils::filter::Filter;
6use crate::utils::progress::Progress;
7use crate::utils::reader::{ChunkReader, Task};
8use crate::utils::writer::Writer;
9use crossbeam_channel::bounded;
10use rayon::prelude::*;
11use std::thread;
12
13impl Select {
14    pub fn csv_run(&self) -> CliResult {
15        let path = &self.path();
16
17        // filters and cols
18        let filter = Filter::new(&self.filter)
19            .total_col_of(path, self.sep, self.quote)
20            .parse();
21        let cols = Columns::new(&self.cols)
22            .total_col_of(path, self.sep, self.quote)
23            .parse();
24
25        // wtr and rdr
26        let out = new_path(path, "-selected");
27        let mut wtr = Writer::file_or_stdout(self.export, &out)?;
28        let mut rdr = ChunkReader::new(path)?;
29
30        // header
31        if !self.no_header {
32            let Some(r) = rdr.next() else { return Ok(()) };
33            let r = r?;
34            if cols.select_all {
35                wtr.write_str_unchecked(&r)
36            } else {
37                let mut r = self.split_row_to_vec(&r);
38                r = cols.iter().map(|&i| r[i]).collect();
39                wtr.write_fields_unchecked(&r);
40            }
41        }
42
43        // parallel queue
44        let (tx, rx) = bounded(1);
45
46        // read
47        thread::spawn(move || rdr.send_to_channel_by_chunks(tx, 10_000));
48
49        // process
50        let mut prog = Progress::new();
51        for task in rx {
52            handle_task(self, task, &filter, &cols, &mut wtr, &mut prog)
53        }
54
55        if self.export {
56            println!("\nSaved to file: {}", out.display())
57        }
58
59        Ok(())
60    }
61}
62
63#[allow(clippy::too_many_arguments)]
64fn handle_task(
65    args: &Select,
66    task: Task,
67    filter: &Filter,
68    cols: &Columns,
69    wtr: &mut Writer,
70    prog: &mut Progress,
71) {
72    // filter
73    let filtered = task
74        .lines
75        .par_iter()
76        .filter_map(|row| filter.record_valid_map(row, args.sep, args.quote))
77        .collect::<Vec<(_, _)>>();
78
79    // write
80    for (r, f) in filtered {
81        // write the line directly
82        if cols.select_all {
83            wtr.write_str_unchecked(r.unwrap());
84            continue;
85        }
86
87        // write by fields
88        let f = f.unwrap_or_else(|| args.split_row_to_vec(r.unwrap()));
89        let row = cols.iter().map(|&i| f[i]).collect::<Vec<_>>();
90        wtr.write_fields_unchecked(&row);
91    }
92
93    if args.export {
94        prog.add_chunks(1);
95        prog.add_bytes(task.bytes);
96        prog.print();
97    }
98}