csv_managed/
frequency.rs

1use std::collections::HashMap;
2
3use anyhow::{Context, Result, anyhow};
4use encoding_rs::Encoding;
5use log::info;
6
7use crate::{
8    cli::FrequencyArgs,
9    data::{Value, parse_typed_value},
10    io_utils,
11    schema::{self, Schema},
12    table,
13};
14
15pub fn execute(args: &FrequencyArgs) -> Result<()> {
16    if args.schema.is_none() && io_utils::is_dash(&args.input) {
17        return Err(anyhow!(
18            "Reading from stdin requires --schema (or --meta) for frequency analysis"
19        ));
20    }
21
22    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
23    let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
24
25    let schema = load_or_infer_schema(args, delimiter, encoding)?;
26    let columns = resolve_columns(&schema, &args.columns)?;
27    if columns.is_empty() {
28        return Err(anyhow!(
29            "No columns available for frequency analysis. Supply --columns to continue."
30        ));
31    }
32
33    let mut reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, true)?;
34    let headers = io_utils::reader_headers(&mut reader, encoding)?;
35    schema
36        .validate_headers(&headers)
37        .with_context(|| format!("Validating headers for {input:?}", input = args.input))?;
38
39    let mut stats = FrequencyAccumulator::new(&columns, &schema);
40
41    for (row_idx, record) in reader.byte_records().enumerate() {
42        let record = record.with_context(|| format!("Reading row {}", row_idx + 2))?;
43        let mut decoded = io_utils::decode_record(&record, encoding)?;
44        schema.apply_replacements_to_row(&mut decoded);
45        stats
46            .ingest(&schema, &decoded)
47            .with_context(|| format!("Processing row {}", row_idx + 2))?;
48    }
49
50    let mut rows = Vec::new();
51    for column_index in &columns {
52        rows.extend(stats.render_rows(*column_index, args.top));
53    }
54
55    let headers = vec![
56        "column".to_string(),
57        "value".to_string(),
58        "count".to_string(),
59        "percent".to_string(),
60    ];
61    table::print_table(&headers, &rows);
62    info!("Computed frequency counts for {} column(s)", columns.len());
63    Ok(())
64}
65
66fn load_or_infer_schema(
67    args: &FrequencyArgs,
68    delimiter: u8,
69    encoding: &'static Encoding,
70) -> Result<Schema> {
71    if let Some(path) = &args.schema {
72        Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))
73    } else {
74        schema::infer_schema(&args.input, 0, delimiter, encoding)
75            .with_context(|| format!("Inferring schema from {input:?}", input = args.input))
76    }
77}
78
79fn resolve_columns(schema: &Schema, specified: &[String]) -> Result<Vec<usize>> {
80    if specified.is_empty() {
81        Ok((0..schema.columns.len()).collect())
82    } else {
83        specified
84            .iter()
85            .map(|name| {
86                schema
87                    .column_index(name)
88                    .ok_or_else(|| anyhow!("Column '{name}' not found in schema"))
89            })
90            .collect()
91    }
92}
93
94struct FrequencyAccumulator {
95    columns: Vec<usize>,
96    totals: HashMap<usize, usize>,
97    counts: HashMap<usize, HashMap<String, usize>>,
98    names: HashMap<usize, String>,
99}
100
101impl FrequencyAccumulator {
102    fn new(columns: &[usize], schema: &Schema) -> Self {
103        let mut totals = HashMap::new();
104        let mut counts = HashMap::new();
105        let mut names = HashMap::new();
106        for idx in columns {
107            totals.insert(*idx, 0);
108            counts.insert(*idx, HashMap::new());
109            names.insert(*idx, schema.columns[*idx].output_name().to_string());
110        }
111        Self {
112            columns: columns.to_vec(),
113            totals,
114            counts,
115            names,
116        }
117    }
118
119    fn ingest(&mut self, schema: &Schema, record: &[String]) -> Result<()> {
120        for column_index in &self.columns {
121            let column = &schema.columns[*column_index];
122            let raw = record.get(*column_index).map(|s| s.as_str()).unwrap_or("");
123            let normalized = column.normalize_value(raw);
124            let value = if normalized.is_empty() {
125                String::from("<empty>")
126            } else {
127                match parse_typed_value(normalized.as_ref(), &column.datatype)
128                    .with_context(|| format!("Column '{}'", column.output_name()))?
129                {
130                    Some(Value::String(s)) => s,
131                    Some(Value::Integer(i)) => i.to_string(),
132                    Some(Value::Float(f)) => format_number(f),
133                    Some(Value::Boolean(b)) => b.to_string(),
134                    Some(Value::Date(d)) => d.format("%Y-%m-%d").to_string(),
135                    Some(Value::DateTime(dt)) => dt.format("%Y-%m-%d %H:%M:%S").to_string(),
136                    Some(Value::Guid(g)) => g.to_string(),
137                    None => String::from("<empty>"),
138                }
139            };
140            let total = self
141                .totals
142                .get_mut(column_index)
143                .expect("Column should exist in totals");
144            *total += 1;
145            let counter = self
146                .counts
147                .get_mut(column_index)
148                .expect("Column should exist in counts");
149            *counter.entry(value).or_insert(0) += 1;
150        }
151        Ok(())
152    }
153
154    fn render_rows(&self, column_index: usize, top: usize) -> Vec<Vec<String>> {
155        let total = match self.totals.get(&column_index) {
156            Some(total) if *total > 0 => *total,
157            _ => return Vec::new(),
158        };
159        let mut items = self
160            .counts
161            .get(&column_index)
162            .cloned()
163            .unwrap_or_default()
164            .into_iter()
165            .collect::<Vec<_>>();
166        items.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
167        if top > 0 && items.len() > top {
168            items.truncate(top);
169        }
170        items
171            .into_iter()
172            .map(|(value, count)| {
173                let percent = (count as f64 / total as f64) * 100.0;
174                vec![
175                    self.names
176                        .get(&column_index)
177                        .cloned()
178                        .unwrap_or_else(|| column_index.to_string()),
179                    value,
180                    count.to_string(),
181                    format!("{percent:.2}%"),
182                ]
183            })
184            .collect()
185    }
186}
187
188fn format_number(value: f64) -> String {
189    if value.fract() == 0.0 {
190        format!("{value:.0}")
191    } else {
192        format!("{value:.4}")
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use encoding_rs::UTF_8;
200
201    const DATA_FILE: &str = "big_5_players_stats_2023_2024.csv";
202    const GOALS_COL: &str = "Performance_Gls";
203
204    fn fixture_path() -> std::path::PathBuf {
205        std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
206            .join("tests")
207            .join("data")
208            .join(DATA_FILE)
209    }
210
211    #[test]
212    fn accumulator_counts_goal_totals() {
213        let path = fixture_path();
214        assert!(path.exists(), "fixture missing: {path:?}");
215        let delimiter = crate::io_utils::resolve_input_delimiter(&path, None);
216        let schema =
217            crate::schema::infer_schema(&path, 200, delimiter, UTF_8).expect("infer schema");
218        let column_index = schema.column_index(GOALS_COL).expect("column index");
219        let mut accumulator = FrequencyAccumulator::new(&[column_index], &schema);
220        let mut reader =
221            crate::io_utils::open_csv_reader_from_path(&path, delimiter, true).expect("open csv");
222        crate::io_utils::reader_headers(&mut reader, UTF_8).expect("headers");
223
224        for (idx, record) in reader.byte_records().enumerate() {
225            if idx >= 100 {
226                break;
227            }
228            let record = record.expect("record");
229            let decoded = crate::io_utils::decode_record(&record, UTF_8).expect("decode");
230            accumulator.ingest(&schema, &decoded).expect("ingest row");
231        }
232
233        let rows = accumulator.render_rows(column_index, 3);
234        assert!(!rows.is_empty());
235        assert_eq!(rows[0][0], GOALS_COL);
236    }
237}