csv_managed/
frequency.rs

1use std::{collections::HashMap, path::Path};
2
3use anyhow::{Context, Result};
4use encoding_rs::Encoding;
5
6use crate::{
7    data::Value,
8    filter::{FilterCondition, evaluate_conditions},
9    io_utils,
10    rows::{evaluate_filter_expressions, parse_typed_row},
11    schema::{self, Schema},
12};
13
14pub struct FrequencyOptions<'a> {
15    pub top: usize,
16    pub row_limit: Option<usize>,
17    pub filters: &'a [FilterCondition],
18    pub filter_exprs: &'a [String],
19}
20
21pub fn compute_frequency_rows(
22    input: &Path,
23    schema: &Schema,
24    delimiter: u8,
25    encoding: &'static Encoding,
26    columns: &[usize],
27    options: &FrequencyOptions,
28) -> Result<Vec<Vec<String>>> {
29    let expects_headers = schema.expects_headers();
30    let mut reader = io_utils::open_csv_reader_from_path(input, delimiter, expects_headers)?;
31    let headers = if expects_headers {
32        let headers = io_utils::reader_headers(&mut reader, encoding)?;
33        schema
34            .validate_headers(&headers)
35            .with_context(|| format!("Validating headers for {input:?}", input = input))?;
36        headers
37    } else {
38        schema.headers()
39    };
40    let header_aliases = schema.header_alias_sets();
41
42    let mut stats = FrequencyAccumulator::new(columns, schema);
43
44    for (row_idx, record) in reader.byte_records().enumerate() {
45        if let Some(limit) = options.row_limit
46            && row_idx >= limit
47        {
48            break;
49        }
50        let record = record.with_context(|| format!("Reading row {}", row_idx + 2))?;
51        let mut decoded = io_utils::decode_record(&record, encoding)?;
52        if schema::row_looks_like_header(&decoded, &header_aliases) {
53            continue;
54        }
55        if schema.has_transformations() {
56            schema
57                .apply_transformations_to_row(&mut decoded)
58                .with_context(|| {
59                    format!(
60                        "Applying datatype mappings to row {} in {input:?}",
61                        row_idx + 2
62                    )
63                })?;
64        }
65        schema.apply_replacements_to_row(&mut decoded);
66        let typed = parse_typed_row(schema, &decoded)?;
67        if !options.filters.is_empty()
68            && !evaluate_conditions(options.filters, schema, &headers, &decoded, &typed)?
69        {
70            continue;
71        }
72        if !options.filter_exprs.is_empty()
73            && !evaluate_filter_expressions(
74                options.filter_exprs,
75                &headers,
76                &decoded,
77                &typed,
78                Some(row_idx + 1),
79            )?
80        {
81            continue;
82        }
83        stats
84            .ingest(schema, &decoded, &typed)
85            .with_context(|| format!("Processing row {}", row_idx + 2))?;
86    }
87
88    let mut rows = Vec::new();
89    for &column_index in columns {
90        rows.extend(stats.render_rows(column_index, options.top));
91    }
92    Ok(rows)
93}
94
95struct FrequencyAccumulator {
96    columns: Vec<usize>,
97    totals: HashMap<usize, usize>,
98    counts: HashMap<usize, HashMap<String, usize>>,
99    names: HashMap<usize, String>,
100}
101
102impl FrequencyAccumulator {
103    fn new(columns: &[usize], schema: &Schema) -> Self {
104        let mut totals = HashMap::new();
105        let mut counts = HashMap::new();
106        let mut names = HashMap::new();
107        for idx in columns {
108            totals.insert(*idx, 0);
109            counts.insert(*idx, HashMap::new());
110            names.insert(*idx, schema.columns[*idx].output_name().to_string());
111        }
112        Self {
113            columns: columns.to_vec(),
114            totals,
115            counts,
116            names,
117        }
118    }
119
120    fn ingest(
121        &mut self,
122        schema: &Schema,
123        raw_row: &[String],
124        typed_row: &[Option<Value>],
125    ) -> Result<()> {
126        for column_index in &self.columns {
127            let column = &schema.columns[*column_index];
128            let raw = raw_row.get(*column_index).map(|s| s.as_str()).unwrap_or("");
129            let normalized = column.normalize_value(raw);
130            let value = if normalized.is_empty() {
131                String::from("<empty>")
132            } else if let Some(typed) = typed_row.get(*column_index).and_then(|v| v.as_ref()) {
133                display_value(typed)
134            } else {
135                normalized.into_owned()
136            };
137            let total = self
138                .totals
139                .get_mut(column_index)
140                .expect("Column should exist in totals");
141            *total += 1;
142            let counter = self
143                .counts
144                .get_mut(column_index)
145                .expect("Column should exist in counts");
146            *counter.entry(value).or_insert(0) += 1;
147        }
148        Ok(())
149    }
150
151    fn render_rows(&self, column_index: usize, top: usize) -> Vec<Vec<String>> {
152        let total = match self.totals.get(&column_index) {
153            Some(total) if *total > 0 => *total,
154            _ => return Vec::new(),
155        };
156        let mut items = self
157            .counts
158            .get(&column_index)
159            .cloned()
160            .unwrap_or_default()
161            .into_iter()
162            .collect::<Vec<_>>();
163        items.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
164        if top > 0 && items.len() > top {
165            items.truncate(top);
166        }
167        items
168            .into_iter()
169            .map(|(value, count)| {
170                let percent = (count as f64 / total as f64) * 100.0;
171                vec![
172                    self.names
173                        .get(&column_index)
174                        .cloned()
175                        .unwrap_or_else(|| column_index.to_string()),
176                    value,
177                    count.to_string(),
178                    format!("{percent:.2}%"),
179                ]
180            })
181            .collect()
182    }
183}
184
185fn display_value(value: &Value) -> String {
186    match value {
187        Value::String(s) => s.clone(),
188        Value::Integer(i) => i.to_string(),
189        Value::Float(f) => format_number(*f),
190        Value::Boolean(b) => b.to_string(),
191        Value::Date(d) => d.format("%Y-%m-%d").to_string(),
192        Value::DateTime(dt) => dt.format("%Y-%m-%d %H:%M:%S").to_string(),
193        Value::Time(t) => t.format("%H:%M:%S").to_string(),
194        Value::Guid(g) => g.to_string(),
195        Value::Decimal(d) => d.to_string_fixed(),
196        Value::Currency(c) => c.to_string_fixed(),
197    }
198}
199
200fn format_number(value: f64) -> String {
201    if value.fract() == 0.0 {
202        format!("{value:.0}")
203    } else {
204        format!("{value:.4}")
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use encoding_rs::UTF_8;
212
213    const DATA_FILE: &str = "big_5_players_stats_2023_2024.csv";
214    const GOALS_COL: &str = "Performance_Gls";
215
216    fn fixture_path() -> std::path::PathBuf {
217        std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
218            .join("tests")
219            .join("data")
220            .join(DATA_FILE)
221    }
222
223    #[test]
224    fn accumulator_counts_goal_totals() {
225        let path = fixture_path();
226        assert!(path.exists(), "fixture missing: {path:?}");
227        let delimiter = crate::io_utils::resolve_input_delimiter(&path, None);
228        let schema =
229            crate::schema::infer_schema(&path, 200, delimiter, UTF_8, None).expect("infer schema");
230        let column_index = schema.column_index(GOALS_COL).expect("column index");
231        let mut accumulator = FrequencyAccumulator::new(&[column_index], &schema);
232        let mut reader =
233            crate::io_utils::open_csv_reader_from_path(&path, delimiter, true).expect("open csv");
234        crate::io_utils::reader_headers(&mut reader, UTF_8).expect("headers");
235
236        for (idx, record) in reader.byte_records().enumerate() {
237            if idx >= 100 {
238                break;
239            }
240            let record = record.expect("record");
241            let mut decoded = crate::io_utils::decode_record(&record, UTF_8).expect("decode");
242            if decoded
243                .first()
244                .is_some_and(|value| value.parse::<i64>().is_err())
245            {
246                // The FBref export repeats header rows ("Rk,Player,…") within the dataset; skip them.
247                continue;
248            }
249            schema.apply_replacements_to_row(&mut decoded);
250            let typed = crate::rows::parse_typed_row(&schema, &decoded).expect("parse typed row");
251            accumulator
252                .ingest(&schema, &decoded, &typed)
253                .expect("ingest row");
254        }
255
256        let rows = accumulator.render_rows(column_index, 3);
257        assert!(!rows.is_empty());
258        assert_eq!(rows[0][0], GOALS_COL);
259    }
260}