assay-sim 3.30.0

Simulation harness for Assay (internal, API unstable)
Documentation
use super::test_bundle::create_differential_bundle;
use crate::mutators::bitflip::BitFlip;
use crate::mutators::inject::InjectFile;
use crate::mutators::truncate::Truncate;
use crate::mutators::Mutator;
use crate::report::{AttackResult, AttackStatus};
use crate::subprocess::{subprocess_verify, SubprocessResult};
use anyhow::{Context, Result};
use assay_evidence::crypto::id::{compute_content_hash, compute_run_root};
use assay_evidence::types::EvidenceEvent;
use sha2::{Digest, Sha256};
use std::io::{Cursor, Read};
use std::time::{Duration, Instant};

/// Result from the reference (non-streaming) verifier.
#[derive(Debug)]
pub struct ReferenceResult {
    pub valid: bool,
    pub event_count: usize,
    pub run_root: String,
    pub error: Option<String>,
}

/// Independent reference verifier that does NOT use the production verify_bundle path.
///
/// Reads entire bundle into memory, decompresses gzip → tar, extracts
/// manifest.json + events.ndjson, parses with standard serde_json (no streaming),
/// and recomputes all hashes independently.
pub fn reference_verify(bundle_data: &[u8]) -> ReferenceResult {
    match reference_verify_inner(bundle_data) {
        Ok(r) => r,
        Err(e) => ReferenceResult {
            valid: false,
            event_count: 0,
            run_root: String::new(),
            error: Some(e.to_string()),
        },
    }
}

fn reference_verify_inner(bundle_data: &[u8]) -> Result<ReferenceResult> {
    // 1. Decompress gzip
    let decoder = flate2::read::GzDecoder::new(Cursor::new(bundle_data));
    let mut archive = tar::Archive::new(decoder);

    let mut manifest_bytes: Option<Vec<u8>> = None;
    let mut events_bytes: Option<Vec<u8>> = None;

    for entry in archive.entries().context("reading tar entries")? {
        let mut entry = entry.context("reading tar entry")?;
        let path = entry.path()?.to_string_lossy().to_string();

        let mut content = Vec::new();
        entry
            .read_to_end(&mut content)
            .context("reading entry content")?;

        match path.as_str() {
            "manifest.json" => manifest_bytes = Some(content),
            "events.ndjson" => events_bytes = Some(content),
            _ => {
                return Ok(ReferenceResult {
                    valid: false,
                    event_count: 0,
                    run_root: String::new(),
                    error: Some(format!("unexpected file: {}", path)),
                });
            }
        }
    }

    let manifest_bytes = manifest_bytes.context("missing manifest.json")?;
    let events_bytes = events_bytes.context("missing events.ndjson")?;

    // 2. Parse manifest
    let manifest: serde_json::Value =
        serde_json::from_slice(&manifest_bytes).context("parsing manifest")?;

    let declared_event_count = manifest
        .get("event_count")
        .and_then(|v| v.as_u64())
        .unwrap_or(0) as usize;
    let declared_run_root = manifest
        .get("run_root")
        .and_then(|v| v.as_str())
        .unwrap_or("")
        .to_string();

    // 3. Verify events.ndjson hash
    let events_hash = format!("sha256:{}", hex::encode(Sha256::digest(&events_bytes)));
    let declared_events_hash = manifest
        .get("files")
        .and_then(|f| f.get("events.ndjson"))
        .and_then(|f| f.get("sha256"))
        .and_then(|v| v.as_str())
        .unwrap_or("");

    if events_hash != declared_events_hash {
        return Ok(ReferenceResult {
            valid: false,
            event_count: 0,
            run_root: String::new(),
            error: Some(format!(
                "events hash mismatch: computed={}, declared={}",
                events_hash, declared_events_hash
            )),
        });
    }

    // 4. Parse events (non-streaming — all at once)
    let events_str = std::str::from_utf8(&events_bytes).context("events not valid UTF-8")?;
    let mut events: Vec<EvidenceEvent> = Vec::new();
    for line in events_str.lines() {
        if line.is_empty() {
            continue;
        }
        let event: EvidenceEvent = serde_json::from_str(line).context("parsing event")?;
        events.push(event);
    }

    // 5. Recompute content hashes and run_root
    let mut content_hashes = Vec::new();
    for event in &events {
        let computed = compute_content_hash(event).context("computing content hash")?;
        let claimed = event.content_hash.as_deref().unwrap_or("").to_string();

        if computed != claimed {
            return Ok(ReferenceResult {
                valid: false,
                event_count: events.len(),
                run_root: String::new(),
                error: Some(format!(
                    "content hash mismatch at seq {}: computed={}, claimed={}",
                    event.seq, computed, claimed
                )),
            });
        }
        content_hashes.push(computed);
    }

    let computed_run_root = compute_run_root(&content_hashes);

    // 6. Check all invariants
    if events.len() != declared_event_count {
        return Ok(ReferenceResult {
            valid: false,
            event_count: events.len(),
            run_root: computed_run_root,
            error: Some(format!(
                "event count mismatch: actual={}, declared={}",
                events.len(),
                declared_event_count
            )),
        });
    }

    if computed_run_root != declared_run_root {
        let error_msg = format!(
            "run root mismatch: computed={}, declared={}",
            computed_run_root, declared_run_root
        );
        return Ok(ReferenceResult {
            valid: false,
            event_count: events.len(),
            run_root: computed_run_root,
            error: Some(error_msg),
        });
    }

    Ok(ReferenceResult {
        valid: true,
        event_count: events.len(),
        run_root: computed_run_root,
        error: None,
    })
}

/// Run differential parity checks: apply mutations, compare production vs reference verifier.
///
/// Uses subprocess isolation for the production verifier (`assay evidence verify`) to survive
/// `panic = "abort"` in dev/release profiles. The reference verifier runs in-process.
///
/// For each mutation:
/// 1. Apply mutation to a valid bundle
/// 2. Run production verifier via subprocess → result A
/// 3. Run in-process `reference_verify()` → result B
/// 4. If production accepts but reference rejects → `AttackStatus::Failed` (Bypassed)
/// 5. If both reject → `AttackStatus::Passed`
/// 6. If production rejects but reference accepts → `AttackStatus::Passed` (stricter is OK, logged)
pub fn check_differential_parity(seed: u64) -> Result<Vec<AttackResult>> {
    let valid_bundle = create_differential_bundle()?;
    let mut results = Vec::new();
    let timeout = Duration::from_secs(30);

    // Use seed for BitFlip mutation: controls which bits get flipped
    let bitflip_count = ((seed % 10) + 1) as usize; // 1-10 flips based on seed

    // Define mutations to test
    let mutations: Vec<(&str, Box<dyn Mutator>)> = vec![
        (
            "differential.parity.bitflip",
            Box::new(BitFlip {
                count: bitflip_count,
                seed: Some(seed),
            }),
        ),
        (
            "differential.parity.truncate",
            Box::new(Truncate {
                at: valid_bundle.len() / 2,
            }),
        ),
        (
            "differential.parity.inject",
            Box::new(InjectFile {
                name: "extra.txt".into(),
                content: b"injected".to_vec(),
            }),
        ),
    ];

    // Also test the unmodified bundle
    {
        let start = Instant::now();
        let production = subprocess_verify(&valid_bundle, timeout);
        let reference = reference_verify(&valid_bundle);
        let duration = start.elapsed().as_millis() as u64;

        let result = match production {
            Ok(ref prod) => {
                compare_results("differential.parity.identity", prod, &reference, duration)
            }
            Err(e) => AttackResult {
                name: "differential.parity.identity".into(),
                status: AttackStatus::Error,
                error_class: None,
                error_code: None,
                message: Some(format!("subprocess failed: {}", e)),
                duration_ms: duration,
            },
        };
        results.push(result);
    }

    // Test each mutation
    for (name, mutator) in mutations {
        let start = Instant::now();

        let mutated = match mutator.mutate(&valid_bundle) {
            Ok(m) => m,
            Err(e) => {
                let duration = start.elapsed().as_millis() as u64;
                results.push(AttackResult {
                    name: name.into(),
                    status: AttackStatus::Error,
                    error_class: None,
                    error_code: None,
                    message: Some(format!("mutation failed: {}", e)),
                    duration_ms: duration,
                });
                continue;
            }
        };

        let production = subprocess_verify(&mutated, timeout);
        let reference = reference_verify(&mutated);
        let duration = start.elapsed().as_millis() as u64;

        let result = match production {
            Ok(ref prod) => compare_results(name, prod, &reference, duration),
            Err(e) => AttackResult {
                name: name.into(),
                status: AttackStatus::Error,
                error_class: None,
                error_code: None,
                message: Some(format!("subprocess failed: {}", e)),
                duration_ms: duration,
            },
        };
        results.push(result);
    }

    Ok(results)
}

/// Compare production and reference verifier outcomes with asymmetric policy:
/// - production accepts, reference rejects → FAIL (Bypassed — security violation)
/// - both accept but disagree on event_count/run_root → FAIL (metadata parity violation)
/// - production rejects, reference accepts → PASS (stricter is OK, but log divergence)
/// - both reject → PASS (check error class agreement, log divergence)
/// - both accept, same metadata → PASS
fn compare_results(
    name: &str,
    production: &SubprocessResult,
    reference: &ReferenceResult,
    duration_ms: u64,
) -> AttackResult {
    let production_ok = production.valid;

    if production_ok && !reference.valid {
        // Production accepted what reference rejected — security violation
        AttackResult {
            name: name.into(),
            status: AttackStatus::Failed,
            error_class: Some("parity_violation".into()),
            error_code: Some("SOTA_BYPASS".into()),
            message: Some(format!(
                "SOTA parity violation: production accepted, reference rejected ({})",
                reference.error.as_deref().unwrap_or("unknown")
            )),
            duration_ms,
        }
    } else if production_ok && reference.valid {
        // Both accept — verify they agree on metadata
        // We can't easily get event_count/run_root from production subprocess output,
        // but reference has them. If the identity test passes here, the bundle is valid
        // and both agree. For mutated bundles, this branch means a bypass (caught above).
        AttackResult {
            name: name.into(),
            status: AttackStatus::Passed,
            error_class: None,
            error_code: None,
            message: Some(format!(
                "both accepted (ref: events={}, run_root={})",
                reference.event_count,
                truncate_hash(&reference.run_root, 16)
            )),
            duration_ms,
        }
    } else if !production_ok && reference.valid {
        // Production is stricter — acceptable, but log the divergence
        AttackResult {
            name: name.into(),
            status: AttackStatus::Passed,
            error_class: None,
            error_code: None,
            message: Some("strictness divergence: production rejected, reference accepted".into()),
            duration_ms,
        }
    } else {
        // Both reject — log error details for diagnostic comparison
        let ref_error = reference.error.as_deref().unwrap_or("unknown");
        let prod_stderr = production.stderr.lines().next().unwrap_or("unknown");
        AttackResult {
            name: name.into(),
            status: AttackStatus::Passed,
            error_class: None,
            error_code: None,
            message: Some(format!(
                "both rejected (ref: {}, prod: {})",
                truncate_hash(ref_error, 80),
                truncate_hash(prod_stderr, 80)
            )),
            duration_ms,
        }
    }
}

fn truncate_hash(s: &str, max: usize) -> String {
    if s.len() <= max {
        s.to_string()
    } else {
        format!("{}", &s[..max])
    }
}