csv_managed/
stats.rs

1use std::collections::HashMap;
2
3use anyhow::{Context, Result, anyhow};
4use encoding_rs::Encoding;
5use log::info;
6
7use crate::{
8    cli::StatsArgs,
9    data::{Value, parse_typed_value},
10    io_utils,
11    schema::{self, ColumnType, Schema},
12    table,
13};
14
15pub fn execute(args: &StatsArgs) -> Result<()> {
16    if args.schema.is_none() && io_utils::is_dash(&args.input) {
17        return Err(anyhow!(
18            "Reading from stdin requires --schema (or --meta) for typed statistics"
19        ));
20    }
21
22    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
23    let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
24
25    let schema = load_or_infer_schema(args, delimiter, encoding)?;
26
27    let columns = resolve_columns(&schema, &args.columns)?;
28    if columns.is_empty() {
29        return Err(anyhow!(
30            "No numeric columns available. Provide a schema file or explicit column list."
31        ));
32    }
33
34    let mut reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, true)?;
35    let headers = io_utils::reader_headers(&mut reader, encoding)?;
36    schema
37        .validate_headers(&headers)
38        .with_context(|| format!("Validating headers for {:?}", args.input))?;
39
40    let mut stats = StatsAccumulator::new(&columns, &schema);
41
42    for (row_idx, record) in reader.byte_records().enumerate() {
43        if args.limit > 0 && row_idx >= args.limit {
44            break;
45        }
46        let record = record.with_context(|| format!("Reading row {}", row_idx + 2))?;
47        let mut decoded = io_utils::decode_record(&record, encoding)?;
48        schema.apply_replacements_to_row(&mut decoded);
49        stats
50            .ingest(&schema, &decoded)
51            .with_context(|| format!("Processing row {}", row_idx + 2))?;
52    }
53
54    let rows = stats.render_rows();
55    let headers = vec![
56        "column".to_string(),
57        "count".to_string(),
58        "min".to_string(),
59        "max".to_string(),
60        "mean".to_string(),
61        "median".to_string(),
62        "std_dev".to_string(),
63    ];
64    table::print_table(&headers, &rows);
65    info!("Computed summary statistics for {} column(s)", rows.len());
66    Ok(())
67}
68
69fn load_or_infer_schema(
70    args: &StatsArgs,
71    delimiter: u8,
72    encoding: &'static Encoding,
73) -> Result<Schema> {
74    if let Some(path) = &args.schema {
75        Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))
76    } else {
77        schema::infer_schema(&args.input, 0, delimiter, encoding)
78            .with_context(|| format!("Inferring schema from {input:?}", input = args.input))
79    }
80}
81
82fn resolve_columns(schema: &Schema, specified: &[String]) -> Result<Vec<usize>> {
83    if specified.is_empty() {
84        Ok(schema
85            .columns
86            .iter()
87            .enumerate()
88            .filter(|(_, col)| matches!(col.datatype, ColumnType::Integer | ColumnType::Float))
89            .map(|(idx, _)| idx)
90            .collect())
91    } else {
92        specified
93            .iter()
94            .map(|name| {
95                let idx = schema
96                    .column_index(name)
97                    .ok_or_else(|| anyhow!("Column '{name}' not found in schema"))?;
98                let column = &schema.columns[idx];
99                if !matches!(column.datatype, ColumnType::Integer | ColumnType::Float) {
100                    return Err(anyhow!(
101                        "Column '{}' is type {:?} and cannot be profiled as numeric",
102                        column.output_name(),
103                        column.datatype
104                    ));
105                }
106                Ok(idx)
107            })
108            .collect()
109    }
110}
111
112struct StatsAccumulator {
113    columns: Vec<usize>,
114    data: HashMap<usize, ColumnStats>,
115}
116
117impl StatsAccumulator {
118    fn new(columns: &[usize], schema: &Schema) -> Self {
119        let mut data = HashMap::new();
120        for idx in columns {
121            let stats = ColumnStats::with_name(schema.columns[*idx].output_name().to_string());
122            data.insert(*idx, stats);
123        }
124        Self {
125            columns: columns.to_vec(),
126            data,
127        }
128    }
129
130    fn ingest(&mut self, schema: &Schema, record: &[String]) -> Result<()> {
131        for column_index in &self.columns {
132            let column = &schema.columns[*column_index];
133            let value = record.get(*column_index).map(|s| s.as_str()).unwrap_or("");
134            let normalized = column.normalize_value(value);
135            if normalized.is_empty() {
136                continue;
137            }
138            if let Some(parsed) = parse_typed_value(normalized.as_ref(), &column.datatype)
139                .with_context(|| format!("Column '{}'", column.output_name()))?
140            {
141                let numeric = match parsed {
142                    Value::Integer(i) => i as f64,
143                    Value::Float(f) => f,
144                    other => {
145                        return Err(anyhow!(
146                            "Column '{}' expected numeric type but encountered {:?}",
147                            column.output_name(),
148                            other
149                        ));
150                    }
151                };
152                if let Some(stats) = self.data.get_mut(column_index) {
153                    stats.add(numeric);
154                }
155            }
156        }
157        Ok(())
158    }
159
160    fn render_rows(&self) -> Vec<Vec<String>> {
161        let mut rows = Vec::new();
162        for column_index in &self.columns {
163            if let Some(stats) = self.data.get(column_index) {
164                rows.push(vec![
165                    stats.name.clone(),
166                    stats.count.to_string(),
167                    stats
168                        .min
169                        .map(format_number)
170                        .unwrap_or_else(|| "".to_string()),
171                    stats
172                        .max
173                        .map(format_number)
174                        .unwrap_or_else(|| "".to_string()),
175                    stats
176                        .mean()
177                        .map(format_number)
178                        .unwrap_or_else(|| "".to_string()),
179                    stats
180                        .median()
181                        .map(format_number)
182                        .unwrap_or_else(|| "".to_string()),
183                    stats
184                        .std_dev()
185                        .map(format_number)
186                        .unwrap_or_else(|| "".to_string()),
187                ]);
188            }
189        }
190        rows
191    }
192}
193
194#[derive(Default)]
195struct ColumnStats {
196    name: String,
197    values: Vec<f64>,
198    sum: f64,
199    sum_squares: f64,
200    count: usize,
201    min: Option<f64>,
202    max: Option<f64>,
203}
204
205impl ColumnStats {
206    fn with_name(name: String) -> Self {
207        Self {
208            name,
209            ..Self::default()
210        }
211    }
212
213    fn add(&mut self, value: f64) {
214        self.count += 1;
215        self.sum += value;
216        self.sum_squares += value * value;
217        self.min = Some(match self.min {
218            Some(current) => current.min(value),
219            None => value,
220        });
221        self.max = Some(match self.max {
222            Some(current) => current.max(value),
223            None => value,
224        });
225        self.values.push(value);
226    }
227
228    fn mean(&self) -> Option<f64> {
229        if self.count > 0 {
230            Some(self.sum / self.count as f64)
231        } else {
232            None
233        }
234    }
235
236    fn median(&self) -> Option<f64> {
237        if self.values.is_empty() {
238            return None;
239        }
240        let mut sorted = self.values.clone();
241        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
242        let mid = sorted.len() / 2;
243        if sorted.len().is_multiple_of(2) {
244            Some((sorted[mid - 1] + sorted[mid]) / 2.0)
245        } else {
246            Some(sorted[mid])
247        }
248    }
249
250    fn std_dev(&self) -> Option<f64> {
251        if self.count < 2 {
252            return None;
253        }
254        let mean = self.mean()?;
255        let variance =
256            (self.sum_squares - self.count as f64 * mean * mean) / (self.count as f64 - 1.0);
257        Some(variance.max(0.0).sqrt())
258    }
259}
260
261fn format_number(value: f64) -> String {
262    if value.fract() == 0.0 {
263        format!("{value:.0}")
264    } else {
265        format!("{value:.4}")
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use encoding_rs::UTF_8;
273
274    const DATA_FILE: &str = "big_5_players_stats_2023_2024.csv";
275    const GOALS_COL: &str = "Performance_Gls";
276    const ASSISTS_COL: &str = "Performance_Ast";
277
278    fn fixture_path() -> std::path::PathBuf {
279        std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
280            .join("tests")
281            .join("data")
282            .join(DATA_FILE)
283    }
284
285    #[test]
286    fn accumulator_computes_stats_for_big5_subset() {
287        let path = fixture_path();
288        assert!(path.exists(), "fixture missing: {path:?}");
289        let delimiter = crate::io_utils::resolve_input_delimiter(&path, None);
290        let mut schema =
291            crate::schema::infer_schema(&path, 200, delimiter, UTF_8).expect("infer schema");
292        let goals_index = schema.column_index(GOALS_COL).expect("goals index");
293        let assists_index = schema.column_index(ASSISTS_COL).expect("assists index");
294        schema.columns[goals_index].datatype = crate::schema::ColumnType::Integer;
295        schema.columns[assists_index].datatype = crate::schema::ColumnType::Integer;
296        let columns = vec![goals_index, assists_index];
297        let mut accumulator = StatsAccumulator::new(&columns, &schema);
298        let mut reader =
299            crate::io_utils::open_csv_reader_from_path(&path, delimiter, true).expect("open csv");
300        crate::io_utils::reader_headers(&mut reader, UTF_8).expect("headers");
301
302        for (idx, record) in reader.byte_records().enumerate() {
303            if idx >= 100 {
304                break;
305            }
306            let record = record.expect("record");
307            let decoded = crate::io_utils::decode_record(&record, UTF_8).expect("decode");
308            if accumulator.ingest(&schema, &decoded).is_err() {
309                continue;
310            }
311        }
312
313        let rows = accumulator.render_rows();
314        assert_eq!(rows.len(), columns.len());
315        let goal_stats = rows
316            .iter()
317            .find(|row| row[0] == GOALS_COL)
318            .expect("goal stats");
319        assert_ne!(goal_stats[1], "0");
320        assert!(!goal_stats[4].is_empty());
321    }
322}