rsv_lib/csv/
frequency.rs

1use crate::args::Frequency;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::file::{self};
5use crate::utils::filename;
6use crate::utils::progress::Progress;
7use crate::utils::reader::ChunkReader;
8use crate::utils::util::print_frequency_table;
9use crossbeam_channel::bounded;
10use dashmap::DashMap;
11use rayon::prelude::*;
12use std::thread;
13
14impl Frequency {
15    pub fn csv_run(&self) -> CliResult {
16        let path = &self.path();
17
18        // cols
19        let col = Columns::new(&self.cols)
20            .total_col_of(path, self.sep, self.quote)
21            .parse();
22
23        // open file and header
24        let mut rdr = ChunkReader::new(path)?;
25        let names: Vec<String> = if self.no_header {
26            col.artificial_cols_with_appended_n()
27        } else {
28            let Some(r) = rdr.next() else { return Ok(()) };
29            let r = r?;
30            let r = self.split_row_to_vec(&r);
31            if col.max >= r.len() {
32                println!("[info] ignore a bad line # {r:?}!");
33                col.artificial_cols_with_appended_n()
34            } else {
35                col.select_owned_vector_and_append_n(&r)
36            }
37        };
38
39        // read file
40        let (tx, rx) = bounded(1);
41        thread::spawn(move || rdr.send_to_channel_by_chunks(tx, 10_000));
42
43        // process
44        let freq = DashMap::new();
45        let mut prog = Progress::new();
46        for task in rx {
47            task.lines.par_iter().for_each(|r| {
48                let r = self.split_row_to_vec(r);
49                if col.max >= r.len() {
50                    println!("[info] ignore a bad line # {r:?}!");
51                } else {
52                    let r = col.select_owned_string(&r);
53                    *freq.entry(r).or_insert(0) += 1;
54                }
55            });
56
57            if self.export {
58                prog.add_chunks(1);
59                prog.add_bytes(task.bytes);
60                prog.print();
61            }
62        }
63
64        let mut freq = freq.into_iter().collect::<Vec<(_, _)>>();
65        if self.ascending {
66            freq.sort_by(|a, b| a.1.cmp(&b.1));
67        } else {
68            freq.sort_by(|a, b| b.1.cmp(&a.1));
69        }
70
71        // apply head n
72        if self.n > 0 {
73            freq.truncate(self.n as usize)
74        }
75
76        // export or print
77        if self.export {
78            let new_path = filename::new_path(path, "-frequency");
79            file::write_frequency_to_csv(&new_path, &names, freq);
80            println!("\nSaved to file: {}", new_path.display());
81        } else {
82            print_frequency_table(&names, freq)
83        }
84
85        Ok(())
86    }
87}