Skip to main content

dsfb_gpu_debug_demo/cli/
ingest.rs

1//! S-REAL.1 — Deterministic ingest of `# residual-projection v2` TSV fixtures.
2//!
3//! WHY: The S-REAL.1 audit gauntlet runs DSFB-GPU on real public datasets
4//! whose canonical released form is **pre-projected residuals** in a
5//! TAB-delimited text file with `# key=value` comment headers. DSFB-GPU's
6//! production pipeline normally takes a `Vec<TraceEvent>` (trace-event
7//! stream) and projects events into residuals via the window-feature kernel.
8//! The TSV fixtures are already past that projection stage. To run the
9//! deterministic engine on this form without modifying the dispatcher, we
10//! deterministically **lower** each (window, signal) cell into a synthetic
11//! `TraceEvent` whose latency value carries the residual magnitude. The
12//! lowering rule is byte-replayable: same TSV bytes → same `Vec<TraceEvent>`
13//! → same `CaseFile`.
14//!
15//! Discipline:
16//! - **No probabilistic ingest**: every cell maps to one event by a fixed rule.
17//! - **No silent NaN handling**: NaN cells are skipped (no event emitted for
18//!   that cell), and the count of skipped cells is reported in the
19//!   `IngestReport` so the audit report can disclose the projection loss.
20//! - **SHA-256 byte-pin**: the loader returns an error if the file bytes
21//!   do not hash to the expected pin. The audit's `dataset_manifest.toml`
22//!   records the pin alongside the upstream DOI/URL.
23//! - **No domain-truth claim**: the lowering interprets cell values as
24//!   "milliseconds-scaled signal magnitude" and reports that convention in
25//!   the audit; we make no claim about what the cell value "is" in the
26//!   upstream domain.
27//!
28//! Non-claims (preserved into the audit's `limitations.md`):
29//! - This loader does NOT recover the upstream's original trace events. It
30//!   produces a deterministic event sequence whose post-projection residual
31//!   matches the fixture's residual at each cell, under the documented
32//!   lowering rule.
33//! - This loader does NOT validate the upstream dataset's labels, semantics,
34//!   ground truth, or fitness for any downstream use.
35//!
36//! License: Apache-2.0. Background IP: Invariant Forge LLC.
37
38use std::collections::BTreeMap;
39use std::fmt;
40
41use dsfb_gpu_debug_core::event::TraceEvent;
42use dsfb_gpu_debug_core::hash::sha256;
43
44/// Parsed `# residual-projection v2` fixture.
45///
46/// WHY: The TSV header carries provenance metadata (DOI, URL, archive
47/// SHA-256, num_windows, num_signals, healthy_window_end, license) that the
48/// audit's `dataset_manifest.toml` must cite verbatim, and the data rows
49/// are the residual projection that the event-lowering rule consumes.
50/// Carrying both in one struct keeps the metadata + data co-located so the
51/// audit report cannot accidentally claim metadata from one fixture against
52/// data from another.
53///
54/// `Eq` is intentionally not derived because cell values are `f64` (NaN is
55/// already collapsed to `None` by the parser, so the remaining floats are
56/// finite, but f64 still rejects `Eq` at the type level). `PartialEq` is
57/// sufficient for the byte-replay determinism we test.
58#[derive(Clone, PartialEq, Debug, Default)]
59pub struct ResidualProjectionFixture {
60    /// SHA-256 of the input file bytes (hex lowercase, 64 chars). This is
61    /// the load-bearing provenance pin: the loader rejects mismatched
62    /// bytes before parsing, so a corrupted fixture cannot silently
63    /// produce a valid-looking CaseFile.
64    pub fixture_sha256_hex: String,
65    /// Every `# key=value` comment line collected in encounter order then
66    /// re-sorted into a BTreeMap for deterministic iteration. Keys without
67    /// `=` (e.g. raw comment text) are NOT recorded; the audit's
68    /// `dataset_manifest.toml` mirrors only key=value entries.
69    pub metadata: BTreeMap<String, String>,
70    /// Declared row count from the `# num_windows=` header. May not equal
71    /// `rows.len()`: some fixtures publish a num_windows < observed rows
72    /// (extra metadata-noise rows past the declared count). The loader
73    /// reports both; the lowering uses the observed `rows.len()`.
74    pub declared_num_windows: u32,
75    /// Declared column count from the `# num_signals=` header. Must equal
76    /// every data row's column count; the parser rejects rows that diverge.
77    pub declared_num_signals: u32,
78    /// Declared healthy-window boundary from `# healthy_window_end=`.
79    /// Recorded for the audit report; the loader does NOT use it to split
80    /// the data (the dispatcher's bank stage performs admission, not
81    /// pre-classification by the loader).
82    pub declared_healthy_window_end: u32,
83    /// Window-major × signal-minor matrix of cell values. `None` represents
84    /// a `nan` token in the TSV — the lowering rule skips these cells.
85    pub rows: Vec<Vec<Option<f64>>>,
86}
87
88/// What the loader did. Surfaced into the audit report so the human reader
89/// can see exactly how many cells were skipped (NaN), how many events were
90/// emitted, and what the input/output shapes were.
91///
92/// WHY: The audit's central honesty claim is "DSFB-GPU saw exactly this".
93/// A summary that hides cell-skip counts would let a future engineer
94/// over-interpret the case file; making the projection loss explicit
95/// prevents that.
96#[derive(Clone, Eq, PartialEq, Debug, Default)]
97pub struct IngestReport {
98    pub fixture_sha256_hex: String,
99    pub fixture_byte_size: u64,
100    pub declared_num_windows: u32,
101    pub declared_num_signals: u32,
102    pub observed_num_windows: u32,
103    pub observed_num_signals: u32,
104    pub nan_cell_count: u32,
105    pub finite_cell_count: u32,
106    pub emitted_event_count: u32,
107}
108
109/// Loader errors. Every variant has an explicit message so the CLI can
110/// surface a human-actionable diagnosis without inventing free-form text.
111#[derive(Clone, Eq, PartialEq, Debug)]
112pub enum IngestError {
113    EmptyFile,
114    NotUtf8 {
115        byte_offset: usize,
116    },
117    MissingHeader {
118        key: &'static str,
119    },
120    MalformedNumericHeader {
121        key: String,
122        value: String,
123    },
124    RowColumnCountMismatch {
125        line: usize,
126        expected: usize,
127        found: usize,
128    },
129    BadCell {
130        line: usize,
131        column: usize,
132        token: String,
133    },
134    NoDataRows,
135    Sha256Mismatch {
136        expected: String,
137        actual: String,
138    },
139    Sha256NotLowerHex64 {
140        provided: String,
141    },
142}
143
144impl fmt::Display for IngestError {
145    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146        match self {
147            Self::EmptyFile => write!(f, "fixture file is empty"),
148            Self::NotUtf8 { byte_offset } => {
149                write!(
150                    f,
151                    "fixture file is not UTF-8 (invalid at byte {byte_offset})"
152                )
153            }
154            Self::MissingHeader { key } => {
155                write!(f, "fixture missing required `# {key}=...` header")
156            }
157            Self::MalformedNumericHeader { key, value } => write!(
158                f,
159                "fixture header `{key}` is not a non-negative integer: {value:?}"
160            ),
161            Self::RowColumnCountMismatch {
162                line,
163                expected,
164                found,
165            } => write!(
166                f,
167                "fixture data row on line {line} has {found} columns; \
168                 declared num_signals = {expected}"
169            ),
170            Self::BadCell {
171                line,
172                column,
173                token,
174            } => write!(
175                f,
176                "fixture cell on line {line} column {column} is neither a \
177                 finite float nor `nan`: {token:?}"
178            ),
179            Self::NoDataRows => write!(
180                f,
181                "fixture file contains comment headers but no TAB-delimited \
182                 data rows"
183            ),
184            Self::Sha256Mismatch { expected, actual } => write!(
185                f,
186                "fixture SHA-256 mismatch (expected {expected}, computed {actual})"
187            ),
188            Self::Sha256NotLowerHex64 { provided } => write!(
189                f,
190                "expected fixture SHA-256 must be 64 lowercase hex characters; \
191                 got {provided:?}"
192            ),
193        }
194    }
195}
196
197/// Parameters for the deterministic event-lowering rule.
198///
199/// WHY: Each panel-locked S-REAL.1 dataset uses a slightly different cell-
200/// value scale (TADBench latency_p50_ms ≈ 50.0; AIOps KPI ≈ 0.04). We
201/// expose `value_to_microsecond_scale` so the audit report can record the
202/// scale-factor per dataset rather than baking a single magic number into
203/// the lowering. The default `value_to_microsecond_scale = 1000.0` treats
204/// cell values as milliseconds and produces microseconds for the
205/// `TraceEvent.latency_us` field.
206#[derive(Clone, Eq, PartialEq, Debug)]
207pub struct LoweringConfig {
208    /// `latency_us = clamp(value * scale, 0, latency_clamp_us)`. The
209    /// audit's `schema_map.toml` records this value.
210    pub value_to_microsecond_scale: u32,
211    /// Upper bound for latency_us. Matches the contract's
212    /// `latency_clamp_ms * 1000`. Default 32,767,000 us (32.767 seconds).
213    pub latency_clamp_us: u32,
214    /// Window size in nanoseconds. Default 1_000_000_000 ns = 1 s, which
215    /// matches `Contract::canonical().window_size_ms = 1000`. The lowering
216    /// emits `ts_ns = window_index as u64 * window_size_ns`.
217    pub window_size_ns: u64,
218}
219
220impl Default for LoweringConfig {
221    fn default() -> Self {
222        Self {
223            value_to_microsecond_scale: 1000,
224            latency_clamp_us: 32_767_000,
225            window_size_ns: 1_000_000_000,
226        }
227    }
228}
229
230/// Convert a SHA-256 byte array to lowercase hex.
231///
232/// WHY: The MANIFEST pin format and every audit receipt uses lowercase hex;
233/// returning hex from one place keeps the conversion canonical and lets
234/// the caller compare directly against the pinned constant without
235/// case-folding round-trips.
236#[must_use]
237pub fn sha256_to_hex_lower(bytes: &[u8; 32]) -> String {
238    const HEX: &[u8; 16] = b"0123456789abcdef";
239    let mut out = String::with_capacity(64);
240    for b in bytes {
241        out.push(HEX[(b >> 4) as usize] as char);
242        out.push(HEX[(b & 0x0f) as usize] as char);
243    }
244    out
245}
246
247/// Verify file bytes hash to the expected pin.
248///
249/// WHY: The S-REAL.1 audit's load-bearing provenance claim is "DSFB
250/// processed these specific bytes". The loader rejects mismatched bytes
251/// before any parsing happens, so a corrupted fixture cannot produce a
252/// valid-looking but actually-wrong CaseFile that later replays cleanly.
253///
254/// # Errors
255///
256/// Returns `Sha256NotLowerHex64` if `expected_hex` isn't exactly 64
257/// lowercase hex characters; `Sha256Mismatch` if the bytes don't match.
258pub fn verify_fixture_sha256(bytes: &[u8], expected_hex: &str) -> Result<String, IngestError> {
259    if expected_hex.len() != 64 || !expected_hex.bytes().all(|c| c.is_ascii_hexdigit()) {
260        return Err(IngestError::Sha256NotLowerHex64 {
261            provided: expected_hex.to_string(),
262        });
263    }
264    if !expected_hex.bytes().all(|c| !c.is_ascii_uppercase()) {
265        return Err(IngestError::Sha256NotLowerHex64 {
266            provided: expected_hex.to_string(),
267        });
268    }
269    let actual = sha256_to_hex_lower(&sha256(bytes));
270    if actual != expected_hex {
271        return Err(IngestError::Sha256Mismatch {
272            expected: expected_hex.to_string(),
273            actual,
274        });
275    }
276    Ok(actual)
277}
278
279/// Parse a `# residual-projection v2` TSV fixture.
280///
281/// WHY: This is the single entry point for ingesting an S-REAL.1 dataset.
282/// It performs the SHA-256 pin check first (no surprises after bytes are
283/// trusted), then walks the file once collecting metadata and data rows.
284/// The loader is intentionally pedantic: it rejects malformed headers,
285/// row-column mismatches, and non-finite/non-nan cell tokens by structural
286/// error variants so the audit report can attribute any failure to a
287/// specific (line, column) pair.
288///
289/// # Errors
290///
291/// Returns the matching `IngestError` variant if the file is empty, not
292/// UTF-8, missing a required header, has malformed numeric headers,
293/// has rows whose column count diverges from `num_signals`, or has cell
294/// tokens that are neither finite floats nor `nan`.
295pub fn load_residual_projection_tsv(
296    bytes: &[u8],
297    expected_sha256_hex: &str,
298) -> Result<ResidualProjectionFixture, IngestError> {
299    if bytes.is_empty() {
300        return Err(IngestError::EmptyFile);
301    }
302    let actual_hex = verify_fixture_sha256(bytes, expected_sha256_hex)?;
303
304    let text = match std::str::from_utf8(bytes) {
305        Ok(s) => s,
306        Err(e) => {
307            return Err(IngestError::NotUtf8 {
308                byte_offset: e.valid_up_to(),
309            });
310        }
311    };
312
313    let mut metadata: BTreeMap<String, String> = BTreeMap::new();
314    let mut rows: Vec<Vec<Option<f64>>> = Vec::new();
315    let mut expected_columns: Option<usize> = None;
316
317    for (idx, raw_line) in text.lines().enumerate() {
318        let line_no = idx + 1;
319        let line = raw_line.trim_end_matches('\r');
320        if line.is_empty() {
321            continue;
322        }
323        if let Some(rest) = line.strip_prefix('#') {
324            // Metadata line. Format: `# key=value` (extra whitespace
325            // after `#` is tolerated). Lines without `=` are silently
326            // skipped — they are commentary text, not metadata.
327            let trimmed = rest.trim_start();
328            if let Some((k, v)) = trimmed.split_once('=') {
329                metadata.insert(k.trim().to_string(), v.trim().to_string());
330            } else if trimmed.is_empty() {
331                // bare `#` is fine; ignore
332            } else if trimmed.starts_with("residual-projection") {
333                // bare format-marker line, no `=` expected
334                metadata.insert("format".to_string(), trimmed.to_string());
335            }
336            // Other bare comment text is ignored without error.
337            continue;
338        }
339
340        // Data row. TAB-split, parse each cell as f64 or `nan`.
341        let cols: Vec<&str> = line.split('\t').collect();
342        let cell_count = cols.len();
343        if let Some(prev) = expected_columns {
344            if cell_count != prev {
345                return Err(IngestError::RowColumnCountMismatch {
346                    line: line_no,
347                    expected: prev,
348                    found: cell_count,
349                });
350            }
351        } else {
352            expected_columns = Some(cell_count);
353        }
354
355        let mut row: Vec<Option<f64>> = Vec::with_capacity(cell_count);
356        for (col_idx, tok) in cols.iter().enumerate() {
357            let token = tok.trim();
358            if token.eq_ignore_ascii_case("nan") {
359                row.push(None);
360            } else {
361                match token.parse::<f64>() {
362                    Ok(v) if v.is_finite() => row.push(Some(v)),
363                    _ => {
364                        return Err(IngestError::BadCell {
365                            line: line_no,
366                            column: col_idx + 1,
367                            token: token.to_string(),
368                        });
369                    }
370                }
371            }
372        }
373        rows.push(row);
374    }
375
376    if rows.is_empty() {
377        return Err(IngestError::NoDataRows);
378    }
379
380    let declared_num_windows = require_u32_header(&metadata, "num_windows")?;
381    let declared_num_signals = require_u32_header(&metadata, "num_signals")?;
382    let declared_healthy_window_end = require_u32_header(&metadata, "healthy_window_end")?;
383
384    // Cross-check: every row must have `declared_num_signals` columns.
385    let observed_cols = expected_columns.unwrap_or(0);
386    if observed_cols != declared_num_signals as usize {
387        return Err(IngestError::RowColumnCountMismatch {
388            line: 0,
389            expected: declared_num_signals as usize,
390            found: observed_cols,
391        });
392    }
393
394    Ok(ResidualProjectionFixture {
395        fixture_sha256_hex: actual_hex,
396        metadata,
397        declared_num_windows,
398        declared_num_signals,
399        declared_healthy_window_end,
400        rows,
401    })
402}
403
404fn require_u32_header(
405    meta: &BTreeMap<String, String>,
406    key: &'static str,
407) -> Result<u32, IngestError> {
408    let v = meta
409        .get(key)
410        .ok_or(IngestError::MissingHeader { key })?
411        .trim();
412    v.parse::<u32>()
413        .map_err(|_| IngestError::MalformedNumericHeader {
414            key: key.to_string(),
415            value: v.to_string(),
416        })
417}
418
419/// Apply the panel-locked deterministic lowering rule.
420///
421/// WHY: One cell → at most one event. NaN cells produce no event. The rule
422/// is intentionally simple and replayable: future engineers (including the
423/// user months later) can read this function and know exactly what events
424/// the dispatcher saw for any given TSV cell. The audit's `schema_map.toml`
425/// records the lowering parameters so the route from cell value to event
426/// bytes is fully cited.
427///
428/// Lowering law:
429///
430/// ```text
431/// For each (window_idx, signal_idx, value) in rows.iter().enumerate()
432///                                              .flat_map(|(w, row)| row.iter().enumerate()
433///                                                                   .map(move |(s, v)| (w, s, v))):
434///   if value is None (nan): skip; no event emitted
435///   else:
436///     ts_ns         = window_idx * config.window_size_ns
437///     entity_id     = signal_idx
438///     route_id      = 0
439///     span_id       = window_idx * 65536 + signal_idx           (deterministic, unique per cell)
440///     parent_span_id = 0
441///     latency_us    = clamp(value * config.value_to_microsecond_scale,
442///                           0, config.latency_clamp_us)
443///     status_code   = 200                                       (canonical "ok")
444///     error_code    = 0                                         (no error flag from projection)
445///     event_kind    = 0
446///     flags         = 0
447/// ```
448///
449/// The returned events are in row-major × column-major canonical order
450/// (window ascending, signal ascending within window). The dispatcher's
451/// window-feature kernel groups by `entity_id` × `window_index`, so the
452/// emit order does not affect the resulting case file; we still pin the
453/// order for byte-level replay determinism.
454#[must_use]
455#[allow(
456    clippy::cast_precision_loss,
457    clippy::cast_possible_truncation,
458    clippy::cast_sign_loss,
459    reason = "Cell-value lowering is deterministic-by-construction and recorded \
460              in audit_report.html section 2; precision / sign / truncation \
461              losses are intentional under the documented rule."
462)]
463pub fn lower_to_trace_events(
464    fixture: &ResidualProjectionFixture,
465    config: &LoweringConfig,
466) -> Vec<TraceEvent> {
467    let mut events: Vec<TraceEvent> = Vec::new();
468    let scale = config.value_to_microsecond_scale as u64;
469    let clamp = config.latency_clamp_us;
470    let win_ns = config.window_size_ns;
471
472    for (w_idx, row) in fixture.rows.iter().enumerate() {
473        let ts_ns = (w_idx as u64).saturating_mul(win_ns);
474        for (s_idx, cell) in row.iter().enumerate() {
475            let Some(v) = cell else {
476                continue;
477            };
478            // Deterministic clamp via integer math: floor(v * scale)
479            // with non-negative saturation to [0, clamp].
480            let latency_us = if !v.is_finite() || *v <= 0.0 {
481                0_u32
482            } else {
483                // Multiply in f64 then cast; the audit report records this
484                // explicitly so the conversion is non-mysterious. The
485                // `scale` constant is well under 2^52 (default 1000) so
486                // the f64 mantissa losses are not a concern in practice.
487                let scaled = *v * scale as f64;
488                if scaled >= clamp as f64 {
489                    clamp
490                } else {
491                    scaled as u32
492                }
493            };
494            let span_id = (w_idx as u64).saturating_mul(65_536) + s_idx as u64;
495            events.push(TraceEvent::new(
496                ts_ns,
497                s_idx as u32, // entity_id
498                0,            // route_id
499                span_id,
500                0, // parent_span_id
501                latency_us,
502                200, // status_code (ok)
503                0,   // error_code
504                0,   // event_kind
505                0,   // flags
506            ));
507        }
508    }
509    events
510}
511
512/// Build the ingest report from a fixture + emitted events.
513///
514/// WHY: Audit transparency. The HTML report shows the operator exactly how
515/// many cells were skipped and how many events were emitted so they can
516/// see the projection's information loss before reading any episode list.
517#[must_use]
518pub fn build_ingest_report(
519    fixture: &ResidualProjectionFixture,
520    events: &[TraceEvent],
521    fixture_byte_size: u64,
522) -> IngestReport {
523    let observed_windows = fixture.rows.len() as u32;
524    let observed_signals = fixture.declared_num_signals;
525    let total_cells = fixture.rows.iter().map(Vec::len).sum::<usize>() as u32;
526    let nan_cell_count = fixture
527        .rows
528        .iter()
529        .flat_map(|r| r.iter())
530        .filter(|c| c.is_none())
531        .count() as u32;
532    IngestReport {
533        fixture_sha256_hex: fixture.fixture_sha256_hex.clone(),
534        fixture_byte_size,
535        declared_num_windows: fixture.declared_num_windows,
536        declared_num_signals: fixture.declared_num_signals,
537        observed_num_windows: observed_windows,
538        observed_num_signals: observed_signals,
539        nan_cell_count,
540        finite_cell_count: total_cells.saturating_sub(nan_cell_count),
541        emitted_event_count: events.len() as u32,
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548
549    // SHA-256 pin for the vendored `data/fixtures/aiops_challenge.tsv`.
550    // Mirrors the `aiops_kpi` entry in `s_real_audit::AUDIT_DATASETS`.
551    // Refreshed at S-REAL.3.1.2 — the prior hard-coded value
552    // (29961b8b6...) had become stale relative to the on-disk
553    // fixture, breaking 4 ingest unit tests in the post-PAPER.1c
554    // baseline. The mirror is now load-bearing: the
555    // `audit_dataset_tier_dir_matches_bundle_manifest` test in
556    // `s_real_audit::tests` cross-validates the audit driver's table
557    // against `reports/s_real_3/bundle_manifest.toml`; a future SHA
558    // change to the AIOps fixture requires refreshing both pins
559    // (this constant AND `AUDIT_DATASETS[aiops_kpi].fixture_sha256_hex`)
560    // in the same atomic commit.
561    const PIN_AIOPS: &str = "be17110ebe6647d00fad79dc1ca69b1b01b22788773202bad6e3322e97b0602e";
562
563    fn aiops_path() -> std::path::PathBuf {
564        // Resolve the vendored AIOps fixture relative to this crate's
565        // manifest dir so unit tests do not depend on any external
566        // repository being mounted at a fixed absolute path. The
567        // workspace root is two levels up from the crate manifest dir;
568        // the fixture lives at `<workspace>/data/fixtures/...`.
569        let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR");
570        std::path::PathBuf::from(manifest_dir)
571            .parent()
572            .and_then(std::path::Path::parent)
573            .expect("workspace root")
574            .join("data/fixtures/aiops_challenge.tsv")
575    }
576
577    #[test]
578    fn sha256_to_hex_lower_roundtrip() {
579        let bytes = sha256(b"hello world");
580        let hex = sha256_to_hex_lower(&bytes);
581        assert_eq!(hex.len(), 64);
582        assert!(hex
583            .chars()
584            .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()));
585    }
586
587    #[test]
588    fn verify_fixture_sha256_admits_correct_pin() {
589        let bytes = std::fs::read(aiops_path()).expect("read aiops fixture");
590        let actual = verify_fixture_sha256(&bytes, PIN_AIOPS).expect("admit");
591        assert_eq!(actual, PIN_AIOPS);
592    }
593
594    #[test]
595    fn verify_fixture_sha256_rejects_uppercase_hex() {
596        let bytes = std::fs::read(aiops_path()).expect("read aiops fixture");
597        let upper: String = PIN_AIOPS.chars().map(|c| c.to_ascii_uppercase()).collect();
598        match verify_fixture_sha256(&bytes, &upper) {
599            Err(IngestError::Sha256NotLowerHex64 { .. }) => {}
600            other => panic!("expected Sha256NotLowerHex64, got {other:?}"),
601        }
602    }
603
604    #[test]
605    fn verify_fixture_sha256_rejects_corrupted_bytes() {
606        let mut bytes = std::fs::read(aiops_path()).expect("read aiops fixture");
607        // flip one byte to simulate corruption (avoid the SHA-256 header
608        // bytes — we want the change to affect the hash deterministically)
609        if !bytes.is_empty() {
610            let mid = bytes.len() / 2;
611            bytes[mid] ^= 0x01;
612        }
613        match verify_fixture_sha256(&bytes, PIN_AIOPS) {
614            Err(IngestError::Sha256Mismatch { .. }) => {}
615            other => panic!("expected Sha256Mismatch, got {other:?}"),
616        }
617    }
618
619    #[test]
620    fn load_aiops_admits_canonical_shape() {
621        let bytes = std::fs::read(aiops_path()).expect("read aiops fixture");
622        let fixture = load_residual_projection_tsv(&bytes, PIN_AIOPS).expect("load");
623        assert_eq!(fixture.declared_num_signals, 4);
624        assert_eq!(fixture.declared_num_windows, 32);
625        // AIOps file has no NaN; every cell parses.
626        for row in &fixture.rows {
627            assert_eq!(row.len(), 4);
628            assert!(row.iter().all(Option::is_some));
629        }
630        assert!(fixture.metadata.contains_key("license"));
631    }
632
633    #[test]
634    fn lower_to_trace_events_canonical_ordering() {
635        let bytes = std::fs::read(aiops_path()).expect("read aiops fixture");
636        let fixture = load_residual_projection_tsv(&bytes, PIN_AIOPS).expect("load");
637        let cfg = LoweringConfig::default();
638        let events = lower_to_trace_events(&fixture, &cfg);
639        assert_eq!(
640            events.len(),
641            fixture
642                .rows
643                .iter()
644                .map(|r| r.iter().filter(|c| c.is_some()).count())
645                .sum::<usize>()
646        );
647        // First event maps to window 0 / entity 0
648        assert_eq!(events[0].ts_ns, 0);
649        assert_eq!(events[0].entity_id, 0);
650        // Within a window, entity_id ascends.
651        let signals = fixture.declared_num_signals as usize;
652        for i in 1..signals.min(events.len()) {
653            assert!(events[i].entity_id >= events[i - 1].entity_id);
654        }
655    }
656
657    #[test]
658    fn lowering_is_deterministic_across_two_runs() {
659        let bytes = std::fs::read(aiops_path()).expect("read aiops fixture");
660        let a = load_residual_projection_tsv(&bytes, PIN_AIOPS).expect("load");
661        let b = load_residual_projection_tsv(&bytes, PIN_AIOPS).expect("load");
662        let cfg = LoweringConfig::default();
663        let ea = lower_to_trace_events(&a, &cfg);
664        let eb = lower_to_trace_events(&b, &cfg);
665        assert_eq!(ea, eb);
666    }
667
668    #[test]
669    fn empty_file_rejected() {
670        match load_residual_projection_tsv(b"", "0".repeat(64).as_str()) {
671            Err(IngestError::EmptyFile) => {}
672            other => panic!("expected EmptyFile, got {other:?}"),
673        }
674    }
675}