rsv_lib/csv/
stats.rs

1use crate::args::Stats;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::column_stats::ColumnStats;
5use crate::utils::column_type::ColumnTypes;
6use crate::utils::file::column_n;
7use crate::utils::filename::new_path;
8use crate::utils::progress::Progress;
9use crate::utils::reader::{ChunkReader, Task};
10use crossbeam_channel::{bounded, unbounded, Sender};
11use rayon::ThreadPoolBuilder;
12use std::fs::File;
13use std::io::{BufWriter, Write};
14
15impl Stats {
16    pub fn csv_run(&self) -> CliResult {
17        let path = &self.path();
18
19        // Column
20        let cols = Columns::new(&self.cols)
21            .total_col_of(path, self.sep, self.quote)
22            .parse();
23        let Some(col_type) =
24            ColumnTypes::guess_from_csv(path, self.sep, self.quote, self.no_header, &cols)?
25        else {
26            return Ok(());
27        };
28
29        // open file
30        let mut rdr = ChunkReader::new(path)?;
31
32        // header
33        let name = if self.no_header {
34            let Some(n) = column_n(path, self.sep, self.quote)? else {
35                return Ok(());
36            };
37            cols.artificial_n_cols(n)
38        } else {
39            let Some(r) = rdr.next() else { return Ok(()) };
40            self.split_row_to_owned_vec(&r?)
41        };
42
43        // stats holder
44        let mut stat = ColumnStats::new(&col_type, &name);
45        let empty_stat = stat.clone();
46
47        // parallel channels
48        let (tx_chunk, rx_chunk) = bounded(2);
49        let (tx_chunk_n_control, rx_chunk_n_control) = bounded(200);
50        let (tx_result, rx_result) = unbounded();
51
52        // progress bar
53        let mut prog = Progress::new();
54
55        // threadpool
56        let pool = ThreadPoolBuilder::new().build().unwrap();
57
58        // read
59        pool.spawn(move || rdr.send_to_channel_by_chunks(tx_chunk, 50_000));
60
61        // parallel process
62        pool.scope(|s| {
63            // add chunk to threadpool for process
64            s.spawn(|_| {
65                for task in rx_chunk {
66                    tx_chunk_n_control.send(()).unwrap();
67
68                    let tx = tx_result.clone();
69                    let st = empty_stat.clone();
70                    let sep_inner = self.sep;
71                    let quote_inner = self.quote;
72                    // println!("dispatch........");
73                    pool.spawn(move || parse_chunk(task, tx, st, sep_inner, quote_inner));
74                }
75
76                drop(tx_result);
77                drop(tx_chunk_n_control);
78            });
79
80            // receive result
81            for ChunkResult { bytes, stat: o } in rx_result {
82                rx_chunk_n_control.recv().unwrap();
83                // println!("result-----------");
84                // this is bottleneck, merge two hashset is very slow.
85                stat.merge(o);
86
87                prog.add_bytes(bytes);
88                prog.add_chunks(1);
89                prog.print();
90            }
91
92            prog.clear();
93        });
94
95        // refine result
96        stat.cal_unique_and_mean();
97
98        // print
99        if self.export {
100            let out = new_path(path, "-stats");
101            let mut wtr = BufWriter::new(File::create(&out)?);
102            wtr.write_all(stat.to_string().as_bytes())?;
103            println!("Saved to file: {}", out.display());
104        } else {
105            stat.print();
106        }
107
108        println!("Total rows: {}", stat.rows);
109        prog.print_elapsed_time();
110
111        Ok(())
112    }
113}
114
115struct ChunkResult {
116    bytes: usize,
117    stat: ColumnStats,
118}
119
120fn parse_chunk(task: Task, tx: Sender<ChunkResult>, mut stat: ColumnStats, sep: char, quote: char) {
121    for l in task.lines {
122        stat.parse_line(&l, sep, quote)
123    }
124
125    tx.send(ChunkResult {
126        bytes: task.bytes,
127        stat,
128    })
129    .unwrap()
130}