1use std::collections::HashMap;
2
3use anyhow::{Context, Result, anyhow};
4use encoding_rs::Encoding;
5use log::info;
6
7use crate::{
8 cli::FrequencyArgs,
9 data::{Value, parse_typed_value},
10 io_utils,
11 schema::{self, Schema},
12 table,
13};
14
15pub fn execute(args: &FrequencyArgs) -> Result<()> {
16 if args.schema.is_none() && io_utils::is_dash(&args.input) {
17 return Err(anyhow!(
18 "Reading from stdin requires --schema (or --meta) for frequency analysis"
19 ));
20 }
21
22 let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
23 let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
24
25 let schema = load_or_infer_schema(args, delimiter, encoding)?;
26 let columns = resolve_columns(&schema, &args.columns)?;
27 if columns.is_empty() {
28 return Err(anyhow!(
29 "No columns available for frequency analysis. Supply --columns to continue."
30 ));
31 }
32
33 let mut reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, true)?;
34 let headers = io_utils::reader_headers(&mut reader, encoding)?;
35 schema
36 .validate_headers(&headers)
37 .with_context(|| format!("Validating headers for {input:?}", input = args.input))?;
38
39 let mut stats = FrequencyAccumulator::new(&columns, &schema);
40
41 for (row_idx, record) in reader.byte_records().enumerate() {
42 let record = record.with_context(|| format!("Reading row {}", row_idx + 2))?;
43 let mut decoded = io_utils::decode_record(&record, encoding)?;
44 schema.apply_replacements_to_row(&mut decoded);
45 stats
46 .ingest(&schema, &decoded)
47 .with_context(|| format!("Processing row {}", row_idx + 2))?;
48 }
49
50 let mut rows = Vec::new();
51 for column_index in &columns {
52 rows.extend(stats.render_rows(*column_index, args.top));
53 }
54
55 let headers = vec![
56 "column".to_string(),
57 "value".to_string(),
58 "count".to_string(),
59 "percent".to_string(),
60 ];
61 table::print_table(&headers, &rows);
62 info!("Computed frequency counts for {} column(s)", columns.len());
63 Ok(())
64}
65
66fn load_or_infer_schema(
67 args: &FrequencyArgs,
68 delimiter: u8,
69 encoding: &'static Encoding,
70) -> Result<Schema> {
71 if let Some(path) = &args.schema {
72 Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))
73 } else {
74 schema::infer_schema(&args.input, 0, delimiter, encoding)
75 .with_context(|| format!("Inferring schema from {input:?}", input = args.input))
76 }
77}
78
79fn resolve_columns(schema: &Schema, specified: &[String]) -> Result<Vec<usize>> {
80 if specified.is_empty() {
81 Ok((0..schema.columns.len()).collect())
82 } else {
83 specified
84 .iter()
85 .map(|name| {
86 schema
87 .column_index(name)
88 .ok_or_else(|| anyhow!("Column '{name}' not found in schema"))
89 })
90 .collect()
91 }
92}
93
94struct FrequencyAccumulator {
95 columns: Vec<usize>,
96 totals: HashMap<usize, usize>,
97 counts: HashMap<usize, HashMap<String, usize>>,
98 names: HashMap<usize, String>,
99}
100
101impl FrequencyAccumulator {
102 fn new(columns: &[usize], schema: &Schema) -> Self {
103 let mut totals = HashMap::new();
104 let mut counts = HashMap::new();
105 let mut names = HashMap::new();
106 for idx in columns {
107 totals.insert(*idx, 0);
108 counts.insert(*idx, HashMap::new());
109 names.insert(*idx, schema.columns[*idx].output_name().to_string());
110 }
111 Self {
112 columns: columns.to_vec(),
113 totals,
114 counts,
115 names,
116 }
117 }
118
119 fn ingest(&mut self, schema: &Schema, record: &[String]) -> Result<()> {
120 for column_index in &self.columns {
121 let column = &schema.columns[*column_index];
122 let raw = record.get(*column_index).map(|s| s.as_str()).unwrap_or("");
123 let normalized = column.normalize_value(raw);
124 let value = if normalized.is_empty() {
125 String::from("<empty>")
126 } else {
127 match parse_typed_value(normalized.as_ref(), &column.datatype)
128 .with_context(|| format!("Column '{}'", column.output_name()))?
129 {
130 Some(Value::String(s)) => s,
131 Some(Value::Integer(i)) => i.to_string(),
132 Some(Value::Float(f)) => format_number(f),
133 Some(Value::Boolean(b)) => b.to_string(),
134 Some(Value::Date(d)) => d.format("%Y-%m-%d").to_string(),
135 Some(Value::DateTime(dt)) => dt.format("%Y-%m-%d %H:%M:%S").to_string(),
136 Some(Value::Guid(g)) => g.to_string(),
137 None => String::from("<empty>"),
138 }
139 };
140 let total = self
141 .totals
142 .get_mut(column_index)
143 .expect("Column should exist in totals");
144 *total += 1;
145 let counter = self
146 .counts
147 .get_mut(column_index)
148 .expect("Column should exist in counts");
149 *counter.entry(value).or_insert(0) += 1;
150 }
151 Ok(())
152 }
153
154 fn render_rows(&self, column_index: usize, top: usize) -> Vec<Vec<String>> {
155 let total = match self.totals.get(&column_index) {
156 Some(total) if *total > 0 => *total,
157 _ => return Vec::new(),
158 };
159 let mut items = self
160 .counts
161 .get(&column_index)
162 .cloned()
163 .unwrap_or_default()
164 .into_iter()
165 .collect::<Vec<_>>();
166 items.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
167 if top > 0 && items.len() > top {
168 items.truncate(top);
169 }
170 items
171 .into_iter()
172 .map(|(value, count)| {
173 let percent = (count as f64 / total as f64) * 100.0;
174 vec![
175 self.names
176 .get(&column_index)
177 .cloned()
178 .unwrap_or_else(|| column_index.to_string()),
179 value,
180 count.to_string(),
181 format!("{percent:.2}%"),
182 ]
183 })
184 .collect()
185 }
186}
187
188fn format_number(value: f64) -> String {
189 if value.fract() == 0.0 {
190 format!("{value:.0}")
191 } else {
192 format!("{value:.4}")
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use encoding_rs::UTF_8;
200
201 const DATA_FILE: &str = "big_5_players_stats_2023_2024.csv";
202 const GOALS_COL: &str = "Performance_Gls";
203
204 fn fixture_path() -> std::path::PathBuf {
205 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
206 .join("tests")
207 .join("data")
208 .join(DATA_FILE)
209 }
210
211 #[test]
212 fn accumulator_counts_goal_totals() {
213 let path = fixture_path();
214 assert!(path.exists(), "fixture missing: {path:?}");
215 let delimiter = crate::io_utils::resolve_input_delimiter(&path, None);
216 let schema =
217 crate::schema::infer_schema(&path, 200, delimiter, UTF_8).expect("infer schema");
218 let column_index = schema.column_index(GOALS_COL).expect("column index");
219 let mut accumulator = FrequencyAccumulator::new(&[column_index], &schema);
220 let mut reader =
221 crate::io_utils::open_csv_reader_from_path(&path, delimiter, true).expect("open csv");
222 crate::io_utils::reader_headers(&mut reader, UTF_8).expect("headers");
223
224 for (idx, record) in reader.byte_records().enumerate() {
225 if idx >= 100 {
226 break;
227 }
228 let record = record.expect("record");
229 let decoded = crate::io_utils::decode_record(&record, UTF_8).expect("decode");
230 accumulator.ingest(&schema, &decoded).expect("ingest row");
231 }
232
233 let rows = accumulator.render_rows(column_index, 3);
234 assert!(!rows.is_empty());
235 assert_eq!(rows[0][0], GOALS_COL);
236 }
237}