1use crate::args::Stats;
2use crate::utils::cli_result::CliResult;
3use crate::utils::column::Columns;
4use crate::utils::column_stats::ColumnStats;
5use crate::utils::column_type::ColumnTypes;
6use crate::utils::file::column_n;
7use crate::utils::filename::new_path;
8use crate::utils::progress::Progress;
9use crate::utils::reader::{ChunkReader, Task};
10use crossbeam_channel::{bounded, unbounded, Sender};
11use rayon::ThreadPoolBuilder;
12use std::fs::File;
13use std::io::{BufWriter, Write};
14
15impl Stats {
16 pub fn csv_run(&self) -> CliResult {
17 let path = &self.path();
18
19 let cols = Columns::new(&self.cols)
21 .total_col_of(path, self.sep, self.quote)
22 .parse();
23 let Some(col_type) =
24 ColumnTypes::guess_from_csv(path, self.sep, self.quote, self.no_header, &cols)?
25 else {
26 return Ok(());
27 };
28
29 let mut rdr = ChunkReader::new(path)?;
31
32 let name = if self.no_header {
34 let Some(n) = column_n(path, self.sep, self.quote)? else {
35 return Ok(());
36 };
37 cols.artificial_n_cols(n)
38 } else {
39 let Some(r) = rdr.next() else { return Ok(()) };
40 self.split_row_to_owned_vec(&r?)
41 };
42
43 let mut stat = ColumnStats::new(&col_type, &name);
45 let empty_stat = stat.clone();
46
47 let (tx_chunk, rx_chunk) = bounded(2);
49 let (tx_chunk_n_control, rx_chunk_n_control) = bounded(200);
50 let (tx_result, rx_result) = unbounded();
51
52 let mut prog = Progress::new();
54
55 let pool = ThreadPoolBuilder::new().build().unwrap();
57
58 pool.spawn(move || rdr.send_to_channel_by_chunks(tx_chunk, 50_000));
60
61 pool.scope(|s| {
63 s.spawn(|_| {
65 for task in rx_chunk {
66 tx_chunk_n_control.send(()).unwrap();
67
68 let tx = tx_result.clone();
69 let st = empty_stat.clone();
70 let sep_inner = self.sep;
71 let quote_inner = self.quote;
72 pool.spawn(move || parse_chunk(task, tx, st, sep_inner, quote_inner));
74 }
75
76 drop(tx_result);
77 drop(tx_chunk_n_control);
78 });
79
80 for ChunkResult { bytes, stat: o } in rx_result {
82 rx_chunk_n_control.recv().unwrap();
83 stat.merge(o);
86
87 prog.add_bytes(bytes);
88 prog.add_chunks(1);
89 prog.print();
90 }
91
92 prog.clear();
93 });
94
95 stat.cal_unique_and_mean();
97
98 if self.export {
100 let out = new_path(path, "-stats");
101 let mut wtr = BufWriter::new(File::create(&out)?);
102 wtr.write_all(stat.to_string().as_bytes())?;
103 println!("Saved to file: {}", out.display());
104 } else {
105 stat.print();
106 }
107
108 println!("Total rows: {}", stat.rows);
109 prog.print_elapsed_time();
110
111 Ok(())
112 }
113}
114
115struct ChunkResult {
116 bytes: usize,
117 stat: ColumnStats,
118}
119
120fn parse_chunk(task: Task, tx: Sender<ChunkResult>, mut stat: ColumnStats, sep: char, quote: char) {
121 for l in task.lines {
122 stat.parse_line(&l, sep, quote)
123 }
124
125 tx.send(ChunkResult {
126 bytes: task.bytes,
127 stat,
128 })
129 .unwrap()
130}