1use std::{collections::HashMap, path::Path};
2
3use anyhow::{Context, Result};
4use encoding_rs::Encoding;
5
6use crate::{
7 data::Value,
8 filter::{FilterCondition, evaluate_conditions},
9 io_utils,
10 rows::{evaluate_filter_expressions, parse_typed_row},
11 schema::{self, Schema},
12};
13
14pub struct FrequencyOptions<'a> {
15 pub top: usize,
16 pub row_limit: Option<usize>,
17 pub filters: &'a [FilterCondition],
18 pub filter_exprs: &'a [String],
19}
20
21pub fn compute_frequency_rows(
22 input: &Path,
23 schema: &Schema,
24 delimiter: u8,
25 encoding: &'static Encoding,
26 columns: &[usize],
27 options: &FrequencyOptions,
28) -> Result<Vec<Vec<String>>> {
29 let expects_headers = schema.expects_headers();
30 let mut reader = io_utils::open_csv_reader_from_path(input, delimiter, expects_headers)?;
31 let headers = if expects_headers {
32 let headers = io_utils::reader_headers(&mut reader, encoding)?;
33 schema
34 .validate_headers(&headers)
35 .with_context(|| format!("Validating headers for {input:?}", input = input))?;
36 headers
37 } else {
38 schema.headers()
39 };
40 let header_aliases = schema.header_alias_sets();
41
42 let mut stats = FrequencyAccumulator::new(columns, schema);
43
44 for (row_idx, record) in reader.byte_records().enumerate() {
45 if let Some(limit) = options.row_limit
46 && row_idx >= limit
47 {
48 break;
49 }
50 let record = record.with_context(|| format!("Reading row {}", row_idx + 2))?;
51 let mut decoded = io_utils::decode_record(&record, encoding)?;
52 if schema::row_looks_like_header(&decoded, &header_aliases) {
53 continue;
54 }
55 if schema.has_transformations() {
56 schema
57 .apply_transformations_to_row(&mut decoded)
58 .with_context(|| {
59 format!(
60 "Applying datatype mappings to row {} in {input:?}",
61 row_idx + 2
62 )
63 })?;
64 }
65 schema.apply_replacements_to_row(&mut decoded);
66 let typed = parse_typed_row(schema, &decoded)?;
67 if !options.filters.is_empty()
68 && !evaluate_conditions(options.filters, schema, &headers, &decoded, &typed)?
69 {
70 continue;
71 }
72 if !options.filter_exprs.is_empty()
73 && !evaluate_filter_expressions(
74 options.filter_exprs,
75 &headers,
76 &decoded,
77 &typed,
78 Some(row_idx + 1),
79 )?
80 {
81 continue;
82 }
83 stats
84 .ingest(schema, &decoded, &typed)
85 .with_context(|| format!("Processing row {}", row_idx + 2))?;
86 }
87
88 let mut rows = Vec::new();
89 for &column_index in columns {
90 rows.extend(stats.render_rows(column_index, options.top));
91 }
92 Ok(rows)
93}
94
95struct FrequencyAccumulator {
96 columns: Vec<usize>,
97 totals: HashMap<usize, usize>,
98 counts: HashMap<usize, HashMap<String, usize>>,
99 names: HashMap<usize, String>,
100}
101
102impl FrequencyAccumulator {
103 fn new(columns: &[usize], schema: &Schema) -> Self {
104 let mut totals = HashMap::new();
105 let mut counts = HashMap::new();
106 let mut names = HashMap::new();
107 for idx in columns {
108 totals.insert(*idx, 0);
109 counts.insert(*idx, HashMap::new());
110 names.insert(*idx, schema.columns[*idx].output_name().to_string());
111 }
112 Self {
113 columns: columns.to_vec(),
114 totals,
115 counts,
116 names,
117 }
118 }
119
120 fn ingest(
121 &mut self,
122 schema: &Schema,
123 raw_row: &[String],
124 typed_row: &[Option<Value>],
125 ) -> Result<()> {
126 for column_index in &self.columns {
127 let column = &schema.columns[*column_index];
128 let raw = raw_row.get(*column_index).map(|s| s.as_str()).unwrap_or("");
129 let normalized = column.normalize_value(raw);
130 let value = if normalized.is_empty() {
131 String::from("<empty>")
132 } else if let Some(typed) = typed_row.get(*column_index).and_then(|v| v.as_ref()) {
133 display_value(typed)
134 } else {
135 normalized.into_owned()
136 };
137 let total = self
138 .totals
139 .get_mut(column_index)
140 .expect("Column should exist in totals");
141 *total += 1;
142 let counter = self
143 .counts
144 .get_mut(column_index)
145 .expect("Column should exist in counts");
146 *counter.entry(value).or_insert(0) += 1;
147 }
148 Ok(())
149 }
150
151 fn render_rows(&self, column_index: usize, top: usize) -> Vec<Vec<String>> {
152 let total = match self.totals.get(&column_index) {
153 Some(total) if *total > 0 => *total,
154 _ => return Vec::new(),
155 };
156 let mut items = self
157 .counts
158 .get(&column_index)
159 .cloned()
160 .unwrap_or_default()
161 .into_iter()
162 .collect::<Vec<_>>();
163 items.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
164 if top > 0 && items.len() > top {
165 items.truncate(top);
166 }
167 items
168 .into_iter()
169 .map(|(value, count)| {
170 let percent = (count as f64 / total as f64) * 100.0;
171 vec![
172 self.names
173 .get(&column_index)
174 .cloned()
175 .unwrap_or_else(|| column_index.to_string()),
176 value,
177 count.to_string(),
178 format!("{percent:.2}%"),
179 ]
180 })
181 .collect()
182 }
183}
184
185fn display_value(value: &Value) -> String {
186 match value {
187 Value::String(s) => s.clone(),
188 Value::Integer(i) => i.to_string(),
189 Value::Float(f) => format_number(*f),
190 Value::Boolean(b) => b.to_string(),
191 Value::Date(d) => d.format("%Y-%m-%d").to_string(),
192 Value::DateTime(dt) => dt.format("%Y-%m-%d %H:%M:%S").to_string(),
193 Value::Time(t) => t.format("%H:%M:%S").to_string(),
194 Value::Guid(g) => g.to_string(),
195 Value::Decimal(d) => d.to_string_fixed(),
196 Value::Currency(c) => c.to_string_fixed(),
197 }
198}
199
200fn format_number(value: f64) -> String {
201 if value.fract() == 0.0 {
202 format!("{value:.0}")
203 } else {
204 format!("{value:.4}")
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use encoding_rs::UTF_8;
212
213 const DATA_FILE: &str = "big_5_players_stats_2023_2024.csv";
214 const GOALS_COL: &str = "Performance_Gls";
215
216 fn fixture_path() -> std::path::PathBuf {
217 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
218 .join("tests")
219 .join("data")
220 .join(DATA_FILE)
221 }
222
223 #[test]
224 fn accumulator_counts_goal_totals() {
225 let path = fixture_path();
226 assert!(path.exists(), "fixture missing: {path:?}");
227 let delimiter = crate::io_utils::resolve_input_delimiter(&path, None);
228 let schema =
229 crate::schema::infer_schema(&path, 200, delimiter, UTF_8, None).expect("infer schema");
230 let column_index = schema.column_index(GOALS_COL).expect("column index");
231 let mut accumulator = FrequencyAccumulator::new(&[column_index], &schema);
232 let mut reader =
233 crate::io_utils::open_csv_reader_from_path(&path, delimiter, true).expect("open csv");
234 crate::io_utils::reader_headers(&mut reader, UTF_8).expect("headers");
235
236 for (idx, record) in reader.byte_records().enumerate() {
237 if idx >= 100 {
238 break;
239 }
240 let record = record.expect("record");
241 let mut decoded = crate::io_utils::decode_record(&record, UTF_8).expect("decode");
242 if decoded
243 .first()
244 .is_some_and(|value| value.parse::<i64>().is_err())
245 {
246 continue;
248 }
249 schema.apply_replacements_to_row(&mut decoded);
250 let typed = crate::rows::parse_typed_row(&schema, &decoded).expect("parse typed row");
251 accumulator
252 .ingest(&schema, &decoded, &typed)
253 .expect("ingest row");
254 }
255
256 let rows = accumulator.render_rows(column_index, 3);
257 assert!(!rows.is_empty());
258 assert_eq!(rows[0][0], GOALS_COL);
259 }
260}