Skip to main content

dsfb_database/adapters/
generic_csv.rs

1//! Generic CSV adapter — a single-domain worked example of applying
2//! `dsfb-database`'s motif grammar to a residual stream that was not
3//! captured from a SQL engine.
4//!
5//! ## What this adapter does
6//!
7//! Read an operator-supplied CSV with a timestamp column and a numeric
8//! value column (and optionally a channel column), construct a residual
9//! stream via the *same* rolling-baseline rule as the PostgreSQL
10//! `pg_stat_statements` adapter (see [`crate::adapters::postgres`]), and
11//! hand the resulting stream to the motif grammar.
12//!
13//! ## What this adapter does NOT do
14//!
15//! It does **not** validate that the operator-supplied grammar is
16//! appropriate for the input signal, nor does it claim the five-motif
17//! vocabulary has any universal meaning outside SQL telemetry. This
18//! adapter is a **worked example** that lets an operator exercise the
19//! deterministic machinery on their own residuals; it is not a
20//! generalisation claim. See the pinned non-claim in
21//! [`crate::non_claims`] that references this adapter by name.
22//!
23//! ## CSV contract
24//!
25//! Minimum: one timestamp column, one value column. The adapter
26//! auto-detects both:
27//!
28//!  * Timestamp column — first column whose header contains any of
29//!    `{"t", "time", "timestamp", "ts"}` (case-insensitive) or whose
30//!    first data row parses as an `f64`. Operators can override with
31//!    `--time-col <name>`.
32//!  * Value column — first numeric column that is not the timestamp and
33//!    whose header is not recognisably a key (`id`, `key`, `uuid`,
34//!    `hash`). Operators can override with `--value-col <name>`.
35//!  * Channel column — optional. If any column is named `channel`,
36//!    `qclass`, `group`, or `series` (case-insensitive), the adapter
37//!    emits one residual per (timestamp, channel) row; otherwise the
38//!    channel defaults to `generic`.
39//!
40//! ## Residual construction
41//!
42//! The adapter emits every row's `value` as a `ResidualClass::PlanRegression`
43//! residual in the dimensionless form `(value − baseline) / max(|baseline|, ε)`,
44//! where `baseline` is the mean of the first `BASELINE_WINDOW = 3`
45//! values on that channel. This is the *same* normalisation the
46//! PostgreSQL adapter performs; it keeps the `drift_threshold` in
47//! `spec/motifs.yaml` interpretable as a dimensionless fraction.
48//!
49//! `--pre-residualized` skips the baseline subtraction and emits
50//! `value` verbatim. Use this when the operator's pipeline already
51//! produces `(actual − expected)` residuals.
52
53use crate::residual::{plan_regression, ResidualClass, ResidualSample, ResidualStream};
54use anyhow::{anyhow, Context, Result};
55use std::collections::HashMap;
56use std::path::Path;
57
58/// Same baseline size as [`crate::adapters::postgres::BASELINE_WINDOW`].
59const BASELINE_WINDOW: usize = 3;
60
61/// Upper bound on rows the adapter reads. Matches the pg_stat_statements
62/// adapter's `MAX_PGSS_ROWS` order of magnitude so behaviour is uniform.
63const MAX_ROWS: usize = 100_000_000;
64
65/// Options for the generic CSV loader. Empty for "auto-detect everything".
66#[derive(Debug, Clone, Default)]
67pub struct GenericCsvOptions {
68    pub time_col: Option<String>,
69    pub value_col: Option<String>,
70    pub channel_col: Option<String>,
71    pub pre_residualized: bool,
72}
73
74/// Load `path` as a generic CSV and produce a typed residual stream.
75///
76/// Errors if the file is missing, has fewer than two rows, the requested
77/// columns are absent, or auto-detection cannot identify a timestamp
78/// column.
79pub fn load_generic_csv(path: &Path, opts: &GenericCsvOptions) -> Result<ResidualStream> {
80    let mut rdr = csv::Reader::from_path(path)
81        .with_context(|| format!("opening generic csv at {}", path.display()))?;
82    let headers: Vec<String> = rdr
83        .headers()
84        .context("reading CSV headers")?
85        .iter()
86        .map(str::to_owned)
87        .collect();
88    if headers.is_empty() {
89        return Err(anyhow!(
90            "generic csv at {} has no header row",
91            path.display()
92        ));
93    }
94
95    let all_rows: Vec<csv::StringRecord> = rdr
96        .records()
97        .take(MAX_ROWS)
98        .collect::<std::result::Result<Vec<_>, _>>()
99        .context("parsing generic csv rows")?;
100    if all_rows.len() < 2 {
101        return Err(anyhow!(
102            "generic csv at {} has fewer than 2 data rows; need ≥2 to compute a baseline",
103            path.display()
104        ));
105    }
106
107    let t_idx = pick_time_col(&headers, &all_rows[0], opts.time_col.as_deref())?;
108    let v_idx = pick_value_col(&headers, t_idx, opts.value_col.as_deref())?;
109    let c_idx = pick_channel_col(&headers, opts.channel_col.as_deref());
110
111    let filename = path
112        .file_name()
113        .and_then(|n| n.to_str())
114        .unwrap_or("anonymous.csv");
115    let mut stream = ResidualStream::new(format!("generic-csv@{}", filename));
116
117    // Group (t, value) pairs by channel, then emit in sorted (channel, t) order
118    // so the residual stream is byte-stable across runs.
119    let mut by_channel: HashMap<String, Vec<(f64, f64)>> = HashMap::new();
120    for rec in &all_rows {
121        let Some(t_raw) = rec.get(t_idx) else {
122            continue;
123        };
124        let Some(v_raw) = rec.get(v_idx) else {
125            continue;
126        };
127        let Ok(t) = t_raw.trim().parse::<f64>() else {
128            continue;
129        };
130        let Ok(v) = v_raw.trim().parse::<f64>() else {
131            continue;
132        };
133        if !t.is_finite() || !v.is_finite() {
134            continue;
135        }
136        let channel = c_idx
137            .and_then(|i| rec.get(i))
138            .map(str::to_owned)
139            .unwrap_or_else(|| "generic".to_string());
140        by_channel.entry(channel).or_default().push((t, v));
141    }
142
143    if by_channel.is_empty() {
144        return Err(anyhow!(
145            "generic csv at {} produced no parseable (t, value) pairs",
146            path.display()
147        ));
148    }
149
150    let mut channels_sorted: Vec<String> = by_channel.keys().cloned().collect();
151    channels_sorted.sort();
152
153    for ch in &channels_sorted {
154        let rows = by_channel.get_mut(ch).unwrap();
155        rows.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
156        if opts.pre_residualized {
157            for (t, r) in rows.iter() {
158                stream.push(
159                    ResidualSample::new(*t, ResidualClass::PlanRegression, *r)
160                        .with_channel(ch.clone()),
161                );
162            }
163        } else {
164            if rows.len() <= BASELINE_WINDOW {
165                continue;
166            }
167            let baseline: f64 =
168                rows.iter().take(BASELINE_WINDOW).map(|(_, v)| *v).sum::<f64>()
169                    / BASELINE_WINDOW as f64;
170            debug_assert!(baseline.is_finite(), "finite baseline from finite inputs");
171            for (i, (t, v)) in rows.iter().enumerate() {
172                if i < BASELINE_WINDOW {
173                    continue;
174                }
175                plan_regression::push_latency(&mut stream, *t, ch, *v, baseline);
176            }
177        }
178    }
179
180    if stream.is_empty() {
181        return Err(anyhow!(
182            "generic csv at {} produced no residuals (every channel had < {} + 1 rows)",
183            path.display(),
184            BASELINE_WINDOW
185        ));
186    }
187
188    stream.sort();
189    Ok(stream)
190}
191
192fn pick_time_col(
193    headers: &[String],
194    first_row: &csv::StringRecord,
195    override_name: Option<&str>,
196) -> Result<usize> {
197    if let Some(name) = override_name {
198        return find_header(headers, name)
199            .ok_or_else(|| anyhow!("--time-col '{}' not found in {:?}", name, headers));
200    }
201    let tokens = ["t", "time", "timestamp", "ts"];
202    for (i, h) in headers.iter().enumerate() {
203        let lo = h.to_ascii_lowercase();
204        if tokens.iter().any(|tok| lo == *tok || lo.contains(tok)) {
205            return Ok(i);
206        }
207    }
208    for (i, cell) in first_row.iter().enumerate() {
209        if cell.trim().parse::<f64>().is_ok() {
210            return Ok(i);
211        }
212    }
213    Err(anyhow!(
214        "could not auto-detect a timestamp column in {:?}; pass --time-col <name>",
215        headers
216    ))
217}
218
219fn pick_value_col(
220    headers: &[String],
221    t_idx: usize,
222    override_name: Option<&str>,
223) -> Result<usize> {
224    if let Some(name) = override_name {
225        return find_header(headers, name)
226            .ok_or_else(|| anyhow!("--value-col '{}' not found in {:?}", name, headers));
227    }
228    let key_tokens = ["id", "key", "uuid", "hash", "channel", "group", "qclass", "series"];
229    for (i, h) in headers.iter().enumerate() {
230        if i == t_idx {
231            continue;
232        }
233        let lo = h.to_ascii_lowercase();
234        if key_tokens.iter().any(|tok| lo == *tok) {
235            continue;
236        }
237        let value_tokens = ["value", "residual", "latency", "metric", "amount", "v", "y"];
238        if value_tokens.iter().any(|tok| lo == *tok || lo.contains(tok)) {
239            return Ok(i);
240        }
241    }
242    for (i, h) in headers.iter().enumerate() {
243        if i == t_idx {
244            continue;
245        }
246        let lo = h.to_ascii_lowercase();
247        if key_tokens.iter().any(|tok| lo == *tok) {
248            continue;
249        }
250        return Ok(i);
251    }
252    Err(anyhow!(
253        "could not auto-detect a value column in {:?}; pass --value-col <name>",
254        headers
255    ))
256}
257
258fn pick_channel_col(headers: &[String], override_name: Option<&str>) -> Option<usize> {
259    if let Some(name) = override_name {
260        return find_header(headers, name);
261    }
262    let tokens = ["channel", "qclass", "group", "series"];
263    for (i, h) in headers.iter().enumerate() {
264        let lo = h.to_ascii_lowercase();
265        if tokens.iter().any(|tok| lo == *tok) {
266            return Some(i);
267        }
268    }
269    None
270}
271
272fn find_header(headers: &[String], name: &str) -> Option<usize> {
273    headers.iter().position(|h| h.eq_ignore_ascii_case(name))
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use std::io::Write;
280
281    fn tmp_csv(content: &str) -> tempfile::NamedTempFile {
282        let mut f = tempfile::Builder::new()
283            .suffix(".csv")
284            .tempfile()
285            .expect("tempfile");
286        f.write_all(content.as_bytes()).expect("write");
287        f
288    }
289
290    #[test]
291    fn autodetects_time_value_single_channel() {
292        let csv = "t,value\n0,1.0\n1,1.0\n2,1.0\n3,1.5\n4,2.0\n5,2.5\n";
293        let f = tmp_csv(csv);
294        let s = load_generic_csv(f.path(), &GenericCsvOptions::default()).expect("load");
295        assert_eq!(s.len(), 3);
296        assert!(s
297            .samples
298            .iter()
299            .all(|r| r.class == ResidualClass::PlanRegression));
300        assert!(s.samples.iter().all(|r| r.channel.as_deref() == Some("generic")));
301    }
302
303    #[test]
304    fn uses_channel_column_when_present() {
305        let csv = "time,channel,y\n0,a,1\n1,a,1\n2,a,1\n3,a,2\n0,b,2\n1,b,2\n2,b,2\n3,b,3\n";
306        let f = tmp_csv(csv);
307        let s = load_generic_csv(f.path(), &GenericCsvOptions::default()).expect("load");
308        let channels: std::collections::BTreeSet<_> = s
309            .samples
310            .iter()
311            .filter_map(|r| r.channel.clone())
312            .collect();
313        assert!(channels.contains("a"));
314        assert!(channels.contains("b"));
315    }
316
317    #[test]
318    fn pre_residualized_skips_baseline() {
319        let csv = "t,residual\n0,0.1\n1,0.2\n2,0.3\n";
320        let f = tmp_csv(csv);
321        let s = load_generic_csv(
322            f.path(),
323            &GenericCsvOptions {
324                pre_residualized: true,
325                ..Default::default()
326            },
327        )
328        .expect("load");
329        assert_eq!(s.len(), 3);
330    }
331
332    #[test]
333    fn explicit_overrides_are_honoured() {
334        let csv = "alpha,beta,gamma\n0,1.0,x\n1,1.0,x\n2,1.0,x\n3,1.5,x\n4,2.0,x\n5,2.5,x\n";
335        let f = tmp_csv(csv);
336        let s = load_generic_csv(
337            f.path(),
338            &GenericCsvOptions {
339                time_col: Some("alpha".into()),
340                value_col: Some("beta".into()),
341                channel_col: Some("gamma".into()),
342                pre_residualized: false,
343            },
344        )
345        .expect("load");
346        assert!(s
347            .samples
348            .iter()
349            .all(|r| r.channel.as_deref() == Some("x")));
350    }
351
352    #[test]
353    fn rejects_csv_with_one_row() {
354        let csv = "t,value\n0,1.0\n";
355        let f = tmp_csv(csv);
356        let err = load_generic_csv(f.path(), &GenericCsvOptions::default()).unwrap_err();
357        assert!(err.to_string().contains("fewer than 2"));
358    }
359}