1use crate::residual::{plan_regression, ResidualClass, ResidualSample, ResidualStream};
54use anyhow::{anyhow, Context, Result};
55use std::collections::HashMap;
56use std::path::Path;
57
58const BASELINE_WINDOW: usize = 3;
60
61const MAX_ROWS: usize = 100_000_000;
64
65#[derive(Debug, Clone, Default)]
67pub struct GenericCsvOptions {
68 pub time_col: Option<String>,
69 pub value_col: Option<String>,
70 pub channel_col: Option<String>,
71 pub pre_residualized: bool,
72}
73
74pub fn load_generic_csv(path: &Path, opts: &GenericCsvOptions) -> Result<ResidualStream> {
80 let mut rdr = csv::Reader::from_path(path)
81 .with_context(|| format!("opening generic csv at {}", path.display()))?;
82 let headers: Vec<String> = rdr
83 .headers()
84 .context("reading CSV headers")?
85 .iter()
86 .map(str::to_owned)
87 .collect();
88 if headers.is_empty() {
89 return Err(anyhow!(
90 "generic csv at {} has no header row",
91 path.display()
92 ));
93 }
94
95 let all_rows: Vec<csv::StringRecord> = rdr
96 .records()
97 .take(MAX_ROWS)
98 .collect::<std::result::Result<Vec<_>, _>>()
99 .context("parsing generic csv rows")?;
100 if all_rows.len() < 2 {
101 return Err(anyhow!(
102 "generic csv at {} has fewer than 2 data rows; need ≥2 to compute a baseline",
103 path.display()
104 ));
105 }
106
107 let t_idx = pick_time_col(&headers, &all_rows[0], opts.time_col.as_deref())?;
108 let v_idx = pick_value_col(&headers, t_idx, opts.value_col.as_deref())?;
109 let c_idx = pick_channel_col(&headers, opts.channel_col.as_deref());
110
111 let filename = path
112 .file_name()
113 .and_then(|n| n.to_str())
114 .unwrap_or("anonymous.csv");
115 let mut stream = ResidualStream::new(format!("generic-csv@{}", filename));
116
117 let mut by_channel: HashMap<String, Vec<(f64, f64)>> = HashMap::new();
120 for rec in &all_rows {
121 let Some(t_raw) = rec.get(t_idx) else {
122 continue;
123 };
124 let Some(v_raw) = rec.get(v_idx) else {
125 continue;
126 };
127 let Ok(t) = t_raw.trim().parse::<f64>() else {
128 continue;
129 };
130 let Ok(v) = v_raw.trim().parse::<f64>() else {
131 continue;
132 };
133 if !t.is_finite() || !v.is_finite() {
134 continue;
135 }
136 let channel = c_idx
137 .and_then(|i| rec.get(i))
138 .map(str::to_owned)
139 .unwrap_or_else(|| "generic".to_string());
140 by_channel.entry(channel).or_default().push((t, v));
141 }
142
143 if by_channel.is_empty() {
144 return Err(anyhow!(
145 "generic csv at {} produced no parseable (t, value) pairs",
146 path.display()
147 ));
148 }
149
150 let mut channels_sorted: Vec<String> = by_channel.keys().cloned().collect();
151 channels_sorted.sort();
152
153 for ch in &channels_sorted {
154 let rows = by_channel.get_mut(ch).unwrap();
155 rows.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
156 if opts.pre_residualized {
157 for (t, r) in rows.iter() {
158 stream.push(
159 ResidualSample::new(*t, ResidualClass::PlanRegression, *r)
160 .with_channel(ch.clone()),
161 );
162 }
163 } else {
164 if rows.len() <= BASELINE_WINDOW {
165 continue;
166 }
167 let baseline: f64 =
168 rows.iter().take(BASELINE_WINDOW).map(|(_, v)| *v).sum::<f64>()
169 / BASELINE_WINDOW as f64;
170 debug_assert!(baseline.is_finite(), "finite baseline from finite inputs");
171 for (i, (t, v)) in rows.iter().enumerate() {
172 if i < BASELINE_WINDOW {
173 continue;
174 }
175 plan_regression::push_latency(&mut stream, *t, ch, *v, baseline);
176 }
177 }
178 }
179
180 if stream.is_empty() {
181 return Err(anyhow!(
182 "generic csv at {} produced no residuals (every channel had < {} + 1 rows)",
183 path.display(),
184 BASELINE_WINDOW
185 ));
186 }
187
188 stream.sort();
189 Ok(stream)
190}
191
192fn pick_time_col(
193 headers: &[String],
194 first_row: &csv::StringRecord,
195 override_name: Option<&str>,
196) -> Result<usize> {
197 if let Some(name) = override_name {
198 return find_header(headers, name)
199 .ok_or_else(|| anyhow!("--time-col '{}' not found in {:?}", name, headers));
200 }
201 let tokens = ["t", "time", "timestamp", "ts"];
202 for (i, h) in headers.iter().enumerate() {
203 let lo = h.to_ascii_lowercase();
204 if tokens.iter().any(|tok| lo == *tok || lo.contains(tok)) {
205 return Ok(i);
206 }
207 }
208 for (i, cell) in first_row.iter().enumerate() {
209 if cell.trim().parse::<f64>().is_ok() {
210 return Ok(i);
211 }
212 }
213 Err(anyhow!(
214 "could not auto-detect a timestamp column in {:?}; pass --time-col <name>",
215 headers
216 ))
217}
218
219fn pick_value_col(
220 headers: &[String],
221 t_idx: usize,
222 override_name: Option<&str>,
223) -> Result<usize> {
224 if let Some(name) = override_name {
225 return find_header(headers, name)
226 .ok_or_else(|| anyhow!("--value-col '{}' not found in {:?}", name, headers));
227 }
228 let key_tokens = ["id", "key", "uuid", "hash", "channel", "group", "qclass", "series"];
229 for (i, h) in headers.iter().enumerate() {
230 if i == t_idx {
231 continue;
232 }
233 let lo = h.to_ascii_lowercase();
234 if key_tokens.iter().any(|tok| lo == *tok) {
235 continue;
236 }
237 let value_tokens = ["value", "residual", "latency", "metric", "amount", "v", "y"];
238 if value_tokens.iter().any(|tok| lo == *tok || lo.contains(tok)) {
239 return Ok(i);
240 }
241 }
242 for (i, h) in headers.iter().enumerate() {
243 if i == t_idx {
244 continue;
245 }
246 let lo = h.to_ascii_lowercase();
247 if key_tokens.iter().any(|tok| lo == *tok) {
248 continue;
249 }
250 return Ok(i);
251 }
252 Err(anyhow!(
253 "could not auto-detect a value column in {:?}; pass --value-col <name>",
254 headers
255 ))
256}
257
258fn pick_channel_col(headers: &[String], override_name: Option<&str>) -> Option<usize> {
259 if let Some(name) = override_name {
260 return find_header(headers, name);
261 }
262 let tokens = ["channel", "qclass", "group", "series"];
263 for (i, h) in headers.iter().enumerate() {
264 let lo = h.to_ascii_lowercase();
265 if tokens.iter().any(|tok| lo == *tok) {
266 return Some(i);
267 }
268 }
269 None
270}
271
272fn find_header(headers: &[String], name: &str) -> Option<usize> {
273 headers.iter().position(|h| h.eq_ignore_ascii_case(name))
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use std::io::Write;
280
281 fn tmp_csv(content: &str) -> tempfile::NamedTempFile {
282 let mut f = tempfile::Builder::new()
283 .suffix(".csv")
284 .tempfile()
285 .expect("tempfile");
286 f.write_all(content.as_bytes()).expect("write");
287 f
288 }
289
290 #[test]
291 fn autodetects_time_value_single_channel() {
292 let csv = "t,value\n0,1.0\n1,1.0\n2,1.0\n3,1.5\n4,2.0\n5,2.5\n";
293 let f = tmp_csv(csv);
294 let s = load_generic_csv(f.path(), &GenericCsvOptions::default()).expect("load");
295 assert_eq!(s.len(), 3);
296 assert!(s
297 .samples
298 .iter()
299 .all(|r| r.class == ResidualClass::PlanRegression));
300 assert!(s.samples.iter().all(|r| r.channel.as_deref() == Some("generic")));
301 }
302
303 #[test]
304 fn uses_channel_column_when_present() {
305 let csv = "time,channel,y\n0,a,1\n1,a,1\n2,a,1\n3,a,2\n0,b,2\n1,b,2\n2,b,2\n3,b,3\n";
306 let f = tmp_csv(csv);
307 let s = load_generic_csv(f.path(), &GenericCsvOptions::default()).expect("load");
308 let channels: std::collections::BTreeSet<_> = s
309 .samples
310 .iter()
311 .filter_map(|r| r.channel.clone())
312 .collect();
313 assert!(channels.contains("a"));
314 assert!(channels.contains("b"));
315 }
316
317 #[test]
318 fn pre_residualized_skips_baseline() {
319 let csv = "t,residual\n0,0.1\n1,0.2\n2,0.3\n";
320 let f = tmp_csv(csv);
321 let s = load_generic_csv(
322 f.path(),
323 &GenericCsvOptions {
324 pre_residualized: true,
325 ..Default::default()
326 },
327 )
328 .expect("load");
329 assert_eq!(s.len(), 3);
330 }
331
332 #[test]
333 fn explicit_overrides_are_honoured() {
334 let csv = "alpha,beta,gamma\n0,1.0,x\n1,1.0,x\n2,1.0,x\n3,1.5,x\n4,2.0,x\n5,2.5,x\n";
335 let f = tmp_csv(csv);
336 let s = load_generic_csv(
337 f.path(),
338 &GenericCsvOptions {
339 time_col: Some("alpha".into()),
340 value_col: Some("beta".into()),
341 channel_col: Some("gamma".into()),
342 pre_residualized: false,
343 },
344 )
345 .expect("load");
346 assert!(s
347 .samples
348 .iter()
349 .all(|r| r.channel.as_deref() == Some("x")));
350 }
351
352 #[test]
353 fn rejects_csv_with_one_row() {
354 let csv = "t,value\n0,1.0\n";
355 let f = tmp_csv(csv);
356 let err = load_generic_csv(f.path(), &GenericCsvOptions::default()).unwrap_err();
357 assert!(err.to_string().contains("fewer than 2"));
358 }
359}